diff --git a/datafusion/functions-nested/src/array_has.rs b/datafusion/functions-nested/src/array_has.rs index 5eebd7b7268c..d5b2e3e6cf67 100644 --- a/datafusion/functions-nested/src/array_has.rs +++ b/datafusion/functions-nested/src/array_has.rs @@ -21,7 +21,7 @@ use arrow::array::{ Array, ArrayRef, AsArray, BooleanArray, BooleanBufferBuilder, Datum, Scalar, StringArrayType, }; -use arrow::buffer::BooleanBuffer; +use arrow::buffer::{BooleanBuffer, NullBuffer}; use arrow::datatypes::DataType; use arrow::row::{RowConverter, Rows, SortField}; use datafusion_common::cast::{as_fixed_size_list_array, as_generic_list_array}; @@ -314,7 +314,7 @@ impl<'a> ArrayWrapper<'a> { } } - fn nulls(&self) -> Option<&arrow::buffer::NullBuffer> { + fn nulls(&self) -> Option<&NullBuffer> { match self { ArrayWrapper::FixedSizeList(arr) => arr.nulls(), ArrayWrapper::List(arr) => arr.nulls(), @@ -327,20 +327,21 @@ fn array_has_dispatch_for_array<'a>( haystack: ArrayWrapper<'a>, needle: &ArrayRef, ) -> Result { - let mut boolean_builder = BooleanArray::builder(haystack.len()); + let combined_nulls = NullBuffer::union(haystack.nulls(), needle.nulls()); + let mut result = BooleanBufferBuilder::new(haystack.len()); for (i, arr) in haystack.iter().enumerate() { - if arr.is_none() || needle.is_null(i) { - boolean_builder.append_null(); + if combined_nulls.as_ref().is_some_and(|n| n.is_null(i)) { + result.append(false); continue; } let arr = arr.unwrap(); let is_nested = arr.data_type().is_nested(); let needle_row = Scalar::new(needle.slice(i, 1)); let eq_array = compare_with_eq(&arr, &needle_row, is_nested)?; - boolean_builder.append_value(eq_array.true_count() > 0); + result.append(eq_array.true_count() > 0); } - Ok(Arc::new(boolean_builder.finish())) + Ok(Arc::new(BooleanArray::new(result.finish(), combined_nulls))) } fn array_has_dispatch_for_scalar( @@ -435,9 +436,8 @@ fn general_array_has_for_all_and_any<'a>( let h_offsets: Vec = haystack.offsets().collect(); let n_offsets: Vec = needle.offsets().collect(); - let h_nulls = haystack.nulls(); - let n_nulls = needle.nulls(); - let mut builder = BooleanArray::builder(num_rows); + let combined_nulls = NullBuffer::union(haystack.nulls(), needle.nulls()); + let mut result = BooleanBufferBuilder::new(num_rows); for chunk_start in (0..num_rows).step_by(ROW_CONVERSION_CHUNK_SIZE) { let chunk_end = (chunk_start + ROW_CONVERSION_CHUNK_SIZE).min(num_rows); @@ -460,13 +460,11 @@ fn general_array_has_for_all_and_any<'a>( let chunk_n_rows = converter.convert_columns(&[n_vals])?; for i in chunk_start..chunk_end { - if h_nulls.is_some_and(|n| n.is_null(i)) - || n_nulls.is_some_and(|n| n.is_null(i)) - { - builder.append_null(); + if combined_nulls.as_ref().is_some_and(|n| n.is_null(i)) { + result.append(false); continue; } - builder.append_value(general_array_has_all_and_any_kernel( + result.append(general_array_has_all_and_any_kernel( &chunk_h_rows, (h_offsets[i] - h_elem_start)..(h_offsets[i + 1] - h_elem_start), &chunk_n_rows, @@ -476,7 +474,7 @@ fn general_array_has_for_all_and_any<'a>( } } - Ok(Arc::new(builder.finish())) + Ok(Arc::new(BooleanArray::new(result.finish(), combined_nulls))) } // String comparison for array_has_all and array_has_any @@ -490,9 +488,8 @@ fn array_has_all_and_any_string_internal<'a>( let h_offsets: Vec = haystack.offsets().collect(); let n_offsets: Vec = needle.offsets().collect(); - let h_nulls = haystack.nulls(); - let n_nulls = needle.nulls(); - let mut builder = BooleanArray::builder(num_rows); + let combined_nulls = NullBuffer::union(haystack.nulls(), needle.nulls()); + let mut result = BooleanBufferBuilder::new(num_rows); for chunk_start in (0..num_rows).step_by(ROW_CONVERSION_CHUNK_SIZE) { let chunk_end = (chunk_start + ROW_CONVERSION_CHUNK_SIZE).min(num_rows); @@ -513,17 +510,15 @@ fn array_has_all_and_any_string_internal<'a>( let chunk_n_strings = string_array_to_vec(n_vals.as_ref()); for i in chunk_start..chunk_end { - if h_nulls.is_some_and(|n| n.is_null(i)) - || n_nulls.is_some_and(|n| n.is_null(i)) - { - builder.append_null(); + if combined_nulls.as_ref().is_some_and(|n| n.is_null(i)) { + result.append(false); continue; } let h_start = h_offsets[i] - h_elem_start; let h_end = h_offsets[i + 1] - h_elem_start; let n_start = n_offsets[i] - n_elem_start; let n_end = n_offsets[i + 1] - n_elem_start; - builder.append_value(array_has_string_kernel( + result.append(array_has_string_kernel( &chunk_h_strings[h_start..h_end], &chunk_n_strings[n_start..n_end], comparison_type, @@ -531,7 +526,7 @@ fn array_has_all_and_any_string_internal<'a>( } } - Ok(Arc::new(builder.finish())) + Ok(Arc::new(BooleanArray::new(result.finish(), combined_nulls))) } fn array_has_all_and_any_dispatch<'a>( @@ -687,16 +682,16 @@ impl<'a> ScalarStringLookup<'a> { fn array_has_any_string_inner<'a, C: StringArrayType<'a> + Copy>( col_strings: C, col_offsets: &[usize], - col_nulls: Option<&arrow::buffer::NullBuffer>, + col_nulls: Option<&NullBuffer>, has_null_scalar: bool, scalar_lookup: &ScalarStringLookup<'_>, ) -> ArrayRef { let num_rows = col_offsets.len() - 1; - let mut builder = BooleanArray::builder(num_rows); + let mut result = BooleanBufferBuilder::new(num_rows); for i in 0..num_rows { if col_nulls.is_some_and(|v| v.is_null(i)) { - builder.append_null(); + result.append(false); continue; } let start = col_offsets[i]; @@ -708,10 +703,10 @@ fn array_has_any_string_inner<'a, C: StringArrayType<'a> + Copy>( scalar_lookup.contains(col_strings.value(j)) } }); - builder.append_value(found); + result.append(found); } - Arc::new(builder.finish()) + Arc::new(BooleanArray::new(result.finish(), col_nulls.cloned())) } /// General scalar fast path for `array_has_any`, using RowConverter for @@ -734,7 +729,7 @@ fn array_has_any_with_scalar_general( let col_offsets: Vec = col_list.offsets().collect(); let col_nulls = col_list.nulls(); - let mut builder = BooleanArray::builder(col_list.len()); + let mut result = BooleanBufferBuilder::new(col_list.len()); let num_scalar = scalar_rows.num_rows(); if num_scalar > SCALAR_SMALL_THRESHOLD { @@ -745,38 +740,39 @@ fn array_has_any_with_scalar_general( for i in 0..col_list.len() { if col_nulls.is_some_and(|v| v.is_null(i)) { - builder.append_null(); + result.append(false); continue; } let start = col_offsets[i]; let end = col_offsets[i + 1]; let found = (start..end).any(|j| scalar_set.contains(col_rows.row(j).as_ref())); - builder.append_value(found); + result.append(found); } } else { // Small scalar: linear scan avoids HashSet hashing overhead for i in 0..col_list.len() { if col_nulls.is_some_and(|v| v.is_null(i)) { - builder.append_null(); + result.append(false); continue; } let start = col_offsets[i]; let end = col_offsets[i + 1]; let found = (start..end) .any(|j| (0..num_scalar).any(|k| col_rows.row(j) == scalar_rows.row(k))); - builder.append_value(found); + result.append(found); } } - let result: ArrayRef = Arc::new(builder.finish()); + let output: ArrayRef = + Arc::new(BooleanArray::new(result.finish(), col_nulls.cloned())); if is_scalar_output { Ok(ColumnarValue::Scalar(ScalarValue::try_from_array( - &result, 0, + &output, 0, )?)) } else { - Ok(ColumnarValue::Array(result)) + Ok(ColumnarValue::Array(output)) } }