diff --git a/Cargo.lock b/Cargo.lock index ed90dd25bda7b..878f03e4151c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2240,7 +2240,6 @@ dependencies = [ "datafusion-macros", "datafusion-physical-expr", "datafusion-physical-expr-common", - "foldhash 0.2.0", "half", "log", "num-traits", diff --git a/datafusion/common/src/hash_utils.rs b/datafusion/common/src/hash_utils.rs index 02db75498af49..e9c4c26e37482 100644 --- a/datafusion/common/src/hash_utils.rs +++ b/datafusion/common/src/hash_utils.rs @@ -31,8 +31,54 @@ use itertools::Itertools; use std::collections::HashMap; use std::hash::{BuildHasher, Hash, Hasher}; -/// The hash random state used throughout DataFusion for hashing. +/// [`RandomState`] is optimized for speed and suitable for hash tables and +/// bloom filters. [`QualityRandomState`] is optimized for statistical quality +/// and suitable for algorithms such as HyperLogLog. The tradeoff is that the +/// fast variant gives up some statistical quality, while the quality variant +/// is slightly slower. +/// +/// See: pub type RandomState = FixedState; +pub type QualityRandomState = foldhash::quality::FixedState; + +/// Fixed quality hash state used by HyperLogLog sketches. +/// +/// The seed is part of the HLL wire/storage semantics: serialized sketches only +/// remain mergeable if every producer uses the same hash state. +pub const HLL_RANDOM_STATE: QualityRandomState = QualityRandomState::with_seed(0); + +/// Hash state used by [`create_hashes`]. +/// +/// Multi-column hashing folds the previous column hash into a fresh hasher +/// before hashing the next column. This trait keeps that seeded hasher in the +/// same foldhash tier as the top-level hash state. +pub trait HashState: BuildHasher { + type SeededState: BuildHasher; + + fn seeded_state(&self, seed: u64) -> Self::SeededState; +} + +impl HashState for FixedState { + type SeededState = foldhash::fast::SeedableRandomState; + + fn seeded_state(&self, seed: u64) -> Self::SeededState { + foldhash::fast::SeedableRandomState::with_seed( + seed, + foldhash::SharedSeed::global_fixed(), + ) + } +} + +impl HashState for foldhash::quality::FixedState { + type SeededState = foldhash::quality::SeedableRandomState; + + fn seeded_state(&self, seed: u64) -> Self::SeededState { + foldhash::quality::SeedableRandomState::with_seed( + seed, + foldhash::SharedSeed::global_fixed(), + ) + } +} #[cfg(not(feature = "force_hash_collisions"))] use crate::cast::{ @@ -99,7 +145,7 @@ thread_local! { /// ``` pub fn with_hashes( arrays: I, - random_state: &RandomState, + random_state: &impl HashState, callback: F, ) -> Result where @@ -141,7 +187,11 @@ where } #[cfg(not(feature = "force_hash_collisions"))] -fn hash_null(random_state: &RandomState, hashes_buffer: &'_ mut [u64], mul_col: bool) { +fn hash_null( + random_state: &S, + hashes_buffer: &'_ mut [u64], + mul_col: bool, +) { if mul_col { hashes_buffer.iter_mut().for_each(|hash| { // stable hash for null value @@ -155,13 +205,13 @@ fn hash_null(random_state: &RandomState, hashes_buffer: &'_ mut [u64], mul_col: } pub trait HashValue { - fn hash_one(&self, state: &RandomState) -> u64; + fn hash_one(&self, state: &S) -> u64; /// Write this value into an existing hasher (same data as `hash_one`). fn hash_write(&self, hasher: &mut impl Hasher); } impl HashValue for &T { - fn hash_one(&self, state: &RandomState) -> u64 { + fn hash_one(&self, state: &S) -> u64 { T::hash_one(self, state) } fn hash_write(&self, hasher: &mut impl Hasher) { @@ -172,7 +222,7 @@ impl HashValue for &T { macro_rules! hash_value { ($($t:ty),+) => { $(impl HashValue for $t { - fn hash_one(&self, state: &RandomState) -> u64 { + fn hash_one(&self, state: &S) -> u64 { state.hash_one(self) } fn hash_write(&self, hasher: &mut impl Hasher) { @@ -187,7 +237,7 @@ hash_value!(bool, str, [u8], IntervalDayTime, IntervalMonthDayNano); macro_rules! hash_float_value { ($(($t:ty, $i:ty)),+) => { $(impl HashValue for $t { - fn hash_one(&self, state: &RandomState) -> u64 { + fn hash_one(&self, state: &S) -> u64 { // +0.0 and -0.0 differ only in the sign bit but compare equal // under IEEE 754; normalize -0.0 → +0.0 so Hash agrees with Eq. let bits = <$i>::from_ne_bytes(self.to_ne_bytes()); @@ -204,25 +254,13 @@ macro_rules! hash_float_value { } hash_float_value!((half::f16, u16), (f32, u32), (f64, u64)); -/// Create a `SeedableRandomState` whose per-hasher seed incorporates `seed`. -/// This folds the previous hash into the hasher's initial state so only the -/// new value needs to pass through the hash function — same cost as `hash_one`. -#[cfg(not(feature = "force_hash_collisions"))] -#[inline] -fn seeded_state(seed: u64) -> foldhash::fast::SeedableRandomState { - foldhash::fast::SeedableRandomState::with_seed( - seed, - foldhash::SharedSeed::global_fixed(), - ) -} - /// Builds hash values of PrimitiveArray and writes them into `hashes_buffer` /// If `rehash==true` this folds the existing hash into the hasher state /// and hashes only the new value (avoiding a separate combine step). #[cfg(not(feature = "force_hash_collisions"))] fn hash_array_primitive( array: &PrimitiveArray, - random_state: &RandomState, + random_state: &impl HashState, hashes_buffer: &mut [u64], rehash: bool, ) where @@ -237,7 +275,7 @@ fn hash_array_primitive( if array.null_count() == 0 { if rehash { for (hash, &value) in hashes_buffer.iter_mut().zip(array.values().iter()) { - let mut hasher = seeded_state(*hash).build_hasher(); + let mut hasher = random_state.seeded_state(*hash).build_hasher(); value.hash_write(&mut hasher); *hash = hasher.finish(); } @@ -249,7 +287,7 @@ fn hash_array_primitive( } else if rehash { for i in array.nulls().unwrap().valid_indices() { let value = unsafe { array.value_unchecked(i) }; - let mut hasher = seeded_state(hashes_buffer[i]).build_hasher(); + let mut hasher = random_state.seeded_state(hashes_buffer[i]).build_hasher(); value.hash_write(&mut hasher); hashes_buffer[i] = hasher.finish(); } @@ -267,7 +305,7 @@ fn hash_array_primitive( #[cfg(not(feature = "force_hash_collisions"))] fn hash_array( array: &T, - random_state: &RandomState, + random_state: &impl HashState, hashes_buffer: &mut [u64], rehash: bool, ) where @@ -322,7 +360,7 @@ fn hash_string_view_array_inner< const REHASH: bool, >( array: &GenericByteViewArray, - random_state: &RandomState, + random_state: &impl HashState, hashes_buffer: &mut [u64], ) { assert_eq!( @@ -351,7 +389,7 @@ fn hash_string_view_array_inner< // all views are inlined, no need to access external buffers if !HAS_BUFFERS || view_len <= 12 { if REHASH { - let mut hasher = seeded_state(*hash).build_hasher(); + let mut hasher = random_state.seeded_state(*hash).build_hasher(); v.hash_write(&mut hasher); *hash = hasher.finish(); } else { @@ -362,7 +400,7 @@ fn hash_string_view_array_inner< // view is not inlined, so we need to hash the bytes as well let value = view_bytes(view_len, v); if REHASH { - let mut hasher = seeded_state(*hash).build_hasher(); + let mut hasher = random_state.seeded_state(*hash).build_hasher(); value.hash_write(&mut hasher); *hash = hasher.finish(); } else { @@ -377,7 +415,7 @@ fn hash_string_view_array_inner< #[cfg(not(feature = "force_hash_collisions"))] fn hash_generic_byte_view_array( array: &GenericByteViewArray, - random_state: &RandomState, + random_state: &impl HashState, hashes_buffer: &mut [u64], rehash: bool, ) { @@ -396,7 +434,7 @@ fn hash_generic_byte_view_array( } (false, false, true) => { for (hash, &view) in hashes_buffer.iter_mut().zip(array.views().iter()) { - let mut hasher = seeded_state(*hash).build_hasher(); + let mut hasher = random_state.seeded_state(*hash).build_hasher(); view.hash_write(&mut hasher); *hash = hasher.finish(); } @@ -449,7 +487,7 @@ fn hash_dictionary_inner< const MULTI_COL: bool, >( array: &DictionaryArray, - random_state: &RandomState, + random_state: &impl HashState, hashes_buffer: &mut [u64], ) -> Result<()> { // Hash each dictionary value once, and then use that computed @@ -491,7 +529,7 @@ fn hash_dictionary_inner< #[cfg(not(feature = "force_hash_collisions"))] fn hash_dictionary( array: &DictionaryArray, - random_state: &RandomState, + random_state: &impl HashState, hashes_buffer: &mut [u64], multi_col: bool, ) -> Result<()> { @@ -547,7 +585,7 @@ fn hash_dictionary( #[cfg(not(feature = "force_hash_collisions"))] fn hash_struct_array( array: &StructArray, - random_state: &RandomState, + random_state: &impl HashState, hashes_buffer: &mut [u64], ) -> Result<()> { let nulls = array.nulls(); @@ -577,7 +615,7 @@ fn hash_struct_array( #[cfg(not(feature = "force_hash_collisions"))] fn hash_map_array( array: &MapArray, - random_state: &RandomState, + random_state: &impl HashState, hashes_buffer: &mut [u64], ) -> Result<()> { let nulls = array.nulls(); @@ -628,7 +666,7 @@ fn hash_map_array( #[cfg(not(feature = "force_hash_collisions"))] fn hash_list_array( array: &GenericListArray, - random_state: &RandomState, + random_state: &impl HashState, hashes_buffer: &mut [u64], ) -> Result<()> where @@ -679,7 +717,7 @@ where #[cfg(not(feature = "force_hash_collisions"))] fn hash_list_view_array( array: &GenericListViewArray, - random_state: &RandomState, + random_state: &impl HashState, hashes_buffer: &mut [u64], ) -> Result<()> where @@ -718,7 +756,7 @@ where #[cfg(not(feature = "force_hash_collisions"))] fn hash_union_array( array: &UnionArray, - random_state: &RandomState, + random_state: &impl HashState, hashes_buffer: &mut [u64], ) -> Result<()> { let DataType::Union(union_fields, _mode) = array.data_type() else { @@ -750,7 +788,7 @@ fn hash_union_array( fn hash_union_array_default( array: &UnionArray, union_fields: &UnionFields, - random_state: &RandomState, + random_state: &impl HashState, hashes_buffer: &mut [u64], ) -> Result<()> { let mut child_hashes: HashMap> = @@ -791,7 +829,7 @@ fn hash_union_array_default( fn hash_sparse_union_array( array: &UnionArray, union_fields: &UnionFields, - random_state: &RandomState, + random_state: &impl HashState, hashes_buffer: &mut [u64], ) -> Result<()> { use std::collections::HashMap; @@ -846,7 +884,7 @@ fn hash_sparse_union_array( #[cfg(not(feature = "force_hash_collisions"))] fn hash_fixed_list_array( array: &FixedSizeListArray, - random_state: &RandomState, + random_state: &impl HashState, hashes_buffer: &mut [u64], ) -> Result<()> { let values = array.values(); @@ -885,7 +923,7 @@ fn hash_run_array_inner< const REHASH: bool, >( array: &RunArray, - random_state: &RandomState, + random_state: &impl HashState, hashes_buffer: &mut [u64], ) -> Result<()> { // We find the relevant runs that cover potentially sliced arrays, so we can only hash those @@ -952,7 +990,7 @@ fn hash_run_array_inner< #[cfg(not(feature = "force_hash_collisions"))] fn hash_run_array( array: &RunArray, - random_state: &RandomState, + random_state: &impl HashState, hashes_buffer: &mut [u64], rehash: bool, ) -> Result<()> { @@ -979,7 +1017,7 @@ fn hash_run_array( #[cfg(not(feature = "force_hash_collisions"))] fn hash_single_array( array: &dyn Array, - random_state: &RandomState, + random_state: &impl HashState, hashes_buffer: &mut [u64], rehash: bool, ) -> Result<()> { @@ -1052,7 +1090,7 @@ fn hash_single_array( #[cfg(feature = "force_hash_collisions")] fn hash_single_array( _array: &dyn Array, - _random_state: &RandomState, + _random_state: &impl HashState, hashes_buffer: &mut [u64], _rehash: bool, ) -> Result<()> { @@ -1105,7 +1143,7 @@ impl AsDynArray for &ArrayRef { /// `hashes_buffer` should be pre-sized appropriately. pub fn create_hashes<'a, I, T>( arrays: I, - random_state: &RandomState, + random_state: &impl HashState, hashes_buffer: &'a mut [u64], ) -> Result<&'a mut [u64]> where @@ -1814,6 +1852,30 @@ mod tests { assert_eq!(hashes1, hashes2); } + #[test] + fn test_create_hashes_with_quality_hash_state() { + let int_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4])); + let str_array: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c", "d"])); + let quality_state = foldhash::quality::FixedState::with_seed(0); + + let mut one_col_hashes = vec![0; int_array.len()]; + create_hashes([&int_array], &quality_state, &mut one_col_hashes).unwrap(); + let expected_hashes: Vec<_> = [1i32, 2, 3, 4] + .iter() + .map(|value| quality_state.hash_one(value)) + .collect(); + assert_eq!(one_col_hashes, expected_hashes); + + let mut two_col_hashes = vec![0; int_array.len()]; + create_hashes( + [&int_array, &str_array], + &quality_state, + &mut two_col_hashes, + ) + .unwrap(); + assert_ne!(two_col_hashes, one_col_hashes); + } + #[test] fn test_with_hashes() { let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4])); diff --git a/datafusion/functions-aggregate/Cargo.toml b/datafusion/functions-aggregate/Cargo.toml index 778e6a24bf00e..ff89808f0b81b 100644 --- a/datafusion/functions-aggregate/Cargo.toml +++ b/datafusion/functions-aggregate/Cargo.toml @@ -50,7 +50,6 @@ datafusion-functions-aggregate-common = { workspace = true } datafusion-macros = { workspace = true } datafusion-physical-expr = { workspace = true } datafusion-physical-expr-common = { workspace = true } -foldhash = "0.2" half = { workspace = true } log = { workspace = true } num-traits = { workspace = true } diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs b/datafusion/functions-aggregate/src/approx_distinct.rs index 38b902964f546..531e6bb4f1a9a 100644 --- a/datafusion/functions-aggregate/src/approx_distinct.rs +++ b/datafusion/functions-aggregate/src/approx_distinct.rs @@ -18,20 +18,16 @@ //! Defines physical expressions that can evaluated at runtime during query execution use crate::hyperloglog::{HLL_HASH_STATE, HyperLogLog, NUM_REGISTERS, count_from_hashes}; -use arrow::array::{Array, BinaryArray, StringViewArray}; use arrow::array::{ - AsArray, BinaryBuilder, BooleanArray, GenericBinaryArray, GenericStringArray, - OffsetSizeTrait, PrimitiveArray, UInt64Array, + Array, ArrayRef, BinaryArray, BinaryBuilder, BooleanArray, PrimitiveArray, + UInt64Array, }; use arrow::buffer::NullBuffer; use arrow::datatypes::{ - ArrowPrimitiveType, Date32Type, Date64Type, FieldRef, Int32Type, Int64Type, - Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType, - TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, - TimestampNanosecondType, TimestampSecondType, UInt32Type, UInt64Type, + ArrowPrimitiveType, DataType, Field, FieldRef, Int64Type, TimeUnit, }; -use arrow::{array::ArrayRef, datatypes::DataType, datatypes::Field}; use datafusion_common::ScalarValue; +use datafusion_common::hash_utils::create_hashes; use datafusion_common::{ DataFusionError, Result, downcast_value, internal_datafusion_err, internal_err, not_impl_err, @@ -51,8 +47,8 @@ use datafusion_functions_aggregate_common::aggregate::groups_accumulator::nulls: use datafusion_functions_aggregate_common::noop_accumulator::NoopAccumulator; use datafusion_macros::user_doc; use std::fmt::{Debug, Formatter}; -use std::hash::{BuildHasher, Hash}; -use std::marker::PhantomData; +use std::hash::Hash; +use std::mem::{size_of, size_of_val}; use std::sync::Arc; make_udaf_expr_and_func!( @@ -124,192 +120,129 @@ impl Accumulator for ApproxDistinctBitmapWrapper { } #[derive(Debug)] -struct NumericHLLAccumulator -where - T: ArrowPrimitiveType, - T::Native: Hash, -{ - hll: HyperLogLog, +struct HLLAccumulator { + hll: HyperLogLog, + hashes: Vec, } -impl NumericHLLAccumulator -where - T: ArrowPrimitiveType, - T::Native: Hash, -{ +impl HLLAccumulator { pub fn new() -> Self { Self { hll: HyperLogLog::new(), + hashes: Vec::new(), } } } -#[derive(Debug)] -struct StringHLLAccumulator -where - T: OffsetSizeTrait, -{ - hll: HyperLogLog, - phantom_data: PhantomData, -} +impl Accumulator for HLLAccumulator { + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let array = values[0].as_ref(); + self.hashes.clear(); + self.hashes.resize(array.len(), 0); + create_hashes([array], &HLL_HASH_STATE, &mut self.hashes)?; -impl StringHLLAccumulator -where - T: OffsetSizeTrait, -{ - pub fn new() -> Self { - Self { - hll: HyperLogLog::new(), - phantom_data: PhantomData, + match array.logical_nulls() { + None => { + for &hash in &self.hashes { + self.hll.add_hashed(hash); + } + } + Some(nulls) => { + for row in 0..array.len() { + if nulls.is_valid(row) { + self.hll.add_hashed(self.hashes[row]); + } + } + } } + Ok(()) } -} - -#[derive(Debug)] -struct StringViewHLLAccumulator { - hll: HyperLogLog, -} -impl StringViewHLLAccumulator { - pub fn new() -> Self { - Self { - hll: HyperLogLog::new(), + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + assert_eq!(1, states.len(), "expect only 1 element in the states"); + let binary_array = downcast_value!(states[0], BinaryArray); + for v in binary_array.iter() { + let v = v.ok_or_else(|| { + internal_datafusion_err!("Impossibly got empty binary array from states") + })?; + let other = v.try_into()?; + self.hll.merge(&other); } + Ok(()) + } + + fn state(&mut self) -> Result> { + let value = ScalarValue::from(&self.hll); + Ok(vec![value]) + } + + fn evaluate(&mut self) -> Result { + Ok(ScalarValue::UInt64(Some(self.hll.count() as u64))) + } + + fn size(&self) -> usize { + size_of_val(self) + self.hashes.capacity() * size_of::() } } +/// Specialize the numeric case for extra performance. #[derive(Debug)] -struct BinaryHLLAccumulator +struct NumericHLLAccumulator where - T: OffsetSizeTrait, + T: ArrowPrimitiveType, + T::Native: Hash, { - hll: HyperLogLog<[u8]>, - phantom_data: PhantomData, + hll: HyperLogLog, } -impl BinaryHLLAccumulator +impl NumericHLLAccumulator where - T: OffsetSizeTrait, + T: ArrowPrimitiveType, + T::Native: Hash, { pub fn new() -> Self { Self { hll: HyperLogLog::new(), - phantom_data: PhantomData, } } } -macro_rules! default_accumulator_impl { - () => { - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - assert_eq!(1, states.len(), "expect only 1 element in the states"); - let binary_array = downcast_value!(states[0], BinaryArray); - for v in binary_array.iter() { - let v = v.ok_or_else(|| { - internal_datafusion_err!( - "Impossibly got empty binary array from states" - ) - })?; - let other = v.try_into()?; - self.hll.merge(&other); - } - Ok(()) - } - - fn state(&mut self) -> Result> { - let value = ScalarValue::from(&self.hll); - Ok(vec![value]) - } - - fn evaluate(&mut self) -> Result { - Ok(ScalarValue::UInt64(Some(self.hll.count() as u64))) - } - - fn size(&self) -> usize { - // HLL has static size - std::mem::size_of_val(self) - } - }; -} - -impl Accumulator for BinaryHLLAccumulator +impl Accumulator for NumericHLLAccumulator where - T: OffsetSizeTrait, + T: ArrowPrimitiveType + Debug, + T::Native: Hash, { fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let array: &GenericBinaryArray = - downcast_value!(values[0], GenericBinaryArray, T); - // flatten because we would skip nulls + let array: &PrimitiveArray = downcast_value!(values[0], PrimitiveArray, T); self.hll.extend(array.into_iter().flatten()); Ok(()) } - default_accumulator_impl!(); -} - -impl Accumulator for StringViewHLLAccumulator { - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let array: &StringViewArray = downcast_value!(values[0], StringViewArray); - - if array.data_buffers().is_empty() { - // Fast path: with no data buffers every value is inline, so they all - // take the u128 path — no need to check the length per row. - for (i, &view) in array.views().iter().enumerate() { - if !array.is_null(i) { - self.hll.add_hashed(HLL_HASH_STATE.hash_one(view)); - } - } - } else { - // Mixed batch: decide per row by length. Short strings still use the - // u128 path so they match how they'd be hashed in an all-inline - // batch; only the genuinely out-of-line strings materialize a &str. - for (i, &view) in array.views().iter().enumerate() { - if array.is_null(i) { - continue; - } - // The low 32 bits of the u128 view encode the string length. - if (view as u32) <= 12 { - self.hll.add_hashed(HLL_HASH_STATE.hash_one(view)); - } else { - self.hll.add(array.value(i)); - } - } + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + assert_eq!(1, states.len(), "expect only 1 element in the states"); + let binary_array = downcast_value!(states[0], BinaryArray); + for v in binary_array.iter() { + let v = v.ok_or_else(|| { + internal_datafusion_err!("Impossibly got empty binary array from states") + })?; + let other = v.try_into()?; + self.hll.merge(&other); } - Ok(()) } - default_accumulator_impl!(); -} - -impl Accumulator for StringHLLAccumulator -where - T: OffsetSizeTrait, -{ - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let array: &GenericStringArray = - downcast_value!(values[0], GenericStringArray, T); - // flatten because we would skip nulls - self.hll.extend(array.into_iter().flatten()); - Ok(()) + fn state(&mut self) -> Result> { + let value = ScalarValue::from(&self.hll); + Ok(vec![value]) } - default_accumulator_impl!(); -} - -impl Accumulator for NumericHLLAccumulator -where - T: ArrowPrimitiveType + Debug, - T::Native: Hash, -{ - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let array: &PrimitiveArray = downcast_value!(values[0], PrimitiveArray, T); - // flatten because we would skip nulls - self.hll.extend(array.into_iter().flatten()); - Ok(()) + fn evaluate(&mut self) -> Result { + Ok(ScalarValue::UInt64(Some(self.hll.count() as u64))) } - default_accumulator_impl!(); + fn size(&self) -> usize { + size_of_val(self) + } } /// Maximum number of distinct hashes kept in the sparse representation of a @@ -496,125 +429,6 @@ impl GroupHll { } } -/// Computes HyperLogLog hashes for the rows of an input array, type by type. -/// -/// The hashing matches the per-group [`Accumulator`] implementations exactly so -/// that the grouped and ungrouped paths produce identical estimates. -trait HllValueHasher: Send + Sync + 'static { - /// Invoke `f(row_index, hash)` for every row that is valid according to - /// `nulls`. `nulls = None` means every row is valid (caller has - /// pre-combined value-nulls and filter into a single buffer). - fn for_each_hash( - array: &dyn Array, - nulls: Option<&NullBuffer>, - f: impl FnMut(usize, u64), - ); -} - -struct NumericHasher(PhantomData); - -impl HllValueHasher for NumericHasher -where - T: ArrowPrimitiveType + Send + Sync + 'static, - T::Native: Hash, -{ - #[inline] - fn for_each_hash( - array: &dyn Array, - nulls: Option<&NullBuffer>, - mut f: impl FnMut(usize, u64), - ) { - let array: &PrimitiveArray = array.as_primitive::(); - match nulls { - None => { - for (i, v) in array.values().iter().enumerate() { - f(i, HLL_HASH_STATE.hash_one(v)); - } - } - Some(nulls) => { - for i in 0..array.len() { - if nulls.is_valid(i) { - f(i, HLL_HASH_STATE.hash_one(array.value(i))); - } - } - } - } - } -} - -struct Utf8Hasher(PhantomData); - -impl HllValueHasher for Utf8Hasher { - #[inline] - fn for_each_hash( - array: &dyn Array, - nulls: Option<&NullBuffer>, - mut f: impl FnMut(usize, u64), - ) { - let array: &GenericStringArray = array.as_string::(); - for i in 0..array.len() { - if nulls.is_none_or(|n| n.is_valid(i)) { - f(i, HLL_HASH_STATE.hash_one(array.value(i))); - } - } - } -} - -struct Utf8ViewHasher; - -impl HllValueHasher for Utf8ViewHasher { - #[inline] - fn for_each_hash( - array: &dyn Array, - nulls: Option<&NullBuffer>, - mut f: impl FnMut(usize, u64), - ) { - let array: &StringViewArray = array.as_string_view(); - // Mirror `StringViewHLLAccumulator`: hash the raw inline view when all - // strings are stored inline (≤ 12 bytes), avoiding `&str` materialization. - if array.data_buffers().is_empty() { - let views = array.views(); - for i in 0..array.len() { - if nulls.is_none_or(|n| n.is_valid(i)) { - f(i, HLL_HASH_STATE.hash_one(views[i])); - } - } - } else { - // Mixed batch: short strings (≤ 12 bytes) are still inline and must - // be hashed as the raw u128 view to match the all-inline fast path. - let views = array.views(); - for i in 0..array.len() { - if nulls.is_none_or(|n| n.is_valid(i)) { - let view = views[i]; - if (view as u32) <= 12 { - f(i, HLL_HASH_STATE.hash_one(view)); - } else { - f(i, HLL_HASH_STATE.hash_one(array.value(i))); - } - } - } - } - } -} - -struct BinaryHasher(PhantomData); - -impl HllValueHasher for BinaryHasher { - #[inline] - fn for_each_hash( - array: &dyn Array, - nulls: Option<&NullBuffer>, - mut f: impl FnMut(usize, u64), - ) { - let array: &GenericBinaryArray = array.as_binary::(); - for i in 0..array.len() { - if nulls.is_none_or(|n| n.is_valid(i)) { - f(i, HLL_HASH_STATE.hash_one(array.value(i))); - } - } - } -} - /// A [`GroupsAccumulator`] for `approx_distinct` that keeps one adaptive /// (sparse → dense) HyperLogLog sketch per group. /// @@ -646,20 +460,21 @@ impl HllValueHasher for BinaryHasher { /// replayed into a dense sketch. New values for `b` update the dense registers /// directly, and serialized state is the raw [`NUM_REGISTERS`]-byte register /// array. -struct HllGroupsAccumulator { +struct HllGroupsAccumulator { /// Per-group sketches, indexed by `group_index`. groups: Vec, /// Incrementally maintained estimate of heap bytes used by `groups`. allocated_bytes: usize, - phantom: PhantomData, + /// Reused workspace for vectorized value hashing. + hashes: Vec, } -impl HllGroupsAccumulator { +impl HllGroupsAccumulator { fn new() -> Self { Self { groups: Vec::new(), allocated_bytes: 0, - phantom: PhantomData, + hashes: Vec::new(), } } @@ -677,7 +492,7 @@ impl HllGroupsAccumulator { } } -impl GroupsAccumulator for HllGroupsAccumulator { +impl GroupsAccumulator for HllGroupsAccumulator { fn update_batch( &mut self, values: &[ArrayRef], @@ -686,17 +501,30 @@ impl GroupsAccumulator for HllGroupsAccumulator { total_num_groups: usize, ) -> Result<()> { self.ensure_groups(total_num_groups); - let groups = &mut self.groups; + let array = values[0].as_ref(); + self.hashes.clear(); + self.hashes.resize(array.len(), 0); + create_hashes([array], &HLL_HASH_STATE, &mut self.hashes)?; + let mut delta: isize = 0; - // Pre-combine value-nulls and filter into one mask so the callback - // needs no per-row branching. + // Pre-combine value-nulls and filter into one mask so the update loop + // only visits rows that should affect the sketch. let filter_nulls = opt_filter.map(filter_to_nulls); - let value_nulls = values[0].logical_nulls(); + let value_nulls = array.logical_nulls(); let combined_nulls = NullBuffer::union(filter_nulls.as_ref(), value_nulls.as_ref()); - H::for_each_hash(values[0].as_ref(), combined_nulls.as_ref(), |row, hash| { - delta += groups[group_indices[row]].add_hash(hash); - }); + match combined_nulls { + None => { + for (row, &hash) in self.hashes.iter().enumerate() { + delta += self.groups[group_indices[row]].add_hash(hash); + } + } + Some(nulls) => { + for row in nulls.valid_indices() { + delta += self.groups[group_indices[row]].add_hash(self.hashes[row]); + } + } + } self.apply_delta(delta); Ok(()) } @@ -757,7 +585,9 @@ impl GroupsAccumulator for HllGroupsAccumulator { } fn size(&self) -> usize { - self.groups.capacity() * size_of::() + self.allocated_bytes + self.groups.capacity() * size_of::() + + self.allocated_bytes + + self.hashes.capacity() * size_of::() } } @@ -901,41 +731,8 @@ impl AggregateUDFImpl for ApproxDistinct { | DataType::Int16 => { return get_fixed_domain_approx_accumulator(data_type); } - DataType::UInt32 => Box::new(NumericHLLAccumulator::::new()), - DataType::UInt64 => Box::new(NumericHLLAccumulator::::new()), - DataType::Int32 => Box::new(NumericHLLAccumulator::::new()), DataType::Int64 => Box::new(NumericHLLAccumulator::::new()), - DataType::Date32 => Box::new(NumericHLLAccumulator::::new()), - DataType::Date64 => Box::new(NumericHLLAccumulator::::new()), - DataType::Time32(TimeUnit::Second) => { - Box::new(NumericHLLAccumulator::::new()) - } - DataType::Time32(TimeUnit::Millisecond) => { - Box::new(NumericHLLAccumulator::::new()) - } - DataType::Time64(TimeUnit::Microsecond) => { - Box::new(NumericHLLAccumulator::::new()) - } - DataType::Time64(TimeUnit::Nanosecond) => { - Box::new(NumericHLLAccumulator::::new()) - } - DataType::Timestamp(TimeUnit::Second, _) => { - Box::new(NumericHLLAccumulator::::new()) - } - DataType::Timestamp(TimeUnit::Millisecond, _) => { - Box::new(NumericHLLAccumulator::::new()) - } - DataType::Timestamp(TimeUnit::Microsecond, _) => { - Box::new(NumericHLLAccumulator::::new()) - } - DataType::Timestamp(TimeUnit::Nanosecond, _) => { - Box::new(NumericHLLAccumulator::::new()) - } - DataType::Utf8 => Box::new(StringHLLAccumulator::::new()), - DataType::LargeUtf8 => Box::new(StringHLLAccumulator::::new()), - DataType::Utf8View => Box::new(StringViewHLLAccumulator::new()), - DataType::Binary => Box::new(BinaryHLLAccumulator::::new()), - DataType::LargeBinary => Box::new(BinaryHLLAccumulator::::new()), + data_type if is_hll_type(data_type) => Box::new(HLLAccumulator::new()), DataType::Null => { Box::new(NoopAccumulator::new(ScalarValue::UInt64(Some(0)))) } @@ -949,7 +746,7 @@ impl AggregateUDFImpl for ApproxDistinct { } fn groups_accumulator_supported(&self, args: AccumulatorArgs) -> bool { - is_hll_groups_type(args.expr_fields[0].data_type()) + is_hll_type(args.expr_fields[0].data_type()) } fn create_groups_accumulator( @@ -957,71 +754,13 @@ impl AggregateUDFImpl for ApproxDistinct { args: AccumulatorArgs, ) -> Result> { let data_type = args.expr_fields[0].data_type(); - let accumulator: Box = match data_type { - DataType::UInt32 => { - Box::new(HllGroupsAccumulator::>::new()) - } - DataType::UInt64 => { - Box::new(HllGroupsAccumulator::>::new()) - } - DataType::Int32 => { - Box::new(HllGroupsAccumulator::>::new()) - } - DataType::Int64 => { - Box::new(HllGroupsAccumulator::>::new()) - } - DataType::Date32 => { - Box::new(HllGroupsAccumulator::>::new()) - } - DataType::Date64 => { - Box::new(HllGroupsAccumulator::>::new()) - } - DataType::Time32(TimeUnit::Second) => { - Box::new(HllGroupsAccumulator::>::new()) - } - DataType::Time32(TimeUnit::Millisecond) => Box::new(HllGroupsAccumulator::< - NumericHasher, - >::new()), - DataType::Time64(TimeUnit::Microsecond) => Box::new(HllGroupsAccumulator::< - NumericHasher, - >::new()), - DataType::Time64(TimeUnit::Nanosecond) => Box::new(HllGroupsAccumulator::< - NumericHasher, - >::new()), - DataType::Timestamp(TimeUnit::Second, _) => Box::new(HllGroupsAccumulator::< - NumericHasher, - >::new()), - DataType::Timestamp(TimeUnit::Millisecond, _) => { - Box::new(HllGroupsAccumulator::< - NumericHasher, - >::new()) - } - DataType::Timestamp(TimeUnit::Microsecond, _) => { - Box::new(HllGroupsAccumulator::< - NumericHasher, - >::new()) - } - DataType::Timestamp(TimeUnit::Nanosecond, _) => Box::new( - HllGroupsAccumulator::>::new(), - ), - DataType::Utf8 => Box::new(HllGroupsAccumulator::>::new()), - DataType::LargeUtf8 => { - Box::new(HllGroupsAccumulator::>::new()) - } - DataType::Utf8View => Box::new(HllGroupsAccumulator::::new()), - DataType::Binary => { - Box::new(HllGroupsAccumulator::>::new()) - } - DataType::LargeBinary => { - Box::new(HllGroupsAccumulator::>::new()) - } - other => { - return not_impl_err!( - "GroupsAccumulator for 'approx_distinct' is not implemented for data type {other}" - ); - } - }; - Ok(accumulator) + if is_hll_type(data_type) { + Ok(Box::new(HllGroupsAccumulator::new())) + } else { + not_impl_err!( + "GroupsAccumulator for 'approx_distinct' is not implemented for data type {data_type}" + ) + } } fn documentation(&self) -> Option<&Documentation> { @@ -1029,10 +768,9 @@ impl AggregateUDFImpl for ApproxDistinct { } } -/// Returns true for the data types backed by the HyperLogLog -/// [`HllGroupsAccumulator`]. The fixed-domain types (booleans / small ints) and -/// `Null` fall back to the per-group [`Accumulator`] path. -fn is_hll_groups_type(data_type: &DataType) -> bool { +/// Returns true for the data types backed by HyperLogLog. The fixed-domain +/// types (booleans / small ints) and `Null` use separate accumulator paths. +fn is_hll_type(data_type: &DataType) -> bool { matches!( data_type, DataType::UInt32 @@ -1061,6 +799,7 @@ fn is_hll_groups_type(data_type: &DataType) -> bool { mod tests { use super::*; use arrow::array::{AsArray, Int64Array, StringViewArray}; + use std::hash::BuildHasher; use std::sync::Arc; // A string longer than the 12-byte inline limit @@ -1086,7 +825,7 @@ mod tests { buf } - fn distinct_count(acc: &mut StringViewHLLAccumulator) -> u64 { + fn distinct_count(acc: &mut HLLAccumulator) -> u64 { match acc.evaluate().unwrap() { ScalarValue::UInt64(Some(v)) => v, other => panic!("unexpected evaluate result: {other:?}"), @@ -1221,7 +960,7 @@ mod tests { let filter = BooleanArray::from(vec![Some(true), None, Some(false), None, Some(true)]); - let mut acc = HllGroupsAccumulator::>::new(); + let mut acc = HllGroupsAccumulator::new(); // put all rows in group 0 let group_indices = vec![0usize; 5]; acc.update_batch(&[values], &group_indices, Some(&filter), 1) @@ -1250,7 +989,7 @@ mod tests { assert!(!batch2.as_string_view().data_buffers().is_empty()); let group_indices = vec![0usize, 0]; - let mut acc = HllGroupsAccumulator::::new(); + let mut acc = HllGroupsAccumulator::new(); acc.update_batch(&[batch1], &group_indices, None, 1) .unwrap(); acc.update_batch(&[batch2], &group_indices, None, 1) @@ -1269,7 +1008,7 @@ mod tests { // Multiset: {"aaa" x2, "bbb", LONG}, so 3 distinct values. let mixed: ArrayRef = Arc::new(StringViewArray::from(vec!["aaa", "bbb", LONG, "aaa"])); - let mut acc_single = StringViewHLLAccumulator::new(); + let mut acc_single = HLLAccumulator::new(); acc_single.update_batch(&[mixed]).unwrap(); // Same multiset, but split so "aaa" lands in both an all-inline batch @@ -1279,7 +1018,7 @@ mod tests { assert!(inline_only.as_string_view().data_buffers().is_empty()); assert!(!with_buffer.as_string_view().data_buffers().is_empty()); - let mut acc_split = StringViewHLLAccumulator::new(); + let mut acc_split = HLLAccumulator::new(); acc_split.update_batch(&[inline_only]).unwrap(); acc_split.update_batch(&[with_buffer]).unwrap(); diff --git a/datafusion/functions-aggregate/src/hyperloglog.rs b/datafusion/functions-aggregate/src/hyperloglog.rs index 182fe15cf0f24..9968e5a98194f 100644 --- a/datafusion/functions-aggregate/src/hyperloglog.rs +++ b/datafusion/functions-aggregate/src/hyperloglog.rs @@ -55,14 +55,7 @@ where phantom: PhantomData, } -/// Fixed seed for the hashing so that values are consistent across runs -/// -/// Note that when we later move on to have serialized HLL register binaries -/// shared across cluster, this HLL_HASH_STATE will have to be consistent across all -/// parties otherwise we might have corruption. So ideally for later this seed -/// shall be part of the serialized form (or stay unchanged across versions). -pub(crate) const HLL_HASH_STATE: foldhash::quality::FixedState = - foldhash::quality::FixedState::with_seed(0); +pub(crate) use datafusion_common::hash_utils::HLL_RANDOM_STATE as HLL_HASH_STATE; impl Default for HyperLogLog where @@ -93,9 +86,8 @@ where } } - /// choice of hash function: foldhash is already an dependency - /// and it fits the requirements of being a 64bit hash with - /// reasonable performance. + /// The HLL hash state is shared through `datafusion_common::hash_utils` + /// so sketches remain compatible across accumulators. #[inline] fn hash_value(&self, obj: &T) -> u64 { HLL_HASH_STATE.hash_one(obj)