diff --git a/Cargo.lock b/Cargo.lock index af1c980d45da..a1fd86831b31 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -511,6 +511,7 @@ dependencies = [ "arrow-buffer", "arrow-data", "arrow-schema", + "criterion", "num-traits", "rand 0.9.4", ] diff --git a/arrow-buffer/src/util/bit_util.rs b/arrow-buffer/src/util/bit_util.rs index 920ecba6d60c..1c0dbc8024b3 100644 --- a/arrow-buffer/src/util/bit_util.rs +++ b/arrow-buffer/src/util/bit_util.rs @@ -19,6 +19,49 @@ use crate::bit_chunk_iterator::BitChunks; +/// Parallel bit extract: for each set bit in `mask`, extract the +/// corresponding bit from `value` and pack them contiguously into the low +/// bits of the return value. +/// +/// Equivalent to the x86 BMI2 `PEXT` instruction. When compiled with the +/// `bmi2` target feature enabled (for example `-C target-cpu=x86-64-v3`) +/// this lowers to the hardware `pext` instruction; otherwise it falls back +/// to a portable scalar loop. +/// +/// Replace with `value.compress(mask)` when `uint_gather_scatter_bits` +/// is stabilised: +#[inline] +pub fn compress(value: u64, mask: u64) -> u64 { + #[cfg(all(target_arch = "x86_64", target_feature = "bmi2"))] + { + // SAFETY: the `bmi2` target feature is statically enabled for this + // build, so the `pext` instruction is guaranteed to be available. + unsafe { std::arch::x86_64::_pext_u64(value, mask) } + } + + #[cfg(not(all(target_arch = "x86_64", target_feature = "bmi2")))] + { + let mut mask = mask; + let mut result: u64 = 0; + let mut dest_bit: u64 = 1; + while mask != 0 { + let lowest = mask & mask.wrapping_neg(); + if value & lowest != 0 { + result |= dest_bit; + } + dest_bit <<= 1; + mask ^= lowest; + } + result + } +} + +/// Returns true if [`compress`] lowers to the hardware `pext` instruction +#[inline] +pub fn compress_available() -> bool { + cfg!(all(target_arch = "x86_64", target_feature = "bmi2")) +} + /// Returns the nearest number that is `>=` than `num` and is a multiple of 64 #[inline] pub fn round_upto_multiple_of_64(num: usize) -> usize { diff --git a/arrow-select/Cargo.toml b/arrow-select/Cargo.toml index 443094e6c986..3e124d0728e9 100644 --- a/arrow-select/Cargo.toml +++ b/arrow-select/Cargo.toml @@ -44,4 +44,9 @@ num-traits = { version = "0.2.19", default-features = false, features = ["std"] ahash = { version = "0.8", default-features = false} [dev-dependencies] +criterion = { workspace = true, default-features = false } rand = { version = "0.9", default-features = false, features = ["std", "std_rng", "thread_rng"] } + +[[bench]] +name = "filter_bits" +harness = false diff --git a/arrow-select/benches/filter_bits.rs b/arrow-select/benches/filter_bits.rs new file mode 100644 index 000000000000..380d0de6bb40 --- /dev/null +++ b/arrow-select/benches/filter_bits.rs @@ -0,0 +1,90 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Benchmarks for the internal `filter_bits` kernel. +//! +//! `filter_bits` is private, so it is exercised through +//! [`FilterPredicate::filter`] on a [`BooleanArray`] without nulls, which +//! dispatches directly to `filter_bits` on the array's value buffer. +//! +//! The filter selectivity determines which `IterationStrategy` is used: +//! selectivity above 0.8 selects `SlicesIterator`, below selects +//! `IndexIterator`, and [`FilterBuilder::optimize`] converts these into their +//! precomputed `Slices` / `Indices` counterparts. + +use arrow_array::BooleanArray; +use arrow_select::filter::{FilterBuilder, FilterPredicate}; +use criterion::{Criterion, criterion_group, criterion_main}; +use rand::rngs::StdRng; +use rand::{Rng, SeedableRng}; +use std::hint; + +fn create_boolean_array(size: usize, true_density: f64, rng: &mut StdRng) -> BooleanArray { + (0..size) + .map(|_| Some(rng.random_bool(true_density))) + .collect() +} + +fn bench_filter_bits(predicate: &FilterPredicate, array: &BooleanArray) { + hint::black_box(predicate.filter(array).unwrap()); +} + +fn add_benchmark(c: &mut Criterion) { + const SIZE: usize = 65536; + let mut rng = StdRng::seed_from_u64(42); + + let data = create_boolean_array(SIZE, 0.5, &mut rng); + + // Slice off a non-byte-aligned prefix to exercise the bit offset handling + // in `filter_bits` + let padded = create_boolean_array(SIZE + 3, 0.5, &mut rng); + let sliced = padded.slice(3, SIZE); + + // (label, true_density): densities above the 0.8 selectivity threshold use + // the slices strategies, those below use the index strategies + let cases = [ + ("slices, kept 1023/1024", 1.0 - 1.0 / 1024.0), + ("slices, kept 9/10", 0.9), + ("indices, kept 1/2", 0.5), + ("indices, kept 1/10", 0.1), + ("indices, kept 1/1024", 1.0 / 1024.0), + ]; + + for (label, true_density) in cases { + let filter_array = create_boolean_array(SIZE, true_density, &mut rng); + + // Lazy strategies: SlicesIterator / IndexIterator + let lazy = FilterBuilder::new(&filter_array).build(); + // Precomputed strategies: Slices / Indices + let optimized = FilterBuilder::new(&filter_array).optimize().build(); + + c.bench_function(&format!("filter_bits ({label})"), |b| { + b.iter(|| bench_filter_bits(&lazy, &data)) + }); + + c.bench_function(&format!("filter_bits optimized ({label})"), |b| { + b.iter(|| bench_filter_bits(&optimized, &data)) + }); + + c.bench_function(&format!("filter_bits sliced ({label})"), |b| { + b.iter(|| bench_filter_bits(&lazy, &sliced)) + }); + } +} + +criterion_group!(benches, add_benchmark); +criterion_main!(benches); diff --git a/arrow-select/src/filter.rs b/arrow-select/src/filter.rs index b1f3a21a3ed3..e8c181db9a86 100644 --- a/arrow-select/src/filter.rs +++ b/arrow-select/src/filter.rs @@ -26,6 +26,7 @@ use arrow_array::types::{ ArrowDictionaryKeyType, ArrowPrimitiveType, ByteArrayType, ByteViewType, RunEndIndexType, }; use arrow_array::*; +use arrow_buffer::bit_chunk_iterator::BitChunks; use arrow_buffer::{ ArrowNativeType, BooleanBuffer, NullBuffer, OffsetBuffer, RunEndBuffer, ScalarBuffer, bit_util, }; @@ -679,6 +680,20 @@ fn filter_bits(buffer: &BooleanBuffer, predicate: &FilterPredicate) -> Buffer { let offset = buffer.offset(); assert!(buffer.len() >= predicate.filter.len()); + // The compress path scans the entire filter mask a word at a time, so it + // is only worthwhile when `pext` is available in hardware and the filter + // is neither very sparse nor very dense: it must keep more than one bit + // per word on average (otherwise visiting each kept bit individually is + // faster) and drop more than one bit per word on average (otherwise + // copying whole ranges via the slices strategies is faster) + let len = predicate.filter.len(); + if bit_util::compress_available() + && predicate.count > len / 64 + && predicate.count < len - len / 64 + { + return filter_bits_compress(buffer, predicate); + } + match &predicate.strategy { IterationStrategy::IndexIterator => { let bits = @@ -716,6 +731,47 @@ fn filter_bits(buffer: &BooleanBuffer, predicate: &FilterPredicate) -> Buffer { } } +/// Filter the packed bitmask `buffer` with `predicate` by extracting the kept +/// bits of each 64-bit word with [`bit_util::compress`] (`pext`) +fn filter_bits_compress(buffer: &BooleanBuffer, predicate: &FilterPredicate) -> Buffer { + let mask_chunks = predicate.filter.values().bit_chunks(); + let value_chunks = BitChunks::new(buffer.values(), buffer.offset(), predicate.filter.len()); + + let mut out = MutableBuffer::new(bit_util::ceil(predicate.count, 8)); + // Bits extracted from each chunk are packed into `current` until it + // contains a complete word, which is then flushed to `out` + let mut current = 0_u64; + let mut filled = 0_u32; + + let chunks = value_chunks + .iter() + .zip(mask_chunks.iter()) + .chain([(value_chunks.remainder_bits(), mask_chunks.remainder_bits())]); + + for (values, mask) in chunks { + let bits = bit_util::compress(values, mask); + let count = mask.count_ones(); + current |= bits << filled; + if filled + count >= 64 { + out.extend_from_slice(¤t.to_le_bytes()); + filled = filled + count - 64; + // The bits of `bits` that did not fit in `current`, if any + current = if filled == 0 { + 0 + } else { + bits >> (count - filled) + }; + } else { + filled += count; + } + } + + if filled > 0 { + out.extend_from_slice(¤t.to_le_bytes()[..bit_util::ceil(filled as usize, 8)]); + } + out.into() +} + /// `filter` implementation for boolean buffers fn filter_boolean(array: &BooleanArray, predicate: &FilterPredicate) -> BooleanArray { let buffer = filter_bits(array.values(), predicate);