Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 78 additions & 24 deletions datafusion/physical-plan/src/joins/array_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ macro_rules! downcast_supported_integer {
/// ```
/// The resulting `range` (10) correctly represents the size of the interval `[-5, 5]`.
///
/// **2. Index Lookup (in `get_matched_indices`)**
/// **2. Index Lookup (in `get_matched_indices_with_limit_offset`)**
///
/// For a probe value of `0` (which is stored as `0u64`):
/// ```text
Expand Down Expand Up @@ -157,13 +157,23 @@ impl ArrayMap {
max_val.wrapping_sub(min_val)
}

#[inline]
fn key_to_index(key: u64, offset: u64, data_len: usize) -> Option<usize> {
let idx = key.wrapping_sub(offset);
if idx < data_len as u64 {
Some(idx as usize)
} else {
None
}
}

/// Creates a new [`ArrayMap`] from the given array of join keys.
///
/// Note: This function processes only the non-null values in the input `array`,
/// ignoring any rows where the key is `NULL`.
///
pub(crate) fn try_new(array: &ArrayRef, min_val: u64, max_val: u64) -> Result<Self> {
let range = max_val.wrapping_sub(min_val);
let range = Self::calculate_range(min_val, max_val);
if range >= usize::MAX as u64 {
return internal_err!("ArrayMap key range is too large to be allocated.");
}
Expand Down Expand Up @@ -207,10 +217,9 @@ impl ArrayMap {
for (i, val) in arr.iter().enumerate().rev() {
if let Some(val) = val {
let key: u64 = val.as_();
let idx = key.wrapping_sub(offset_val) as usize;
if idx >= data.len() {
let Some(idx) = Self::key_to_index(key, offset_val, data.len()) else {
return internal_err!("failed build Array idx >= data.len()");
}
};

if data[idx] != 0 {
if next.is_empty() {
Expand Down Expand Up @@ -264,6 +273,16 @@ impl ArrayMap {
)
}

/// Looks up `key` (a raw probe value cast to `u64`) in the build side,
/// returning the 1-based build-side slot if the key maps to a non-empty
/// bucket, or `None` otherwise.
#[inline]
fn get_value(&self, key: u64) -> Option<u32> {
let idx = Self::key_to_index(key, self.offset, self.data.len())?;
let value = self.data[idx];
(value != 0).then_some(value)
}

fn lookup_and_get_indices<T: ArrowNumericType>(
&self,
array: &ArrayRef,
Expand Down Expand Up @@ -294,14 +313,10 @@ impl ArrayMap {
}
// SAFETY: prob_idx is guaranteed to be within bounds by the loop range.
let prob_val: u64 = unsafe { arr.value_unchecked(prob_idx) }.as_();
let idx_in_build_side = prob_val.wrapping_sub(self.offset) as usize;

if idx_in_build_side >= self.data.len()
|| self.data[idx_in_build_side] == 0
{
let Some(build_value) = self.get_value(prob_val) else {
continue;
}
build_indices.push((self.data[idx_in_build_side] - 1) as u64);
};
build_indices.push((build_value - 1) as u64);
probe_indices.push(prob_idx as u32);
}
Ok(None)
Expand Down Expand Up @@ -337,22 +352,17 @@ impl ArrayMap {
return Ok(Some((prob_side_idx, None)));
}

if arr.is_null(prob_side_idx) {
if have_null && arr.is_null(prob_side_idx) {
continue;
}

let is_last = prob_side_idx == arr.len() - 1;

// SAFETY: prob_idx is guaranteed to be within bounds by the loop range.
let prob_val: u64 = unsafe { arr.value_unchecked(prob_side_idx) }.as_();
let idx_in_build_side = prob_val.wrapping_sub(self.offset) as usize;
if idx_in_build_side >= self.data.len()
|| self.data[idx_in_build_side] == 0
{
let Some(build_idx) = self.get_value(prob_val) else {
continue;
}

let build_idx = self.data[idx_in_build_side];
};

if let Some(offset) = traverse_chain(
&self.next,
Expand Down Expand Up @@ -381,14 +391,14 @@ impl ArrayMap {

downcast_supported_integer!(
array.data_type() => (
contain_hashes_helper,
contain_keys_helper,
self,
array
)
)
}

fn contain_hashes_helper<T: ArrowNumericType>(
fn contain_keys_helper<T: ArrowNumericType>(
&self,
array: &ArrayRef,
) -> Result<BooleanArray>
Expand All @@ -402,8 +412,7 @@ impl ArrayMap {
}
// SAFETY: i is within bounds [0, arr.len())
let key: u64 = unsafe { arr.value_unchecked(i) }.as_();
let idx = key.wrapping_sub(self.offset) as usize;
idx < self.data.len() && self.data[idx] != 0
self.get_value(key).is_some()
});
Ok(BooleanArray::new(buffer, None))
}
Expand All @@ -414,6 +423,7 @@ mod tests {
use super::*;
use arrow::array::Int32Array;
use arrow::array::Int64Array;
use arrow::array::UInt64Array;
use std::sync::Arc;

#[test]
Expand Down Expand Up @@ -506,6 +516,50 @@ mod tests {
Ok(())
}

#[test]
fn test_array_map_rejects_large_out_of_range_probe_key() -> Result<()> {
let build: ArrayRef =
Arc::new(UInt64Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10]));
let map = ArrayMap::try_new(&build, 0, 10)?;

assert_eq!(ArrayMap::key_to_index(3, 0, 11), Some(3));

// Pick a key for which the computed bucket offset is larger than
// u32::MAX but has low 32 bits equal to 3. It must be bounds-checked
// before casting to usize, otherwise 32-bit targets can truncate it
// into range.
let out_of_range_key = (1_u64 << 32) + 3;
assert_eq!(ArrayMap::key_to_index(out_of_range_key, 0, 11), None);

let probe = [Arc::new(UInt64Array::from(vec![
Some(3),
Some(out_of_range_key),
Some(11),
None,
])) as ArrayRef];

let mut matched_probe_indices = vec![];
let mut matched_build_indices = vec![];
let next = map.get_matched_indices_with_limit_offset(
&probe,
10,
(0, None),
&mut matched_probe_indices,
&mut matched_build_indices,
)?;
assert_eq!(matched_probe_indices, vec![0]);
assert_eq!(matched_build_indices, vec![3]);
assert!(next.is_none());

let contains = map.contain_keys(&probe)?;
assert!(contains.value(0));
assert!(!contains.value(1));
assert!(!contains.value(2));
assert!(!contains.value(3));

Ok(())
}

#[test]
fn test_array_map_i64_with_negative_and_positive_numbers() -> Result<()> {
// Build array with a mix of negative and positive i64 values, no duplicates
Expand Down
Loading