Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,9 @@ mod native;
pub use bytes::BytesDistinctCountAccumulator;
pub use bytes::BytesViewDistinctCountAccumulator;
pub use dict::DictionaryCountAccumulator;
pub use native::Bitmap65536DistinctCountAccumulator;
pub use native::Bitmap65536DistinctCountAccumulatorI16;
pub use native::BoolArray256DistinctCountAccumulator;
pub use native::BoolArray256DistinctCountAccumulatorI8;
pub use native::FloatDistinctCountAccumulator;
pub use native::PrimitiveDistinctCountAccumulator;
Original file line number Diff line number Diff line change
Expand Up @@ -165,3 +165,354 @@ impl<T: ArrowPrimitiveType + Debug> Accumulator for FloatDistinctCountAccumulato
size_of_val(self) + self.values.size()
}
}

/// Optimized COUNT DISTINCT accumulator for u8 using a bool array.
/// Uses 256 bytes to track all possible u8 values.
#[derive(Debug)]
pub struct BoolArray256DistinctCountAccumulator {
seen: Box<[bool; 256]>,
}

impl BoolArray256DistinctCountAccumulator {
pub fn new() -> Self {
Self {
seen: Box::new([false; 256]),
}
}

#[inline]
fn count(&self) -> i64 {
self.seen.iter().filter(|&&b| b).count() as i64
}
}

impl Default for BoolArray256DistinctCountAccumulator {
fn default() -> Self {
Self::new()
}
}

impl Accumulator for BoolArray256DistinctCountAccumulator {
fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
if values.is_empty() {
return Ok(());
}

let arr = as_primitive_array::<arrow::datatypes::UInt8Type>(&values[0])?;
for value in arr.iter().flatten() {
self.seen[value as usize] = true;
}
Ok(())
}

fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
if states.is_empty() {
return Ok(());
}

let arr = as_list_array(&states[0])?;
arr.iter().try_for_each(|maybe_list| {
if let Some(list) = maybe_list {
let list = as_primitive_array::<arrow::datatypes::UInt8Type>(&list)?;
for value in list.values().iter() {
self.seen[*value as usize] = true;
}
};
Ok(())
})
}

fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
let values: Vec<u8> = self
.seen
.iter()
.enumerate()
.filter_map(|(idx, &seen)| if seen { Some(idx as u8) } else { None })
.collect();

let arr = Arc::new(
PrimitiveArray::<arrow::datatypes::UInt8Type>::from_iter_values(values),
);
Ok(vec![
SingleRowListArrayBuilder::new(arr).build_list_scalar(),
])
}

fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
Ok(ScalarValue::Int64(Some(self.count())))
}

fn size(&self) -> usize {
size_of_val(self) + 256
}
}

/// Optimized COUNT DISTINCT accumulator for i8 using a bool array.
/// Uses 256 bytes to track all possible i8 values (mapped to 0..255).
#[derive(Debug)]
pub struct BoolArray256DistinctCountAccumulatorI8 {
seen: Box<[bool; 256]>,
}

impl BoolArray256DistinctCountAccumulatorI8 {
pub fn new() -> Self {
Self {
seen: Box::new([false; 256]),
}
}

#[inline]
fn count(&self) -> i64 {
self.seen.iter().filter(|&&b| b).count() as i64
}
}

impl Default for BoolArray256DistinctCountAccumulatorI8 {
fn default() -> Self {
Self::new()
}
}

impl Accumulator for BoolArray256DistinctCountAccumulatorI8 {
fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
if values.is_empty() {
return Ok(());
}

let arr = as_primitive_array::<arrow::datatypes::Int8Type>(&values[0])?;
for value in arr.iter().flatten() {
self.seen[value as u8 as usize] = true;
}
Ok(())
}

fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
if states.is_empty() {
return Ok(());
}

let arr = as_list_array(&states[0])?;
arr.iter().try_for_each(|maybe_list| {
if let Some(list) = maybe_list {
let list = as_primitive_array::<arrow::datatypes::Int8Type>(&list)?;
for value in list.values().iter() {
self.seen[*value as u8 as usize] = true;
}
};
Ok(())
})
}

fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
let values: Vec<i8> = self
.seen
.iter()
.enumerate()
.filter_map(
|(idx, &seen)| {
if seen { Some(idx as u8 as i8) } else { None }
},
)
.collect();

let arr = Arc::new(
PrimitiveArray::<arrow::datatypes::Int8Type>::from_iter_values(values),
);
Ok(vec![
SingleRowListArrayBuilder::new(arr).build_list_scalar(),
])
}

fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
Ok(ScalarValue::Int64(Some(self.count())))
}

fn size(&self) -> usize {
size_of_val(self) + 256
}
}

/// Optimized COUNT DISTINCT accumulator for u16 using a 65536-bit bitmap.
/// Uses 8KB (1024 x u64) to track all possible u16 values.
#[derive(Debug)]
pub struct Bitmap65536DistinctCountAccumulator {
bitmap: Box<[u64; 1024]>,
}

impl Bitmap65536DistinctCountAccumulator {
pub fn new() -> Self {
Self {
bitmap: Box::new([0; 1024]),
}
}

#[inline]
fn set_bit(&mut self, value: u16) {
let word = (value / 64) as usize;
let bit = value % 64;
self.bitmap[word] |= 1u64 << bit;
}

#[inline]
fn count(&self) -> i64 {
self.bitmap.iter().map(|w| w.count_ones() as i64).sum()
}
}

impl Default for Bitmap65536DistinctCountAccumulator {
fn default() -> Self {
Self::new()
}
}

impl Accumulator for Bitmap65536DistinctCountAccumulator {
fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
if values.is_empty() {
return Ok(());
}

let arr = as_primitive_array::<arrow::datatypes::UInt16Type>(&values[0])?;
for value in arr.iter().flatten() {
self.set_bit(value);
}
Ok(())
}

fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
if states.is_empty() {
return Ok(());
}

let arr = as_list_array(&states[0])?;
arr.iter().try_for_each(|maybe_list| {
if let Some(list) = maybe_list {
let list = as_primitive_array::<arrow::datatypes::UInt16Type>(&list)?;
for value in list.values().iter() {
self.set_bit(*value);
}
};
Ok(())
})
}

fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
let mut values = Vec::new();
for (word_idx, &word) in self.bitmap.iter().enumerate() {
if word != 0 {
for bit in 0..64 {
if (word & (1u64 << bit)) != 0 {
values.push((word_idx as u16) * 64 + bit);
}
}
}
}

let arr = Arc::new(
PrimitiveArray::<arrow::datatypes::UInt16Type>::from_iter_values(values),
);
Ok(vec![
SingleRowListArrayBuilder::new(arr).build_list_scalar(),
])
}

fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
Ok(ScalarValue::Int64(Some(self.count())))
}

fn size(&self) -> usize {
size_of_val(self) + 8192
}
}

/// Optimized COUNT DISTINCT accumulator for i16 using a 65536-bit bitmap.
/// Uses 8KB (1024 x u64) to track all possible i16 values (mapped to 0..65535).
#[derive(Debug)]
pub struct Bitmap65536DistinctCountAccumulatorI16 {
bitmap: Box<[u64; 1024]>,
}

impl Bitmap65536DistinctCountAccumulatorI16 {
pub fn new() -> Self {
Self {
bitmap: Box::new([0; 1024]),
}
}

#[inline]
fn set_bit(&mut self, value: i16) {
let idx = value as u16;
let word = (idx / 64) as usize;
let bit = idx % 64;
self.bitmap[word] |= 1u64 << bit;
}

#[inline]
fn count(&self) -> i64 {
self.bitmap.iter().map(|w| w.count_ones() as i64).sum()
}
}

impl Default for Bitmap65536DistinctCountAccumulatorI16 {
fn default() -> Self {
Self::new()
}
}

impl Accumulator for Bitmap65536DistinctCountAccumulatorI16 {
fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {
if values.is_empty() {
return Ok(());
}

let arr = as_primitive_array::<arrow::datatypes::Int16Type>(&values[0])?;
for value in arr.iter().flatten() {
self.set_bit(value);
}
Ok(())
}

fn merge_batch(&mut self, states: &[ArrayRef]) -> datafusion_common::Result<()> {
if states.is_empty() {
return Ok(());
}

let arr = as_list_array(&states[0])?;
arr.iter().try_for_each(|maybe_list| {
if let Some(list) = maybe_list {
let list = as_primitive_array::<arrow::datatypes::Int16Type>(&list)?;
for value in list.values().iter() {
self.set_bit(*value);
}
};
Ok(())
})
}

fn state(&mut self) -> datafusion_common::Result<Vec<ScalarValue>> {
let mut values = Vec::new();
for (word_idx, &word) in self.bitmap.iter().enumerate() {
if word != 0 {
for bit in 0..64 {
if (word & (1u64 << bit)) != 0 {
values.push(((word_idx as u16) * 64 + bit) as i16);
}
}
}
}

let arr = Arc::new(
PrimitiveArray::<arrow::datatypes::Int16Type>::from_iter_values(values),
);
Ok(vec![
SingleRowListArrayBuilder::new(arr).build_list_scalar(),
])
}

fn evaluate(&mut self) -> datafusion_common::Result<ScalarValue> {
Ok(ScalarValue::Int64(Some(self.count())))
}

fn size(&self) -> usize {
size_of_val(self) + 8192
}
}
4 changes: 4 additions & 0 deletions datafusion/functions-aggregate/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -83,3 +83,7 @@ harness = false
[[bench]]
name = "first_last"
harness = false

[[bench]]
name = "count_distinct"
harness = false
Loading
Loading