Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 27 additions & 22 deletions datafusion/functions-nested/src/extract.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

use arrow::array::{
Array, ArrayRef, Capacities, GenericListArray, GenericListViewArray, Int64Array,
MutableArrayData, NullArray, NullBufferBuilder, OffsetSizeTrait,
MutableArrayData, NullArray, OffsetSizeTrait,
};
use arrow::buffer::{OffsetBuffer, ScalarBuffer};
use arrow::buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
use arrow::datatypes::DataType;
use arrow::datatypes::{
DataType::{FixedSizeList, LargeList, LargeListView, List, ListView, Null},
Expand Down Expand Up @@ -595,6 +595,23 @@ where
}
}

/// Combine null bitmaps from all slice inputs into a single mask.
fn combine_input_nulls(
array: &dyn Array,
from_array: &Int64Array,
to_array: &Int64Array,
stride: Option<&Int64Array>,
) -> Option<NullBuffer> {
[
array.nulls(),
from_array.nulls(),
to_array.nulls(),
stride.and_then(|s| s.nulls()),
]
.into_iter()
.fold(None, |acc, nulls| NullBuffer::union(acc.as_ref(), nulls))
}

fn general_array_slice<O: OffsetSizeTrait>(
array: &GenericListArray<O>,
from_array: &Int64Array,
Expand All @@ -615,25 +632,19 @@ where
// The rule `adjusted_from_index` and `adjusted_to_index` follows the rule of array_slice in duckdb.

let mut offsets = vec![O::usize_as(0)];
let mut null_builder = NullBufferBuilder::new(array.len());

let nulls = combine_input_nulls(array, from_array, to_array, stride);

for (row_index, offset_window) in array.offsets().windows(2).enumerate() {
let start = offset_window[0];
let end = offset_window[1];
let len = end - start;

// If any input is null, return null.
if array.is_null(row_index)
|| from_array.is_null(row_index)
|| to_array.is_null(row_index)
|| stride.is_some_and(|s| s.is_null(row_index))
{
if nulls.as_ref().is_some_and(|n| n.is_null(row_index)) {
mutable.extend_nulls(1);
offsets.push(offsets[row_index] + O::usize_as(1));
null_builder.append_null();
continue;
}
null_builder.append_non_null();

// Empty arrays always return an empty array.
if len == O::usize_as(0) {
Expand Down Expand Up @@ -676,7 +687,7 @@ where
Arc::new(Field::new_list_field(array.value_type(), true)),
OffsetBuffer::<O>::new(offsets.into()),
arrow::array::make_array(data),
null_builder.finish(),
nulls,
)?))
}

Expand Down Expand Up @@ -707,21 +718,15 @@ where
let mut offsets = Vec::with_capacity(array.len());
let mut sizes = Vec::with_capacity(array.len());
let mut current_offset = O::usize_as(0);
let mut null_builder = NullBufferBuilder::new(array.len());

let nulls = combine_input_nulls(array, from_array, to_array, stride);

for row_index in 0..array.len() {
// Propagate NULL semantics: any NULL input yields a NULL output slot.
if array.is_null(row_index)
|| from_array.is_null(row_index)
|| to_array.is_null(row_index)
|| stride.is_some_and(|s| s.is_null(row_index))
{
null_builder.append_null();
if nulls.as_ref().is_some_and(|n| n.is_null(row_index)) {
offsets.push(current_offset);
sizes.push(O::usize_as(0));
continue;
}
null_builder.append_non_null();

let len = array.value_size(row_index);

Expand Down Expand Up @@ -777,7 +782,7 @@ where
ScalarBuffer::from(offsets),
ScalarBuffer::from(sizes),
arrow::array::make_array(data),
null_builder.finish(),
nulls,
)?))
}

Expand Down
Loading