From 9de744aff210dd90f6086923693fedc3e57260cc Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Thu, 11 Jun 2026 10:48:32 -0400 Subject: [PATCH 1/3] fix: Reject out-of-range ArrayMap probe keys on 32-bit targets ArrayMap bucket lookups computed `key.wrapping_sub(offset) as usize` and bounds-checked the result after the cast. On 32-bit targets (e.g. wasm32) the cast truncates the 64-bit ring distance, so an out-of-range probe key whose low 32 bits land inside the bucket array (e.g. probe key 2^32 + 3 against build keys 0..=10) wraps back into range and reports a false match, producing incorrect join results. Consolidate the four lookup sites (fill_data, both loops of lookup_and_get_indices, contain_keys) onto a key_to_index helper that bounds-checks in u64 before narrowing to usize, and add a regression test. No behavior change on 64-bit targets. --- .../physical-plan/src/joins/array_map.rs | 93 ++++++++++++++++--- 1 file changed, 80 insertions(+), 13 deletions(-) diff --git a/datafusion/physical-plan/src/joins/array_map.rs b/datafusion/physical-plan/src/joins/array_map.rs index ad40d6776df4f..ee70ef5bba604 100644 --- a/datafusion/physical-plan/src/joins/array_map.rs +++ b/datafusion/physical-plan/src/joins/array_map.rs @@ -157,6 +157,16 @@ impl ArrayMap { max_val.wrapping_sub(min_val) } + #[inline] + fn key_to_index(key: u64, offset: u64, data_len: usize) -> Option { + let idx = key.wrapping_sub(offset); + if idx < data_len as u64 { + Some(idx as usize) + } else { + None + } + } + /// Creates a new [`ArrayMap`] from the given array of join keys. /// /// Note: This function processes only the non-null values in the input `array`, @@ -207,10 +217,9 @@ impl ArrayMap { for (i, val) in arr.iter().enumerate().rev() { if let Some(val) = val { let key: u64 = val.as_(); - let idx = key.wrapping_sub(offset_val) as usize; - if idx >= data.len() { + let Some(idx) = Self::key_to_index(key, offset_val, data.len()) else { return internal_err!("failed build Array idx >= data.len()"); - } + }; if data[idx] != 0 { if next.is_empty() { @@ -264,6 +273,16 @@ impl ArrayMap { ) } + /// Returns whether `key` (a raw probe value cast to `u64`) is present in the + /// build side, i.e. maps to a non-empty bucket. + #[inline] + fn key_present(&self, key: u64) -> bool { + let Some(idx) = Self::key_to_index(key, self.offset, self.data.len()) else { + return false; + }; + self.data[idx] != 0 + } + fn lookup_and_get_indices( &self, array: &ArrayRef, @@ -294,11 +313,13 @@ impl ArrayMap { } // SAFETY: prob_idx is guaranteed to be within bounds by the loop range. let prob_val: u64 = unsafe { arr.value_unchecked(prob_idx) }.as_(); - let idx_in_build_side = prob_val.wrapping_sub(self.offset) as usize; + let Some(idx_in_build_side) = + Self::key_to_index(prob_val, self.offset, self.data.len()) + else { + continue; + }; - if idx_in_build_side >= self.data.len() - || self.data[idx_in_build_side] == 0 - { + if self.data[idx_in_build_side] == 0 { continue; } build_indices.push((self.data[idx_in_build_side] - 1) as u64); @@ -345,10 +366,12 @@ impl ArrayMap { // SAFETY: prob_idx is guaranteed to be within bounds by the loop range. let prob_val: u64 = unsafe { arr.value_unchecked(prob_side_idx) }.as_(); - let idx_in_build_side = prob_val.wrapping_sub(self.offset) as usize; - if idx_in_build_side >= self.data.len() - || self.data[idx_in_build_side] == 0 - { + let Some(idx_in_build_side) = + Self::key_to_index(prob_val, self.offset, self.data.len()) + else { + continue; + }; + if self.data[idx_in_build_side] == 0 { continue; } @@ -402,8 +425,7 @@ impl ArrayMap { } // SAFETY: i is within bounds [0, arr.len()) let key: u64 = unsafe { arr.value_unchecked(i) }.as_(); - let idx = key.wrapping_sub(self.offset) as usize; - idx < self.data.len() && self.data[idx] != 0 + self.key_present(key) }); Ok(BooleanArray::new(buffer, None)) } @@ -414,6 +436,7 @@ mod tests { use super::*; use arrow::array::Int32Array; use arrow::array::Int64Array; + use arrow::array::UInt64Array; use std::sync::Arc; #[test] @@ -506,6 +529,50 @@ mod tests { Ok(()) } + #[test] + fn test_array_map_rejects_large_out_of_range_probe_key() -> Result<()> { + let build: ArrayRef = + Arc::new(UInt64Array::from(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10])); + let map = ArrayMap::try_new(&build, 0, 10)?; + + assert_eq!(ArrayMap::key_to_index(3, 0, 11), Some(3)); + + // Pick a key for which the computed bucket offset is larger than + // u32::MAX but has low 32 bits equal to 3. It must be bounds-checked + // before casting to usize, otherwise 32-bit targets can truncate it + // into range. + let out_of_range_key = (1_u64 << 32) + 3; + assert_eq!(ArrayMap::key_to_index(out_of_range_key, 0, 11), None); + + let probe = [Arc::new(UInt64Array::from(vec![ + Some(3), + Some(out_of_range_key), + Some(11), + None, + ])) as ArrayRef]; + + let mut matched_probe_indices = vec![]; + let mut matched_build_indices = vec![]; + let next = map.get_matched_indices_with_limit_offset( + &probe, + 10, + (0, None), + &mut matched_probe_indices, + &mut matched_build_indices, + )?; + assert_eq!(matched_probe_indices, vec![0]); + assert_eq!(matched_build_indices, vec![3]); + assert!(next.is_none()); + + let contains = map.contain_keys(&probe)?; + assert!(contains.value(0)); + assert!(!contains.value(1)); + assert!(!contains.value(2)); + assert!(!contains.value(3)); + + Ok(()) + } + #[test] fn test_array_map_i64_with_negative_and_positive_numbers() -> Result<()> { // Build array with a mix of negative and positive i64 values, no duplicates From ac75f3f3e6f8e6629f73b3a7489edc3ddda059a2 Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Sun, 14 Jun 2026 10:00:27 -0400 Subject: [PATCH 2/3] key_present -> get_value, per code review --- .../physical-plan/src/joins/array_map.rs | 35 ++++++------------- 1 file changed, 11 insertions(+), 24 deletions(-) diff --git a/datafusion/physical-plan/src/joins/array_map.rs b/datafusion/physical-plan/src/joins/array_map.rs index ee70ef5bba604..811cf51bcf0d1 100644 --- a/datafusion/physical-plan/src/joins/array_map.rs +++ b/datafusion/physical-plan/src/joins/array_map.rs @@ -273,14 +273,14 @@ impl ArrayMap { ) } - /// Returns whether `key` (a raw probe value cast to `u64`) is present in the - /// build side, i.e. maps to a non-empty bucket. + /// Looks up `key` (a raw probe value cast to `u64`) in the build side, + /// returning the 1-based build-side slot if the key maps to a non-empty + /// bucket, or `None` otherwise. #[inline] - fn key_present(&self, key: u64) -> bool { - let Some(idx) = Self::key_to_index(key, self.offset, self.data.len()) else { - return false; - }; - self.data[idx] != 0 + fn get_value(&self, key: u64) -> Option { + let idx = Self::key_to_index(key, self.offset, self.data.len())?; + let value = self.data[idx]; + (value != 0).then_some(value) } fn lookup_and_get_indices( @@ -313,16 +313,10 @@ impl ArrayMap { } // SAFETY: prob_idx is guaranteed to be within bounds by the loop range. let prob_val: u64 = unsafe { arr.value_unchecked(prob_idx) }.as_(); - let Some(idx_in_build_side) = - Self::key_to_index(prob_val, self.offset, self.data.len()) - else { + let Some(build_value) = self.get_value(prob_val) else { continue; }; - - if self.data[idx_in_build_side] == 0 { - continue; - } - build_indices.push((self.data[idx_in_build_side] - 1) as u64); + build_indices.push((build_value - 1) as u64); probe_indices.push(prob_idx as u32); } Ok(None) @@ -366,16 +360,9 @@ impl ArrayMap { // SAFETY: prob_idx is guaranteed to be within bounds by the loop range. let prob_val: u64 = unsafe { arr.value_unchecked(prob_side_idx) }.as_(); - let Some(idx_in_build_side) = - Self::key_to_index(prob_val, self.offset, self.data.len()) - else { + let Some(build_idx) = self.get_value(prob_val) else { continue; }; - if self.data[idx_in_build_side] == 0 { - continue; - } - - let build_idx = self.data[idx_in_build_side]; if let Some(offset) = traverse_chain( &self.next, @@ -425,7 +412,7 @@ impl ArrayMap { } // SAFETY: i is within bounds [0, arr.len()) let key: u64 = unsafe { arr.value_unchecked(i) }.as_(); - self.key_present(key) + self.get_value(key).is_some() }); Ok(BooleanArray::new(buffer, None)) } From d6b11c19ad929710e0789b7bbd408ef14a79c434 Mon Sep 17 00:00:00 2001 From: Neil Conway Date: Sun, 14 Jun 2026 10:07:20 -0400 Subject: [PATCH 3/3] Minor refactorings --- datafusion/physical-plan/src/joins/array_map.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-plan/src/joins/array_map.rs b/datafusion/physical-plan/src/joins/array_map.rs index 811cf51bcf0d1..4e56cf013c8f7 100644 --- a/datafusion/physical-plan/src/joins/array_map.rs +++ b/datafusion/physical-plan/src/joins/array_map.rs @@ -89,7 +89,7 @@ macro_rules! downcast_supported_integer { /// ``` /// The resulting `range` (10) correctly represents the size of the interval `[-5, 5]`. /// -/// **2. Index Lookup (in `get_matched_indices`)** +/// **2. Index Lookup (in `get_matched_indices_with_limit_offset`)** /// /// For a probe value of `0` (which is stored as `0u64`): /// ```text @@ -173,7 +173,7 @@ impl ArrayMap { /// ignoring any rows where the key is `NULL`. /// pub(crate) fn try_new(array: &ArrayRef, min_val: u64, max_val: u64) -> Result { - let range = max_val.wrapping_sub(min_val); + let range = Self::calculate_range(min_val, max_val); if range >= usize::MAX as u64 { return internal_err!("ArrayMap key range is too large to be allocated."); } @@ -352,7 +352,7 @@ impl ArrayMap { return Ok(Some((prob_side_idx, None))); } - if arr.is_null(prob_side_idx) { + if have_null && arr.is_null(prob_side_idx) { continue; } @@ -391,14 +391,14 @@ impl ArrayMap { downcast_supported_integer!( array.data_type() => ( - contain_hashes_helper, + contain_keys_helper, self, array ) ) } - fn contain_hashes_helper( + fn contain_keys_helper( &self, array: &ArrayRef, ) -> Result