diff --git a/common/src/main/scala/org/apache/comet/CometConf.scala b/common/src/main/scala/org/apache/comet/CometConf.scala index bfe90181ff..3f8d6bd221 100644 --- a/common/src/main/scala/org/apache/comet/CometConf.scala +++ b/common/src/main/scala/org/apache/comet/CometConf.scala @@ -534,6 +534,19 @@ object CometConf extends ShimCometConf { .checkValue(v => v > 0, "Write buffer size must be positive") .createWithDefault(1) + val COMET_NATIVE_SHUFFLE_MODE: ConfigEntry[String] = + conf(s"$COMET_EXEC_CONFIG_PREFIX.shuffle.nativeMode") + .category(CATEGORY_SHUFFLE) + .doc( + "Selects which native shuffle implementation to use for multi-partition shuffles. " + + "'buffered' buffers input batches and tracks per-partition row indices, writing all " + + "partitions at the end with memory-pressure-driven spilling. " + + "'immediate' repartitions each incoming batch using take and writes per-partition " + + "data directly to individual files, avoiding in-memory buffering of input batches.") + .stringConf + .checkValues(Set("buffered", "immediate")) + .createWithDefault("buffered") + val COMET_SHUFFLE_PREFER_DICTIONARY_RATIO: ConfigEntry[Double] = conf( "spark.comet.shuffle.preferDictionary.ratio") .category(CATEGORY_SHUFFLE) diff --git a/native/Cargo.lock b/native/Cargo.lock index 598b18d58c..2c117631a4 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -96,11 +96,55 @@ version = "0.1.6" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" +[[package]] +name = "anstream" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "824a212faf96e9acacdbd09febd34438f8f711fb84e09a8916013cd7815ca28d" +dependencies = [ + "anstyle", + "anstyle-parse", + "anstyle-query", + "anstyle-wincon", + "colorchoice", + "is_terminal_polyfill", + "utf8parse", +] + [[package]] name = "anstyle" -version = "1.0.13" +version = "1.0.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "940b3a0ca603d1eade50a4846a2afffd5ef57a9feac2c0e2ec2e14f9ead76000" + +[[package]] +name = "anstyle-parse" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "52ce7f38b242319f7cabaa6813055467063ecdc9d355bbb4ce0c68908cd8130e" +dependencies = [ + "utf8parse", +] + +[[package]] +name = "anstyle-query" +version = "1.1.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5192cca8006f1fd4f7237516f40fa183bb07f8fbdfedaa0036de5ea9b0b45e78" +checksum = "40c48f72fd53cd289104fc64099abca73db4166ad86ea0b4341abe65af83dadc" +dependencies = [ + "windows-sys 0.61.2", +] + +[[package]] +name = "anstyle-wincon" +version = "3.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "291e6a250ff86cd4a820112fb8898808a366d8f9f58ce16d1f538353ad55747d" +dependencies = [ + "anstyle", + "once_cell_polyfill", + "windows-sys 0.61.2", +] [[package]] name = "anyhow" @@ -135,9 +179,9 @@ dependencies = [ [[package]] name = "arc-swap" -version = "1.8.2" +version = "1.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f9f3647c145568cec02c42054e07bdf9a5a698e15b466fb2341bfc393cd24aa5" +checksum = "a07d1f37ff60921c83bdfc7407723bdefe89b44b98a9b772f225c8f9d67141a6" dependencies = [ "rustversion", ] @@ -600,9 +644,9 @@ dependencies = [ [[package]] name = "aws-lc-rs" -version = "1.16.1" +version = "1.16.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "94bffc006df10ac2a68c83692d734a465f8ee6c5b384d8545a636f81d858f4bf" +checksum = "a054912289d18629dc78375ba2c3726a3afe3ff71b4edba9dedfca0e3446d1fc" dependencies = [ "aws-lc-sys", "zeroize", @@ -610,9 +654,9 @@ dependencies = [ [[package]] name = "aws-lc-sys" -version = "0.38.0" +version = "0.39.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4321e568ed89bb5a7d291a7f37997c2c0df89809d7b6d12062c81ddb54aa782e" +checksum = "1fa7e52a4c5c547c741610a2c6f123f3881e409b714cd27e6798ef020c514f0a" dependencies = [ "cc", "cmake", @@ -647,9 +691,9 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.96.0" +version = "1.97.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f64a6eded248c6b453966e915d32aeddb48ea63ad17932682774eb026fbef5b1" +checksum = "9aadc669e184501caaa6beafb28c6267fc1baef0810fb58f9b205485ca3f2567" dependencies = [ "aws-credential-types", "aws-runtime", @@ -671,9 +715,9 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.98.0" +version = "1.99.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "db96d720d3c622fcbe08bae1c4b04a72ce6257d8b0584cb5418da00ae20a344f" +checksum = "1342a7db8f358d3de0aed2007a0b54e875458e39848d54cc1d46700b2bfcb0a8" dependencies = [ "aws-credential-types", "aws-runtime", @@ -695,9 +739,9 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.100.0" +version = "1.101.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fafbdda43b93f57f699c5dfe8328db590b967b8a820a13ccdd6687355dfcc7ca" +checksum = "ab41ad64e4051ecabeea802d6a17845a91e83287e1dd249e6963ea1ba78c428a" dependencies = [ "aws-credential-types", "aws-runtime", @@ -868,9 +912,9 @@ dependencies = [ [[package]] name = "aws-smithy-types" -version = "1.4.6" +version = "1.4.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d2b1117b3b2bbe166d11199b540ceed0d0f7676e36e7b962b5a437a9971eac75" +checksum = "9d73dbfbaa8e4bc57b9045137680b958d274823509a360abfd8e1d514d40c95c" dependencies = [ "base64-simd", "bytes", @@ -1100,9 +1144,9 @@ dependencies = [ [[package]] name = "bon" -version = "3.9.0" +version = "3.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2d13a61f2963b88eef9c1be03df65d42f6996dfeac1054870d950fcf66686f83" +checksum = "f47dbe92550676ee653353c310dfb9cf6ba17ee70396e1f7cf0a2020ad49b2fe" dependencies = [ "bon-macros", "rustversion", @@ -1110,9 +1154,9 @@ dependencies = [ [[package]] name = "bon-macros" -version = "3.9.0" +version = "3.9.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d314cc62af2b6b0c65780555abb4d02a03dd3b799cd42419044f0c38d99738c0" +checksum = "519bd3116aeeb42d5372c29d982d16d0170d3d4a5ed85fc7dd91642ffff3c67c" dependencies = [ "darling 0.23.0", "ident_case", @@ -1326,38 +1370,59 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.60" +version = "4.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2797f34da339ce31042b27d23607e051786132987f595b02ba4f6a6dffb7030a" +checksum = "b193af5b67834b676abd72466a96c1024e6a6ad978a1f484bd90b85c94041351" dependencies = [ "clap_builder", + "clap_derive", ] [[package]] name = "clap_builder" -version = "4.5.60" +version = "4.6.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "24a241312cea5059b13574bb9b3861cabf758b879c15190b37b6d6fd63ab6876" +checksum = "714a53001bf66416adb0e2ef5ac857140e7dc3a0c48fb28b2f10762fc4b5069f" dependencies = [ + "anstream", "anstyle", "clap_lex", + "strsim", +] + +[[package]] +name = "clap_derive" +version = "4.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1110bd8a634a1ab8cb04345d8d878267d57c3cf1b38d91b71af6686408bbca6a" +dependencies = [ + "heck", + "proc-macro2", + "quote", + "syn 2.0.117", ] [[package]] name = "clap_lex" -version = "1.0.0" +version = "1.1.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a822ea5bc7590f9d40f1ba12c0dc3c2760f3482c6984db1573ad11031420831" +checksum = "c8d4a3bb8b1e0c1050499d1815f5ab16d04f0959b233085fb31653fbfc9d98f9" [[package]] name = "cmake" -version = "0.1.57" +version = "0.1.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "75443c44cd6b379beb8c5b45d85d0773baf31cce901fe7bb252f4eff3008ef7d" +checksum = "c0f78a02292a74a88ac736019ab962ece0bc380e3f977bf72e376c5d78ff0678" dependencies = [ "cc", ] +[[package]] +name = "colorchoice" +version = "1.0.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1d07550c9036bf2ae0c684c4297d503f838287c83c53686d05370d0e139ae570" + [[package]] name = "combine" version = "4.6.7" @@ -1617,16 +1682,6 @@ dependencies = [ "darling_macro 0.20.11", ] -[[package]] -name = "darling" -version = "0.21.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9cdf337090841a411e2a7f3deb9187445851f91b309c0c0a29e05f74a00a48c0" -dependencies = [ - "darling_core 0.21.3", - "darling_macro 0.21.3", -] - [[package]] name = "darling" version = "0.23.0" @@ -1651,20 +1706,6 @@ dependencies = [ "syn 2.0.117", ] -[[package]] -name = "darling_core" -version = "0.21.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1247195ecd7e3c85f83c8d2a366e4210d588e802133e1e355180a9870b517ea4" -dependencies = [ - "fnv", - "ident_case", - "proc-macro2", - "quote", - "strsim", - "syn 2.0.117", -] - [[package]] name = "darling_core" version = "0.23.0" @@ -1689,17 +1730,6 @@ dependencies = [ "syn 2.0.117", ] -[[package]] -name = "darling_macro" -version = "0.21.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "d38308df82d1080de0afee5d069fa14b0326a88c14f15c5ccda35b4a6c414c81" -dependencies = [ - "darling_core 0.21.3", - "quote", - "syn 2.0.117", -] - [[package]] name = "darling_macro" version = "0.23.0" @@ -1953,6 +1983,7 @@ dependencies = [ "arrow", "async-trait", "bytes", + "clap", "crc32fast", "criterion", "datafusion", @@ -1964,6 +1995,7 @@ dependencies = [ "jni", "log", "lz4_flex 0.13.0", + "parquet", "simd-adler32", "snap", "tempfile", @@ -2675,9 +2707,9 @@ dependencies = [ [[package]] name = "dissimilar" -version = "1.0.10" +version = "1.0.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8975ffdaa0ef3661bfe02dbdcc06c9f829dfafe6a3c474de366a8d5e44276921" +checksum = "aeda16ab4059c5fd2a83f2b9c9e9c981327b18aa8e3b313f7e6563799d4f093e" [[package]] name = "dlv-list" @@ -3612,9 +3644,9 @@ checksum = "d98f6fed1fde3f8c21bc40a1abb88dd75e67924f9cffc3ef95607bad8017f8e2" [[package]] name = "iri-string" -version = "0.7.10" +version = "0.7.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c91338f0783edbd6195decb37bae672fd3b165faffb89bf7b9e6942f8b1a731a" +checksum = "d8e7418f59cc01c88316161279a7f665217ae316b388e58a0d10e29f54f1e5eb" dependencies = [ "memchr", "serde", @@ -3631,6 +3663,12 @@ dependencies = [ "windows-sys 0.61.2", ] +[[package]] +name = "is_terminal_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a6cb138bb79a146c1bd460005623e142ef0181e3d0219cb493e02f7d08a35695" + [[package]] name = "itertools" version = "0.13.0" @@ -3651,9 +3689,9 @@ dependencies = [ [[package]] name = "itoa" -version = "1.0.17" +version = "1.0.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "92ecc6618181def0457392ccd0ee51198e065e016d1d527a7ac1b6dc7c1f09d2" +checksum = "8f42a60cbdf9a97f5d2305f08a87dc4e09308d1276d28c869c684d7777685682" [[package]] name = "java-locator" @@ -3715,7 +3753,7 @@ dependencies = [ "cfg-if", "combine", "java-locator", - "jni-sys", + "jni-sys 0.3.1", "libloading 0.7.4", "log", "thiserror 1.0.69", @@ -3725,9 +3763,31 @@ dependencies = [ [[package]] name = "jni-sys" -version = "0.3.0" +version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8eaf4bc02d17cbdd7ff4c7438cafcdf7fb9a4613313ad11b4f8fefe7d3fa0130" +checksum = "41a652e1f9b6e0275df1f15b32661cf0d4b78d4d87ddec5e0c3c20f097433258" +dependencies = [ + "jni-sys 0.4.1", +] + +[[package]] +name = "jni-sys" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c6377a88cb3910bee9b0fa88d4f42e1d2da8e79915598f65fb0c7ee14c878af2" +dependencies = [ + "jni-sys-macros", +] + +[[package]] +name = "jni-sys-macros" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38c0b942f458fe50cdac086d2f946512305e5631e720728f2a61aabcd47a6264" +dependencies = [ + "quote", + "syn 2.0.117", +] [[package]] name = "jobserver" @@ -4067,9 +4127,9 @@ dependencies = [ [[package]] name = "mio" -version = "1.1.1" +version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a69bcab0ad47271a0234d9422b131806bf3968021e5dc9328caf2d4cd58557fc" +checksum = "50b7e5b27aa02a74bac8c3f23f448f8d87ff11f92d3aac1a6ed369ee08cc56c1" dependencies = [ "libc", "wasi", @@ -4084,9 +4144,9 @@ checksum = "dce6dd36094cac388f119d2e9dc82dc730ef91c32a6222170d630e5414b956e6" [[package]] name = "moka" -version = "0.12.14" +version = "0.12.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "85f8024e1c8e71c778968af91d43700ce1d11b219d127d79fb2934153b82b42b" +checksum = "957228ad12042ee839f93c8f257b62b4c0ab5eaae1d4fa60de53b27c9d7c5046" dependencies = [ "async-lock", "crossbeam-channel", @@ -4187,9 +4247,9 @@ dependencies = [ [[package]] name = "num-conv" -version = "0.2.0" +version = "0.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cf97ec579c3c42f953ef76dbf8d55ac91fb219dde70e49aa4a6b7d74e9919050" +checksum = "c6673768db2d862beb9b39a78fdcb1a69439615d5794a1be50caa9bc92c81967" [[package]] name = "num-format" @@ -4311,6 +4371,12 @@ version = "1.21.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9f7c3e4beb33f85d45ae3e3a1792185706c8e16d043238c593331cc7cd313b50" +[[package]] +name = "once_cell_polyfill" +version = "1.70.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "384b8ab6d37215f3c5301a95a4accb5d64aa607f1fcb26a11b5303878451b4fe" + [[package]] name = "oorandom" version = "11.1.5" @@ -4715,9 +4781,9 @@ checksum = "c33a9471896f1c69cecef8d20cbe2f7accd12527ce60845ff44c153bb2a21b49" [[package]] name = "portable-atomic-util" -version = "0.2.5" +version = "0.2.6" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a9db96d7fa8782dd8c15ce32ffe8680bbd1e978a43bf51a34d39483540495f5" +checksum = "091397be61a01d4be58e7841595bd4bfedb15f1cd54977d79b8271e94ed799a3" dependencies = [ "portable-atomic", ] @@ -5595,9 +5661,9 @@ dependencies = [ [[package]] name = "serde_with" -version = "3.17.0" +version = "3.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "381b283ce7bc6b476d903296fb59d0d36633652b633b27f64db4fb46dcbfc3b9" +checksum = "dd5414fad8e6907dbdd5bc441a50ae8d6e26151a03b1de04d89a5576de61d01f" dependencies = [ "base64", "chrono", @@ -5614,11 +5680,11 @@ dependencies = [ [[package]] name = "serde_with_macros" -version = "3.17.0" +version = "3.18.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a6d4e30573c8cb306ed6ab1dca8423eec9a463ea0e155f45399455e0368b27e0" +checksum = "d3db8978e608f1fe7357e211969fd9abdcae80bac1ba7a3369bb7eb6b404eb65" dependencies = [ - "darling 0.21.3", + "darling 0.23.0", "proc-macro2", "quote", "syn 2.0.117", @@ -5836,9 +5902,9 @@ checksum = "13c2bddecc57b384dee18652358fb23172facb8a2c51ccc10d74c157bdea3292" [[package]] name = "symbolic-common" -version = "12.17.2" +version = "12.17.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "751a2823d606b5d0a7616499e4130a516ebd01a44f39811be2b9600936509c23" +checksum = "52ca086c1eb5c7ee74b151ba83c6487d5d33f8c08ad991b86f3f58f6629e68d5" dependencies = [ "debugid", "memmap2", @@ -5848,9 +5914,9 @@ dependencies = [ [[package]] name = "symbolic-demangle" -version = "12.17.2" +version = "12.17.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "79b237cfbe320601dd24b4ac817a5b68bb28f5508e33f08d42be0682cadc8ac9" +checksum = "baa911a28a62823aaf2cc2e074212492a3ee69d0d926cc8f5b12b4a108ff5c0c" dependencies = [ "cpp_demangle", "rustc-demangle", @@ -6072,9 +6138,9 @@ dependencies = [ [[package]] name = "tinyvec" -version = "1.10.0" +version = "1.11.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfa5fdc3bce6191a1dbc8c02d5c8bffcf557bafa17c124c5264a458f1b0613fa" +checksum = "3e61e67053d25a4e82c844e8424039d9745781b3fc4f32b8d55ed50f5f667ef3" dependencies = [ "tinyvec_macros", ] @@ -6300,9 +6366,9 @@ checksum = "e6e4313cd5fcd3dad5cafa179702e2b244f760991f45397d14d4ebf38247da75" [[package]] name = "unicode-segmentation" -version = "1.12.0" +version = "1.13.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f6ccf251212114b54433ec949fd6a7841275f9ada20dddd2f29e9ceea4501493" +checksum = "9629274872b2bfaf8d66f5f15725007f635594914870f65218920345aa11aa8c" [[package]] name = "unicode-width" @@ -6361,6 +6427,12 @@ version = "1.0.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" +[[package]] +name = "utf8parse" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" + [[package]] name = "uuid" version = "1.23.0" @@ -7027,18 +7099,18 @@ dependencies = [ [[package]] name = "zerocopy" -version = "0.8.42" +version = "0.8.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "f2578b716f8a7a858b7f02d5bd870c14bf4ddbbcf3a4c05414ba6503640505e3" +checksum = "efbb2a062be311f2ba113ce66f697a4dc589f85e78a4aea276200804cea0ed87" dependencies = [ "zerocopy-derive", ] [[package]] name = "zerocopy-derive" -version = "0.8.42" +version = "0.8.47" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7e6cc098ea4d3bd6246687de65af3f920c430e236bee1e3bf2e441463f08a02f" +checksum = "0e8bc7269b54418e7aeeef514aa68f8690b8c0489a06b0136e5f57c4c5ccab89" dependencies = [ "proc-macro2", "quote", diff --git a/native/core/src/execution/planner.rs b/native/core/src/execution/planner.rs index 5af31fcc22..7b9bf5a3e5 100644 --- a/native/core/src/execution/planner.rs +++ b/native/core/src/execution/planner.rs @@ -29,7 +29,7 @@ use crate::execution::{ planner::expression_registry::ExpressionRegistry, planner::operator_registry::OperatorRegistry, serde::to_arrow_datatype, - shuffle::ShuffleWriterExec, + shuffle::{ShuffleMode, ShuffleWriterExec}, }; use arrow::compute::CastOptions; use arrow::datatypes::{DataType, Field, FieldRef, Schema, TimeUnit, DECIMAL128_MAX_PRECISION}; @@ -116,7 +116,8 @@ use datafusion_comet_proto::{ spark_operator::{ self, lower_window_frame_bound::LowerFrameBoundStruct, operator::OpStruct, upper_window_frame_bound::UpperFrameBoundStruct, BuildSide, - CompressionCodec as SparkCompressionCodec, JoinType, Operator, WindowFrameType, + CompressionCodec as SparkCompressionCodec, JoinType, Operator, + ShuffleMode as SparkShuffleMode, WindowFrameType, }, spark_partitioning::{partitioning::PartitioningStruct, Partitioning as SparkPartitioning}, }; @@ -1352,6 +1353,10 @@ impl PhysicalPlanner { }?; let write_buffer_size = writer.write_buffer_size as usize; + let shuffle_mode = match writer.shuffle_mode.try_into() { + Ok(SparkShuffleMode::BufferedShuffle) => ShuffleMode::Buffered, + _ => ShuffleMode::Immediate, + }; let shuffle_writer = Arc::new(ShuffleWriterExec::try_new( Arc::clone(&child.native_plan), partitioning, @@ -1360,6 +1365,7 @@ impl PhysicalPlanner { writer.output_index_file.clone(), writer.tracing_enabled, write_buffer_size, + shuffle_mode, )?); Ok(( diff --git a/native/proto/src/proto/operator.proto b/native/proto/src/proto/operator.proto index 344b9f0f21..7f6d947315 100644 --- a/native/proto/src/proto/operator.proto +++ b/native/proto/src/proto/operator.proto @@ -284,6 +284,12 @@ enum CompressionCodec { Snappy = 3; } +// Selects which shuffle implementation to use for multi-partition shuffles. +enum ShuffleMode { + BufferedShuffle = 0; + ImmediateShuffle = 1; +} + message ShuffleWriter { spark.spark_partitioning.Partitioning partitioning = 1; string output_data_file = 3; @@ -294,6 +300,8 @@ message ShuffleWriter { // Size of the write buffer in bytes used when writing shuffle data to disk. // Larger values may improve write performance but use more memory. int32 write_buffer_size = 8; + // Which shuffle implementation to use for multi-partition shuffles. + ShuffleMode shuffle_mode = 9; } message ParquetWriter { diff --git a/native/shuffle/Cargo.toml b/native/shuffle/Cargo.toml index 94ed5f30a5..a5982c05fa 100644 --- a/native/shuffle/Cargo.toml +++ b/native/shuffle/Cargo.toml @@ -32,6 +32,7 @@ publish = false arrow = { workspace = true } async-trait = { workspace = true } bytes = { workspace = true } +clap = { version = "4", features = ["derive"], optional = true } crc32fast = "1.3.2" datafusion = { workspace = true } datafusion-comet-common = { workspace = true } @@ -42,6 +43,8 @@ itertools = "0.14.0" jni = "0.21" log = "0.4" lz4_flex = { version = "0.13.0", default-features = false, features = ["frame"] } +# parquet is only used by the shuffle_bench binary (shuffle-bench feature) +parquet = { workspace = true, optional = true } simd-adler32 = "0.3.9" snap = "1.1" tokio = { version = "1", features = ["rt-multi-thread"] } @@ -53,10 +56,18 @@ datafusion = { workspace = true, features = ["parquet_encryption", "sql"] } itertools = "0.14.0" tempfile = "3.26.0" +[features] +shuffle-bench = ["clap", "parquet"] + [lib] name = "datafusion_comet_shuffle" path = "src/lib.rs" +[[bin]] +name = "shuffle_bench" +path = "src/bin/shuffle_bench.rs" +required-features = ["shuffle-bench"] + [[bench]] name = "shuffle_writer" harness = false diff --git a/native/shuffle/README.md b/native/shuffle/README.md index 8fba6b0323..74b8dbe656 100644 --- a/native/shuffle/README.md +++ b/native/shuffle/README.md @@ -23,3 +23,46 @@ This crate provides the shuffle writer and reader implementation for Apache Data of the [Apache DataFusion Comet] subproject. [Apache DataFusion Comet]: https://github.com/apache/datafusion-comet/ + +## Shuffle Benchmark Tool + +A standalone benchmark binary (`shuffle_bench`) is included for profiling shuffle write and read +performance outside of Spark. It streams input data directly from Parquet files. + +### Basic usage + +```sh +cargo run --release --features shuffle-bench --bin shuffle_bench -- \ + --input /data/tpch-sf100/lineitem/ \ + --partitions 200 \ + --codec zstd --zstd-level 1 \ + --hash-columns 0,3 +``` + +### Options + +| Option | Default | Description | +| ------------------------ | -------------------------- | ------------------------------------------------------------ | +| `--input` | _(required)_ | Path to a Parquet file or directory of Parquet files | +| `--partitions` | `200` | Number of output shuffle partitions | +| `--partitioning` | `hash` | Partitioning scheme: `hash`, `single`, `round-robin` | +| `--hash-columns` | `0` | Comma-separated column indices to hash on (e.g. `0,3`) | +| `--codec` | `zstd` | Compression codec: `none`, `lz4`, `zstd`, `snappy` | +| `--zstd-level` | `1` | Zstd compression level (1–22) | +| `--batch-size` | `8192` | Batch size for reading Parquet data | +| `--memory-limit` | _(none)_ | Memory limit in bytes; triggers spilling when exceeded | +| `--max-buffered-batches` | `0` | Max batches to buffer before spilling (0 = memory-pool-only) | +| `--write-buffer-size` | `1048576` | Write buffer size in bytes | +| `--limit` | `0` | Limit rows processed per iteration (0 = no limit) | +| `--iterations` | `1` | Number of timed iterations | +| `--warmup` | `0` | Number of warmup iterations before timing | +| `--read-back` | `false` | Also benchmark reading back the shuffle output | +| `--output-dir` | `/tmp/comet_shuffle_bench` | Directory for temporary shuffle output files | + +### Profiling with flamegraph + +```sh +cargo flamegraph --release --features shuffle-bench --bin shuffle_bench -- \ + --input /data/tpch-sf100/lineitem/ \ + --partitions 200 --codec zstd --zstd-level 1 +``` diff --git a/native/shuffle/benches/shuffle_writer.rs b/native/shuffle/benches/shuffle_writer.rs index 27abd919fa..fcb2e592f1 100644 --- a/native/shuffle/benches/shuffle_writer.rs +++ b/native/shuffle/benches/shuffle_writer.rs @@ -30,7 +30,7 @@ use datafusion::{ prelude::SessionContext, }; use datafusion_comet_shuffle::{ - CometPartitioning, CompressionCodec, ShuffleBlockWriter, ShuffleWriterExec, + CometPartitioning, CompressionCodec, ShuffleBlockWriter, ShuffleMode, ShuffleWriterExec, }; use itertools::Itertools; use std::io::Cursor; @@ -153,6 +153,7 @@ fn create_shuffle_writer_exec( "/tmp/index.out".to_string(), false, 1024 * 1024, + ShuffleMode::Buffered, ) .unwrap() } diff --git a/native/shuffle/src/bin/shuffle_bench.rs b/native/shuffle/src/bin/shuffle_bench.rs new file mode 100644 index 0000000000..079a890a19 --- /dev/null +++ b/native/shuffle/src/bin/shuffle_bench.rs @@ -0,0 +1,660 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Standalone shuffle benchmark tool for profiling Comet shuffle write and read +//! outside of Spark. Streams input directly from Parquet files. +//! +//! # Usage +//! +//! ```sh +//! cargo run --release --bin shuffle_bench -- \ +//! --input /data/tpch-sf100/lineitem/ \ +//! --partitions 200 \ +//! --codec zstd --zstd-level 1 \ +//! --hash-columns 0,3 \ +//! --read-back +//! ``` +//! +//! Profile with flamegraph: +//! ```sh +//! cargo flamegraph --release --bin shuffle_bench -- \ +//! --input /data/tpch-sf100/lineitem/ \ +//! --partitions 200 --codec zstd --zstd-level 1 +//! ``` + +use arrow::datatypes::{DataType, SchemaRef}; +use clap::Parser; +use datafusion::execution::config::SessionConfig; +use datafusion::execution::runtime_env::RuntimeEnvBuilder; +use datafusion::physical_expr::expressions::Column; +use datafusion::physical_plan::coalesce_partitions::CoalescePartitionsExec; +use datafusion::physical_plan::common::collect; +use datafusion::physical_plan::ExecutionPlan; +use datafusion::prelude::{ParquetReadOptions, SessionContext}; +use datafusion_comet_shuffle::{ + read_ipc_compressed, CometPartitioning, CompressionCodec, ShuffleMode, ShuffleWriterExec, +}; +use parquet::arrow::arrow_reader::ParquetRecordBatchReaderBuilder; +use std::fs; +use std::path::PathBuf; +use std::sync::Arc; +use std::time::Instant; + +#[derive(Parser, Debug)] +#[command( + name = "shuffle_bench", + about = "Standalone benchmark for Comet shuffle write and read performance" +)] +struct Args { + /// Path to input Parquet file or directory of Parquet files + #[arg(long)] + input: PathBuf, + + /// Batch size for reading Parquet data + #[arg(long, default_value_t = 8192)] + batch_size: usize, + + /// Number of output shuffle partitions + #[arg(long, default_value_t = 200)] + partitions: usize, + + /// Partitioning scheme: hash, single, round-robin + #[arg(long, default_value = "hash")] + partitioning: String, + + /// Column indices to hash on (comma-separated, e.g. "0,3") + #[arg(long, default_value = "0")] + hash_columns: String, + + /// Compression codec: none, lz4, zstd, snappy + #[arg(long, default_value = "zstd")] + codec: String, + + /// Zstd compression level (1-22) + #[arg(long, default_value_t = 1)] + zstd_level: i32, + + /// Memory limit in bytes (triggers spilling when exceeded) + #[arg(long)] + memory_limit: Option, + + /// Also benchmark reading back the shuffle output + #[arg(long, default_value_t = false)] + read_back: bool, + + /// Number of iterations to run + #[arg(long, default_value_t = 1)] + iterations: usize, + + /// Number of warmup iterations before timing + #[arg(long, default_value_t = 0)] + warmup: usize, + + /// Output directory for shuffle data/index files + #[arg(long, default_value = "/tmp/comet_shuffle_bench")] + output_dir: PathBuf, + + /// Write buffer size in bytes + #[arg(long, default_value_t = 1048576)] + write_buffer_size: usize, + + /// Limit rows processed per iteration (0 = no limit) + #[arg(long, default_value_t = 0)] + limit: usize, + + /// Shuffle mode: buffered or immediate + #[arg(long, default_value = "immediate")] + shuffle_mode: String, + + /// Number of concurrent shuffle tasks to simulate executor parallelism. + /// Each task reads the same input and writes to its own output files. + #[arg(long, default_value_t = 1)] + concurrent_tasks: usize, +} + +fn main() { + let args = Args::parse(); + + // Create output directory + fs::create_dir_all(&args.output_dir).expect("Failed to create output directory"); + let data_file = args.output_dir.join("data.out"); + let index_file = args.output_dir.join("index.out"); + + let (schema, total_rows) = read_parquet_metadata(&args.input, args.limit); + + let codec = parse_codec(&args.codec, args.zstd_level); + let hash_col_indices = parse_hash_columns(&args.hash_columns); + + println!("=== Shuffle Benchmark ==="); + println!("Input: {}", args.input.display()); + println!( + "Schema: {} columns ({})", + schema.fields().len(), + describe_schema(&schema) + ); + println!("Total rows: {}", format_number(total_rows as usize)); + println!("Batch size: {}", format_number(args.batch_size)); + println!("Partitioning: {}", args.partitioning); + println!("Partitions: {}", args.partitions); + println!("Codec: {:?}", codec); + println!("Hash columns: {:?}", hash_col_indices); + println!("Shuffle mode: {}", args.shuffle_mode); + if let Some(mem_limit) = args.memory_limit { + println!("Memory limit: {}", format_bytes(mem_limit)); + } + if args.concurrent_tasks > 1 { + println!("Concurrent: {} tasks", args.concurrent_tasks); + } + println!( + "Iterations: {} (warmup: {})", + args.iterations, args.warmup + ); + println!(); + + let total_iters = args.warmup + args.iterations; + let mut write_times = Vec::with_capacity(args.iterations); + let mut read_times = Vec::with_capacity(args.iterations); + let mut data_file_sizes = Vec::with_capacity(args.iterations); + + for i in 0..total_iters { + let is_warmup = i < args.warmup; + let label = if is_warmup { + format!("warmup {}/{}", i + 1, args.warmup) + } else { + format!("iter {}/{}", i - args.warmup + 1, args.iterations) + }; + + let write_elapsed = if args.concurrent_tasks > 1 { + run_concurrent_shuffle_writes(&args.input, &schema, &codec, &hash_col_indices, &args) + } else { + run_shuffle_write( + &args.input, + &schema, + &codec, + &hash_col_indices, + &args, + data_file.to_str().unwrap(), + index_file.to_str().unwrap(), + ) + }; + let data_size = fs::metadata(&data_file).map(|m| m.len()).unwrap_or(0); + + if !is_warmup { + write_times.push(write_elapsed); + data_file_sizes.push(data_size); + } + + print!(" [{label}] write: {:.3}s", write_elapsed); + if args.concurrent_tasks <= 1 { + print!(" output: {}", format_bytes(data_size as usize)); + } + + if args.read_back && args.concurrent_tasks <= 1 { + let read_elapsed = run_shuffle_read( + data_file.to_str().unwrap(), + index_file.to_str().unwrap(), + args.partitions, + ); + if !is_warmup { + read_times.push(read_elapsed); + } + print!(" read: {:.3}s", read_elapsed); + } + println!(); + } + + if args.iterations > 0 { + println!(); + println!("=== Results ==="); + + let avg_write = write_times.iter().sum::() / write_times.len() as f64; + let write_throughput_rows = (total_rows as f64 * args.concurrent_tasks as f64) / avg_write; + + println!("Write:"); + println!(" avg time: {:.3}s", avg_write); + if write_times.len() > 1 { + let min = write_times.iter().cloned().fold(f64::INFINITY, f64::min); + let max = write_times + .iter() + .cloned() + .fold(f64::NEG_INFINITY, f64::max); + println!(" min/max: {:.3}s / {:.3}s", min, max); + } + println!( + " throughput: {} rows/s (total across {} tasks)", + format_number(write_throughput_rows as usize), + args.concurrent_tasks + ); + if args.concurrent_tasks <= 1 { + let avg_data_size = data_file_sizes.iter().sum::() / data_file_sizes.len() as u64; + println!( + " output size: {}", + format_bytes(avg_data_size as usize) + ); + } + + if !read_times.is_empty() { + let avg_data_size = data_file_sizes.iter().sum::() / data_file_sizes.len() as u64; + let avg_read = read_times.iter().sum::() / read_times.len() as f64; + let read_throughput_bytes = avg_data_size as f64 / avg_read; + + println!("Read:"); + println!(" avg time: {:.3}s", avg_read); + if read_times.len() > 1 { + let min = read_times.iter().cloned().fold(f64::INFINITY, f64::min); + let max = read_times.iter().cloned().fold(f64::NEG_INFINITY, f64::max); + println!(" min/max: {:.3}s / {:.3}s", min, max); + } + println!( + " throughput: {}/s (from compressed)", + format_bytes(read_throughput_bytes as usize) + ); + } + } + + let _ = fs::remove_file(&data_file); + let _ = fs::remove_file(&index_file); +} + +/// Read schema and total row count from Parquet metadata without loading any data. +fn read_parquet_metadata(path: &PathBuf, limit: usize) -> (SchemaRef, u64) { + let paths = collect_parquet_paths(path); + let mut schema = None; + let mut total_rows = 0u64; + + for file_path in &paths { + let file = fs::File::open(file_path) + .unwrap_or_else(|e| panic!("Failed to open {}: {}", file_path.display(), e)); + let builder = ParquetRecordBatchReaderBuilder::try_new(file).unwrap_or_else(|e| { + panic!( + "Failed to read Parquet metadata from {}: {}", + file_path.display(), + e + ) + }); + if schema.is_none() { + schema = Some(Arc::clone(builder.schema())); + } + total_rows += builder.metadata().file_metadata().num_rows() as u64; + if limit > 0 && total_rows >= limit as u64 { + total_rows = total_rows.min(limit as u64); + break; + } + } + + (schema.expect("No parquet files found"), total_rows) +} + +fn collect_parquet_paths(path: &PathBuf) -> Vec { + if path.is_dir() { + let mut files: Vec = fs::read_dir(path) + .unwrap_or_else(|e| panic!("Failed to read directory {}: {}", path.display(), e)) + .filter_map(|entry| { + let p = entry.ok()?.path(); + if p.extension().and_then(|e| e.to_str()) == Some("parquet") { + Some(p) + } else { + None + } + }) + .collect(); + files.sort(); + if files.is_empty() { + panic!("No .parquet files found in {}", path.display()); + } + files + } else { + vec![path.clone()] + } +} + +fn run_shuffle_write( + input_path: &PathBuf, + schema: &SchemaRef, + codec: &CompressionCodec, + hash_col_indices: &[usize], + args: &Args, + data_file: &str, + index_file: &str, +) -> f64 { + let partitioning = build_partitioning( + &args.partitioning, + args.partitions, + hash_col_indices, + schema, + ); + + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + let config = SessionConfig::new().with_batch_size(args.batch_size); + let mut runtime_builder = RuntimeEnvBuilder::new(); + if let Some(mem_limit) = args.memory_limit { + runtime_builder = runtime_builder.with_memory_limit(mem_limit, 1.0); + } + let runtime_env = Arc::new(runtime_builder.build().unwrap()); + let ctx = SessionContext::new_with_config_rt(config, runtime_env); + + let path_str = input_path.to_str().unwrap(); + let mut df = ctx + .read_parquet(path_str, ParquetReadOptions::default()) + .await + .expect("Failed to create Parquet scan"); + if args.limit > 0 { + df = df.limit(0, Some(args.limit)).unwrap(); + } + + let parquet_plan = df + .create_physical_plan() + .await + .expect("Failed to create physical plan"); + + // ShuffleWriterExec reads from a single input partition + let input: Arc = if parquet_plan + .properties() + .output_partitioning() + .partition_count() + > 1 + { + Arc::new(CoalescePartitionsExec::new(parquet_plan)) + } else { + parquet_plan + }; + + let shuffle_mode = match args.shuffle_mode.as_str() { + "buffered" => ShuffleMode::Buffered, + _ => ShuffleMode::Immediate, + }; + + let exec = ShuffleWriterExec::try_new( + input, + partitioning, + codec.clone(), + data_file.to_string(), + index_file.to_string(), + false, + args.write_buffer_size, + shuffle_mode, + ) + .expect("Failed to create ShuffleWriterExec"); + + let task_ctx = ctx.task_ctx(); + let start = Instant::now(); + let stream = exec.execute(0, task_ctx).unwrap(); + collect(stream).await.unwrap(); + start.elapsed().as_secs_f64() + }) +} + +/// Run N concurrent shuffle writes to simulate executor parallelism. +/// Returns wall-clock time for all tasks to complete. +fn run_concurrent_shuffle_writes( + input_path: &PathBuf, + schema: &SchemaRef, + codec: &CompressionCodec, + hash_col_indices: &[usize], + args: &Args, +) -> f64 { + let rt = tokio::runtime::Runtime::new().unwrap(); + rt.block_on(async { + let start = Instant::now(); + + let mut handles = Vec::with_capacity(args.concurrent_tasks); + for task_id in 0..args.concurrent_tasks { + let task_dir = args.output_dir.join(format!("task_{task_id}")); + fs::create_dir_all(&task_dir).expect("Failed to create task output directory"); + let data_file = task_dir.join("data.out").to_str().unwrap().to_string(); + let index_file = task_dir.join("index.out").to_str().unwrap().to_string(); + + let input_path = input_path.clone(); + let schema = Arc::clone(schema); + let codec = codec.clone(); + let hash_col_indices = hash_col_indices.to_vec(); + let partitioning_scheme = args.partitioning.clone(); + let num_partitions = args.partitions; + let batch_size = args.batch_size; + let memory_limit = args.memory_limit; + let write_buffer_size = args.write_buffer_size; + let shuffle_mode_str = args.shuffle_mode.clone(); + let limit = args.limit; + + let handle = tokio::spawn(async move { + let config = SessionConfig::new().with_batch_size(batch_size); + let mut runtime_builder = RuntimeEnvBuilder::new(); + if let Some(mem_limit) = memory_limit { + runtime_builder = runtime_builder.with_memory_limit(mem_limit, 1.0); + } + let runtime_env = Arc::new(runtime_builder.build().unwrap()); + let ctx = SessionContext::new_with_config_rt(config, runtime_env); + + let path_str = input_path.to_str().unwrap().to_string(); + let mut df = ctx + .read_parquet(&path_str, ParquetReadOptions::default()) + .await + .expect("Failed to create Parquet scan"); + if limit > 0 { + df = df.limit(0, Some(limit)).unwrap(); + } + + let parquet_plan = df + .create_physical_plan() + .await + .expect("Failed to create physical plan"); + + let input: Arc = if parquet_plan + .properties() + .output_partitioning() + .partition_count() + > 1 + { + Arc::new(CoalescePartitionsExec::new(parquet_plan)) + } else { + parquet_plan + }; + + let partitioning = build_partitioning( + &partitioning_scheme, + num_partitions, + &hash_col_indices, + &schema, + ); + + let shuffle_mode = match shuffle_mode_str.as_str() { + "buffered" => ShuffleMode::Buffered, + _ => ShuffleMode::Immediate, + }; + + let exec = ShuffleWriterExec::try_new( + input, + partitioning, + codec, + data_file, + index_file, + false, + write_buffer_size, + shuffle_mode, + ) + .expect("Failed to create ShuffleWriterExec"); + + let task_ctx = ctx.task_ctx(); + let stream = exec.execute(0, task_ctx).unwrap(); + collect(stream).await.unwrap(); + }); + handles.push(handle); + } + + for handle in handles { + handle.await.expect("Task panicked"); + } + + // Clean up task output directories + for task_id in 0..args.concurrent_tasks { + let task_dir = args.output_dir.join(format!("task_{task_id}")); + let _ = fs::remove_dir_all(&task_dir); + } + + start.elapsed().as_secs_f64() + }) +} + +fn run_shuffle_read(data_file: &str, index_file: &str, num_partitions: usize) -> f64 { + let start = Instant::now(); + + let index_bytes = fs::read(index_file).expect("Failed to read index file"); + let num_offsets = index_bytes.len() / 8; + let offsets: Vec = (0..num_offsets) + .map(|i| { + let bytes: [u8; 8] = index_bytes[i * 8..(i + 1) * 8].try_into().unwrap(); + i64::from_le_bytes(bytes) + }) + .collect(); + + let data_bytes = fs::read(data_file).expect("Failed to read data file"); + + let mut total_rows = 0usize; + let mut total_batches = 0usize; + + for p in 0..num_partitions.min(offsets.len().saturating_sub(1)) { + let start_offset = offsets[p] as usize; + let end_offset = offsets[p + 1] as usize; + + if start_offset >= end_offset { + continue; + } + + let mut offset = start_offset; + while offset < end_offset { + let ipc_length = + u64::from_le_bytes(data_bytes[offset..offset + 8].try_into().unwrap()) as usize; + let block_data = &data_bytes[offset + 16..offset + 8 + ipc_length]; + let batch = read_ipc_compressed(block_data).expect("Failed to decode shuffle block"); + total_rows += batch.num_rows(); + total_batches += 1; + offset += 8 + ipc_length; + } + } + + let elapsed = start.elapsed().as_secs_f64(); + eprintln!( + " read back {} rows in {} batches from {} partitions", + format_number(total_rows), + total_batches, + num_partitions + ); + elapsed +} + +fn build_partitioning( + scheme: &str, + num_partitions: usize, + hash_col_indices: &[usize], + schema: &SchemaRef, +) -> CometPartitioning { + match scheme { + "single" => CometPartitioning::SinglePartition, + "round-robin" => CometPartitioning::RoundRobin(num_partitions, 0), + "hash" => { + let exprs: Vec> = hash_col_indices + .iter() + .map(|&idx| { + let field = schema.field(idx); + Arc::new(Column::new(field.name(), idx)) + as Arc + }) + .collect(); + CometPartitioning::Hash(exprs, num_partitions) + } + other => { + eprintln!("Unknown partitioning scheme: {other}. Using hash."); + build_partitioning("hash", num_partitions, hash_col_indices, schema) + } + } +} + +fn parse_codec(codec: &str, zstd_level: i32) -> CompressionCodec { + match codec.to_lowercase().as_str() { + "none" => CompressionCodec::None, + "lz4" => CompressionCodec::Lz4Frame, + "zstd" => CompressionCodec::Zstd(zstd_level), + "snappy" => CompressionCodec::Snappy, + other => { + eprintln!("Unknown codec: {other}. Using zstd."); + CompressionCodec::Zstd(zstd_level) + } + } +} + +fn parse_hash_columns(s: &str) -> Vec { + s.split(',') + .filter(|s| !s.is_empty()) + .map(|s| s.trim().parse::().expect("Invalid column index")) + .collect() +} + +fn describe_schema(schema: &arrow::datatypes::Schema) -> String { + let mut counts = std::collections::HashMap::new(); + for field in schema.fields() { + let type_name = match field.data_type() { + DataType::Int8 + | DataType::Int16 + | DataType::Int32 + | DataType::Int64 + | DataType::UInt8 + | DataType::UInt16 + | DataType::UInt32 + | DataType::UInt64 => "int", + DataType::Float16 | DataType::Float32 | DataType::Float64 => "float", + DataType::Utf8 | DataType::LargeUtf8 => "string", + DataType::Boolean => "bool", + DataType::Date32 | DataType::Date64 => "date", + DataType::Decimal128(_, _) | DataType::Decimal256(_, _) => "decimal", + DataType::Timestamp(_, _) => "timestamp", + DataType::Binary | DataType::LargeBinary | DataType::FixedSizeBinary(_) => "binary", + _ => "other", + }; + *counts.entry(type_name).or_insert(0) += 1; + } + let mut parts: Vec = counts + .into_iter() + .map(|(k, v)| format!("{}x{}", v, k)) + .collect(); + parts.sort(); + parts.join(", ") +} + +fn format_number(n: usize) -> String { + let s = n.to_string(); + let mut result = String::new(); + for (i, c) in s.chars().rev().enumerate() { + if i > 0 && i % 3 == 0 { + result.push(','); + } + result.push(c); + } + result.chars().rev().collect() +} + +fn format_bytes(bytes: usize) -> String { + if bytes >= 1024 * 1024 * 1024 { + format!("{:.2} GiB", bytes as f64 / (1024.0 * 1024.0 * 1024.0)) + } else if bytes >= 1024 * 1024 { + format!("{:.2} MiB", bytes as f64 / (1024.0 * 1024.0)) + } else if bytes >= 1024 { + format!("{:.2} KiB", bytes as f64 / 1024.0) + } else { + format!("{bytes} B") + } +} diff --git a/native/shuffle/src/lib.rs b/native/shuffle/src/lib.rs index f29588f2e1..fb961752f6 100644 --- a/native/shuffle/src/lib.rs +++ b/native/shuffle/src/lib.rs @@ -27,3 +27,15 @@ pub use comet_partitioning::CometPartitioning; pub use ipc::read_ipc_compressed; pub use shuffle_writer::ShuffleWriterExec; pub use writers::{CompressionCodec, ShuffleBlockWriter}; + +/// Selects which shuffle implementation to use for multi-partition shuffles. +#[derive(Debug, Clone, Copy, PartialEq, Eq)] +pub enum ShuffleMode { + /// Buffered: buffer input batches, track per-partition row indices, and write + /// all partitions at the end with memory-pressure-driven spilling. + Buffered, + /// Experimental: repartition each incoming batch immediately using `take` and + /// write per-partition data directly to individual spill files, avoiding + /// in-memory buffering of input batches. + Immediate, +} diff --git a/native/shuffle/src/partitioners/immediate_partition.rs b/native/shuffle/src/partitioners/immediate_partition.rs new file mode 100644 index 0000000000..1bad02155f --- /dev/null +++ b/native/shuffle/src/partitioners/immediate_partition.rs @@ -0,0 +1,350 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Immediate-mode shuffle partitioner: repartitions each incoming batch via per-partition +//! Arrow `take`, serializes through per-partition `BufBatchWriter`s into in-memory +//! compressed IPC buffers, and spills to disk under memory pressure. + +use crate::metrics::ShufflePartitionerMetrics; +use crate::partitioners::scratch::ScratchSpace; +use crate::partitioners::ShufflePartitioner; +use crate::writers::{write_index_file, BufBatchWriter}; +use crate::{CometPartitioning, CompressionCodec, ShuffleBlockWriter}; +use arrow::array::{ArrayRef, RecordBatch, UInt32Array}; +use arrow::compute::take; +use arrow::datatypes::SchemaRef; +use datafusion::common::DataFusionError; +use datafusion::execution::disk_manager::RefCountedTempFile; +use datafusion::execution::memory_pool::{MemoryConsumer, MemoryReservation}; +use datafusion::execution::runtime_env::RuntimeEnv; +use datafusion_comet_common::tracing::{with_trace, with_trace_async}; +use std::fmt; +use std::fmt::{Debug, Formatter}; +use std::fs::{File, OpenOptions}; +use std::io::{BufWriter, Cursor, Seek, Write}; +use std::sync::Arc; +use tokio::time::Instant; + +struct PartitionBuffer { + writer: BufBatchWriter>>, + spill_file: Option, +} + +struct SpillFile { + /// Kept alive to prevent temp file deletion (RAII guard). + temp_file: RefCountedTempFile, + file: File, +} + +pub(crate) struct ImmediateShufflePartitioner { + output_data_file: String, + output_index_file: String, + partition_buffers: Vec>, + shuffle_block_writer: ShuffleBlockWriter, + partitioning: CometPartitioning, + runtime: Arc, + metrics: ShufflePartitionerMetrics, + scratch: ScratchSpace, + batch_size: usize, + reservation: MemoryReservation, + tracing_enabled: bool, + write_buffer_size: usize, +} + +impl ImmediateShufflePartitioner { + #[allow(clippy::too_many_arguments)] + pub(crate) fn try_new( + partition: usize, + output_data_file: String, + output_index_file: String, + schema: SchemaRef, + partitioning: CometPartitioning, + metrics: ShufflePartitionerMetrics, + runtime: Arc, + batch_size: usize, + codec: CompressionCodec, + tracing_enabled: bool, + write_buffer_size: usize, + ) -> datafusion::common::Result { + let num_output_partitions = partitioning.partition_count(); + assert!( + num_output_partitions > 1, + "Use SinglePartitionShufflePartitioner for 1 output partition." + ); + + let scratch = ScratchSpace::new(&partitioning, batch_size, num_output_partitions); + let shuffle_block_writer = ShuffleBlockWriter::try_new(schema.as_ref(), codec)?; + let partition_buffers = (0..num_output_partitions).map(|_| None).collect(); + + let reservation = MemoryConsumer::new(format!("ImmediateShufflePartitioner[{partition}]")) + .with_can_spill(true) + .register(&runtime.memory_pool); + + Ok(Self { + output_data_file, + output_index_file, + partition_buffers, + shuffle_block_writer, + partitioning, + runtime, + metrics, + scratch, + batch_size, + reservation, + tracing_enabled, + write_buffer_size, + }) + } + + fn ensure_partition_buffer(&mut self, partition_id: usize) { + if self.partition_buffers[partition_id].is_none() { + let writer = BufBatchWriter::new( + self.shuffle_block_writer.clone(), + Cursor::new(Vec::new()), + self.write_buffer_size, + self.batch_size, + ); + self.partition_buffers[partition_id] = Some(PartitionBuffer { + writer, + spill_file: None, + }); + } + } + + fn spill(&mut self) -> datafusion::common::Result<()> { + log::info!( + "ImmediateShufflePartitioner spilling to disk ({} time(s) so far)", + self.metrics.spill_count.value() + ); + + let mut spilled_bytes = 0usize; + + for pb in self.partition_buffers.iter_mut().flatten() { + pb.writer + .flush(&self.metrics.encode_time, &self.metrics.write_time)?; + + let buf = pb.writer.output_bytes(); + if buf.is_empty() { + continue; + } + + if pb.spill_file.is_none() { + let temp_file = self + .runtime + .disk_manager + .create_tmp_file("immediate_shuffle_spill")?; + let file = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(temp_file.path()) + .map_err(|e| { + DataFusionError::Execution(format!("Error creating spill file: {e}")) + })?; + pb.spill_file = Some(SpillFile { temp_file, file }); + } + + let spill = pb.spill_file.as_mut().unwrap(); + let mut write_timer = self.metrics.write_time.timer(); + spill.file.write_all(buf)?; + write_timer.stop(); + spilled_bytes += buf.len(); + + pb.writer.reset_output_buffer(); + } + + self.reservation.free(); + self.metrics.spill_count.add(1); + self.metrics.spilled_bytes.add(spilled_bytes); + + Ok(()) + } + + fn partitioning_batch(&mut self, input: RecordBatch) -> datafusion::common::Result<()> { + if input.num_rows() == 0 { + return Ok(()); + } + + if input.num_rows() > self.batch_size { + return Err(DataFusionError::Internal( + "Input batch size exceeds configured batch size. Call `insert_batch` instead." + .to_string(), + )); + } + + self.metrics.data_size.add(input.get_array_memory_size()); + self.metrics.baseline.record_output(input.num_rows()); + + let num_output_partitions = self.partitioning.partition_count(); + + let mut scratch = std::mem::take(&mut self.scratch); + { + let mut timer = self.metrics.repart_time.timer(); + scratch.compute_partition_ids(&self.partitioning, &input)?; + timer.stop(); + } + + // Reorder the entire batch once by partition using a single take(), then + // use zero-copy slice() per partition. This replaces N per-partition take() + // calls (each allocating new arrays) with 1 take() + N free slice() calls. + let num_rows = input.num_rows(); + let all_indices = UInt32Array::from_iter_values( + scratch.partition_row_indices[..num_rows].iter().copied(), + ); + + let reordered_columns: Vec = input + .columns() + .iter() + .map(|col| { + take(col, &all_indices, None) + .map_err(|e| DataFusionError::ArrowError(Box::from(e), None)) + }) + .collect::>>()?; + let reordered = RecordBatch::try_new(input.schema(), reordered_columns)?; + + let size_before = self.total_buffer_size(); + + for partition_id in 0..num_output_partitions { + let start = scratch.partition_starts[partition_id] as usize; + let end = scratch.partition_starts[partition_id + 1] as usize; + if start == end { + continue; + } + + let partition_batch = reordered.slice(start, end - start); + + self.ensure_partition_buffer(partition_id); + let pb = self.partition_buffers[partition_id].as_mut().unwrap(); + pb.writer.write( + &partition_batch, + &self.metrics.encode_time, + &self.metrics.write_time, + )?; + } + + let mem_growth = self.total_buffer_size().saturating_sub(size_before); + if mem_growth > 0 && self.reservation.try_grow(mem_growth).is_err() { + self.spill()?; + } + + self.scratch = scratch; + Ok(()) + } + + fn total_buffer_size(&self) -> usize { + self.partition_buffers + .iter() + .filter_map(|pb| pb.as_ref()) + .map(|pb| pb.writer.buffered_output_size()) + .sum() + } +} + +#[async_trait::async_trait] +impl ShufflePartitioner for ImmediateShufflePartitioner { + async fn insert_batch(&mut self, batch: RecordBatch) -> datafusion::common::Result<()> { + with_trace_async( + "immediate_shuffle_insert_batch", + self.tracing_enabled, + || async { + let start_time = Instant::now(); + let mut start = 0; + while start < batch.num_rows() { + let end = (start + self.batch_size).min(batch.num_rows()); + let slice = batch.slice(start, end - start); + self.partitioning_batch(slice)?; + start = end; + } + self.metrics.input_batches.add(1); + self.metrics + .baseline + .elapsed_compute() + .add_duration(start_time.elapsed()); + Ok(()) + }, + ) + .await + } + + fn shuffle_write(&mut self) -> datafusion::common::Result<()> { + with_trace("immediate_shuffle_write", self.tracing_enabled, || { + let start_time = Instant::now(); + + let num_output_partitions = self.partition_buffers.len(); + let mut offsets = vec![0u64; num_output_partitions + 1]; + + let output_data = OpenOptions::new() + .write(true) + .create(true) + .truncate(true) + .open(&self.output_data_file) + .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; + + let mut output_data = BufWriter::new(output_data); + + for (partition_id, pb_slot) in self.partition_buffers.iter_mut().enumerate() { + offsets[partition_id] = output_data.stream_position()?; + + if let Some(mut pb) = pb_slot.take() { + if let Some(mut spill) = pb.spill_file.take() { + spill.file.flush()?; + let mut spill_reader = File::open(spill.temp_file.path())?; + let mut write_timer = self.metrics.write_time.timer(); + std::io::copy(&mut spill_reader, &mut output_data)?; + write_timer.stop(); + } + + pb.writer + .flush(&self.metrics.encode_time, &self.metrics.write_time)?; + + let buf = pb.writer.into_writer().into_inner(); + if !buf.is_empty() { + let mut write_timer = self.metrics.write_time.timer(); + output_data.write_all(&buf)?; + write_timer.stop(); + } + } + } + + let mut write_timer = self.metrics.write_time.timer(); + output_data.flush()?; + write_timer.stop(); + + offsets[num_output_partitions] = output_data.stream_position()?; + + let mut write_timer = self.metrics.write_time.timer(); + write_index_file(&self.output_index_file, &offsets)?; + write_timer.stop(); + + self.metrics + .baseline + .elapsed_compute() + .add_duration(start_time.elapsed()); + + Ok(()) + }) + } +} + +impl Debug for ImmediateShufflePartitioner { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("ImmediateShufflePartitioner") + .field("num_partitions", &self.partition_buffers.len()) + .field("memory_used", &self.reservation.size()) + .finish() + } +} diff --git a/native/shuffle/src/partitioners/mod.rs b/native/shuffle/src/partitioners/mod.rs index 3eedef62c7..bb3d7b8c88 100644 --- a/native/shuffle/src/partitioners/mod.rs +++ b/native/shuffle/src/partitioners/mod.rs @@ -15,11 +15,14 @@ // specific language governing permissions and limitations // under the License. +mod immediate_partition; mod multi_partition; mod partitioned_batch_iterator; +pub(crate) mod scratch; mod single_partition; mod traits; +pub(crate) use immediate_partition::ImmediateShufflePartitioner; pub(crate) use multi_partition::MultiPartitionShuffleRepartitioner; pub(crate) use partitioned_batch_iterator::PartitionedBatchIterator; pub(crate) use single_partition::SinglePartitionShufflePartitioner; diff --git a/native/shuffle/src/partitioners/multi_partition.rs b/native/shuffle/src/partitioners/multi_partition.rs index 655bee3511..ccc3ed225e 100644 --- a/native/shuffle/src/partitioners/multi_partition.rs +++ b/native/shuffle/src/partitioners/multi_partition.rs @@ -19,10 +19,11 @@ use crate::metrics::ShufflePartitionerMetrics; use crate::partitioners::partitioned_batch_iterator::{ PartitionedBatchIterator, PartitionedBatchesProducer, }; +use crate::partitioners::scratch::ScratchSpace; use crate::partitioners::ShufflePartitioner; use crate::writers::{BufBatchWriter, PartitionWriter}; -use crate::{comet_partitioning, CometPartitioning, CompressionCodec, ShuffleBlockWriter}; -use arrow::array::{ArrayRef, RecordBatch}; +use crate::{CometPartitioning, CompressionCodec, ShuffleBlockWriter}; +use arrow::array::RecordBatch; use arrow::datatypes::SchemaRef; use datafusion::common::utils::proxy::VecAllocExt; use datafusion::common::DataFusionError; @@ -30,7 +31,6 @@ use datafusion::execution::memory_pool::{MemoryConsumer, MemoryReservation}; use datafusion::execution::runtime_env::RuntimeEnv; use datafusion::physical_plan::metrics::Time; use datafusion_comet_common::tracing::{with_trace, with_trace_async}; -use datafusion_comet_spark_expr::murmur3::create_murmur3_hashes; use itertools::Itertools; use std::fmt; use std::fmt::{Debug, Formatter}; @@ -39,71 +39,6 @@ use std::io::{BufReader, BufWriter, Seek, Write}; use std::sync::Arc; use tokio::time::Instant; -/// Reusable scratch buffers for computing row-to-partition assignments. -#[derive(Default)] -struct ScratchSpace { - /// Hashes for each row in the current batch. - hashes_buf: Vec, - /// Partition ids for each row in the current batch. - partition_ids: Vec, - /// The row indices of the rows in each partition. This array is conceptually divided into - /// partitions, where each partition contains the row indices of the rows in that partition. - /// The length of this array is the same as the number of rows in the batch. - partition_row_indices: Vec, - /// The start indices of partitions in partition_row_indices. partition_starts[K] and - /// partition_starts[K + 1] are the start and end indices of partition K in partition_row_indices. - /// The length of this array is 1 + the number of partitions. - partition_starts: Vec, -} - -impl ScratchSpace { - fn map_partition_ids_to_starts_and_indices( - &mut self, - num_output_partitions: usize, - num_rows: usize, - ) { - let partition_ids = &mut self.partition_ids[..num_rows]; - - // count each partition size, while leaving the last extra element as 0 - let partition_counters = &mut self.partition_starts; - partition_counters.resize(num_output_partitions + 1, 0); - partition_counters.fill(0); - partition_ids - .iter() - .for_each(|partition_id| partition_counters[*partition_id as usize] += 1); - - // accumulate partition counters into partition ends - // e.g. partition counter: [1, 3, 2, 1, 0] => [1, 4, 6, 7, 7] - let partition_ends = partition_counters; - let mut accum = 0; - partition_ends.iter_mut().for_each(|v| { - *v += accum; - accum = *v; - }); - - // calculate partition row indices and partition starts - // e.g. partition ids: [3, 1, 1, 1, 2, 2, 0] will produce the following partition_row_indices - // and partition_starts arrays: - // - // partition_row_indices: [6, 1, 2, 3, 4, 5, 0] - // partition_starts: [0, 1, 4, 6, 7] - // - // partition_starts conceptually splits partition_row_indices into smaller slices. - // Each slice partition_row_indices[partition_starts[K]..partition_starts[K + 1]] contains the - // row indices of the input batch that are partitioned into partition K. For example, - // first partition 0 has one row index [6], partition 1 has row indices [1, 2, 3], etc. - let partition_row_indices = &mut self.partition_row_indices; - partition_row_indices.resize(num_rows, 0); - for (index, partition_id) in partition_ids.iter().enumerate().rev() { - partition_ends[*partition_id as usize] -= 1; - let end = partition_ends[*partition_id as usize]; - partition_row_indices[end as usize] = index as u32; - } - - // after calculating, partition ends become partition starts - } -} - /// A partitioner that uses a hash function to partition data into multiple partitions pub(crate) struct MultiPartitionShuffleRepartitioner { output_data_file: String, @@ -148,22 +83,7 @@ impl MultiPartitionShuffleRepartitioner { "Use SinglePartitionShufflePartitioner for 1 output partition." ); - // Vectors in the scratch space will be filled with valid values before being used, this - // initialization code is simply initializing the vectors to the desired size. - // The initial values are not used. - let scratch = ScratchSpace { - hashes_buf: match partitioning { - // Allocate hashes_buf for hash and round robin partitioning. - // Round robin hashes all columns to achieve even, deterministic distribution. - CometPartitioning::Hash(_, _) | CometPartitioning::RoundRobin(_, _) => { - vec![0; batch_size] - } - _ => vec![], - }, - partition_ids: vec![0; batch_size], - partition_row_indices: vec![0; batch_size], - partition_starts: vec![0; num_output_partitions + 1], - }; + let scratch = ScratchSpace::new(&partitioning, batch_size, num_output_partitions); let shuffle_block_writer = ShuffleBlockWriter::try_new(schema.as_ref(), codec.clone())?; @@ -217,178 +137,20 @@ impl MultiPartitionShuffleRepartitioner { // number of rows those are written to output data file. self.metrics.baseline.record_output(input.num_rows()); - match &self.partitioning { - CometPartitioning::Hash(exprs, num_output_partitions) => { - let mut scratch = std::mem::take(&mut self.scratch); - let (partition_starts, partition_row_indices): (&Vec, &Vec) = { - let mut timer = self.metrics.repart_time.timer(); - - // Evaluate partition expressions to get rows to apply partitioning scheme. - let arrays = exprs - .iter() - .map(|expr| expr.evaluate(&input)?.into_array(input.num_rows())) - .collect::>>()?; - - let num_rows = arrays[0].len(); - - // Use identical seed as Spark hash partitioning. - let hashes_buf = &mut scratch.hashes_buf[..num_rows]; - hashes_buf.fill(42_u32); - - // Generate partition ids for every row. - { - // Hash arrays and compute partition ids based on number of partitions. - let partition_ids = &mut scratch.partition_ids[..num_rows]; - create_murmur3_hashes(&arrays, hashes_buf)? - .iter() - .enumerate() - .for_each(|(idx, hash)| { - partition_ids[idx] = - comet_partitioning::pmod(*hash, *num_output_partitions) as u32; - }); - } - - // We now have partition ids for every input row, map that to partition starts - // and partition indices to eventually right these rows to partition buffers. - scratch - .map_partition_ids_to_starts_and_indices(*num_output_partitions, num_rows); - - timer.stop(); - Ok::<(&Vec, &Vec), DataFusionError>(( - &scratch.partition_starts, - &scratch.partition_row_indices, - )) - }?; - - self.buffer_partitioned_batch_may_spill( - input, - partition_row_indices, - partition_starts, - ) - .await?; - self.scratch = scratch; - } - CometPartitioning::RangePartitioning( - lex_ordering, - num_output_partitions, - row_converter, - bounds, - ) => { - let mut scratch = std::mem::take(&mut self.scratch); - let (partition_starts, partition_row_indices): (&Vec, &Vec) = { - let mut timer = self.metrics.repart_time.timer(); - - // Evaluate partition expressions for values to apply partitioning scheme on. - let arrays = lex_ordering - .iter() - .map(|expr| expr.expr.evaluate(&input)?.into_array(input.num_rows())) - .collect::>>()?; - - let num_rows = arrays[0].len(); - - // Generate partition ids for every row, first by converting the partition - // arrays to Rows, and then doing binary search for each Row against the - // bounds Rows. - { - let row_batch = row_converter.convert_columns(arrays.as_slice())?; - let partition_ids = &mut scratch.partition_ids[..num_rows]; - - row_batch.iter().enumerate().for_each(|(row_idx, row)| { - partition_ids[row_idx] = bounds - .as_slice() - .partition_point(|bound| bound.row() <= row) - as u32 - }); - } - - // We now have partition ids for every input row, map that to partition starts - // and partition indices to eventually right these rows to partition buffers. - scratch - .map_partition_ids_to_starts_and_indices(*num_output_partitions, num_rows); - - timer.stop(); - Ok::<(&Vec, &Vec), DataFusionError>(( - &scratch.partition_starts, - &scratch.partition_row_indices, - )) - }?; - - self.buffer_partitioned_batch_may_spill( - input, - partition_row_indices, - partition_starts, - ) - .await?; - self.scratch = scratch; - } - CometPartitioning::RoundRobin(num_output_partitions, max_hash_columns) => { - // Comet implements "round robin" as hash partitioning on columns. - // This achieves the same goal as Spark's round robin (even distribution - // without semantic grouping) while being deterministic for fault tolerance. - // - // Note: This produces different partition assignments than Spark's round robin, - // which sorts by UnsafeRow binary representation before assigning partitions. - // However, both approaches provide even distribution and determinism. - let mut scratch = std::mem::take(&mut self.scratch); - let (partition_starts, partition_row_indices): (&Vec, &Vec) = { - let mut timer = self.metrics.repart_time.timer(); - - let num_rows = input.num_rows(); - - // Collect columns for hashing, respecting max_hash_columns limit - // max_hash_columns of 0 means no limit (hash all columns) - // Negative values are normalized to 0 in the planner - let num_columns_to_hash = if *max_hash_columns == 0 { - input.num_columns() - } else { - (*max_hash_columns).min(input.num_columns()) - }; - let columns_to_hash: Vec = (0..num_columns_to_hash) - .map(|i| Arc::clone(input.column(i))) - .collect(); - - // Use identical seed as Spark hash partitioning. - let hashes_buf = &mut scratch.hashes_buf[..num_rows]; - hashes_buf.fill(42_u32); - - // Compute hash for selected columns - create_murmur3_hashes(&columns_to_hash, hashes_buf)?; - - // Assign partition IDs based on hash (same as hash partitioning) - let partition_ids = &mut scratch.partition_ids[..num_rows]; - hashes_buf.iter().enumerate().for_each(|(idx, hash)| { - partition_ids[idx] = - comet_partitioning::pmod(*hash, *num_output_partitions) as u32; - }); - - // We now have partition ids for every input row, map that to partition starts - // and partition indices to eventually write these rows to partition buffers. - scratch - .map_partition_ids_to_starts_and_indices(*num_output_partitions, num_rows); - - timer.stop(); - Ok::<(&Vec, &Vec), DataFusionError>(( - &scratch.partition_starts, - &scratch.partition_row_indices, - )) - }?; - - self.buffer_partitioned_batch_may_spill( - input, - partition_row_indices, - partition_starts, - ) - .await?; - self.scratch = scratch; - } - other => { - // this should be unreachable as long as the validation logic - // in the constructor is kept up-to-date - return Err(DataFusionError::NotImplemented(format!( - "Unsupported shuffle partitioning scheme {other:?}" - ))); - } + let mut scratch = std::mem::take(&mut self.scratch); + { + let mut timer = self.metrics.repart_time.timer(); + scratch.compute_partition_ids(&self.partitioning, &input)?; + timer.stop(); } + + self.buffer_partitioned_batch_may_spill( + input, + &scratch.partition_row_indices, + &scratch.partition_starts, + ) + .await?; + self.scratch = scratch; Ok(()) } @@ -609,14 +371,7 @@ impl ShufflePartitioner for MultiPartitionShuffleRepartitioner { offsets[num_output_partitions] = output_data.stream_position()?; let mut write_timer = self.metrics.write_time.timer(); - let mut output_index = - BufWriter::new(File::create(index_file).map_err(|e| { - DataFusionError::Execution(format!("shuffle write error: {e:?}")) - })?); - for offset in offsets { - output_index.write_all(&(offset as i64).to_le_bytes()[..])?; - } - output_index.flush()?; + crate::writers::write_index_file(&index_file, &offsets)?; write_timer.stop(); self.metrics diff --git a/native/shuffle/src/partitioners/scratch.rs b/native/shuffle/src/partitioners/scratch.rs new file mode 100644 index 0000000000..af0ede558f --- /dev/null +++ b/native/shuffle/src/partitioners/scratch.rs @@ -0,0 +1,183 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Shared scratch buffers and partition ID computation for shuffle partitioners. + +use crate::{comet_partitioning, CometPartitioning}; +use arrow::array::{ArrayRef, RecordBatch}; +use datafusion::common::DataFusionError; +use datafusion_comet_spark_expr::murmur3::create_murmur3_hashes; +use std::sync::Arc; + +/// Reusable scratch buffers for computing row-to-partition assignments. +#[derive(Default)] +pub(crate) struct ScratchSpace { + /// Hashes for each row in the current batch. + pub(crate) hashes_buf: Vec, + /// Partition ids for each row in the current batch. + pub(crate) partition_ids: Vec, + /// The row indices of the rows in each partition. This array is conceptually divided into + /// partitions, where each partition contains the row indices of the rows in that partition. + /// The length of this array is the same as the number of rows in the batch. + pub(crate) partition_row_indices: Vec, + /// The start indices of partitions in partition_row_indices. partition_starts[K] and + /// partition_starts[K + 1] are the start and end indices of partition K in partition_row_indices. + /// The length of this array is 1 + the number of partitions. + pub(crate) partition_starts: Vec, +} + +impl ScratchSpace { + /// Create a new ScratchSpace with pre-allocated buffers for the given partitioning scheme. + pub(crate) fn new( + partitioning: &CometPartitioning, + batch_size: usize, + num_output_partitions: usize, + ) -> Self { + Self { + hashes_buf: match partitioning { + CometPartitioning::Hash(_, _) | CometPartitioning::RoundRobin(_, _) => { + vec![0; batch_size] + } + _ => vec![], + }, + partition_ids: vec![0; batch_size], + partition_row_indices: vec![0; batch_size], + partition_starts: vec![0; num_output_partitions + 1], + } + } + + pub(crate) fn map_partition_ids_to_starts_and_indices( + &mut self, + num_output_partitions: usize, + num_rows: usize, + ) { + let partition_ids = &mut self.partition_ids[..num_rows]; + + // count each partition size, while leaving the last extra element as 0 + let partition_counters = &mut self.partition_starts; + partition_counters.resize(num_output_partitions + 1, 0); + partition_counters.fill(0); + partition_ids + .iter() + .for_each(|partition_id| partition_counters[*partition_id as usize] += 1); + + // accumulate partition counters into partition ends + // e.g. partition counter: [1, 3, 2, 1, 0] => [1, 4, 6, 7, 7] + let partition_ends = partition_counters; + let mut accum = 0; + partition_ends.iter_mut().for_each(|v| { + *v += accum; + accum = *v; + }); + + // calculate partition row indices and partition starts + // e.g. partition ids: [3, 1, 1, 1, 2, 2, 0] will produce the following partition_row_indices + // and partition_starts arrays: + // + // partition_row_indices: [6, 1, 2, 3, 4, 5, 0] + // partition_starts: [0, 1, 4, 6, 7] + // + // partition_starts conceptually splits partition_row_indices into smaller slices. + // Each slice partition_row_indices[partition_starts[K]..partition_starts[K + 1]] contains the + // row indices of the input batch that are partitioned into partition K. For example, + // first partition 0 has one row index [6], partition 1 has row indices [1, 2, 3], etc. + let partition_row_indices = &mut self.partition_row_indices; + partition_row_indices.resize(num_rows, 0); + for (index, partition_id) in partition_ids.iter().enumerate().rev() { + partition_ends[*partition_id as usize] -= 1; + let end = partition_ends[*partition_id as usize]; + partition_row_indices[end as usize] = index as u32; + } + + // after calculating, partition ends become partition starts + } + + /// Compute partition IDs for the given batch and populate `partition_starts` and + /// `partition_row_indices`. Returns the number of rows processed. + pub(crate) fn compute_partition_ids( + &mut self, + partitioning: &CometPartitioning, + input: &RecordBatch, + ) -> datafusion::common::Result<()> { + match partitioning { + CometPartitioning::Hash(exprs, num_partitions) => { + let arrays = exprs + .iter() + .map(|expr| expr.evaluate(input)?.into_array(input.num_rows())) + .collect::>>()?; + let num_rows = arrays[0].len(); + let hashes_buf = &mut self.hashes_buf[..num_rows]; + hashes_buf.fill(42_u32); + let partition_ids = &mut self.partition_ids[..num_rows]; + create_murmur3_hashes(&arrays, hashes_buf)? + .iter() + .enumerate() + .for_each(|(idx, hash)| { + partition_ids[idx] = + comet_partitioning::pmod(*hash, *num_partitions) as u32; + }); + self.map_partition_ids_to_starts_and_indices(*num_partitions, num_rows); + } + CometPartitioning::RangePartitioning( + lex_ordering, + num_partitions, + row_converter, + bounds, + ) => { + let arrays = lex_ordering + .iter() + .map(|expr| expr.expr.evaluate(input)?.into_array(input.num_rows())) + .collect::>>()?; + let num_rows = arrays[0].len(); + let row_batch = row_converter.convert_columns(arrays.as_slice())?; + let partition_ids = &mut self.partition_ids[..num_rows]; + row_batch.iter().enumerate().for_each(|(row_idx, row)| { + partition_ids[row_idx] = bounds + .as_slice() + .partition_point(|bound| bound.row() <= row) + as u32 + }); + self.map_partition_ids_to_starts_and_indices(*num_partitions, num_rows); + } + CometPartitioning::RoundRobin(num_partitions, max_hash_columns) => { + let num_rows = input.num_rows(); + let num_columns_to_hash = if *max_hash_columns == 0 { + input.num_columns() + } else { + (*max_hash_columns).min(input.num_columns()) + }; + let columns_to_hash: Vec = (0..num_columns_to_hash) + .map(|i| Arc::clone(input.column(i))) + .collect(); + let hashes_buf = &mut self.hashes_buf[..num_rows]; + hashes_buf.fill(42_u32); + create_murmur3_hashes(&columns_to_hash, hashes_buf)?; + let partition_ids = &mut self.partition_ids[..num_rows]; + hashes_buf.iter().enumerate().for_each(|(idx, hash)| { + partition_ids[idx] = comet_partitioning::pmod(*hash, *num_partitions) as u32; + }); + self.map_partition_ids_to_starts_and_indices(*num_partitions, num_rows); + } + other => { + return Err(DataFusionError::NotImplemented(format!( + "Unsupported shuffle partitioning scheme {other:?}" + ))); + } + } + Ok(()) + } +} diff --git a/native/shuffle/src/partitioners/single_partition.rs b/native/shuffle/src/partitioners/single_partition.rs index 5801ef613b..778c1d9416 100644 --- a/native/shuffle/src/partitioners/single_partition.rs +++ b/native/shuffle/src/partitioners/single_partition.rs @@ -23,7 +23,6 @@ use arrow::array::RecordBatch; use arrow::datatypes::SchemaRef; use datafusion::common::DataFusionError; use std::fs::{File, OpenOptions}; -use std::io::{BufWriter, Write}; use tokio::time::Instant; /// A partitioner that writes all shuffle data to a single file and a single index file @@ -169,19 +168,8 @@ impl ShufflePartitioner for SinglePartitionShufflePartitioner { self.output_data_writer .flush(&self.metrics.encode_time, &self.metrics.write_time)?; - // Write index file. It should only contain 2 entries: 0 and the total number of bytes written - let index_file = OpenOptions::new() - .write(true) - .create(true) - .truncate(true) - .open(self.output_index_path.clone()) - .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?; - let mut index_buf_writer = BufWriter::new(index_file); let data_file_length = self.output_data_writer.writer_stream_position()?; - for offset in [0, data_file_length] { - index_buf_writer.write_all(&(offset as i64).to_le_bytes()[..])?; - } - index_buf_writer.flush()?; + crate::writers::write_index_file(&self.output_index_path, &[0, data_file_length])?; self.metrics .baseline diff --git a/native/shuffle/src/shuffle_writer.rs b/native/shuffle/src/shuffle_writer.rs index e649aaac69..67314b4961 100644 --- a/native/shuffle/src/shuffle_writer.rs +++ b/native/shuffle/src/shuffle_writer.rs @@ -19,9 +19,10 @@ use crate::metrics::ShufflePartitionerMetrics; use crate::partitioners::{ - MultiPartitionShuffleRepartitioner, ShufflePartitioner, SinglePartitionShufflePartitioner, + ImmediateShufflePartitioner, MultiPartitionShuffleRepartitioner, ShufflePartitioner, + SinglePartitionShufflePartitioner, }; -use crate::{CometPartitioning, CompressionCodec}; +use crate::{CometPartitioning, CompressionCodec, ShuffleMode}; use async_trait::async_trait; use datafusion::common::exec_datafusion_err; use datafusion::physical_expr::{EquivalenceProperties, Partitioning}; @@ -68,6 +69,8 @@ pub struct ShuffleWriterExec { tracing_enabled: bool, /// Size of the write buffer in bytes write_buffer_size: usize, + /// Which shuffle implementation to use for multi-partition shuffles + shuffle_mode: ShuffleMode, } impl ShuffleWriterExec { @@ -81,6 +84,7 @@ impl ShuffleWriterExec { output_index_file: String, tracing_enabled: bool, write_buffer_size: usize, + shuffle_mode: ShuffleMode, ) -> Result { let cache = PlanProperties::new( EquivalenceProperties::new(Arc::clone(&input.schema())), @@ -99,6 +103,7 @@ impl ShuffleWriterExec { codec, tracing_enabled, write_buffer_size, + shuffle_mode, }) } } @@ -163,6 +168,7 @@ impl ExecutionPlan for ShuffleWriterExec { self.output_index_file.clone(), self.tracing_enabled, self.write_buffer_size, + self.shuffle_mode, )?)), _ => panic!("ShuffleWriterExec wrong number of children"), } @@ -190,6 +196,7 @@ impl ExecutionPlan for ShuffleWriterExec { self.codec.clone(), self.tracing_enabled, self.write_buffer_size, + self.shuffle_mode, ) .map_err(|e| ArrowError::ExternalError(Box::new(e))), ) @@ -210,6 +217,7 @@ async fn external_shuffle( codec: CompressionCodec, tracing_enabled: bool, write_buffer_size: usize, + shuffle_mode: ShuffleMode, ) -> Result { with_trace_async("external_shuffle", tracing_enabled, || async { let schema = input.schema(); @@ -226,6 +234,21 @@ async fn external_shuffle( write_buffer_size, )?) } + _ if shuffle_mode == ShuffleMode::Immediate => { + Box::new(ImmediateShufflePartitioner::try_new( + partition, + output_data_file, + output_index_file, + Arc::clone(&schema), + partitioning, + metrics, + context.runtime_env(), + context.session_config().batch_size(), + codec, + tracing_enabled, + write_buffer_size, + )?) + } _ => Box::new(MultiPartitionShuffleRepartitioner::try_new( partition, output_data_file, @@ -466,6 +489,7 @@ mod test { "/tmp/index.out".to_string(), false, 1024 * 1024, // write_buffer_size: 1MB default + ShuffleMode::Buffered, ) .unwrap(); @@ -525,6 +549,7 @@ mod test { index_file.clone(), false, 1024 * 1024, + ShuffleMode::Buffered, ) .unwrap(); diff --git a/native/shuffle/src/writers/buf_batch_writer.rs b/native/shuffle/src/writers/buf_batch_writer.rs index cfddb46539..0b10396ead 100644 --- a/native/shuffle/src/writers/buf_batch_writer.rs +++ b/native/shuffle/src/writers/buf_batch_writer.rs @@ -135,6 +135,31 @@ impl, W: Write> BufBatchWriter { } } +impl, W: Write> BufBatchWriter { + /// Consume this BufBatchWriter and return the underlying writer. + pub(crate) fn into_writer(self) -> W { + self.writer + } +} + +impl> BufBatchWriter>> { + /// Total bytes buffered: staging buffer + accumulated compressed IPC output. + pub(crate) fn buffered_output_size(&self) -> usize { + self.buffer.len() + self.writer.get_ref().len() + } + + /// Reset the output buffer, keeping allocated capacity for reuse. + pub(crate) fn reset_output_buffer(&mut self) { + self.writer.get_mut().clear(); + self.writer.set_position(0); + } + + /// Returns a reference to the accumulated compressed IPC output. + pub(crate) fn output_bytes(&self) -> &[u8] { + self.writer.get_ref() + } +} + impl, W: Write + Seek> BufBatchWriter { pub(crate) fn writer_stream_position(&mut self) -> datafusion::common::Result { self.writer.stream_position().map_err(Into::into) diff --git a/native/shuffle/src/writers/mod.rs b/native/shuffle/src/writers/mod.rs index 75caf9f3a3..8de3c7ec9d 100644 --- a/native/shuffle/src/writers/mod.rs +++ b/native/shuffle/src/writers/mod.rs @@ -24,3 +24,20 @@ pub(crate) use buf_batch_writer::BufBatchWriter; pub(crate) use checksum::Checksum; pub use shuffle_block_writer::{CompressionCodec, ShuffleBlockWriter}; pub(crate) use spill::PartitionWriter; + +use datafusion::common::DataFusionError; +use std::fs::File; +use std::io::{BufWriter, Write}; + +/// Write shuffle index file: an array of i64 little-endian byte offsets. +pub(crate) fn write_index_file(path: &str, offsets: &[u64]) -> datafusion::common::Result<()> { + let mut writer = BufWriter::new( + File::create(path) + .map_err(|e| DataFusionError::Execution(format!("shuffle write error: {e:?}")))?, + ); + for &offset in offsets { + writer.write_all(&(offset as i64).to_le_bytes()[..])?; + } + writer.flush()?; + Ok(()) +} diff --git a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala index 3fc222bd19..13c34bd10c 100644 --- a/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala +++ b/spark/src/main/scala/org/apache/spark/sql/comet/execution/shuffle/CometNativeShuffleWriter.scala @@ -193,6 +193,12 @@ class CometNativeShuffleWriter[K, V]( shuffleWriterBuilder.setWriteBufferSize( CometConf.COMET_SHUFFLE_WRITE_BUFFER_SIZE.get().max(Int.MaxValue).toInt) + val shuffleMode = CometConf.COMET_NATIVE_SHUFFLE_MODE.get() match { + case "buffered" => OperatorOuterClass.ShuffleMode.BufferedShuffle + case _ => OperatorOuterClass.ShuffleMode.ImmediateShuffle + } + shuffleWriterBuilder.setShuffleMode(shuffleMode) + outputPartitioning match { case p if isSinglePartitioning(p) => val partitioning = PartitioningOuterClass.SinglePartition.newBuilder()