@@ -157,12 +157,17 @@ impl ArrayMap {
157157 max_val. wrapping_sub ( min_val)
158158 }
159159
160- /// Creates a new [`ArrayMap`] from the given array of join keys.
160+ /// Creates a new [`ArrayMap`] from per-batch arrays of join keys.
161161 ///
162- /// Note: This function processes only the non-null values in the input `array` ,
162+ /// Note: This function processes only the non-null values in the input arrays ,
163163 /// ignoring any rows where the key is `NULL`.
164164 ///
165- pub ( crate ) fn try_new ( array : & ArrayRef , min_val : u64 , max_val : u64 ) -> Result < Self > {
165+ pub ( crate ) fn try_new (
166+ arrays : & [ & ArrayRef ] ,
167+ total_num_rows : usize ,
168+ min_val : u64 ,
169+ max_val : u64 ,
170+ ) -> Result < Self > {
166171 let range = max_val. wrapping_sub ( min_val) ;
167172 if range >= usize:: MAX as u64 {
168173 return internal_err ! ( "ArrayMap key range is too large to be allocated." ) ;
@@ -173,10 +178,16 @@ impl ArrayMap {
173178 let mut next: Vec < u32 > = vec ! [ ] ;
174179 let mut num_of_distinct_key = 0 ;
175180
181+ let data_type = arrays
182+ . first ( )
183+ . map ( |a| a. data_type ( ) . clone ( ) )
184+ . unwrap_or ( DataType :: Int32 ) ;
185+
176186 downcast_supported_integer ! (
177- array. data_type( ) => (
178- fill_data,
179- array,
187+ & data_type => (
188+ fill_data_batched,
189+ arrays,
190+ total_num_rows,
180191 min_val,
181192 & mut data,
182193 & mut next,
@@ -192,8 +203,9 @@ impl ArrayMap {
192203 } )
193204 }
194205
195- fn fill_data < T : ArrowNumericType > (
196- array : & ArrayRef ,
206+ fn fill_data_batched < T : ArrowNumericType > (
207+ arrays : & [ & ArrayRef ] ,
208+ total_num_rows : usize ,
197209 offset_val : u64 ,
198210 data : & mut [ u32 ] ,
199211 next : & mut Vec < u32 > ,
@@ -202,25 +214,32 @@ impl ArrayMap {
202214 where
203215 T :: Native : AsPrimitive < u64 > ,
204216 {
205- let arr = array. as_primitive :: < T > ( ) ;
206217 // Iterate in reverse to maintain FIFO order when there are duplicate keys.
207- for ( i, val) in arr. iter ( ) . enumerate ( ) . rev ( ) {
208- if let Some ( val) = val {
209- let key: u64 = val. as_ ( ) ;
210- let idx = key. wrapping_sub ( offset_val) as usize ;
211- if idx >= data. len ( ) {
212- return internal_err ! ( "failed build Array idx >= data.len()" ) ;
213- }
214-
215- if data[ idx] != 0 {
216- if next. is_empty ( ) {
217- * next = vec ! [ 0 ; array. len( ) ]
218+ // We iterate batches in reverse, and within each batch iterate rows in reverse,
219+ // using a flat index that spans all batches.
220+ let mut flat_offset = total_num_rows;
221+ for array in arrays. iter ( ) . rev ( ) {
222+ let arr = array. as_primitive :: < T > ( ) ;
223+ flat_offset -= arr. len ( ) ;
224+ for ( row_idx, val) in arr. iter ( ) . enumerate ( ) . rev ( ) {
225+ if let Some ( val) = val {
226+ let key: u64 = val. as_ ( ) ;
227+ let idx = key. wrapping_sub ( offset_val) as usize ;
228+ if idx >= data. len ( ) {
229+ return internal_err ! ( "failed build Array idx >= data.len()" ) ;
218230 }
219- next[ i] = data[ idx]
220- } else {
221- * num_of_distinct_key += 1 ;
231+ let flat_idx = flat_offset + row_idx;
232+
233+ if data[ idx] != 0 {
234+ if next. is_empty ( ) {
235+ * next = vec ! [ 0 ; total_num_rows]
236+ }
237+ next[ flat_idx] = data[ idx]
238+ } else {
239+ * num_of_distinct_key += 1 ;
240+ }
241+ data[ idx] = flat_idx as u32 + 1 ;
222242 }
223- data[ idx] = ( i) as u32 + 1 ;
224243 }
225244 }
226245 Ok ( ( ) )
@@ -419,7 +438,7 @@ mod tests {
419438 #[ test]
420439 fn test_array_map_limit_offset_duplicate_elements ( ) -> Result < ( ) > {
421440 let build: ArrayRef = Arc :: new ( Int32Array :: from ( vec ! [ 1 , 1 , 2 ] ) ) ;
422- let map = ArrayMap :: try_new ( & build, 1 , 2 ) ?;
441+ let map = ArrayMap :: try_new ( & [ & build] , build . len ( ) , 1 , 2 ) ?;
423442 let probe = [ Arc :: new ( Int32Array :: from ( vec ! [ 1 , 2 ] ) ) as ArrayRef ] ;
424443
425444 let mut prob_idx = Vec :: new ( ) ;
@@ -450,7 +469,7 @@ mod tests {
450469 #[ test]
451470 fn test_array_map_with_limit_and_misses ( ) -> Result < ( ) > {
452471 let build: ArrayRef = Arc :: new ( Int32Array :: from ( vec ! [ 1 , 2 ] ) ) ;
453- let map = ArrayMap :: try_new ( & build, 1 , 2 ) ?;
472+ let map = ArrayMap :: try_new ( & [ & build] , build . len ( ) , 1 , 2 ) ?;
454473 let probe = [ Arc :: new ( Int32Array :: from ( vec ! [ 10 , 1 , 2 ] ) ) as ArrayRef ] ;
455474
456475 let ( mut p_idx, mut b_idx) = ( vec ! [ ] , vec ! [ ] ) ;
@@ -483,7 +502,7 @@ mod tests {
483502 #[ test]
484503 fn test_array_map_with_build_duplicates_and_misses ( ) -> Result < ( ) > {
485504 let build_array: ArrayRef = Arc :: new ( Int32Array :: from ( vec ! [ 1 , 1 ] ) ) ;
486- let array_map = ArrayMap :: try_new ( & build_array, 1 , 1 ) ?;
505+ let array_map = ArrayMap :: try_new ( & [ & build_array] , build_array . len ( ) , 1 , 1 ) ?;
487506 // prob: 10(m), 1(h1, h2), 20(m), 1(h1, h2)
488507 let probe_array: ArrayRef = Arc :: new ( Int32Array :: from ( vec ! [ 10 , 1 , 20 , 1 ] ) ) ;
489508 let prob_side_keys = [ probe_array] ;
@@ -513,7 +532,12 @@ mod tests {
513532 let min_val = -5_i128 ;
514533 let max_val = 10_i128 ;
515534
516- let array_map = ArrayMap :: try_new ( & build_array, min_val as u64 , max_val as u64 ) ?;
535+ let array_map = ArrayMap :: try_new (
536+ & [ & build_array] ,
537+ build_array. len ( ) ,
538+ min_val as u64 ,
539+ max_val as u64 ,
540+ ) ?;
517541
518542 // Probe array
519543 let probe_array: ArrayRef = Arc :: new ( Int64Array :: from ( vec ! [ 0 , -5 , 10 , -1 ] ) ) ;
0 commit comments