diff --git a/datafusion/physical-plan/src/joins/array_map.rs b/datafusion/physical-plan/src/joins/array_map.rs index ad40d6776df4f..4e56cf013c8f7 100644 --- a/datafusion/physical-plan/src/joins/array_map.rs +++ b/datafusion/physical-plan/src/joins/array_map.rs @@ -89,7 +89,7 @@ macro_rules! downcast_supported_integer { /// ``` /// The resulting `range` (10) correctly represents the size of the interval `[-5, 5]`. /// -/// **2. Index Lookup (in `get_matched_indices`)** +/// **2. Index Lookup (in `get_matched_indices_with_limit_offset`)** /// /// For a probe value of `0` (which is stored as `0u64`): /// ```text @@ -157,13 +157,23 @@ impl ArrayMap { max_val.wrapping_sub(min_val) } + #[inline] + fn key_to_index(key: u64, offset: u64, data_len: usize) -> Option { + let idx = key.wrapping_sub(offset); + if idx < data_len as u64 { + Some(idx as usize) + } else { + None + } + } + /// Creates a new [`ArrayMap`] from the given array of join keys. /// /// Note: This function processes only the non-null values in the input `array`, /// ignoring any rows where the key is `NULL`. /// pub(crate) fn try_new(array: &ArrayRef, min_val: u64, max_val: u64) -> Result { - let range = max_val.wrapping_sub(min_val); + let range = Self::calculate_range(min_val, max_val); if range >= usize::MAX as u64 { return internal_err!("ArrayMap key range is too large to be allocated."); } @@ -207,10 +217,9 @@ impl ArrayMap { for (i, val) in arr.iter().enumerate().rev() { if let Some(val) = val { let key: u64 = val.as_(); - let idx = key.wrapping_sub(offset_val) as usize; - if idx >= data.len() { + let Some(idx) = Self::key_to_index(key, offset_val, data.len()) else { return internal_err!("failed build Array idx >= data.len()"); - } + }; if data[idx] != 0 { if next.is_empty() { @@ -264,6 +273,16 @@ impl ArrayMap { ) } + /// Looks up `key` (a raw probe value cast to `u64`) in the build side, + /// returning the 1-based build-side slot if the key maps to a non-empty + /// bucket, or `None` otherwise. + #[inline] + fn get_value(&self, key: u64) -> Option { + let idx = Self::key_to_index(key, self.offset, self.data.len())?; + let value = self.data[idx]; + (value != 0).then_some(value) + } + fn lookup_and_get_indices( &self, array: &ArrayRef, @@ -294,14 +313,10 @@ impl ArrayMap { } // SAFETY: prob_idx is guaranteed to be within bounds by the loop range. let prob_val: u64 = unsafe { arr.value_unchecked(prob_idx) }.as_(); - let idx_in_build_side = prob_val.wrapping_sub(self.offset) as usize; - - if idx_in_build_side >= self.data.len() - || self.data[idx_in_build_side] == 0 - { + let Some(build_value) = self.get_value(prob_val) else { continue; - } - build_indices.push((self.data[idx_in_build_side] - 1) as u64); + }; + build_indices.push((build_value - 1) as u64); probe_indices.push(prob_idx as u32); } Ok(None) @@ -337,7 +352,7 @@ impl ArrayMap { return Ok(Some((prob_side_idx, None))); } - if arr.is_null(prob_side_idx) { + if have_null && arr.is_null(prob_side_idx) { continue; } @@ -345,14 +360,9 @@ impl ArrayMap { // SAFETY: prob_idx is guaranteed to be within bounds by the loop range. let prob_val: u64 = unsafe { arr.value_unchecked(prob_side_idx) }.as_(); - let idx_in_build_side = prob_val.wrapping_sub(self.offset) as usize; - if idx_in_build_side >= self.data.len() - || self.data[idx_in_build_side] == 0 - { + let Some(build_idx) = self.get_value(prob_val) else { continue; - } - - let build_idx = self.data[idx_in_build_side]; + }; if let Some(offset) = traverse_chain( &self.next, @@ -381,14 +391,14 @@ impl ArrayMap { downcast_supported_integer!( array.data_type() => ( - contain_hashes_helper, + contain_keys_helper, self, array ) ) } - fn contain_hashes_helper( + fn contain_keys_helper( &self, array: &ArrayRef, ) -> Result @@ -402,8 +412,7 @@ impl ArrayMap { } // SAFETY: i is within bounds [0, arr.len()) let key: u64 = unsafe { arr.value_unchecked(i) }.as_(); - let idx = key.wrapping_sub(self.offset) as usize; - idx < self.data.len() && self.data[idx] != 0 + self.get_value(key).is_some() }); Ok(BooleanArray::new(buffer, None)) } @@ -414,6 +423,7 @@ mod tests { use super::*; use arrow::array::Int32Array; use arrow::array::Int64Array; + use arrow::array::UInt64Array; use std::sync::Arc; #[test] @@ -506,6 +516,50 @@ mod tests { Ok(()) } + #[test] + fn test_array_map_rejects_large_out_of_range_probe_key() -> Result<()> { + let build: ArrayRef = + Arc::new(UInt64Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])); + let map = ArrayMap::try_new(&build, 0, 10)?; + + assert_eq!(ArrayMap::key_to_index(3, 0, 11), Some(3)); + + // Pick a key for which the computed bucket offset is larger than + // u32::MAX but has low 32 bits equal to 3. It must be bounds-checked + // before casting to usize, otherwise 32-bit targets can truncate it + // into range. + let out_of_range_key = (1_u64 << 32) + 3; + assert_eq!(ArrayMap::key_to_index(out_of_range_key, 0, 11), None); + + let probe = [Arc::new(UInt64Array::from(vec![ + Some(3), + Some(out_of_range_key), + Some(11), + None, + ])) as ArrayRef]; + + let mut matched_probe_indices = vec![]; + let mut matched_build_indices = vec![]; + let next = map.get_matched_indices_with_limit_offset( + &probe, + 10, + (0, None), + &mut matched_probe_indices, + &mut matched_build_indices, + )?; + assert_eq!(matched_probe_indices, vec![0]); + assert_eq!(matched_build_indices, vec![3]); + assert!(next.is_none()); + + let contains = map.contain_keys(&probe)?; + assert!(contains.value(0)); + assert!(!contains.value(1)); + assert!(!contains.value(2)); + assert!(!contains.value(3)); + + Ok(()) + } + #[test] fn test_array_map_i64_with_negative_and_positive_numbers() -> Result<()> { // Build array with a mix of negative and positive i64 values, no duplicates