diff --git a/datafusion/functions-aggregate-common/src/aggregate/count_distinct.rs b/datafusion/functions-aggregate-common/src/aggregate/count_distinct.rs index 25b40382299b..4f26d8683902 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/count_distinct.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/count_distinct.rs @@ -22,5 +22,9 @@ mod native; pub use bytes::BytesDistinctCountAccumulator; pub use bytes::BytesViewDistinctCountAccumulator; pub use dict::DictionaryCountAccumulator; +pub use native::Bitmap65536DistinctCountAccumulator; +pub use native::Bitmap65536DistinctCountAccumulatorI16; +pub use native::BoolArray256DistinctCountAccumulator; +pub use native::BoolArray256DistinctCountAccumulatorI8; pub use native::FloatDistinctCountAccumulator; pub use native::PrimitiveDistinctCountAccumulator; diff --git a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs index e506b4acb141..0fbc61c48f4a 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs @@ -165,3 +165,354 @@ impl Accumulator for FloatDistinctCountAccumulato size_of_val(self) + self.values.size() } } + +/// Optimized COUNT DISTINCT accumulator for u8 using a bool array. +/// Uses 256 bytes to track all possible u8 values. +#[derive(Debug)] +pub struct BoolArray256DistinctCountAccumulator { + seen: Box<[bool; 256]>, +} + +impl BoolArray256DistinctCountAccumulator { + pub fn new() -> Self { + Self { + seen: Box::new([false; 256]), + } + } + + #[inline] + fn count(&self) -> i64 { + self.seen.iter().filter(|&&b| b).count() as i64 + } +} + +impl Default for BoolArray256DistinctCountAccumulator { + fn default() -> Self { + Self::new() + } +} + +impl Accumulator for BoolArray256DistinctCountAccumulator { + fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> { + if values.is_empty() { + return Ok(()); + } + + let arr = as_primitive_array::(&values[0])?; + for value in arr.iter().flatten() { + self.seen[value as usize] = true; + } + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> { + if states.is_empty() { + return Ok(()); + } + + let arr = as_list_array(&states[0])?; + arr.iter().try_for_each(|maybe_list| { + if let Some(list) = maybe_list { + let list = as_primitive_array::(&list)?; + for value in list.values().iter() { + self.seen[*value as usize] = true; + } + }; + Ok(()) + }) + } + + fn state(&mut self) -> datafusion_common::Result> { + let values: Vec = self + .seen + .iter() + .enumerate() + .filter_map(|(idx, &seen)| if seen { Some(idx as u8) } else { None }) + .collect(); + + let arr = Arc::new( + PrimitiveArray::::from_iter_values(values), + ); + Ok(vec![ + SingleRowListArrayBuilder::new(arr).build_list_scalar(), + ]) + } + + fn evaluate(&mut self) -> datafusion_common::Result { + Ok(ScalarValue::Int64(Some(self.count()))) + } + + fn size(&self) -> usize { + size_of_val(self) + 256 + } +} + +/// Optimized COUNT DISTINCT accumulator for i8 using a bool array. +/// Uses 256 bytes to track all possible i8 values (mapped to 0..255). +#[derive(Debug)] +pub struct BoolArray256DistinctCountAccumulatorI8 { + seen: Box<[bool; 256]>, +} + +impl BoolArray256DistinctCountAccumulatorI8 { + pub fn new() -> Self { + Self { + seen: Box::new([false; 256]), + } + } + + #[inline] + fn count(&self) -> i64 { + self.seen.iter().filter(|&&b| b).count() as i64 + } +} + +impl Default for BoolArray256DistinctCountAccumulatorI8 { + fn default() -> Self { + Self::new() + } +} + +impl Accumulator for BoolArray256DistinctCountAccumulatorI8 { + fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> { + if values.is_empty() { + return Ok(()); + } + + let arr = as_primitive_array::(&values[0])?; + for value in arr.iter().flatten() { + self.seen[value as u8 as usize] = true; + } + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> { + if states.is_empty() { + return Ok(()); + } + + let arr = as_list_array(&states[0])?; + arr.iter().try_for_each(|maybe_list| { + if let Some(list) = maybe_list { + let list = as_primitive_array::(&list)?; + for value in list.values().iter() { + self.seen[*value as u8 as usize] = true; + } + }; + Ok(()) + }) + } + + fn state(&mut self) -> datafusion_common::Result> { + let values: Vec = self + .seen + .iter() + .enumerate() + .filter_map( + |(idx, &seen)| { + if seen { Some(idx as u8 as i8) } else { None } + }, + ) + .collect(); + + let arr = Arc::new( + PrimitiveArray::::from_iter_values(values), + ); + Ok(vec![ + SingleRowListArrayBuilder::new(arr).build_list_scalar(), + ]) + } + + fn evaluate(&mut self) -> datafusion_common::Result { + Ok(ScalarValue::Int64(Some(self.count()))) + } + + fn size(&self) -> usize { + size_of_val(self) + 256 + } +} + +/// Optimized COUNT DISTINCT accumulator for u16 using a 65536-bit bitmap. +/// Uses 8KB (1024 x u64) to track all possible u16 values. +#[derive(Debug)] +pub struct Bitmap65536DistinctCountAccumulator { + bitmap: Box<[u64; 1024]>, +} + +impl Bitmap65536DistinctCountAccumulator { + pub fn new() -> Self { + Self { + bitmap: Box::new([0; 1024]), + } + } + + #[inline] + fn set_bit(&mut self, value: u16) { + let word = (value / 64) as usize; + let bit = value % 64; + self.bitmap[word] |= 1u64 << bit; + } + + #[inline] + fn count(&self) -> i64 { + self.bitmap.iter().map(|w| w.count_ones() as i64).sum() + } +} + +impl Default for Bitmap65536DistinctCountAccumulator { + fn default() -> Self { + Self::new() + } +} + +impl Accumulator for Bitmap65536DistinctCountAccumulator { + fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> { + if values.is_empty() { + return Ok(()); + } + + let arr = as_primitive_array::(&values[0])?; + for value in arr.iter().flatten() { + self.set_bit(value); + } + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> { + if states.is_empty() { + return Ok(()); + } + + let arr = as_list_array(&states[0])?; + arr.iter().try_for_each(|maybe_list| { + if let Some(list) = maybe_list { + let list = as_primitive_array::(&list)?; + for value in list.values().iter() { + self.set_bit(*value); + } + }; + Ok(()) + }) + } + + fn state(&mut self) -> datafusion_common::Result> { + let mut values = Vec::new(); + for (word_idx, &word) in self.bitmap.iter().enumerate() { + if word != 0 { + for bit in 0..64 { + if (word & (1u64 << bit)) != 0 { + values.push((word_idx as u16) * 64 + bit); + } + } + } + } + + let arr = Arc::new( + PrimitiveArray::::from_iter_values(values), + ); + Ok(vec![ + SingleRowListArrayBuilder::new(arr).build_list_scalar(), + ]) + } + + fn evaluate(&mut self) -> datafusion_common::Result { + Ok(ScalarValue::Int64(Some(self.count()))) + } + + fn size(&self) -> usize { + size_of_val(self) + 8192 + } +} + +/// Optimized COUNT DISTINCT accumulator for i16 using a 65536-bit bitmap. +/// Uses 8KB (1024 x u64) to track all possible i16 values (mapped to 0..65535). +#[derive(Debug)] +pub struct Bitmap65536DistinctCountAccumulatorI16 { + bitmap: Box<[u64; 1024]>, +} + +impl Bitmap65536DistinctCountAccumulatorI16 { + pub fn new() -> Self { + Self { + bitmap: Box::new([0; 1024]), + } + } + + #[inline] + fn set_bit(&mut self, value: i16) { + let idx = value as u16; + let word = (idx / 64) as usize; + let bit = idx % 64; + self.bitmap[word] |= 1u64 << bit; + } + + #[inline] + fn count(&self) -> i64 { + self.bitmap.iter().map(|w| w.count_ones() as i64).sum() + } +} + +impl Default for Bitmap65536DistinctCountAccumulatorI16 { + fn default() -> Self { + Self::new() + } +} + +impl Accumulator for Bitmap65536DistinctCountAccumulatorI16 { + fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> { + if values.is_empty() { + return Ok(()); + } + + let arr = as_primitive_array::(&values[0])?; + for value in arr.iter().flatten() { + self.set_bit(value); + } + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> { + if states.is_empty() { + return Ok(()); + } + + let arr = as_list_array(&states[0])?; + arr.iter().try_for_each(|maybe_list| { + if let Some(list) = maybe_list { + let list = as_primitive_array::(&list)?; + for value in list.values().iter() { + self.set_bit(*value); + } + }; + Ok(()) + }) + } + + fn state(&mut self) -> datafusion_common::Result> { + let mut values = Vec::new(); + for (word_idx, &word) in self.bitmap.iter().enumerate() { + if word != 0 { + for bit in 0..64 { + if (word & (1u64 << bit)) != 0 { + values.push(((word_idx as u16) * 64 + bit) as i16); + } + } + } + } + + let arr = Arc::new( + PrimitiveArray::::from_iter_values(values), + ); + Ok(vec![ + SingleRowListArrayBuilder::new(arr).build_list_scalar(), + ]) + } + + fn evaluate(&mut self) -> datafusion_common::Result { + Ok(ScalarValue::Int64(Some(self.count()))) + } + + fn size(&self) -> usize { + size_of_val(self) + 8192 + } +} diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index 81b6eda3f9b1..3587badc763e 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -21,11 +21,11 @@ use arrow::{ compute, datatypes::{ DataType, Date32Type, Date64Type, Decimal128Type, Decimal256Type, Field, - FieldRef, Float16Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, - Int64Type, Time32MillisecondType, Time32SecondType, Time64MicrosecondType, + FieldRef, Float16Type, Float32Type, Float64Type, Int32Type, Int64Type, + Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, - UInt8Type, UInt16Type, UInt32Type, UInt64Type, + UInt32Type, UInt64Type, }, }; use datafusion_common::hash_utils::RandomState; @@ -42,6 +42,10 @@ use datafusion_expr::{ utils::format_state_name, }; use datafusion_functions_aggregate_common::aggregate::{ + count_distinct::Bitmap65536DistinctCountAccumulator, + count_distinct::Bitmap65536DistinctCountAccumulatorI16, + count_distinct::BoolArray256DistinctCountAccumulator, + count_distinct::BoolArray256DistinctCountAccumulatorI8, count_distinct::BytesDistinctCountAccumulator, count_distinct::BytesViewDistinctCountAccumulator, count_distinct::DictionaryCountAccumulator, @@ -170,25 +174,18 @@ impl Count { } fn get_count_accumulator(data_type: &DataType) -> Box { match data_type { - // try and use a specialized accumulator if possible, otherwise fall back to generic accumulator - DataType::Int8 => Box::new(PrimitiveDistinctCountAccumulator::::new( - data_type, - )), - DataType::Int16 => Box::new(PrimitiveDistinctCountAccumulator::::new( - data_type, - )), + // Use optimized bitmap/bool array accumulators for small integer types + DataType::UInt8 => Box::new(BoolArray256DistinctCountAccumulator::new()), + DataType::Int8 => Box::new(BoolArray256DistinctCountAccumulatorI8::new()), + DataType::UInt16 => Box::new(Bitmap65536DistinctCountAccumulator::new()), + DataType::Int16 => Box::new(Bitmap65536DistinctCountAccumulatorI16::new()), + // Use HashSet-based accumulator for larger integer types DataType::Int32 => Box::new(PrimitiveDistinctCountAccumulator::::new( data_type, )), DataType::Int64 => Box::new(PrimitiveDistinctCountAccumulator::::new( data_type, )), - DataType::UInt8 => Box::new(PrimitiveDistinctCountAccumulator::::new( - data_type, - )), - DataType::UInt16 => Box::new( - PrimitiveDistinctCountAccumulator::::new(data_type), - ), DataType::UInt32 => Box::new( PrimitiveDistinctCountAccumulator::::new(data_type), ),