From 8d49dfe7d82a14b5a44566209a8aeba63deb40e0 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Tue, 7 Apr 2026 22:28:35 -0700 Subject: [PATCH 01/10] bitmap_smaller_datatypes --- .../src/approx_distinct.rs | 363 +++++++++++++++++- 1 file changed, 358 insertions(+), 5 deletions(-) diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs b/datafusion/functions-aggregate/src/approx_distinct.rs index 40da98c3eb3a..6f590d9502ee 100644 --- a/datafusion/functions-aggregate/src/approx_distinct.rs +++ b/datafusion/functions-aggregate/src/approx_distinct.rs @@ -18,7 +18,7 @@ //! Defines physical expressions that can evaluated at runtime during query execution use crate::hyperloglog::{HLL_HASH_STATE, HyperLogLog}; -use arrow::array::{Array, BinaryArray, StringViewArray}; +use arrow::array::{Array, AsArray, BinaryArray, BooleanArray, StringViewArray}; use arrow::array::{ GenericBinaryArray, GenericStringArray, OffsetSizeTrait, PrimitiveArray, }; @@ -160,6 +160,358 @@ where } } +#[derive(Debug)] +struct BoolDistinctAccumulator { + seen_true: bool, + seen_false: bool, +} + +impl BoolDistinctAccumulator { + fn new() -> Self { + Self { + seen_true: false, + seen_false: false, + } + } +} + +impl Accumulator for BoolDistinctAccumulator { + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let array: &BooleanArray = downcast_value!(values[0], BooleanArray); + for value in array.iter().flatten() { + if value { + self.seen_true = true; + } else { + self.seen_false = true; + } + } + Ok(()) + } + + fn evaluate(&mut self) -> Result { + let count = (self.seen_true as u64) + (self.seen_false as u64); + Ok(ScalarValue::UInt64(Some(count))) + } + + fn size(&self) -> usize { + size_of::() + } + + fn state(&mut self) -> Result> { + // Pack into 1 byte: bit 0 = seen_false, bit 1 = seen_true + let packed = (self.seen_false as u8) | ((self.seen_true as u8) << 1); + Ok(vec![ScalarValue::Binary(Some(vec![packed]))]) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + let array = downcast_value!(states[0], BinaryArray); + for data in array.iter().flatten() { + if !data.is_empty() { + self.seen_false |= (data[0] & 1) != 0; + self.seen_true |= (data[0] & 2) != 0; + } + } + Ok(()) + } +} + +#[derive(Debug)] +struct Bitmap256Accumulator { + /// 256 bits = 4 x u64, tracks values 0-255 + bitmap: [u64; 4], +} + +impl Bitmap256Accumulator { + fn new() -> Self { + Self { bitmap: [0; 4] } + } + + #[inline] + fn set_bit(&mut self, value: u8) { + let word = (value / 64) as usize; + let bit = value % 64; + self.bitmap[word] |= 1u64 << bit; + } + + #[inline] + fn count(&self) -> u64 { + self.bitmap.iter().map(|w| w.count_ones() as u64).sum() + } + + fn merge(&mut self, other: &[u64; 4]) { + for i in 0..4 { + self.bitmap[i] |= other[i]; + } + } +} + +impl Accumulator for Bitmap256Accumulator { + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let array = values[0].as_primitive::(); + for value in array.iter().flatten() { + self.set_bit(value); + } + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + let array = downcast_value!(states[0], BinaryArray); + for data in array.iter().flatten() { + if data.len() == 32 { + // Convert &[u8] to [u64; 4] + let mut other = [0u64; 4]; + for i in 0..4 { + let offset = i * 8; + other[i] = + u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()); + } + self.merge(&other); + } + } + Ok(()) + } + + fn state(&mut self) -> Result> { + // Serialize [u64; 4] as 32 bytes + let mut bytes = Vec::with_capacity(32); + for word in &self.bitmap { + bytes.extend_from_slice(&word.to_le_bytes()); + } + Ok(vec![ScalarValue::Binary(Some(bytes))]) + } + + fn evaluate(&mut self) -> Result { + Ok(ScalarValue::UInt64(Some(self.count()))) + } + + fn size(&self) -> usize { + size_of::() + } +} + +#[derive(Debug)] +struct Bitmap256AccumulatorI8 { + bitmap: [u64; 4], +} + +impl Bitmap256AccumulatorI8 { + fn new() -> Self { + Self { bitmap: [0; 4] } + } + + #[inline] + fn set_bit(&mut self, value: i8) { + // Convert i8 to u8 by reinterpreting bits + let idx = value as u8; + let word = (idx / 64) as usize; + let bit = idx % 64; + self.bitmap[word] |= 1u64 << bit; + } + + #[inline] + fn count(&self) -> u64 { + self.bitmap.iter().map(|w| w.count_ones() as u64).sum() + } + + fn merge(&mut self, other: &[u64; 4]) { + for i in 0..4 { + self.bitmap[i] |= other[i]; + } + } +} + +impl Accumulator for Bitmap256AccumulatorI8 { + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let array = values[0].as_primitive::(); + for value in array.iter().flatten() { + self.set_bit(value); + } + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + let array = downcast_value!(states[0], BinaryArray); + for data in array.iter().flatten() { + if data.len() == 32 { + let mut other = [0u64; 4]; + for i in 0..4 { + let offset = i * 8; + other[i] = + u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()); + } + self.merge(&other); + } + } + Ok(()) + } + + fn state(&mut self) -> Result> { + let mut bytes = Vec::with_capacity(32); + for word in &self.bitmap { + bytes.extend_from_slice(&word.to_le_bytes()); + } + Ok(vec![ScalarValue::Binary(Some(bytes))]) + } + + fn evaluate(&mut self) -> Result { + Ok(ScalarValue::UInt64(Some(self.count()))) + } + + fn size(&self) -> usize { + size_of::() + } +} + +/// Accumulator for u16 distinct counting using a 65536-bit bitmap +#[derive(Debug)] +struct Bitmap65536Accumulator { + /// 65536 bits = 1024 x u64, tracks values 0-65535 + bitmap: Box<[u64; 1024]>, +} + +impl Bitmap65536Accumulator { + fn new() -> Self { + Self { + bitmap: Box::new([0; 1024]), + } + } + + #[inline] + fn set_bit(&mut self, value: u16) { + let word = (value / 64) as usize; + let bit = value % 64; + self.bitmap[word] |= 1u64 << bit; + } + + #[inline] + fn count(&self) -> u64 { + self.bitmap.iter().map(|w| w.count_ones() as u64).sum() + } + + fn merge(&mut self, other: &[u64; 1024]) { + for i in 0..1024 { + self.bitmap[i] |= other[i]; + } + } +} + +impl Accumulator for Bitmap65536Accumulator { + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let array = values[0].as_primitive::(); + for value in array.iter().flatten() { + self.set_bit(value); + } + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + let array = downcast_value!(states[0], BinaryArray); + for data in array.iter().flatten() { + if data.len() == 8192 { + let mut other = [0u64; 1024]; + for i in 0..1024 { + let offset = i * 8; + other[i] = + u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()); + } + self.merge(&other); + } + } + Ok(()) + } + + fn state(&mut self) -> Result> { + let mut bytes = Vec::with_capacity(8192); + for word in self.bitmap.iter() { + bytes.extend_from_slice(&word.to_le_bytes()); + } + Ok(vec![ScalarValue::Binary(Some(bytes))]) + } + + fn evaluate(&mut self) -> Result { + Ok(ScalarValue::UInt64(Some(self.count()))) + } + + fn size(&self) -> usize { + size_of::() + 8192 + } +} + +/// Accumulator for i16 distinct counting using a 65536-bit bitmap +#[derive(Debug)] +struct Bitmap65536AccumulatorI16 { + bitmap: Box<[u64; 1024]>, +} + +impl Bitmap65536AccumulatorI16 { + fn new() -> Self { + Self { + bitmap: Box::new([0; 1024]), + } + } + + #[inline] + fn set_bit(&mut self, value: i16) { + let idx = value as u16; + let word = (idx / 64) as usize; + let bit = idx % 64; + self.bitmap[word] |= 1u64 << bit; + } + + #[inline] + fn count(&self) -> u64 { + self.bitmap.iter().map(|w| w.count_ones() as u64).sum() + } + + fn merge(&mut self, other: &[u64; 1024]) { + for i in 0..1024 { + self.bitmap[i] |= other[i]; + } + } +} + +impl Accumulator for Bitmap65536AccumulatorI16 { + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let array = values[0].as_primitive::(); + for value in array.iter().flatten() { + self.set_bit(value); + } + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + let array = downcast_value!(states[0], BinaryArray); + for data in array.iter().flatten() { + if data.len() == 8192 { + let mut other = [0u64; 1024]; + for i in 0..1024 { + let offset = i * 8; + other[i] = + u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()); + } + self.merge(&other); + } + } + Ok(()) + } + + fn state(&mut self) -> Result> { + let mut bytes = Vec::with_capacity(8192); + for word in self.bitmap.iter() { + bytes.extend_from_slice(&word.to_le_bytes()); + } + Ok(vec![ScalarValue::Binary(Some(bytes))]) + } + + fn evaluate(&mut self) -> Result { + Ok(ScalarValue::UInt64(Some(self.count()))) + } + + fn size(&self) -> usize { + size_of::() + 8192 + } +} + macro_rules! default_accumulator_impl { () => { fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { @@ -344,12 +696,12 @@ impl AggregateUDFImpl for ApproxDistinct { // TODO u8, i8, u16, i16 shall really be done using bitmap, not HLL // TODO support for boolean (trivial case) // https://github.com/apache/datafusion/issues/1109 - DataType::UInt8 => Box::new(NumericHLLAccumulator::::new()), - DataType::UInt16 => Box::new(NumericHLLAccumulator::::new()), + DataType::UInt8 => Box::new(Bitmap256Accumulator::new()), + DataType::UInt16 => Box::new(Bitmap65536Accumulator::new()), DataType::UInt32 => Box::new(NumericHLLAccumulator::::new()), DataType::UInt64 => Box::new(NumericHLLAccumulator::::new()), - DataType::Int8 => Box::new(NumericHLLAccumulator::::new()), - DataType::Int16 => Box::new(NumericHLLAccumulator::::new()), + DataType::Int8 => Box::new(Bitmap256AccumulatorI8::new()), + DataType::Int16 => Box::new(Bitmap65536AccumulatorI16::new()), DataType::Int32 => Box::new(NumericHLLAccumulator::::new()), DataType::Int64 => Box::new(NumericHLLAccumulator::::new()), DataType::Date32 => Box::new(NumericHLLAccumulator::::new()), @@ -383,6 +735,7 @@ impl AggregateUDFImpl for ApproxDistinct { DataType::Utf8View => Box::new(StringViewHLLAccumulator::new()), DataType::Binary => Box::new(BinaryHLLAccumulator::::new()), DataType::LargeBinary => Box::new(BinaryHLLAccumulator::::new()), + DataType::Boolean => Box::new(BoolDistinctAccumulator::new()), DataType::Null => { Box::new(NoopAccumulator::new(ScalarValue::UInt64(Some(0)))) } From 0b179ff8054f17b94c26b1533381980fb6a14f48 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Tue, 7 Apr 2026 22:43:29 -0700 Subject: [PATCH 02/10] bitmap_smaller_datatypes --- .../benches/approx_distinct.rs | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/datafusion/functions-aggregate/benches/approx_distinct.rs b/datafusion/functions-aggregate/benches/approx_distinct.rs index cc85c2163c18..61650aaa2e3f 100644 --- a/datafusion/functions-aggregate/benches/approx_distinct.rs +++ b/datafusion/functions-aggregate/benches/approx_distinct.rs @@ -18,8 +18,8 @@ use std::sync::Arc; use arrow::array::{ - ArrayRef, Int8Array, Int16Array, Int64Array, StringArray, StringViewArray, - UInt8Array, UInt16Array, + ArrayRef, BooleanArray, Int8Array, Int16Array, Int64Array, StringArray, + StringViewArray, UInt8Array, UInt16Array, }; use arrow::datatypes::{DataType, Field, Schema}; use criterion::{Criterion, criterion_group, criterion_main}; @@ -91,6 +91,13 @@ fn create_i16_array(n_distinct: usize) -> Int16Array { .collect() } +fn create_bool_array() -> BooleanArray { + let mut rng = StdRng::seed_from_u64(42); + (0..BATCH_SIZE) + .map(|_| Some(rng.random_bool(0.5))) + .collect() +} + /// Creates a pool of `n_distinct` random strings of the given length. fn create_string_pool(n_distinct: usize, string_length: usize) -> Vec { let mut rng = StdRng::seed_from_u64(42); @@ -169,7 +176,7 @@ fn approx_distinct_benchmark(c: &mut Criterion) { } } - // Small integer types + // --- Bitmap type benchmarks (our optimization) --- // UInt8 let values = Arc::new(create_u8_array(200)) as ArrayRef; @@ -214,6 +221,17 @@ fn approx_distinct_benchmark(c: &mut Criterion) { .unwrap() }) }); + + // Boolean + let values = Arc::new(create_bool_array()) as ArrayRef; + c.bench_function("approx_distinct bool bitmap", |b| { + b.iter(|| { + let mut accumulator = prepare_accumulator(DataType::Boolean); + accumulator + .update_batch(std::slice::from_ref(&values)) + .unwrap() + }) + }); } criterion_group!(benches, approx_distinct_benchmark); From c6095ab7b584d2a563329a17b07377fc5ddcb6e3 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Tue, 7 Apr 2026 23:35:25 -0700 Subject: [PATCH 03/10] bitmap_instead_of_hll_smaller_datatypes --- .../benches/approx_distinct.rs | 20 +++++++++---------- .../src/approx_distinct.rs | 8 ++++---- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/datafusion/functions-aggregate/benches/approx_distinct.rs b/datafusion/functions-aggregate/benches/approx_distinct.rs index 61650aaa2e3f..d73794c4fd6c 100644 --- a/datafusion/functions-aggregate/benches/approx_distinct.rs +++ b/datafusion/functions-aggregate/benches/approx_distinct.rs @@ -222,16 +222,16 @@ fn approx_distinct_benchmark(c: &mut Criterion) { }) }); - // Boolean - let values = Arc::new(create_bool_array()) as ArrayRef; - c.bench_function("approx_distinct bool bitmap", |b| { - b.iter(|| { - let mut accumulator = prepare_accumulator(DataType::Boolean); - accumulator - .update_batch(std::slice::from_ref(&values)) - .unwrap() - }) - }); + // // Boolean - commented out for main comparison (not supported on main) + // let values = Arc::new(create_bool_array()) as ArrayRef; + // c.bench_function("approx_distinct bool bitmap", |b| { + // b.iter(|| { + // let mut accumulator = prepare_accumulator(DataType::Boolean); + // accumulator + // .update_batch(std::slice::from_ref(&values)) + // .unwrap() + // }) + // }); } criterion_group!(benches, approx_distinct_benchmark); diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs b/datafusion/functions-aggregate/src/approx_distinct.rs index 6f590d9502ee..04b27cc24796 100644 --- a/datafusion/functions-aggregate/src/approx_distinct.rs +++ b/datafusion/functions-aggregate/src/approx_distinct.rs @@ -228,8 +228,8 @@ impl Bitmap256Accumulator { #[inline] fn set_bit(&mut self, value: u8) { - let word = (value / 64) as usize; - let bit = value % 64; + let word = (value >> 6) as usize; + let bit = value & 63; self.bitmap[word] |= 1u64 << bit; } @@ -303,8 +303,8 @@ impl Bitmap256AccumulatorI8 { fn set_bit(&mut self, value: i8) { // Convert i8 to u8 by reinterpreting bits let idx = value as u8; - let word = (idx / 64) as usize; - let bit = idx % 64; + let word = (idx >> 6) as usize; + let bit = idx & 63; self.bitmap[word] |= 1u64 << bit; } From 9d06408eb8cc4f4f7478e4cf318d79f5f7c9a7e6 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Wed, 8 Apr 2026 00:00:42 -0700 Subject: [PATCH 04/10] bitmap_instead_of_hll_smaller_datatypes --- .../src/approx_distinct.rs | 171 ++---------------- 1 file changed, 11 insertions(+), 160 deletions(-) diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs b/datafusion/functions-aggregate/src/approx_distinct.rs index 04b27cc24796..5858f295c0a9 100644 --- a/datafusion/functions-aggregate/src/approx_distinct.rs +++ b/datafusion/functions-aggregate/src/approx_distinct.rs @@ -215,153 +215,6 @@ impl Accumulator for BoolDistinctAccumulator { } } -#[derive(Debug)] -struct Bitmap256Accumulator { - /// 256 bits = 4 x u64, tracks values 0-255 - bitmap: [u64; 4], -} - -impl Bitmap256Accumulator { - fn new() -> Self { - Self { bitmap: [0; 4] } - } - - #[inline] - fn set_bit(&mut self, value: u8) { - let word = (value >> 6) as usize; - let bit = value & 63; - self.bitmap[word] |= 1u64 << bit; - } - - #[inline] - fn count(&self) -> u64 { - self.bitmap.iter().map(|w| w.count_ones() as u64).sum() - } - - fn merge(&mut self, other: &[u64; 4]) { - for i in 0..4 { - self.bitmap[i] |= other[i]; - } - } -} - -impl Accumulator for Bitmap256Accumulator { - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let array = values[0].as_primitive::(); - for value in array.iter().flatten() { - self.set_bit(value); - } - Ok(()) - } - - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - let array = downcast_value!(states[0], BinaryArray); - for data in array.iter().flatten() { - if data.len() == 32 { - // Convert &[u8] to [u64; 4] - let mut other = [0u64; 4]; - for i in 0..4 { - let offset = i * 8; - other[i] = - u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()); - } - self.merge(&other); - } - } - Ok(()) - } - - fn state(&mut self) -> Result> { - // Serialize [u64; 4] as 32 bytes - let mut bytes = Vec::with_capacity(32); - for word in &self.bitmap { - bytes.extend_from_slice(&word.to_le_bytes()); - } - Ok(vec![ScalarValue::Binary(Some(bytes))]) - } - - fn evaluate(&mut self) -> Result { - Ok(ScalarValue::UInt64(Some(self.count()))) - } - - fn size(&self) -> usize { - size_of::() - } -} - -#[derive(Debug)] -struct Bitmap256AccumulatorI8 { - bitmap: [u64; 4], -} - -impl Bitmap256AccumulatorI8 { - fn new() -> Self { - Self { bitmap: [0; 4] } - } - - #[inline] - fn set_bit(&mut self, value: i8) { - // Convert i8 to u8 by reinterpreting bits - let idx = value as u8; - let word = (idx >> 6) as usize; - let bit = idx & 63; - self.bitmap[word] |= 1u64 << bit; - } - - #[inline] - fn count(&self) -> u64 { - self.bitmap.iter().map(|w| w.count_ones() as u64).sum() - } - - fn merge(&mut self, other: &[u64; 4]) { - for i in 0..4 { - self.bitmap[i] |= other[i]; - } - } -} - -impl Accumulator for Bitmap256AccumulatorI8 { - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let array = values[0].as_primitive::(); - for value in array.iter().flatten() { - self.set_bit(value); - } - Ok(()) - } - - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - let array = downcast_value!(states[0], BinaryArray); - for data in array.iter().flatten() { - if data.len() == 32 { - let mut other = [0u64; 4]; - for i in 0..4 { - let offset = i * 8; - other[i] = - u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()); - } - self.merge(&other); - } - } - Ok(()) - } - - fn state(&mut self) -> Result> { - let mut bytes = Vec::with_capacity(32); - for word in &self.bitmap { - bytes.extend_from_slice(&word.to_le_bytes()); - } - Ok(vec![ScalarValue::Binary(Some(bytes))]) - } - - fn evaluate(&mut self) -> Result { - Ok(ScalarValue::UInt64(Some(self.count()))) - } - - fn size(&self) -> usize { - size_of::() - } -} - /// Accumulator for u16 distinct counting using a 65536-bit bitmap #[derive(Debug)] struct Bitmap65536Accumulator { @@ -389,8 +242,8 @@ impl Bitmap65536Accumulator { } fn merge(&mut self, other: &[u64; 1024]) { - for i in 0..1024 { - self.bitmap[i] |= other[i]; + for (dst, src) in self.bitmap.iter_mut().zip(other.iter()) { + *dst |= src; } } } @@ -409,9 +262,9 @@ impl Accumulator for Bitmap65536Accumulator { for data in array.iter().flatten() { if data.len() == 8192 { let mut other = [0u64; 1024]; - for i in 0..1024 { + for (i, word) in other.iter_mut().enumerate() { let offset = i * 8; - other[i] = + *word = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()); } self.merge(&other); @@ -464,8 +317,8 @@ impl Bitmap65536AccumulatorI16 { } fn merge(&mut self, other: &[u64; 1024]) { - for i in 0..1024 { - self.bitmap[i] |= other[i]; + for (dst, src) in self.bitmap.iter_mut().zip(other.iter()) { + *dst |= src; } } } @@ -484,9 +337,9 @@ impl Accumulator for Bitmap65536AccumulatorI16 { for data in array.iter().flatten() { if data.len() == 8192 { let mut other = [0u64; 1024]; - for i in 0..1024 { + for (i, word) in other.iter_mut().enumerate() { let offset = i * 8; - other[i] = + *word = u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()); } self.merge(&other); @@ -693,14 +546,12 @@ impl AggregateUDFImpl for ApproxDistinct { let data_type = acc_args.expr_fields[0].data_type(); let accumulator: Box = match data_type { - // TODO u8, i8, u16, i16 shall really be done using bitmap, not HLL - // TODO support for boolean (trivial case) - // https://github.com/apache/datafusion/issues/1109 - DataType::UInt8 => Box::new(Bitmap256Accumulator::new()), + // Benchmarked HLL to be faster than bitmap for u8/i8 + DataType::UInt8 => Box::new(NumericHLLAccumulator::::new()), DataType::UInt16 => Box::new(Bitmap65536Accumulator::new()), DataType::UInt32 => Box::new(NumericHLLAccumulator::::new()), DataType::UInt64 => Box::new(NumericHLLAccumulator::::new()), - DataType::Int8 => Box::new(Bitmap256AccumulatorI8::new()), + DataType::Int8 => Box::new(NumericHLLAccumulator::::new()), DataType::Int16 => Box::new(Bitmap65536AccumulatorI16::new()), DataType::Int32 => Box::new(NumericHLLAccumulator::::new()), DataType::Int64 => Box::new(NumericHLLAccumulator::::new()), From f185fdcc93d6966e127c51bac616c1c8b3bd2158 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Wed, 8 Apr 2026 00:49:34 -0700 Subject: [PATCH 05/10] bitmap_instead_of_hll_smaller_datatypes --- .../benches/approx_distinct.rs | 22 +--- .../src/approx_distinct.rs | 118 +++++++++++++++++- 2 files changed, 117 insertions(+), 23 deletions(-) diff --git a/datafusion/functions-aggregate/benches/approx_distinct.rs b/datafusion/functions-aggregate/benches/approx_distinct.rs index d73794c4fd6c..25235f87fe03 100644 --- a/datafusion/functions-aggregate/benches/approx_distinct.rs +++ b/datafusion/functions-aggregate/benches/approx_distinct.rs @@ -18,8 +18,8 @@ use std::sync::Arc; use arrow::array::{ - ArrayRef, BooleanArray, Int8Array, Int16Array, Int64Array, StringArray, - StringViewArray, UInt8Array, UInt16Array, + ArrayRef, Int8Array, Int16Array, Int64Array, StringArray, StringViewArray, + UInt8Array, UInt16Array, }; use arrow::datatypes::{DataType, Field, Schema}; use criterion::{Criterion, criterion_group, criterion_main}; @@ -91,13 +91,6 @@ fn create_i16_array(n_distinct: usize) -> Int16Array { .collect() } -fn create_bool_array() -> BooleanArray { - let mut rng = StdRng::seed_from_u64(42); - (0..BATCH_SIZE) - .map(|_| Some(rng.random_bool(0.5))) - .collect() -} - /// Creates a pool of `n_distinct` random strings of the given length. fn create_string_pool(n_distinct: usize, string_length: usize) -> Vec { let mut rng = StdRng::seed_from_u64(42); @@ -221,17 +214,6 @@ fn approx_distinct_benchmark(c: &mut Criterion) { .unwrap() }) }); - - // // Boolean - commented out for main comparison (not supported on main) - // let values = Arc::new(create_bool_array()) as ArrayRef; - // c.bench_function("approx_distinct bool bitmap", |b| { - // b.iter(|| { - // let mut accumulator = prepare_accumulator(DataType::Boolean); - // accumulator - // .update_batch(std::slice::from_ref(&values)) - // .unwrap() - // }) - // }); } criterion_group!(benches, approx_distinct_benchmark); diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs b/datafusion/functions-aggregate/src/approx_distinct.rs index 5858f295c0a9..428f72e3417a 100644 --- a/datafusion/functions-aggregate/src/approx_distinct.rs +++ b/datafusion/functions-aggregate/src/approx_distinct.rs @@ -215,6 +215,118 @@ impl Accumulator for BoolDistinctAccumulator { } } +/// Accumulator for u8 distinct counting using a bool array +#[derive(Debug)] +struct BoolArray256Accumulator { + seen: Box<[bool; 256]>, +} + +impl BoolArray256Accumulator { + fn new() -> Self { + Self { + seen: Box::new([false; 256]), + } + } + + #[inline] + fn count(&self) -> u64 { + self.seen.iter().filter(|&&b| b).count() as u64 + } +} + +impl Accumulator for BoolArray256Accumulator { + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let array = values[0].as_primitive::(); + for value in array.iter().flatten() { + self.seen[value as usize] = true; + } + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + let array = downcast_value!(states[0], BinaryArray); + for data in array.iter().flatten() { + if data.len() == 256 { + for (i, &b) in data.iter().enumerate() { + if b != 0 { + self.seen[i] = true; + } + } + } + } + Ok(()) + } + + fn state(&mut self) -> Result> { + let bytes: Vec = self.seen.iter().map(|&b| b as u8).collect(); + Ok(vec![ScalarValue::Binary(Some(bytes))]) + } + + fn evaluate(&mut self) -> Result { + Ok(ScalarValue::UInt64(Some(self.count()))) + } + + fn size(&self) -> usize { + size_of::() + 256 + } +} + +/// Accumulator for i8 distinct counting using a bool array +#[derive(Debug)] +struct BoolArray256AccumulatorI8 { + seen: Box<[bool; 256]>, +} + +impl BoolArray256AccumulatorI8 { + fn new() -> Self { + Self { + seen: Box::new([false; 256]), + } + } + + #[inline] + fn count(&self) -> u64 { + self.seen.iter().filter(|&&b| b).count() as u64 + } +} + +impl Accumulator for BoolArray256AccumulatorI8 { + fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { + let array = values[0].as_primitive::(); + for value in array.iter().flatten() { + self.seen[value as u8 as usize] = true; + } + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { + let array = downcast_value!(states[0], BinaryArray); + for data in array.iter().flatten() { + if data.len() == 256 { + for (i, &b) in data.iter().enumerate() { + if b != 0 { + self.seen[i] = true; + } + } + } + } + Ok(()) + } + + fn state(&mut self) -> Result> { + let bytes: Vec = self.seen.iter().map(|&b| b as u8).collect(); + Ok(vec![ScalarValue::Binary(Some(bytes))]) + } + + fn evaluate(&mut self) -> Result { + Ok(ScalarValue::UInt64(Some(self.count()))) + } + + fn size(&self) -> usize { + size_of::() + 256 + } +} + /// Accumulator for u16 distinct counting using a 65536-bit bitmap #[derive(Debug)] struct Bitmap65536Accumulator { @@ -546,12 +658,12 @@ impl AggregateUDFImpl for ApproxDistinct { let data_type = acc_args.expr_fields[0].data_type(); let accumulator: Box = match data_type { - // Benchmarked HLL to be faster than bitmap for u8/i8 - DataType::UInt8 => Box::new(NumericHLLAccumulator::::new()), + // Testing bool array for u8 + DataType::UInt8 => Box::new(BoolArray256Accumulator::new()), DataType::UInt16 => Box::new(Bitmap65536Accumulator::new()), DataType::UInt32 => Box::new(NumericHLLAccumulator::::new()), DataType::UInt64 => Box::new(NumericHLLAccumulator::::new()), - DataType::Int8 => Box::new(NumericHLLAccumulator::::new()), + DataType::Int8 => Box::new(BoolArray256AccumulatorI8::new()), DataType::Int16 => Box::new(Bitmap65536AccumulatorI16::new()), DataType::Int32 => Box::new(NumericHLLAccumulator::::new()), DataType::Int64 => Box::new(NumericHLLAccumulator::::new()), From f7c487a10711459a5f5398f8600820850f482374 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Wed, 8 Apr 2026 01:15:58 -0700 Subject: [PATCH 06/10] bitmap_instead_smaller_datatypes --- .../src/aggregate/count_distinct.rs | 4 + .../src/aggregate/count_distinct/native.rs | 353 ++++++++++++++++++ datafusion/functions-aggregate/Cargo.toml | 4 - .../benches/count_distinct.rs | 16 +- .../src/approx_distinct.rs | 2 +- datafusion/functions-aggregate/src/count.rs | 27 +- 6 files changed, 377 insertions(+), 29 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/count_distinct.rs b/datafusion/functions-aggregate-common/src/aggregate/count_distinct.rs index 25b40382299b..4f26d8683902 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/count_distinct.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/count_distinct.rs @@ -22,5 +22,9 @@ mod native; pub use bytes::BytesDistinctCountAccumulator; pub use bytes::BytesViewDistinctCountAccumulator; pub use dict::DictionaryCountAccumulator; +pub use native::Bitmap65536DistinctCountAccumulator; +pub use native::Bitmap65536DistinctCountAccumulatorI16; +pub use native::BoolArray256DistinctCountAccumulator; +pub use native::BoolArray256DistinctCountAccumulatorI8; pub use native::FloatDistinctCountAccumulator; pub use native::PrimitiveDistinctCountAccumulator; diff --git a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs index e506b4acb141..4667ff9a25fc 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs @@ -165,3 +165,356 @@ impl Accumulator for FloatDistinctCountAccumulato size_of_val(self) + self.values.size() } } + +/// Optimized COUNT DISTINCT accumulator for u8 using a bool array. +/// Uses 256 bytes to track all possible u8 values. +#[derive(Debug)] +pub struct BoolArray256DistinctCountAccumulator { + seen: Box<[bool; 256]>, +} + +impl BoolArray256DistinctCountAccumulator { + pub fn new() -> Self { + Self { + seen: Box::new([false; 256]), + } + } + + #[inline] + fn count(&self) -> i64 { + self.seen.iter().filter(|&&b| b).count() as i64 + } +} + +impl Default for BoolArray256DistinctCountAccumulator { + fn default() -> Self { + Self::new() + } +} + +impl Accumulator for BoolArray256DistinctCountAccumulator { + fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> { + if values.is_empty() { + return Ok(()); + } + + let arr = as_primitive_array::(&values[0])?; + for value in arr.iter().flatten() { + self.seen[value as usize] = true; + } + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> { + if states.is_empty() { + return Ok(()); + } + + let arr = as_list_array(&states[0])?; + arr.iter().try_for_each(|maybe_list| { + if let Some(list) = maybe_list { + let list = as_primitive_array::(&list)?; + for value in list.values().iter() { + self.seen[*value as usize] = true; + } + }; + Ok(()) + }) + } + + fn state(&mut self) -> datafusion_common::Result> { + let values: Vec = self + .seen + .iter() + .enumerate() + .filter_map(|(idx, &seen)| if seen { Some(idx as u8) } else { None }) + .collect(); + + let arr = Arc::new(PrimitiveArray::::from_iter_values( + values, + )); + Ok(vec![ + SingleRowListArrayBuilder::new(arr).build_list_scalar(), + ]) + } + + fn evaluate(&mut self) -> datafusion_common::Result { + Ok(ScalarValue::Int64(Some(self.count()))) + } + + fn size(&self) -> usize { + size_of_val(self) + 256 + } +} + +/// Optimized COUNT DISTINCT accumulator for i8 using a bool array. +/// Uses 256 bytes to track all possible i8 values (mapped to 0..255). +#[derive(Debug)] +pub struct BoolArray256DistinctCountAccumulatorI8 { + seen: Box<[bool; 256]>, +} + +impl BoolArray256DistinctCountAccumulatorI8 { + pub fn new() -> Self { + Self { + seen: Box::new([false; 256]), + } + } + + #[inline] + fn count(&self) -> i64 { + self.seen.iter().filter(|&&b| b).count() as i64 + } +} + +impl Default for BoolArray256DistinctCountAccumulatorI8 { + fn default() -> Self { + Self::new() + } +} + +impl Accumulator for BoolArray256DistinctCountAccumulatorI8 { + fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> { + if values.is_empty() { + return Ok(()); + } + + let arr = as_primitive_array::(&values[0])?; + for value in arr.iter().flatten() { + self.seen[value as u8 as usize] = true; + } + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> { + if states.is_empty() { + return Ok(()); + } + + let arr = as_list_array(&states[0])?; + arr.iter().try_for_each(|maybe_list| { + if let Some(list) = maybe_list { + let list = as_primitive_array::(&list)?; + for value in list.values().iter() { + self.seen[*value as u8 as usize] = true; + } + }; + Ok(()) + }) + } + + fn state(&mut self) -> datafusion_common::Result> { + let values: Vec = self + .seen + .iter() + .enumerate() + .filter_map(|(idx, &seen)| { + if seen { + Some(idx as u8 as i8) + } else { + None + } + }) + .collect(); + + let arr = Arc::new(PrimitiveArray::::from_iter_values( + values, + )); + Ok(vec![ + SingleRowListArrayBuilder::new(arr).build_list_scalar(), + ]) + } + + fn evaluate(&mut self) -> datafusion_common::Result { + Ok(ScalarValue::Int64(Some(self.count()))) + } + + fn size(&self) -> usize { + size_of_val(self) + 256 + } +} + +/// Optimized COUNT DISTINCT accumulator for u16 using a 65536-bit bitmap. +/// Uses 8KB (1024 x u64) to track all possible u16 values. +#[derive(Debug)] +pub struct Bitmap65536DistinctCountAccumulator { + bitmap: Box<[u64; 1024]>, +} + +impl Bitmap65536DistinctCountAccumulator { + pub fn new() -> Self { + Self { + bitmap: Box::new([0; 1024]), + } + } + + #[inline] + fn set_bit(&mut self, value: u16) { + let word = (value / 64) as usize; + let bit = value % 64; + self.bitmap[word] |= 1u64 << bit; + } + + #[inline] + fn count(&self) -> i64 { + self.bitmap.iter().map(|w| w.count_ones() as i64).sum() + } +} + +impl Default for Bitmap65536DistinctCountAccumulator { + fn default() -> Self { + Self::new() + } +} + +impl Accumulator for Bitmap65536DistinctCountAccumulator { + fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> { + if values.is_empty() { + return Ok(()); + } + + let arr = as_primitive_array::(&values[0])?; + for value in arr.iter().flatten() { + self.set_bit(value); + } + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> { + if states.is_empty() { + return Ok(()); + } + + let arr = as_list_array(&states[0])?; + arr.iter().try_for_each(|maybe_list| { + if let Some(list) = maybe_list { + let list = as_primitive_array::(&list)?; + for value in list.values().iter() { + self.set_bit(*value); + } + }; + Ok(()) + }) + } + + fn state(&mut self) -> datafusion_common::Result> { + let mut values = Vec::new(); + for (word_idx, &word) in self.bitmap.iter().enumerate() { + if word != 0 { + for bit in 0..64 { + if (word & (1u64 << bit)) != 0 { + values.push((word_idx as u16) * 64 + bit); + } + } + } + } + + let arr = Arc::new(PrimitiveArray::::from_iter_values( + values, + )); + Ok(vec![ + SingleRowListArrayBuilder::new(arr).build_list_scalar(), + ]) + } + + fn evaluate(&mut self) -> datafusion_common::Result { + Ok(ScalarValue::Int64(Some(self.count()))) + } + + fn size(&self) -> usize { + size_of_val(self) + 8192 + } +} + +/// Optimized COUNT DISTINCT accumulator for i16 using a 65536-bit bitmap. +/// Uses 8KB (1024 x u64) to track all possible i16 values (mapped to 0..65535). +#[derive(Debug)] +pub struct Bitmap65536DistinctCountAccumulatorI16 { + bitmap: Box<[u64; 1024]>, +} + +impl Bitmap65536DistinctCountAccumulatorI16 { + pub fn new() -> Self { + Self { + bitmap: Box::new([0; 1024]), + } + } + + #[inline] + fn set_bit(&mut self, value: i16) { + let idx = value as u16; + let word = (idx / 64) as usize; + let bit = idx % 64; + self.bitmap[word] |= 1u64 << bit; + } + + #[inline] + fn count(&self) -> i64 { + self.bitmap.iter().map(|w| w.count_ones() as i64).sum() + } +} + +impl Default for Bitmap65536DistinctCountAccumulatorI16 { + fn default() -> Self { + Self::new() + } +} + +impl Accumulator for Bitmap65536DistinctCountAccumulatorI16 { + fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> { + if values.is_empty() { + return Ok(()); + } + + let arr = as_primitive_array::(&values[0])?; + for value in arr.iter().flatten() { + self.set_bit(value); + } + Ok(()) + } + + fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> { + if states.is_empty() { + return Ok(()); + } + + let arr = as_list_array(&states[0])?; + arr.iter().try_for_each(|maybe_list| { + if let Some(list) = maybe_list { + let list = as_primitive_array::(&list)?; + for value in list.values().iter() { + self.set_bit(*value); + } + }; + Ok(()) + }) + } + + fn state(&mut self) -> datafusion_common::Result> { + let mut values = Vec::new(); + for (word_idx, &word) in self.bitmap.iter().enumerate() { + if word != 0 { + for bit in 0..64 { + if (word & (1u64 << bit)) != 0 { + values.push(((word_idx as u16) * 64 + bit) as i16); + } + } + } + } + + let arr = Arc::new(PrimitiveArray::::from_iter_values( + values, + )); + Ok(vec![ + SingleRowListArrayBuilder::new(arr).build_list_scalar(), + ]) + } + + fn evaluate(&mut self) -> datafusion_common::Result { + Ok(ScalarValue::Int64(Some(self.count()))) + } + + fn size(&self) -> usize { + size_of_val(self) + 8192 + } +} diff --git a/datafusion/functions-aggregate/Cargo.toml b/datafusion/functions-aggregate/Cargo.toml index 406f0a0e32cc..595687cd4276 100644 --- a/datafusion/functions-aggregate/Cargo.toml +++ b/datafusion/functions-aggregate/Cargo.toml @@ -80,10 +80,6 @@ name = "min_max_bytes" name = "approx_distinct" harness = false -[[bench]] -name = "first_last" -harness = false - [[bench]] name = "count_distinct" harness = false diff --git a/datafusion/functions-aggregate/benches/count_distinct.rs b/datafusion/functions-aggregate/benches/count_distinct.rs index e742a3e5c126..0bc8a31f0bff 100644 --- a/datafusion/functions-aggregate/benches/count_distinct.rs +++ b/datafusion/functions-aggregate/benches/count_distinct.rs @@ -17,9 +17,7 @@ use std::sync::Arc; -use arrow::array::{ - ArrayRef, Int8Array, Int16Array, Int64Array, UInt8Array, UInt16Array, -}; +use arrow::array::{ArrayRef, Int16Array, Int64Array, Int8Array, UInt16Array, UInt8Array}; use arrow::datatypes::{DataType, Field, Schema}; use criterion::{Criterion, criterion_group, criterion_main}; use datafusion_expr::function::AccumulatorArgs; @@ -91,7 +89,7 @@ fn count_distinct_benchmark(c: &mut Criterion) { for pct in [80, 99] { let n_distinct = BATCH_SIZE * pct / 100; - // Int64 + // --- Int64 benchmarks (HashSet-based) --- let values = Arc::new(create_i64_array(n_distinct)) as ArrayRef; c.bench_function(&format!("count_distinct i64 {pct}% distinct"), |b| { b.iter(|| { @@ -103,9 +101,9 @@ fn count_distinct_benchmark(c: &mut Criterion) { }); } - // Small integer types + // --- Small integer type benchmarks (our optimization) --- - // UInt8 + // UInt8 - bool array based let values = Arc::new(create_u8_array(200)) as ArrayRef; c.bench_function("count_distinct u8 bitmap", |b| { b.iter(|| { @@ -116,7 +114,7 @@ fn count_distinct_benchmark(c: &mut Criterion) { }) }); - // Int8 + // Int8 - bool array based let values = Arc::new(create_i8_array(200)) as ArrayRef; c.bench_function("count_distinct i8 bitmap", |b| { b.iter(|| { @@ -127,7 +125,7 @@ fn count_distinct_benchmark(c: &mut Criterion) { }) }); - // UInt16 + // UInt16 - bitmap based let values = Arc::new(create_u16_array(50000)) as ArrayRef; c.bench_function("count_distinct u16 bitmap", |b| { b.iter(|| { @@ -138,7 +136,7 @@ fn count_distinct_benchmark(c: &mut Criterion) { }) }); - // Int16 + // Int16 - bitmap based let values = Arc::new(create_i16_array(50000)) as ArrayRef; c.bench_function("count_distinct i16 bitmap", |b| { b.iter(|| { diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs b/datafusion/functions-aggregate/src/approx_distinct.rs index 428f72e3417a..3fe6a45d022b 100644 --- a/datafusion/functions-aggregate/src/approx_distinct.rs +++ b/datafusion/functions-aggregate/src/approx_distinct.rs @@ -658,7 +658,7 @@ impl AggregateUDFImpl for ApproxDistinct { let data_type = acc_args.expr_fields[0].data_type(); let accumulator: Box = match data_type { - // Testing bool array for u8 + // Using bitmaps for u8/u16/i8/i16 DataType::UInt8 => Box::new(BoolArray256Accumulator::new()), DataType::UInt16 => Box::new(Bitmap65536Accumulator::new()), DataType::UInt32 => Box::new(NumericHLLAccumulator::::new()), diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index 81b6eda3f9b1..18f58dd7051b 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -21,11 +21,11 @@ use arrow::{ compute, datatypes::{ DataType, Date32Type, Date64Type, Decimal128Type, Decimal256Type, Field, - FieldRef, Float16Type, Float32Type, Float64Type, Int8Type, Int16Type, Int32Type, + FieldRef, Float16Type, Float32Type, Float64Type, Int32Type, Int64Type, Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, - UInt8Type, UInt16Type, UInt32Type, UInt64Type, + UInt32Type, UInt64Type, }, }; use datafusion_common::hash_utils::RandomState; @@ -42,6 +42,10 @@ use datafusion_expr::{ utils::format_state_name, }; use datafusion_functions_aggregate_common::aggregate::{ + count_distinct::Bitmap65536DistinctCountAccumulator, + count_distinct::Bitmap65536DistinctCountAccumulatorI16, + count_distinct::BoolArray256DistinctCountAccumulator, + count_distinct::BoolArray256DistinctCountAccumulatorI8, count_distinct::BytesDistinctCountAccumulator, count_distinct::BytesViewDistinctCountAccumulator, count_distinct::DictionaryCountAccumulator, @@ -170,25 +174,18 @@ impl Count { } fn get_count_accumulator(data_type: &DataType) -> Box { match data_type { - // try and use a specialized accumulator if possible, otherwise fall back to generic accumulator - DataType::Int8 => Box::new(PrimitiveDistinctCountAccumulator::::new( - data_type, - )), - DataType::Int16 => Box::new(PrimitiveDistinctCountAccumulator::::new( - data_type, - )), + // Use optimized bitmap/bool array accumulators for small integer types + DataType::UInt8 => Box::new(BoolArray256DistinctCountAccumulator::new()), + DataType::Int8 => Box::new(BoolArray256DistinctCountAccumulatorI8::new()), + DataType::UInt16 => Box::new(Bitmap65536DistinctCountAccumulator::new()), + DataType::Int16 => Box::new(Bitmap65536DistinctCountAccumulatorI16::new()), + // Use HashSet-based accumulator for larger integer types DataType::Int32 => Box::new(PrimitiveDistinctCountAccumulator::::new( data_type, )), DataType::Int64 => Box::new(PrimitiveDistinctCountAccumulator::::new( data_type, )), - DataType::UInt8 => Box::new(PrimitiveDistinctCountAccumulator::::new( - data_type, - )), - DataType::UInt16 => Box::new( - PrimitiveDistinctCountAccumulator::::new(data_type), - ), DataType::UInt32 => Box::new( PrimitiveDistinctCountAccumulator::::new(data_type), ), From 3f091d96dddd61d0816778b63e767ea1fc966f0f Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Wed, 8 Apr 2026 08:18:15 -0700 Subject: [PATCH 07/10] bitmap_instead_smaller_datatypes --- .../src/aggregate/count_distinct/native.rs | 36 +++++++++---------- .../benches/count_distinct.rs | 4 ++- datafusion/functions-aggregate/src/count.rs | 4 +-- 3 files changed, 22 insertions(+), 22 deletions(-) diff --git a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs index 4667ff9a25fc..0fbc61c48f4a 100644 --- a/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs +++ b/datafusion/functions-aggregate-common/src/aggregate/count_distinct/native.rs @@ -230,9 +230,9 @@ impl Accumulator for BoolArray256DistinctCountAccumulator { .filter_map(|(idx, &seen)| if seen { Some(idx as u8) } else { None }) .collect(); - let arr = Arc::new(PrimitiveArray::::from_iter_values( - values, - )); + let arr = Arc::new( + PrimitiveArray::::from_iter_values(values), + ); Ok(vec![ SingleRowListArrayBuilder::new(arr).build_list_scalar(), ]) @@ -308,18 +308,16 @@ impl Accumulator for BoolArray256DistinctCountAccumulatorI8 { .seen .iter() .enumerate() - .filter_map(|(idx, &seen)| { - if seen { - Some(idx as u8 as i8) - } else { - None - } - }) + .filter_map( + |(idx, &seen)| { + if seen { Some(idx as u8 as i8) } else { None } + }, + ) .collect(); - let arr = Arc::new(PrimitiveArray::::from_iter_values( - values, - )); + let arr = Arc::new( + PrimitiveArray::::from_iter_values(values), + ); Ok(vec![ SingleRowListArrayBuilder::new(arr).build_list_scalar(), ]) @@ -409,9 +407,9 @@ impl Accumulator for Bitmap65536DistinctCountAccumulator { } } - let arr = Arc::new(PrimitiveArray::::from_iter_values( - values, - )); + let arr = Arc::new( + PrimitiveArray::::from_iter_values(values), + ); Ok(vec![ SingleRowListArrayBuilder::new(arr).build_list_scalar(), ]) @@ -502,9 +500,9 @@ impl Accumulator for Bitmap65536DistinctCountAccumulatorI16 { } } - let arr = Arc::new(PrimitiveArray::::from_iter_values( - values, - )); + let arr = Arc::new( + PrimitiveArray::::from_iter_values(values), + ); Ok(vec![ SingleRowListArrayBuilder::new(arr).build_list_scalar(), ]) diff --git a/datafusion/functions-aggregate/benches/count_distinct.rs b/datafusion/functions-aggregate/benches/count_distinct.rs index 0bc8a31f0bff..22cca4f3cd73 100644 --- a/datafusion/functions-aggregate/benches/count_distinct.rs +++ b/datafusion/functions-aggregate/benches/count_distinct.rs @@ -17,7 +17,9 @@ use std::sync::Arc; -use arrow::array::{ArrayRef, Int16Array, Int64Array, Int8Array, UInt16Array, UInt8Array}; +use arrow::array::{ + ArrayRef, Int8Array, Int16Array, Int64Array, UInt8Array, UInt16Array, +}; use arrow::datatypes::{DataType, Field, Schema}; use criterion::{Criterion, criterion_group, criterion_main}; use datafusion_expr::function::AccumulatorArgs; diff --git a/datafusion/functions-aggregate/src/count.rs b/datafusion/functions-aggregate/src/count.rs index 18f58dd7051b..3587badc763e 100644 --- a/datafusion/functions-aggregate/src/count.rs +++ b/datafusion/functions-aggregate/src/count.rs @@ -21,8 +21,8 @@ use arrow::{ compute, datatypes::{ DataType, Date32Type, Date64Type, Decimal128Type, Decimal256Type, Field, - FieldRef, Float16Type, Float32Type, Float64Type, Int32Type, - Int64Type, Time32MillisecondType, Time32SecondType, Time64MicrosecondType, + FieldRef, Float16Type, Float32Type, Float64Type, Int32Type, Int64Type, + Time32MillisecondType, Time32SecondType, Time64MicrosecondType, Time64NanosecondType, TimeUnit, TimestampMicrosecondType, TimestampMillisecondType, TimestampNanosecondType, TimestampSecondType, UInt32Type, UInt64Type, From e09f68af5a279379714874ea3dcfb9f5805a6ec8 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Wed, 8 Apr 2026 08:35:50 -0700 Subject: [PATCH 08/10] bitmap_instead_smaller_datatypes --- .../functions-aggregate/benches/approx_distinct.rs | 8 ++++---- .../functions-aggregate/benches/count_distinct.rs | 12 ++++++------ 2 files changed, 10 insertions(+), 10 deletions(-) diff --git a/datafusion/functions-aggregate/benches/approx_distinct.rs b/datafusion/functions-aggregate/benches/approx_distinct.rs index 25235f87fe03..d34dfed33119 100644 --- a/datafusion/functions-aggregate/benches/approx_distinct.rs +++ b/datafusion/functions-aggregate/benches/approx_distinct.rs @@ -123,7 +123,7 @@ fn approx_distinct_benchmark(c: &mut Criterion) { for pct in [80, 99] { let n_distinct = BATCH_SIZE * pct / 100; - // --- Int64 benchmarks --- + // Int64 let values = Arc::new(create_i64_array(n_distinct)) as ArrayRef; c.bench_function(&format!("approx_distinct i64 {pct}% distinct"), |b| { b.iter(|| { @@ -139,7 +139,7 @@ fn approx_distinct_benchmark(c: &mut Criterion) { { let string_pool = create_string_pool(n_distinct, str_len); - // --- Utf8 benchmarks --- + // Utf8 let values = Arc::new(create_string_array(&string_pool)) as ArrayRef; c.bench_function( &format!("approx_distinct utf8 {label} {pct}% distinct"), @@ -153,7 +153,7 @@ fn approx_distinct_benchmark(c: &mut Criterion) { }, ); - // --- Utf8View benchmarks --- + // Utf8View let values = Arc::new(create_string_view_array(&string_pool)) as ArrayRef; c.bench_function( &format!("approx_distinct utf8view {label} {pct}% distinct"), @@ -169,7 +169,7 @@ fn approx_distinct_benchmark(c: &mut Criterion) { } } - // --- Bitmap type benchmarks (our optimization) --- + // Small integer types // UInt8 let values = Arc::new(create_u8_array(200)) as ArrayRef; diff --git a/datafusion/functions-aggregate/benches/count_distinct.rs b/datafusion/functions-aggregate/benches/count_distinct.rs index 22cca4f3cd73..e742a3e5c126 100644 --- a/datafusion/functions-aggregate/benches/count_distinct.rs +++ b/datafusion/functions-aggregate/benches/count_distinct.rs @@ -91,7 +91,7 @@ fn count_distinct_benchmark(c: &mut Criterion) { for pct in [80, 99] { let n_distinct = BATCH_SIZE * pct / 100; - // --- Int64 benchmarks (HashSet-based) --- + // Int64 let values = Arc::new(create_i64_array(n_distinct)) as ArrayRef; c.bench_function(&format!("count_distinct i64 {pct}% distinct"), |b| { b.iter(|| { @@ -103,9 +103,9 @@ fn count_distinct_benchmark(c: &mut Criterion) { }); } - // --- Small integer type benchmarks (our optimization) --- + // Small integer types - // UInt8 - bool array based + // UInt8 let values = Arc::new(create_u8_array(200)) as ArrayRef; c.bench_function("count_distinct u8 bitmap", |b| { b.iter(|| { @@ -116,7 +116,7 @@ fn count_distinct_benchmark(c: &mut Criterion) { }) }); - // Int8 - bool array based + // Int8 let values = Arc::new(create_i8_array(200)) as ArrayRef; c.bench_function("count_distinct i8 bitmap", |b| { b.iter(|| { @@ -127,7 +127,7 @@ fn count_distinct_benchmark(c: &mut Criterion) { }) }); - // UInt16 - bitmap based + // UInt16 let values = Arc::new(create_u16_array(50000)) as ArrayRef; c.bench_function("count_distinct u16 bitmap", |b| { b.iter(|| { @@ -138,7 +138,7 @@ fn count_distinct_benchmark(c: &mut Criterion) { }) }); - // Int16 - bitmap based + // Int16 let values = Arc::new(create_i16_array(50000)) as ArrayRef; c.bench_function("count_distinct i16 bitmap", |b| { b.iter(|| { From f8c01a1d55449bc3f7e6633622e4b5b899d0c757 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Wed, 8 Apr 2026 13:38:28 -0700 Subject: [PATCH 09/10] cleanup_pr --- .../benches/approx_distinct.rs | 89 +---- .../src/approx_distinct.rs | 332 +----------------- 2 files changed, 12 insertions(+), 409 deletions(-) diff --git a/datafusion/functions-aggregate/benches/approx_distinct.rs b/datafusion/functions-aggregate/benches/approx_distinct.rs index d34dfed33119..9c22194e0384 100644 --- a/datafusion/functions-aggregate/benches/approx_distinct.rs +++ b/datafusion/functions-aggregate/benches/approx_distinct.rs @@ -17,10 +17,7 @@ use std::sync::Arc; -use arrow::array::{ - ArrayRef, Int8Array, Int16Array, Int64Array, StringArray, StringViewArray, - UInt8Array, UInt16Array, -}; +use arrow::array::{ArrayRef, Int64Array, StringArray, StringViewArray}; use arrow::datatypes::{DataType, Field, Schema}; use criterion::{Criterion, criterion_group, criterion_main}; use datafusion_expr::function::AccumulatorArgs; @@ -59,38 +56,6 @@ fn create_i64_array(n_distinct: usize) -> Int64Array { .collect() } -fn create_u8_array(n_distinct: usize) -> UInt8Array { - let mut rng = StdRng::seed_from_u64(42); - let max_val = n_distinct.min(256) as u8; - (0..BATCH_SIZE) - .map(|_| Some(rng.random_range(0..max_val))) - .collect() -} - -fn create_i8_array(n_distinct: usize) -> Int8Array { - let mut rng = StdRng::seed_from_u64(42); - let max_val = (n_distinct.min(256) / 2) as i8; - (0..BATCH_SIZE) - .map(|_| Some(rng.random_range(-max_val..max_val))) - .collect() -} - -fn create_u16_array(n_distinct: usize) -> UInt16Array { - let mut rng = StdRng::seed_from_u64(42); - let max_val = n_distinct.min(65536) as u16; - (0..BATCH_SIZE) - .map(|_| Some(rng.random_range(0..max_val))) - .collect() -} - -fn create_i16_array(n_distinct: usize) -> Int16Array { - let mut rng = StdRng::seed_from_u64(42); - let max_val = (n_distinct.min(65536) / 2) as i16; - (0..BATCH_SIZE) - .map(|_| Some(rng.random_range(-max_val..max_val))) - .collect() -} - /// Creates a pool of `n_distinct` random strings of the given length. fn create_string_pool(n_distinct: usize, string_length: usize) -> Vec { let mut rng = StdRng::seed_from_u64(42); @@ -123,7 +88,7 @@ fn approx_distinct_benchmark(c: &mut Criterion) { for pct in [80, 99] { let n_distinct = BATCH_SIZE * pct / 100; - // Int64 + // --- Int64 benchmarks --- let values = Arc::new(create_i64_array(n_distinct)) as ArrayRef; c.bench_function(&format!("approx_distinct i64 {pct}% distinct"), |b| { b.iter(|| { @@ -139,7 +104,7 @@ fn approx_distinct_benchmark(c: &mut Criterion) { { let string_pool = create_string_pool(n_distinct, str_len); - // Utf8 + // --- Utf8 benchmarks --- let values = Arc::new(create_string_array(&string_pool)) as ArrayRef; c.bench_function( &format!("approx_distinct utf8 {label} {pct}% distinct"), @@ -153,7 +118,7 @@ fn approx_distinct_benchmark(c: &mut Criterion) { }, ); - // Utf8View + // --- Utf8View benchmarks --- let values = Arc::new(create_string_view_array(&string_pool)) as ArrayRef; c.bench_function( &format!("approx_distinct utf8view {label} {pct}% distinct"), @@ -168,52 +133,6 @@ fn approx_distinct_benchmark(c: &mut Criterion) { ); } } - - // Small integer types - - // UInt8 - let values = Arc::new(create_u8_array(200)) as ArrayRef; - c.bench_function("approx_distinct u8 bitmap", |b| { - b.iter(|| { - let mut accumulator = prepare_accumulator(DataType::UInt8); - accumulator - .update_batch(std::slice::from_ref(&values)) - .unwrap() - }) - }); - - // Int8 - let values = Arc::new(create_i8_array(200)) as ArrayRef; - c.bench_function("approx_distinct i8 bitmap", |b| { - b.iter(|| { - let mut accumulator = prepare_accumulator(DataType::Int8); - accumulator - .update_batch(std::slice::from_ref(&values)) - .unwrap() - }) - }); - - // UInt16 - let values = Arc::new(create_u16_array(50000)) as ArrayRef; - c.bench_function("approx_distinct u16 bitmap", |b| { - b.iter(|| { - let mut accumulator = prepare_accumulator(DataType::UInt16); - accumulator - .update_batch(std::slice::from_ref(&values)) - .unwrap() - }) - }); - - // Int16 - let values = Arc::new(create_i16_array(50000)) as ArrayRef; - c.bench_function("approx_distinct i16 bitmap", |b| { - b.iter(|| { - let mut accumulator = prepare_accumulator(DataType::Int16); - accumulator - .update_batch(std::slice::from_ref(&values)) - .unwrap() - }) - }); } criterion_group!(benches, approx_distinct_benchmark); diff --git a/datafusion/functions-aggregate/src/approx_distinct.rs b/datafusion/functions-aggregate/src/approx_distinct.rs index 3fe6a45d022b..40da98c3eb3a 100644 --- a/datafusion/functions-aggregate/src/approx_distinct.rs +++ b/datafusion/functions-aggregate/src/approx_distinct.rs @@ -18,7 +18,7 @@ //! Defines physical expressions that can evaluated at runtime during query execution use crate::hyperloglog::{HLL_HASH_STATE, HyperLogLog}; -use arrow::array::{Array, AsArray, BinaryArray, BooleanArray, StringViewArray}; +use arrow::array::{Array, BinaryArray, StringViewArray}; use arrow::array::{ GenericBinaryArray, GenericStringArray, OffsetSizeTrait, PrimitiveArray, }; @@ -160,323 +160,6 @@ where } } -#[derive(Debug)] -struct BoolDistinctAccumulator { - seen_true: bool, - seen_false: bool, -} - -impl BoolDistinctAccumulator { - fn new() -> Self { - Self { - seen_true: false, - seen_false: false, - } - } -} - -impl Accumulator for BoolDistinctAccumulator { - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let array: &BooleanArray = downcast_value!(values[0], BooleanArray); - for value in array.iter().flatten() { - if value { - self.seen_true = true; - } else { - self.seen_false = true; - } - } - Ok(()) - } - - fn evaluate(&mut self) -> Result { - let count = (self.seen_true as u64) + (self.seen_false as u64); - Ok(ScalarValue::UInt64(Some(count))) - } - - fn size(&self) -> usize { - size_of::() - } - - fn state(&mut self) -> Result> { - // Pack into 1 byte: bit 0 = seen_false, bit 1 = seen_true - let packed = (self.seen_false as u8) | ((self.seen_true as u8) << 1); - Ok(vec![ScalarValue::Binary(Some(vec![packed]))]) - } - - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - let array = downcast_value!(states[0], BinaryArray); - for data in array.iter().flatten() { - if !data.is_empty() { - self.seen_false |= (data[0] & 1) != 0; - self.seen_true |= (data[0] & 2) != 0; - } - } - Ok(()) - } -} - -/// Accumulator for u8 distinct counting using a bool array -#[derive(Debug)] -struct BoolArray256Accumulator { - seen: Box<[bool; 256]>, -} - -impl BoolArray256Accumulator { - fn new() -> Self { - Self { - seen: Box::new([false; 256]), - } - } - - #[inline] - fn count(&self) -> u64 { - self.seen.iter().filter(|&&b| b).count() as u64 - } -} - -impl Accumulator for BoolArray256Accumulator { - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let array = values[0].as_primitive::(); - for value in array.iter().flatten() { - self.seen[value as usize] = true; - } - Ok(()) - } - - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - let array = downcast_value!(states[0], BinaryArray); - for data in array.iter().flatten() { - if data.len() == 256 { - for (i, &b) in data.iter().enumerate() { - if b != 0 { - self.seen[i] = true; - } - } - } - } - Ok(()) - } - - fn state(&mut self) -> Result> { - let bytes: Vec = self.seen.iter().map(|&b| b as u8).collect(); - Ok(vec![ScalarValue::Binary(Some(bytes))]) - } - - fn evaluate(&mut self) -> Result { - Ok(ScalarValue::UInt64(Some(self.count()))) - } - - fn size(&self) -> usize { - size_of::() + 256 - } -} - -/// Accumulator for i8 distinct counting using a bool array -#[derive(Debug)] -struct BoolArray256AccumulatorI8 { - seen: Box<[bool; 256]>, -} - -impl BoolArray256AccumulatorI8 { - fn new() -> Self { - Self { - seen: Box::new([false; 256]), - } - } - - #[inline] - fn count(&self) -> u64 { - self.seen.iter().filter(|&&b| b).count() as u64 - } -} - -impl Accumulator for BoolArray256AccumulatorI8 { - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let array = values[0].as_primitive::(); - for value in array.iter().flatten() { - self.seen[value as u8 as usize] = true; - } - Ok(()) - } - - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - let array = downcast_value!(states[0], BinaryArray); - for data in array.iter().flatten() { - if data.len() == 256 { - for (i, &b) in data.iter().enumerate() { - if b != 0 { - self.seen[i] = true; - } - } - } - } - Ok(()) - } - - fn state(&mut self) -> Result> { - let bytes: Vec = self.seen.iter().map(|&b| b as u8).collect(); - Ok(vec![ScalarValue::Binary(Some(bytes))]) - } - - fn evaluate(&mut self) -> Result { - Ok(ScalarValue::UInt64(Some(self.count()))) - } - - fn size(&self) -> usize { - size_of::() + 256 - } -} - -/// Accumulator for u16 distinct counting using a 65536-bit bitmap -#[derive(Debug)] -struct Bitmap65536Accumulator { - /// 65536 bits = 1024 x u64, tracks values 0-65535 - bitmap: Box<[u64; 1024]>, -} - -impl Bitmap65536Accumulator { - fn new() -> Self { - Self { - bitmap: Box::new([0; 1024]), - } - } - - #[inline] - fn set_bit(&mut self, value: u16) { - let word = (value / 64) as usize; - let bit = value % 64; - self.bitmap[word] |= 1u64 << bit; - } - - #[inline] - fn count(&self) -> u64 { - self.bitmap.iter().map(|w| w.count_ones() as u64).sum() - } - - fn merge(&mut self, other: &[u64; 1024]) { - for (dst, src) in self.bitmap.iter_mut().zip(other.iter()) { - *dst |= src; - } - } -} - -impl Accumulator for Bitmap65536Accumulator { - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let array = values[0].as_primitive::(); - for value in array.iter().flatten() { - self.set_bit(value); - } - Ok(()) - } - - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - let array = downcast_value!(states[0], BinaryArray); - for data in array.iter().flatten() { - if data.len() == 8192 { - let mut other = [0u64; 1024]; - for (i, word) in other.iter_mut().enumerate() { - let offset = i * 8; - *word = - u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()); - } - self.merge(&other); - } - } - Ok(()) - } - - fn state(&mut self) -> Result> { - let mut bytes = Vec::with_capacity(8192); - for word in self.bitmap.iter() { - bytes.extend_from_slice(&word.to_le_bytes()); - } - Ok(vec![ScalarValue::Binary(Some(bytes))]) - } - - fn evaluate(&mut self) -> Result { - Ok(ScalarValue::UInt64(Some(self.count()))) - } - - fn size(&self) -> usize { - size_of::() + 8192 - } -} - -/// Accumulator for i16 distinct counting using a 65536-bit bitmap -#[derive(Debug)] -struct Bitmap65536AccumulatorI16 { - bitmap: Box<[u64; 1024]>, -} - -impl Bitmap65536AccumulatorI16 { - fn new() -> Self { - Self { - bitmap: Box::new([0; 1024]), - } - } - - #[inline] - fn set_bit(&mut self, value: i16) { - let idx = value as u16; - let word = (idx / 64) as usize; - let bit = idx % 64; - self.bitmap[word] |= 1u64 << bit; - } - - #[inline] - fn count(&self) -> u64 { - self.bitmap.iter().map(|w| w.count_ones() as u64).sum() - } - - fn merge(&mut self, other: &[u64; 1024]) { - for (dst, src) in self.bitmap.iter_mut().zip(other.iter()) { - *dst |= src; - } - } -} - -impl Accumulator for Bitmap65536AccumulatorI16 { - fn update_batch(&mut self, values: &[ArrayRef]) -> Result<()> { - let array = values[0].as_primitive::(); - for value in array.iter().flatten() { - self.set_bit(value); - } - Ok(()) - } - - fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { - let array = downcast_value!(states[0], BinaryArray); - for data in array.iter().flatten() { - if data.len() == 8192 { - let mut other = [0u64; 1024]; - for (i, word) in other.iter_mut().enumerate() { - let offset = i * 8; - *word = - u64::from_le_bytes(data[offset..offset + 8].try_into().unwrap()); - } - self.merge(&other); - } - } - Ok(()) - } - - fn state(&mut self) -> Result> { - let mut bytes = Vec::with_capacity(8192); - for word in self.bitmap.iter() { - bytes.extend_from_slice(&word.to_le_bytes()); - } - Ok(vec![ScalarValue::Binary(Some(bytes))]) - } - - fn evaluate(&mut self) -> Result { - Ok(ScalarValue::UInt64(Some(self.count()))) - } - - fn size(&self) -> usize { - size_of::() + 8192 - } -} - macro_rules! default_accumulator_impl { () => { fn merge_batch(&mut self, states: &[ArrayRef]) -> Result<()> { @@ -658,13 +341,15 @@ impl AggregateUDFImpl for ApproxDistinct { let data_type = acc_args.expr_fields[0].data_type(); let accumulator: Box = match data_type { - // Using bitmaps for u8/u16/i8/i16 - DataType::UInt8 => Box::new(BoolArray256Accumulator::new()), - DataType::UInt16 => Box::new(Bitmap65536Accumulator::new()), + // TODO u8, i8, u16, i16 shall really be done using bitmap, not HLL + // TODO support for boolean (trivial case) + // https://github.com/apache/datafusion/issues/1109 + DataType::UInt8 => Box::new(NumericHLLAccumulator::::new()), + DataType::UInt16 => Box::new(NumericHLLAccumulator::::new()), DataType::UInt32 => Box::new(NumericHLLAccumulator::::new()), DataType::UInt64 => Box::new(NumericHLLAccumulator::::new()), - DataType::Int8 => Box::new(BoolArray256AccumulatorI8::new()), - DataType::Int16 => Box::new(Bitmap65536AccumulatorI16::new()), + DataType::Int8 => Box::new(NumericHLLAccumulator::::new()), + DataType::Int16 => Box::new(NumericHLLAccumulator::::new()), DataType::Int32 => Box::new(NumericHLLAccumulator::::new()), DataType::Int64 => Box::new(NumericHLLAccumulator::::new()), DataType::Date32 => Box::new(NumericHLLAccumulator::::new()), @@ -698,7 +383,6 @@ impl AggregateUDFImpl for ApproxDistinct { DataType::Utf8View => Box::new(StringViewHLLAccumulator::new()), DataType::Binary => Box::new(BinaryHLLAccumulator::::new()), DataType::LargeBinary => Box::new(BinaryHLLAccumulator::::new()), - DataType::Boolean => Box::new(BoolDistinctAccumulator::new()), DataType::Null => { Box::new(NoopAccumulator::new(ScalarValue::UInt64(Some(0)))) } From df90ef58ff9a4a6637d8e473e3b155acac027408 Mon Sep 17 00:00:00 2001 From: B Vadlamani Date: Thu, 9 Apr 2026 14:37:21 -0700 Subject: [PATCH 10/10] rebase_main --- datafusion/functions-aggregate/Cargo.toml | 4 + .../benches/approx_distinct.rs | 83 ++++++++++++++++++- 2 files changed, 86 insertions(+), 1 deletion(-) diff --git a/datafusion/functions-aggregate/Cargo.toml b/datafusion/functions-aggregate/Cargo.toml index 595687cd4276..406f0a0e32cc 100644 --- a/datafusion/functions-aggregate/Cargo.toml +++ b/datafusion/functions-aggregate/Cargo.toml @@ -80,6 +80,10 @@ name = "min_max_bytes" name = "approx_distinct" harness = false +[[bench]] +name = "first_last" +harness = false + [[bench]] name = "count_distinct" harness = false diff --git a/datafusion/functions-aggregate/benches/approx_distinct.rs b/datafusion/functions-aggregate/benches/approx_distinct.rs index 9c22194e0384..cc85c2163c18 100644 --- a/datafusion/functions-aggregate/benches/approx_distinct.rs +++ b/datafusion/functions-aggregate/benches/approx_distinct.rs @@ -17,7 +17,10 @@ use std::sync::Arc; -use arrow::array::{ArrayRef, Int64Array, StringArray, StringViewArray}; +use arrow::array::{ + ArrayRef, Int8Array, Int16Array, Int64Array, StringArray, StringViewArray, + UInt8Array, UInt16Array, +}; use arrow::datatypes::{DataType, Field, Schema}; use criterion::{Criterion, criterion_group, criterion_main}; use datafusion_expr::function::AccumulatorArgs; @@ -56,6 +59,38 @@ fn create_i64_array(n_distinct: usize) -> Int64Array { .collect() } +fn create_u8_array(n_distinct: usize) -> UInt8Array { + let mut rng = StdRng::seed_from_u64(42); + let max_val = n_distinct.min(256) as u8; + (0..BATCH_SIZE) + .map(|_| Some(rng.random_range(0..max_val))) + .collect() +} + +fn create_i8_array(n_distinct: usize) -> Int8Array { + let mut rng = StdRng::seed_from_u64(42); + let max_val = (n_distinct.min(256) / 2) as i8; + (0..BATCH_SIZE) + .map(|_| Some(rng.random_range(-max_val..max_val))) + .collect() +} + +fn create_u16_array(n_distinct: usize) -> UInt16Array { + let mut rng = StdRng::seed_from_u64(42); + let max_val = n_distinct.min(65536) as u16; + (0..BATCH_SIZE) + .map(|_| Some(rng.random_range(0..max_val))) + .collect() +} + +fn create_i16_array(n_distinct: usize) -> Int16Array { + let mut rng = StdRng::seed_from_u64(42); + let max_val = (n_distinct.min(65536) / 2) as i16; + (0..BATCH_SIZE) + .map(|_| Some(rng.random_range(-max_val..max_val))) + .collect() +} + /// Creates a pool of `n_distinct` random strings of the given length. fn create_string_pool(n_distinct: usize, string_length: usize) -> Vec { let mut rng = StdRng::seed_from_u64(42); @@ -133,6 +168,52 @@ fn approx_distinct_benchmark(c: &mut Criterion) { ); } } + + // Small integer types + + // UInt8 + let values = Arc::new(create_u8_array(200)) as ArrayRef; + c.bench_function("approx_distinct u8 bitmap", |b| { + b.iter(|| { + let mut accumulator = prepare_accumulator(DataType::UInt8); + accumulator + .update_batch(std::slice::from_ref(&values)) + .unwrap() + }) + }); + + // Int8 + let values = Arc::new(create_i8_array(200)) as ArrayRef; + c.bench_function("approx_distinct i8 bitmap", |b| { + b.iter(|| { + let mut accumulator = prepare_accumulator(DataType::Int8); + accumulator + .update_batch(std::slice::from_ref(&values)) + .unwrap() + }) + }); + + // UInt16 + let values = Arc::new(create_u16_array(50000)) as ArrayRef; + c.bench_function("approx_distinct u16 bitmap", |b| { + b.iter(|| { + let mut accumulator = prepare_accumulator(DataType::UInt16); + accumulator + .update_batch(std::slice::from_ref(&values)) + .unwrap() + }) + }); + + // Int16 + let values = Arc::new(create_i16_array(50000)) as ArrayRef; + c.bench_function("approx_distinct i16 bitmap", |b| { + b.iter(|| { + let mut accumulator = prepare_accumulator(DataType::Int16); + accumulator + .update_batch(std::slice::from_ref(&values)) + .unwrap() + }) + }); } criterion_group!(benches, approx_distinct_benchmark);