Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

43 changes: 43 additions & 0 deletions arrow-buffer/src/util/bit_util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,49 @@

use crate::bit_chunk_iterator::BitChunks;

/// Parallel bit extract: for each set bit in `mask`, extract the
/// corresponding bit from `value` and pack them contiguously into the low
/// bits of the return value.
///
/// Equivalent to the x86 BMI2 `PEXT` instruction. When compiled with the
/// `bmi2` target feature enabled (for example `-C target-cpu=x86-64-v3`)
/// this lowers to the hardware `pext` instruction; otherwise it falls back
/// to a portable scalar loop.
///
/// Replace with `value.compress(mask)` when `uint_gather_scatter_bits`
/// is stabilised: <https://github.com/rust-lang/rust/issues/149069>
#[inline]
pub fn compress(value: u64, mask: u64) -> u64 {

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#[cfg(all(target_arch = "x86_64", target_feature = "bmi2"))]
{
// SAFETY: the `bmi2` target feature is statically enabled for this
// build, so the `pext` instruction is guaranteed to be available.
unsafe { std::arch::x86_64::_pext_u64(value, mask) }
}

#[cfg(not(all(target_arch = "x86_64", target_feature = "bmi2")))]
{
let mut mask = mask;
let mut result: u64 = 0;
let mut dest_bit: u64 = 1;
while mask != 0 {
let lowest = mask & mask.wrapping_neg();
if value & lowest != 0 {
result |= dest_bit;
}
dest_bit <<= 1;
mask ^= lowest;
}
result
}
}

/// Returns true if [`compress`] lowers to the hardware `pext` instruction
#[inline]
pub fn compress_available() -> bool {
cfg!(all(target_arch = "x86_64", target_feature = "bmi2"))
}

/// Returns the nearest number that is `>=` than `num` and is a multiple of 64
#[inline]
pub fn round_upto_multiple_of_64(num: usize) -> usize {
Expand Down
5 changes: 5 additions & 0 deletions arrow-select/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -44,4 +44,9 @@ num-traits = { version = "0.2.19", default-features = false, features = ["std"]
ahash = { version = "0.8", default-features = false}

[dev-dependencies]
criterion = { workspace = true, default-features = false }
rand = { version = "0.9", default-features = false, features = ["std", "std_rng", "thread_rng"] }

[[bench]]
name = "filter_bits"
harness = false
90 changes: 90 additions & 0 deletions arrow-select/benches/filter_bits.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Benchmarks for the internal `filter_bits` kernel.
//!
//! `filter_bits` is private, so it is exercised through
//! [`FilterPredicate::filter`] on a [`BooleanArray`] without nulls, which
//! dispatches directly to `filter_bits` on the array's value buffer.
//!
//! The filter selectivity determines which `IterationStrategy` is used:
//! selectivity above 0.8 selects `SlicesIterator`, below selects
//! `IndexIterator`, and [`FilterBuilder::optimize`] converts these into their
//! precomputed `Slices` / `Indices` counterparts.

use arrow_array::BooleanArray;
use arrow_select::filter::{FilterBuilder, FilterPredicate};
use criterion::{Criterion, criterion_group, criterion_main};
use rand::rngs::StdRng;
use rand::{Rng, SeedableRng};
use std::hint;

fn create_boolean_array(size: usize, true_density: f64, rng: &mut StdRng) -> BooleanArray {
(0..size)
.map(|_| Some(rng.random_bool(true_density)))
.collect()
}

fn bench_filter_bits(predicate: &FilterPredicate, array: &BooleanArray) {
hint::black_box(predicate.filter(array).unwrap());
}

fn add_benchmark(c: &mut Criterion) {
const SIZE: usize = 65536;
let mut rng = StdRng::seed_from_u64(42);

let data = create_boolean_array(SIZE, 0.5, &mut rng);

// Slice off a non-byte-aligned prefix to exercise the bit offset handling
// in `filter_bits`
let padded = create_boolean_array(SIZE + 3, 0.5, &mut rng);
let sliced = padded.slice(3, SIZE);

// (label, true_density): densities above the 0.8 selectivity threshold use
// the slices strategies, those below use the index strategies
let cases = [
("slices, kept 1023/1024", 1.0 - 1.0 / 1024.0),
("slices, kept 9/10", 0.9),
("indices, kept 1/2", 0.5),
("indices, kept 1/10", 0.1),
("indices, kept 1/1024", 1.0 / 1024.0),
];

for (label, true_density) in cases {
let filter_array = create_boolean_array(SIZE, true_density, &mut rng);

// Lazy strategies: SlicesIterator / IndexIterator
let lazy = FilterBuilder::new(&filter_array).build();
// Precomputed strategies: Slices / Indices
let optimized = FilterBuilder::new(&filter_array).optimize().build();

c.bench_function(&format!("filter_bits ({label})"), |b| {
b.iter(|| bench_filter_bits(&lazy, &data))
});

c.bench_function(&format!("filter_bits optimized ({label})"), |b| {
b.iter(|| bench_filter_bits(&optimized, &data))
});

c.bench_function(&format!("filter_bits sliced ({label})"), |b| {
b.iter(|| bench_filter_bits(&lazy, &sliced))
});
}
}

criterion_group!(benches, add_benchmark);
criterion_main!(benches);
56 changes: 56 additions & 0 deletions arrow-select/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use arrow_array::types::{
ArrowDictionaryKeyType, ArrowPrimitiveType, ByteArrayType, ByteViewType, RunEndIndexType,
};
use arrow_array::*;
use arrow_buffer::bit_chunk_iterator::BitChunks;
use arrow_buffer::{
ArrowNativeType, BooleanBuffer, NullBuffer, OffsetBuffer, RunEndBuffer, ScalarBuffer, bit_util,
};
Expand Down Expand Up @@ -679,6 +680,20 @@ fn filter_bits(buffer: &BooleanBuffer, predicate: &FilterPredicate) -> Buffer {
let offset = buffer.offset();
assert!(buffer.len() >= predicate.filter.len());

// The compress path scans the entire filter mask a word at a time, so it
// is only worthwhile when `pext` is available in hardware and the filter
// is neither very sparse nor very dense: it must keep more than one bit
// per word on average (otherwise visiting each kept bit individually is
// faster) and drop more than one bit per word on average (otherwise
// copying whole ranges via the slices strategies is faster)
let len = predicate.filter.len();
if bit_util::compress_available()
&& predicate.count > len / 64
&& predicate.count < len - len / 64
{
return filter_bits_compress(buffer, predicate);
}

match &predicate.strategy {
IterationStrategy::IndexIterator => {
let bits =
Expand Down Expand Up @@ -716,6 +731,47 @@ fn filter_bits(buffer: &BooleanBuffer, predicate: &FilterPredicate) -> Buffer {
}
}

/// Filter the packed bitmask `buffer` with `predicate` by extracting the kept
/// bits of each 64-bit word with [`bit_util::compress`] (`pext`)
fn filter_bits_compress(buffer: &BooleanBuffer, predicate: &FilterPredicate) -> Buffer {
let mask_chunks = predicate.filter.values().bit_chunks();
let value_chunks = BitChunks::new(buffer.values(), buffer.offset(), predicate.filter.len());

let mut out = MutableBuffer::new(bit_util::ceil(predicate.count, 8));
// Bits extracted from each chunk are packed into `current` until it
// contains a complete word, which is then flushed to `out`
let mut current = 0_u64;
let mut filled = 0_u32;

let chunks = value_chunks
.iter()
.zip(mask_chunks.iter())
.chain([(value_chunks.remainder_bits(), mask_chunks.remainder_bits())]);

for (values, mask) in chunks {
let bits = bit_util::compress(values, mask);
let count = mask.count_ones();
current |= bits << filled;
if filled + count >= 64 {
out.extend_from_slice(&current.to_le_bytes());
filled = filled + count - 64;
// The bits of `bits` that did not fit in `current`, if any
current = if filled == 0 {
0
} else {
bits >> (count - filled)
};
} else {
filled += count;
}
}

if filled > 0 {
out.extend_from_slice(&current.to_le_bytes()[..bit_util::ceil(filled as usize, 8)]);
}
out.into()
}

/// `filter` implementation for boolean buffers
fn filter_boolean(array: &BooleanArray, predicate: &FilterPredicate) -> BooleanArray {
let buffer = filter_bits(array.values(), predicate);
Expand Down