Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 34 additions & 38 deletions datafusion/functions-nested/src/array_has.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use arrow::array::{
Array, ArrayRef, AsArray, BooleanArray, BooleanBufferBuilder, Datum, Scalar,
StringArrayType,
};
use arrow::buffer::BooleanBuffer;
use arrow::buffer::{BooleanBuffer, NullBuffer};
use arrow::datatypes::DataType;
use arrow::row::{RowConverter, Rows, SortField};
use datafusion_common::cast::{as_fixed_size_list_array, as_generic_list_array};
Expand Down Expand Up @@ -314,7 +314,7 @@ impl<'a> ArrayWrapper<'a> {
}
}

fn nulls(&self) -> Option<&arrow::buffer::NullBuffer> {
fn nulls(&self) -> Option<&NullBuffer> {
match self {
ArrayWrapper::FixedSizeList(arr) => arr.nulls(),
ArrayWrapper::List(arr) => arr.nulls(),
Expand All @@ -327,20 +327,21 @@ fn array_has_dispatch_for_array<'a>(
haystack: ArrayWrapper<'a>,
needle: &ArrayRef,
) -> Result<ArrayRef> {
let mut boolean_builder = BooleanArray::builder(haystack.len());
let combined_nulls = NullBuffer::union(haystack.nulls(), needle.nulls());
let mut result = BooleanBufferBuilder::new(haystack.len());
for (i, arr) in haystack.iter().enumerate() {
if arr.is_none() || needle.is_null(i) {
boolean_builder.append_null();
if combined_nulls.as_ref().is_some_and(|n| n.is_null(i)) {
result.append(false);
continue;
}
let arr = arr.unwrap();
let is_nested = arr.data_type().is_nested();
let needle_row = Scalar::new(needle.slice(i, 1));
let eq_array = compare_with_eq(&arr, &needle_row, is_nested)?;
boolean_builder.append_value(eq_array.true_count() > 0);
result.append(eq_array.true_count() > 0);
}

Ok(Arc::new(boolean_builder.finish()))
Ok(Arc::new(BooleanArray::new(result.finish(), combined_nulls)))
}

fn array_has_dispatch_for_scalar(
Expand Down Expand Up @@ -435,9 +436,8 @@ fn general_array_has_for_all_and_any<'a>(
let h_offsets: Vec<usize> = haystack.offsets().collect();
let n_offsets: Vec<usize> = needle.offsets().collect();

let h_nulls = haystack.nulls();
let n_nulls = needle.nulls();
let mut builder = BooleanArray::builder(num_rows);
let combined_nulls = NullBuffer::union(haystack.nulls(), needle.nulls());
let mut result = BooleanBufferBuilder::new(num_rows);

for chunk_start in (0..num_rows).step_by(ROW_CONVERSION_CHUNK_SIZE) {
let chunk_end = (chunk_start + ROW_CONVERSION_CHUNK_SIZE).min(num_rows);
Expand All @@ -460,13 +460,11 @@ fn general_array_has_for_all_and_any<'a>(
let chunk_n_rows = converter.convert_columns(&[n_vals])?;

for i in chunk_start..chunk_end {
if h_nulls.is_some_and(|n| n.is_null(i))
|| n_nulls.is_some_and(|n| n.is_null(i))
{
builder.append_null();
if combined_nulls.as_ref().is_some_and(|n| n.is_null(i)) {
result.append(false);
continue;
}
builder.append_value(general_array_has_all_and_any_kernel(
result.append(general_array_has_all_and_any_kernel(
&chunk_h_rows,
(h_offsets[i] - h_elem_start)..(h_offsets[i + 1] - h_elem_start),
&chunk_n_rows,
Expand All @@ -476,7 +474,7 @@ fn general_array_has_for_all_and_any<'a>(
}
}

Ok(Arc::new(builder.finish()))
Ok(Arc::new(BooleanArray::new(result.finish(), combined_nulls)))
}

// String comparison for array_has_all and array_has_any
Expand All @@ -490,9 +488,8 @@ fn array_has_all_and_any_string_internal<'a>(
let h_offsets: Vec<usize> = haystack.offsets().collect();
let n_offsets: Vec<usize> = needle.offsets().collect();

let h_nulls = haystack.nulls();
let n_nulls = needle.nulls();
let mut builder = BooleanArray::builder(num_rows);
let combined_nulls = NullBuffer::union(haystack.nulls(), needle.nulls());
let mut result = BooleanBufferBuilder::new(num_rows);

for chunk_start in (0..num_rows).step_by(ROW_CONVERSION_CHUNK_SIZE) {
let chunk_end = (chunk_start + ROW_CONVERSION_CHUNK_SIZE).min(num_rows);
Expand All @@ -513,25 +510,23 @@ fn array_has_all_and_any_string_internal<'a>(
let chunk_n_strings = string_array_to_vec(n_vals.as_ref());

for i in chunk_start..chunk_end {
if h_nulls.is_some_and(|n| n.is_null(i))
|| n_nulls.is_some_and(|n| n.is_null(i))
{
builder.append_null();
if combined_nulls.as_ref().is_some_and(|n| n.is_null(i)) {
result.append(false);
continue;
}
let h_start = h_offsets[i] - h_elem_start;
let h_end = h_offsets[i + 1] - h_elem_start;
let n_start = n_offsets[i] - n_elem_start;
let n_end = n_offsets[i + 1] - n_elem_start;
builder.append_value(array_has_string_kernel(
result.append(array_has_string_kernel(
&chunk_h_strings[h_start..h_end],
&chunk_n_strings[n_start..n_end],
comparison_type,
));
}
}

Ok(Arc::new(builder.finish()))
Ok(Arc::new(BooleanArray::new(result.finish(), combined_nulls)))
}

fn array_has_all_and_any_dispatch<'a>(
Expand Down Expand Up @@ -687,16 +682,16 @@ impl<'a> ScalarStringLookup<'a> {
fn array_has_any_string_inner<'a, C: StringArrayType<'a> + Copy>(
col_strings: C,
col_offsets: &[usize],
col_nulls: Option<&arrow::buffer::NullBuffer>,
col_nulls: Option<&NullBuffer>,
has_null_scalar: bool,
scalar_lookup: &ScalarStringLookup<'_>,
) -> ArrayRef {
let num_rows = col_offsets.len() - 1;
let mut builder = BooleanArray::builder(num_rows);
let mut result = BooleanBufferBuilder::new(num_rows);

for i in 0..num_rows {
if col_nulls.is_some_and(|v| v.is_null(i)) {
builder.append_null();
result.append(false);
continue;
}
let start = col_offsets[i];
Expand All @@ -708,10 +703,10 @@ fn array_has_any_string_inner<'a, C: StringArrayType<'a> + Copy>(
scalar_lookup.contains(col_strings.value(j))
}
});
builder.append_value(found);
result.append(found);
}

Arc::new(builder.finish())
Arc::new(BooleanArray::new(result.finish(), col_nulls.cloned()))
}

/// General scalar fast path for `array_has_any`, using RowConverter for
Expand All @@ -734,7 +729,7 @@ fn array_has_any_with_scalar_general(
let col_offsets: Vec<usize> = col_list.offsets().collect();
let col_nulls = col_list.nulls();

let mut builder = BooleanArray::builder(col_list.len());
let mut result = BooleanBufferBuilder::new(col_list.len());
let num_scalar = scalar_rows.num_rows();

if num_scalar > SCALAR_SMALL_THRESHOLD {
Expand All @@ -745,38 +740,39 @@ fn array_has_any_with_scalar_general(

for i in 0..col_list.len() {
if col_nulls.is_some_and(|v| v.is_null(i)) {
builder.append_null();
result.append(false);
continue;
}
let start = col_offsets[i];
let end = col_offsets[i + 1];
let found =
(start..end).any(|j| scalar_set.contains(col_rows.row(j).as_ref()));
builder.append_value(found);
result.append(found);
}
} else {
// Small scalar: linear scan avoids HashSet hashing overhead
for i in 0..col_list.len() {
if col_nulls.is_some_and(|v| v.is_null(i)) {
builder.append_null();
result.append(false);
continue;
}
let start = col_offsets[i];
let end = col_offsets[i + 1];
let found = (start..end)
.any(|j| (0..num_scalar).any(|k| col_rows.row(j) == scalar_rows.row(k)));
builder.append_value(found);
result.append(found);
}
}

let result: ArrayRef = Arc::new(builder.finish());
let output: ArrayRef =
Arc::new(BooleanArray::new(result.finish(), col_nulls.cloned()));

if is_scalar_output {
Ok(ColumnarValue::Scalar(ScalarValue::try_from_array(
&result, 0,
&output, 0,
)?))
} else {
Ok(ColumnarValue::Array(result))
Ok(ColumnarValue::Array(output))
}
}

Expand Down
Loading