Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
130 changes: 112 additions & 18 deletions arrow-select/src/interleave.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ use arrow_array::builder::{BooleanBufferBuilder, PrimitiveBuilder};
use arrow_array::cast::AsArray;
use arrow_array::types::*;
use arrow_array::*;
use arrow_buffer::bit_mask::set_bits;
use arrow_buffer::bit_util;
use arrow_buffer::{ArrowNativeType, BooleanBuffer, MutableBuffer, NullBuffer, OffsetBuffer};
use arrow_data::ByteView;
use arrow_data::transform::MutableArrayData;
Expand Down Expand Up @@ -373,13 +375,88 @@ fn interleave_struct(
Ok(Arc::new(struct_array))
}

fn interleave_list_primitive_child<O: OffsetSizeTrait, T: ArrowPrimitiveType>(

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used to uses MutableArrayData, but it's about 15% slower than this implementation.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Vec is very fast

interleaved: &Interleave<'_, GenericListArray<O>>,
indices: &[(usize, usize)],
capacity: usize,
data_type: &DataType,
) -> ArrayRef {
let child_arrays: Vec<&PrimitiveArray<T>> = interleaved
.arrays
.iter()
.map(|list| list.values().as_primitive::<T>())
.collect();

let has_child_nulls = child_arrays.iter().any(|a| a.null_count() > 0);

// Build values buffer by copying contiguous slices
let mut values: Vec<T::Native> = Vec::with_capacity(capacity);
for &(array, row) in indices {
let o = interleaved.arrays[array].value_offsets();
let start = o[row].as_usize();
let end = o[row + 1].as_usize();
if end > start {
values.extend_from_slice(&child_arrays[array].values()[start..end]);
}
}

// Build null buffer. Pre-allocate with 0x00 (all null), then:
// - Sources with nulls: set_bits copies the source validity bits into the destination range.
// - Sources without nulls: set the bit range to all 1s directly.
let nulls = if has_child_nulls {
let null_byte_len = bit_util::ceil(capacity, 8);
let mut output_null_buf = MutableBuffer::from_len_zeroed(null_byte_len);

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks quite similar to BooleanBufferBuilder -- maybe we could use it instead?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used BooleanBufferBuilder but it seems a bit slower, I'll retry it tonight

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I found using BooleanBufferBuilder value-by-value is slower since it would set_bits when. append_n and it's 10% slower than direct using this.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

BooleanBufferBuilder calls append_packed_range in benchmark would have higher cost, especially when list is short. Direct set_bits might have higher performance (about 5-20% faster)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

makes sense -- thank you for checking


let mut offset_write = 0;
let mut output_null_count = 0usize;
for &(array, row) in indices {
let o = interleaved.arrays[array].value_offsets();
let start = o[row].as_usize();
let end = o[row + 1].as_usize();
let len = end - start;
if len > 0 {
match child_arrays[array].nulls() {
Some(null_buffer) => {
output_null_count += set_bits(
output_null_buf.as_slice_mut(),
null_buffer.validity(),
offset_write,
null_buffer.offset() + start,
len,
);
}
None => {
// For a non-nullable source, set the bit range to all 1s directly.
let buf = output_null_buf.as_slice_mut();
(offset_write..offset_write + len).for_each(|i| bit_util::set_bit(buf, i));
}
}
}
offset_write += len;
}

if output_null_count > 0 {
let bool_buf = BooleanBuffer::new(output_null_buf.into(), 0, capacity);
// SAFETY: null_count is accumulated from set_bits which correctly counts unset bits
Some(unsafe { NullBuffer::new_unchecked(bool_buf, output_null_count) })
} else {
None
}
} else {
None
};

Arc::new(PrimitiveArray::<T>::new(values.into(), nulls).with_data_type(data_type.clone()))
}

fn interleave_list<O: OffsetSizeTrait>(
values: &[&dyn Array],
indices: &[(usize, usize)],
field: &FieldRef,
) -> Result<ArrayRef, ArrowError> {
let interleaved = Interleave::<'_, GenericListArray<O>>::new(values, indices);

// Step 1: compute output offsets and total child capacity
let mut capacity = 0usize;
let mut offsets = Vec::with_capacity(indices.len() + 1);
offsets.push(O::from_usize(0).unwrap());
Expand All @@ -392,29 +469,46 @@ fn interleave_list<O: OffsetSizeTrait>(
);
}

let mut child_indices = Vec::with_capacity(capacity);
for (array, row) in indices {
let list = interleaved.arrays[*array];
let start = list.value_offsets()[*row].as_usize();
let end = list.value_offsets()[*row + 1].as_usize();
child_indices.extend((start..end).map(|i| (*array, i)));
// Step 2: build child values.
macro_rules! list_primitive_helper {
($t:ty) => {
interleave_list_primitive_child::<O, $t>(
&interleaved,
indices,
capacity,
field.data_type(),
)
};
}

let child_arrays: Vec<&dyn Array> = interleaved
.arrays
.iter()
.map(|list| list.values().as_ref())
.collect();
let child_values = downcast_primitive! {
// For primitive child types, directly copy typed value slices and null bit
// ranges, avoiding both the intermediate child_indices Vec allocation and
// MutableArrayData's function pointer indirection.
field.data_type() => (list_primitive_helper),

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is just for type which could be copied fastly, for List<List<...>>, still we need some optimizations

_ => {
// For complex child types (nested lists, structs, views, dictionaries, etc.),
// use recursive interleave to benefit from type-specific optimizations.
let mut child_indices = Vec::with_capacity(capacity);

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This keeps the previous code

for (array, row) in indices {
let list = interleaved.arrays[*array];
let start = list.value_offsets()[*row].as_usize();
let end = list.value_offsets()[*row + 1].as_usize();
child_indices.extend((start..end).map(|i| (*array, i)));
}

let interleaved_values = interleave(&child_arrays, &child_indices)?;
let child_arrays: Vec<&dyn Array> = interleaved
.arrays
.iter()
.map(|list| list.values().as_ref())
.collect();
interleave(&child_arrays, &child_indices)?
}
};

let offsets = OffsetBuffer::new(offsets.into());
let list_array = GenericListArray::<O>::new(
field.clone(),
offsets,
interleaved_values,
interleaved.nulls,
);
let list_array =
GenericListArray::<O>::new(field.clone(), offsets, child_values, interleaved.nulls);

Ok(Arc::new(list_array))
}
Expand Down
Loading