diff --git a/native/Cargo.lock b/native/Cargo.lock index 598b18d58c..0cf1f20318 100644 --- a/native/Cargo.lock +++ b/native/Cargo.lock @@ -1953,6 +1953,7 @@ dependencies = [ "arrow", "async-trait", "bytes", + "crc32c", "crc32fast", "criterion", "datafusion", diff --git a/native/shuffle/Cargo.toml b/native/shuffle/Cargo.toml index 94ed5f30a5..5cd7cd43ef 100644 --- a/native/shuffle/Cargo.toml +++ b/native/shuffle/Cargo.toml @@ -32,6 +32,7 @@ publish = false arrow = { workspace = true } async-trait = { workspace = true } bytes = { workspace = true } +crc32c = "0.6.8" crc32fast = "1.3.2" datafusion = { workspace = true } datafusion-comet-common = { workspace = true } diff --git a/native/shuffle/src/lib.rs b/native/shuffle/src/lib.rs index f29588f2e1..dd3b900272 100644 --- a/native/shuffle/src/lib.rs +++ b/native/shuffle/src/lib.rs @@ -20,6 +20,7 @@ pub mod ipc; pub(crate) mod metrics; pub(crate) mod partitioners; mod shuffle_writer; +mod spark_crc32c_hasher; pub mod spark_unsafe; pub(crate) mod writers; diff --git a/native/shuffle/src/spark_crc32c_hasher.rs b/native/shuffle/src/spark_crc32c_hasher.rs new file mode 100644 index 0000000000..19f794e7b5 --- /dev/null +++ b/native/shuffle/src/spark_crc32c_hasher.rs @@ -0,0 +1,55 @@ +//! Provide a CRC-32C implementor of [Hasher]. +use std::hash::Hasher; + +use crc32c::crc32c_append; + +/// Implementor of [Hasher] for CRC-32C. +/// +/// Note that CRC-32C produces a 32-bit hash (as [u32]), +/// but the trait requires that the output value be [u64]. +/// +/// This implementation is necessary because the existing [Hasher] implementation does not support +/// [Clone]. +#[derive(Default, Clone)] +pub struct SparkCrc32cHasher { + checksum: u32, +} + +impl SparkCrc32cHasher { + /// Create the [Hasher] pre-loaded with a particular checksum. + /// + /// Use the [Default::default()] constructor for a clean start. + pub fn new(initial: u32) -> Self { + Self { checksum: initial } + } + + pub fn finalize(&self) -> u32 { + self.checksum + } +} + +impl Hasher for SparkCrc32cHasher { + fn finish(&self) -> u64 { + self.checksum as u64 + } + + fn write(&mut self, bytes: &[u8]) { + self.checksum = crc32c_append(self.checksum, bytes); + } +} + +#[cfg(test)] +mod tests { + use super::*; + + const TEST_STRING: &[u8] = + b"This is a very long string which is used to test the CRC-32-Castagnoli function."; + const CHECKSUM: u32 = 0x20_CB_1E_59; + + #[test] + fn can_hash() { + let mut hasher = SparkCrc32cHasher::default(); + hasher.write(TEST_STRING); + assert_eq!(hasher.finish(), CHECKSUM as u64); + } +} diff --git a/native/shuffle/src/writers/checksum.rs b/native/shuffle/src/writers/checksum.rs index b240302e66..cbd8719e9d 100644 --- a/native/shuffle/src/writers/checksum.rs +++ b/native/shuffle/src/writers/checksum.rs @@ -16,18 +16,22 @@ // under the License. use bytes::Buf; -use crc32fast::Hasher; use datafusion_comet_jni_bridge::errors::{CometError, CometResult}; use simd_adler32::Adler32; +use std::hash::Hasher; use std::io::{Cursor, SeekFrom}; +use std::default::Default; +use crate::spark_crc32c_hasher::SparkCrc32cHasher; /// Checksum algorithms for writing IPC bytes. #[derive(Clone)] pub(crate) enum Checksum { /// CRC32 checksum algorithm. - CRC32(Hasher), + CRC32(crc32fast::Hasher), /// Adler32 checksum algorithm. Adler32(Adler32), + /// CRC32C checksum algorithm. + CRC32C(SparkCrc32cHasher), } impl Checksum { @@ -35,9 +39,9 @@ impl Checksum { match algo { 0 => { let hasher = if let Some(initial) = initial_opt { - Hasher::new_with_initial(initial) + crc32fast::Hasher::new_with_initial(initial) } else { - Hasher::new() + crc32fast::Hasher::new() }; Ok(Checksum::CRC32(hasher)) } @@ -51,6 +55,14 @@ impl Checksum { }; Ok(Checksum::Adler32(hasher)) } + 2 => { + let hasher = if let Some(initial) = initial_opt { + SparkCrc32cHasher::new(initial) + } else { + Default::default() + }; + Ok(Checksum::CRC32C(hasher)) + } _ => Err(CometError::Internal( "Unsupported checksum algorithm".to_string(), )), @@ -69,6 +81,11 @@ impl Checksum { hasher.write(cursor.chunk()); Ok(()) } + Checksum::CRC32C(hasher) => { + std::io::Seek::seek(cursor, SeekFrom::Start(0))?; + hasher.write(cursor.chunk()); + Ok(()) + } } } @@ -76,6 +93,59 @@ impl Checksum { match self { Checksum::CRC32(hasher) => hasher.finalize(), Checksum::Adler32(hasher) => hasher.finish(), + Checksum::CRC32C(hasher) => hasher.finalize(), } } } + +#[cfg(test)] +mod tests { + use super::*; + use std::io::Cursor; + + #[test] + fn test_crc32() { + let mut checksum = Checksum::try_new(0, None).unwrap(); + let message = b"123456789"; + + let mut vector: Vec = message.to_vec(); + let mut buff = Cursor::new(&mut vector); + + checksum.update(&mut buff).unwrap(); + let result = checksum.finalize(); + + let expected_crc = 0xcbf43926u32; + assert_eq!(result, expected_crc) + } + + #[test] + fn test_adler32() { + let mut checksum = Checksum::try_new(1, None).unwrap(); + let message = b"123456789"; + + let mut vector: Vec = message.to_vec(); + let mut buff = Cursor::new(&mut vector); + + checksum.update(&mut buff).unwrap(); + let result = checksum.finalize(); + + let expected_crc = 0x091e01deu32; + assert_eq!(result, expected_crc) + } + + #[test] + fn test_crc32c() { + let mut checksum = Checksum::try_new(2, None).unwrap(); + let message = b"123456789"; + + let mut vector: Vec = message.to_vec(); + let mut buff = Cursor::new(&mut vector); + + checksum.update(&mut buff).unwrap(); + let result = checksum.finalize(); + + let expected_crc = 0xe3069283u32; + assert_eq!(result, expected_crc) + } + +} \ No newline at end of file diff --git a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/SpillWriter.java b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/SpillWriter.java index 044c7842f0..fd3ff65a03 100644 --- a/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/SpillWriter.java +++ b/spark/src/main/java/org/apache/spark/sql/comet/execution/shuffle/SpillWriter.java @@ -68,7 +68,7 @@ public abstract class SpillWriter { protected byte[][] dataTypes; - // 0: CRC32, 1: Adler32. Spark uses Adler32 by default. + // 0: CRC32, 1: Adler32, or 2: CRC32C. Spark uses Adler32 by default. protected int checksumAlgo = 1; protected long checksum = -1; @@ -98,6 +98,8 @@ protected void setChecksumAlgo(String checksumAlgo) { this.checksumAlgo = 0; } else if (algo.equals("adler32")) { this.checksumAlgo = 1; + } else if (algo.equals("crc32c")) { + this.checksumAlgo = 2; } else { throw new UnsupportedOperationException( "Unsupported shuffle checksum algorithm: " + checksumAlgo);