Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

148 changes: 105 additions & 43 deletions datafusion/common/src/hash_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,54 @@ use itertools::Itertools;
use std::collections::HashMap;
use std::hash::{BuildHasher, Hash, Hasher};

/// The hash random state used throughout DataFusion for hashing.
/// [`RandomState`] is optimized for speed and suitable for hash tables and
/// bloom filters. [`QualityRandomState`] is optimized for statistical quality
/// and suitable for algorithms such as HyperLogLog. The tradeoff is that the
/// fast variant gives up some statistical quality, while the quality variant
/// is slightly slower.
///
/// See: <https://docs.rs/foldhash/0.2.0/src/foldhash/lib.rs.html#17-21>
pub type RandomState = FixedState;
pub type QualityRandomState = foldhash::quality::FixedState;

/// Fixed quality hash state used by HyperLogLog sketches.
///
/// The seed is part of the HLL wire/storage semantics: serialized sketches only
/// remain mergeable if every producer uses the same hash state.
pub const HLL_RANDOM_STATE: QualityRandomState = QualityRandomState::with_seed(0);

/// Hash state used by [`create_hashes`].
///
/// Multi-column hashing folds the previous column hash into a fresh hasher
/// before hashing the next column. This trait keeps that seeded hasher in the
/// same foldhash tier as the top-level hash state.
pub trait HashState: BuildHasher {
type SeededState: BuildHasher;

fn seeded_state(&self, seed: u64) -> Self::SeededState;
}

impl HashState for FixedState {
type SeededState = foldhash::fast::SeedableRandomState;

fn seeded_state(&self, seed: u64) -> Self::SeededState {
foldhash::fast::SeedableRandomState::with_seed(
seed,
foldhash::SharedSeed::global_fixed(),
)
}
}

impl HashState for foldhash::quality::FixedState {
type SeededState = foldhash::quality::SeedableRandomState;

fn seeded_state(&self, seed: u64) -> Self::SeededState {
foldhash::quality::SeedableRandomState::with_seed(
seed,
foldhash::SharedSeed::global_fixed(),
)
}
}

#[cfg(not(feature = "force_hash_collisions"))]
use crate::cast::{
Expand Down Expand Up @@ -99,7 +145,7 @@ thread_local! {
/// ```
pub fn with_hashes<I, T, F, R>(
arrays: I,
random_state: &RandomState,
random_state: &impl HashState,
callback: F,
) -> Result<R>
where
Expand Down Expand Up @@ -141,7 +187,11 @@ where
}

#[cfg(not(feature = "force_hash_collisions"))]
fn hash_null(random_state: &RandomState, hashes_buffer: &'_ mut [u64], mul_col: bool) {
fn hash_null<S: HashState>(
random_state: &S,
hashes_buffer: &'_ mut [u64],
mul_col: bool,
) {
if mul_col {
hashes_buffer.iter_mut().for_each(|hash| {
// stable hash for null value
Expand All @@ -155,13 +205,13 @@ fn hash_null(random_state: &RandomState, hashes_buffer: &'_ mut [u64], mul_col:
}

pub trait HashValue {
fn hash_one(&self, state: &RandomState) -> u64;
fn hash_one<S: BuildHasher>(&self, state: &S) -> u64;
/// Write this value into an existing hasher (same data as `hash_one`).
fn hash_write(&self, hasher: &mut impl Hasher);
}

impl<T: HashValue + ?Sized> HashValue for &T {
fn hash_one(&self, state: &RandomState) -> u64 {
fn hash_one<S: BuildHasher>(&self, state: &S) -> u64 {
T::hash_one(self, state)
}
fn hash_write(&self, hasher: &mut impl Hasher) {
Expand All @@ -172,7 +222,7 @@ impl<T: HashValue + ?Sized> HashValue for &T {
macro_rules! hash_value {
($($t:ty),+) => {
$(impl HashValue for $t {
fn hash_one(&self, state: &RandomState) -> u64 {
fn hash_one<S: BuildHasher>(&self, state: &S) -> u64 {
state.hash_one(self)
}
fn hash_write(&self, hasher: &mut impl Hasher) {
Expand All @@ -187,7 +237,7 @@ hash_value!(bool, str, [u8], IntervalDayTime, IntervalMonthDayNano);
macro_rules! hash_float_value {
($(($t:ty, $i:ty)),+) => {
$(impl HashValue for $t {
fn hash_one(&self, state: &RandomState) -> u64 {
fn hash_one<S: BuildHasher>(&self, state: &S) -> u64 {
// +0.0 and -0.0 differ only in the sign bit but compare equal
// under IEEE 754; normalize -0.0 → +0.0 so Hash agrees with Eq.
let bits = <$i>::from_ne_bytes(self.to_ne_bytes());
Expand All @@ -204,25 +254,13 @@ macro_rules! hash_float_value {
}
hash_float_value!((half::f16, u16), (f32, u32), (f64, u64));

/// Create a `SeedableRandomState` whose per-hasher seed incorporates `seed`.
/// This folds the previous hash into the hasher's initial state so only the
/// new value needs to pass through the hash function — same cost as `hash_one`.
#[cfg(not(feature = "force_hash_collisions"))]
#[inline]
fn seeded_state(seed: u64) -> foldhash::fast::SeedableRandomState {
foldhash::fast::SeedableRandomState::with_seed(
seed,
foldhash::SharedSeed::global_fixed(),
)
}

/// Builds hash values of PrimitiveArray and writes them into `hashes_buffer`
/// If `rehash==true` this folds the existing hash into the hasher state
/// and hashes only the new value (avoiding a separate combine step).
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_array_primitive<T>(
array: &PrimitiveArray<T>,
random_state: &RandomState,
random_state: &impl HashState,
hashes_buffer: &mut [u64],
rehash: bool,
) where
Expand All @@ -237,7 +275,7 @@ fn hash_array_primitive<T>(
if array.null_count() == 0 {
if rehash {
for (hash, &value) in hashes_buffer.iter_mut().zip(array.values().iter()) {
let mut hasher = seeded_state(*hash).build_hasher();
let mut hasher = random_state.seeded_state(*hash).build_hasher();
value.hash_write(&mut hasher);
*hash = hasher.finish();
}
Expand All @@ -249,7 +287,7 @@ fn hash_array_primitive<T>(
} else if rehash {
for i in array.nulls().unwrap().valid_indices() {
let value = unsafe { array.value_unchecked(i) };
let mut hasher = seeded_state(hashes_buffer[i]).build_hasher();
let mut hasher = random_state.seeded_state(hashes_buffer[i]).build_hasher();
value.hash_write(&mut hasher);
hashes_buffer[i] = hasher.finish();
}
Expand All @@ -267,7 +305,7 @@ fn hash_array_primitive<T>(
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_array<T>(
array: &T,
random_state: &RandomState,
random_state: &impl HashState,
hashes_buffer: &mut [u64],
rehash: bool,
) where
Expand Down Expand Up @@ -322,7 +360,7 @@ fn hash_string_view_array_inner<
const REHASH: bool,
>(
array: &GenericByteViewArray<T>,
random_state: &RandomState,
random_state: &impl HashState,
hashes_buffer: &mut [u64],
) {
assert_eq!(
Expand Down Expand Up @@ -351,7 +389,7 @@ fn hash_string_view_array_inner<
// all views are inlined, no need to access external buffers
if !HAS_BUFFERS || view_len <= 12 {
if REHASH {
let mut hasher = seeded_state(*hash).build_hasher();
let mut hasher = random_state.seeded_state(*hash).build_hasher();
v.hash_write(&mut hasher);
*hash = hasher.finish();
} else {
Expand All @@ -362,7 +400,7 @@ fn hash_string_view_array_inner<
// view is not inlined, so we need to hash the bytes as well
let value = view_bytes(view_len, v);
if REHASH {
let mut hasher = seeded_state(*hash).build_hasher();
let mut hasher = random_state.seeded_state(*hash).build_hasher();
value.hash_write(&mut hasher);
*hash = hasher.finish();
} else {
Expand All @@ -377,7 +415,7 @@ fn hash_string_view_array_inner<
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_generic_byte_view_array<T: ByteViewType>(
array: &GenericByteViewArray<T>,
random_state: &RandomState,
random_state: &impl HashState,
hashes_buffer: &mut [u64],
rehash: bool,
) {
Expand All @@ -396,7 +434,7 @@ fn hash_generic_byte_view_array<T: ByteViewType>(
}
(false, false, true) => {
for (hash, &view) in hashes_buffer.iter_mut().zip(array.views().iter()) {
let mut hasher = seeded_state(*hash).build_hasher();
let mut hasher = random_state.seeded_state(*hash).build_hasher();
view.hash_write(&mut hasher);
*hash = hasher.finish();
}
Expand Down Expand Up @@ -449,7 +487,7 @@ fn hash_dictionary_inner<
const MULTI_COL: bool,
>(
array: &DictionaryArray<K>,
random_state: &RandomState,
random_state: &impl HashState,
hashes_buffer: &mut [u64],
) -> Result<()> {
// Hash each dictionary value once, and then use that computed
Expand Down Expand Up @@ -491,7 +529,7 @@ fn hash_dictionary_inner<
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_dictionary<K: ArrowDictionaryKeyType>(
array: &DictionaryArray<K>,
random_state: &RandomState,
random_state: &impl HashState,
hashes_buffer: &mut [u64],
multi_col: bool,
) -> Result<()> {
Expand Down Expand Up @@ -547,7 +585,7 @@ fn hash_dictionary<K: ArrowDictionaryKeyType>(
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_struct_array(
array: &StructArray,
random_state: &RandomState,
random_state: &impl HashState,
hashes_buffer: &mut [u64],
) -> Result<()> {
let nulls = array.nulls();
Expand Down Expand Up @@ -577,7 +615,7 @@ fn hash_struct_array(
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_map_array(
array: &MapArray,
random_state: &RandomState,
random_state: &impl HashState,
hashes_buffer: &mut [u64],
) -> Result<()> {
let nulls = array.nulls();
Expand Down Expand Up @@ -628,7 +666,7 @@ fn hash_map_array(
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_list_array<OffsetSize>(
array: &GenericListArray<OffsetSize>,
random_state: &RandomState,
random_state: &impl HashState,
hashes_buffer: &mut [u64],
) -> Result<()>
where
Expand Down Expand Up @@ -679,7 +717,7 @@ where
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_list_view_array<OffsetSize>(
array: &GenericListViewArray<OffsetSize>,
random_state: &RandomState,
random_state: &impl HashState,
hashes_buffer: &mut [u64],
) -> Result<()>
where
Expand Down Expand Up @@ -718,7 +756,7 @@ where
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_union_array(
array: &UnionArray,
random_state: &RandomState,
random_state: &impl HashState,
hashes_buffer: &mut [u64],
) -> Result<()> {
let DataType::Union(union_fields, _mode) = array.data_type() else {
Expand Down Expand Up @@ -750,7 +788,7 @@ fn hash_union_array(
fn hash_union_array_default(
array: &UnionArray,
union_fields: &UnionFields,
random_state: &RandomState,
random_state: &impl HashState,
hashes_buffer: &mut [u64],
) -> Result<()> {
let mut child_hashes: HashMap<i8, Vec<u64>> =
Expand Down Expand Up @@ -791,7 +829,7 @@ fn hash_union_array_default(
fn hash_sparse_union_array(
array: &UnionArray,
union_fields: &UnionFields,
random_state: &RandomState,
random_state: &impl HashState,
hashes_buffer: &mut [u64],
) -> Result<()> {
use std::collections::HashMap;
Expand Down Expand Up @@ -846,7 +884,7 @@ fn hash_sparse_union_array(
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_fixed_list_array(
array: &FixedSizeListArray,
random_state: &RandomState,
random_state: &impl HashState,
hashes_buffer: &mut [u64],
) -> Result<()> {
let values = array.values();
Expand Down Expand Up @@ -885,7 +923,7 @@ fn hash_run_array_inner<
const REHASH: bool,
>(
array: &RunArray<R>,
random_state: &RandomState,
random_state: &impl HashState,
hashes_buffer: &mut [u64],
) -> Result<()> {
// We find the relevant runs that cover potentially sliced arrays, so we can only hash those
Expand Down Expand Up @@ -952,7 +990,7 @@ fn hash_run_array_inner<
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_run_array<R: RunEndIndexType>(
array: &RunArray<R>,
random_state: &RandomState,
random_state: &impl HashState,
hashes_buffer: &mut [u64],
rehash: bool,
) -> Result<()> {
Expand All @@ -979,7 +1017,7 @@ fn hash_run_array<R: RunEndIndexType>(
#[cfg(not(feature = "force_hash_collisions"))]
fn hash_single_array(
array: &dyn Array,
random_state: &RandomState,
random_state: &impl HashState,
hashes_buffer: &mut [u64],
rehash: bool,
) -> Result<()> {
Expand Down Expand Up @@ -1052,7 +1090,7 @@ fn hash_single_array(
#[cfg(feature = "force_hash_collisions")]
fn hash_single_array(
_array: &dyn Array,
_random_state: &RandomState,
_random_state: &impl HashState,
hashes_buffer: &mut [u64],
_rehash: bool,
) -> Result<()> {
Expand Down Expand Up @@ -1105,7 +1143,7 @@ impl AsDynArray for &ArrayRef {
/// `hashes_buffer` should be pre-sized appropriately.
pub fn create_hashes<'a, I, T>(
arrays: I,
random_state: &RandomState,
random_state: &impl HashState,
hashes_buffer: &'a mut [u64],
) -> Result<&'a mut [u64]>
where
Expand Down Expand Up @@ -1814,6 +1852,30 @@ mod tests {
assert_eq!(hashes1, hashes2);
}

#[test]
fn test_create_hashes_with_quality_hash_state() {
let int_array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
let str_array: ArrayRef = Arc::new(StringArray::from(vec!["a", "b", "c", "d"]));
let quality_state = foldhash::quality::FixedState::with_seed(0);

let mut one_col_hashes = vec![0; int_array.len()];
create_hashes([&int_array], &quality_state, &mut one_col_hashes).unwrap();
let expected_hashes: Vec<_> = [1i32, 2, 3, 4]
.iter()
.map(|value| quality_state.hash_one(value))
.collect();
assert_eq!(one_col_hashes, expected_hashes);

let mut two_col_hashes = vec![0; int_array.len()];
create_hashes(
[&int_array, &str_array],
&quality_state,
&mut two_col_hashes,
)
.unwrap();
assert_ne!(two_col_hashes, one_col_hashes);
}

#[test]
fn test_with_hashes() {
let array: ArrayRef = Arc::new(Int32Array::from(vec![1, 2, 3, 4]));
Expand Down
1 change: 0 additions & 1 deletion datafusion/functions-aggregate/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ datafusion-functions-aggregate-common = { workspace = true }
datafusion-macros = { workspace = true }
datafusion-physical-expr = { workspace = true }
datafusion-physical-expr-common = { workspace = true }
foldhash = "0.2"
half = { workspace = true }
log = { workspace = true }
num-traits = { workspace = true }
Expand Down
Loading
Loading