@@ -23,7 +23,7 @@ use arrow_array::{Array, ArrayRef, Int64Array, StringArray, StructArray};
2323use futures:: { StreamExt , TryStreamExt } ;
2424use tokio:: sync:: oneshot:: { Receiver , channel} ;
2525
26- use super :: delete_filter:: DeleteFilter ;
26+ use super :: delete_filter:: { DeleteFilter , PosDelLoadAction } ;
2727use crate :: arrow:: delete_file_loader:: BasicDeleteFileLoader ;
2828use crate :: arrow:: { arrow_primitive_to_literal, arrow_schema_to_schema} ;
2929use crate :: delete_vector:: DeleteVector ;
@@ -42,13 +42,20 @@ use crate::{Error, ErrorKind, Result};
4242pub ( crate ) struct CachingDeleteFileLoader {
4343 basic_delete_file_loader : BasicDeleteFileLoader ,
4444 concurrency_limit_data_files : usize ,
45+ /// Shared filter state to allow caching loaded deletes across multiple
46+ /// calls to `load_deletes` (e.g., across multiple file scan tasks).
47+ delete_filter : DeleteFilter ,
4548}
4649
4750// Intermediate context during processing of a delete file task.
4851enum DeleteFileContext {
4952 // TODO: Delete Vector loader from Puffin files
5053 ExistingEqDel ,
51- PosDels ( ArrowRecordBatchStream ) ,
54+ ExistingPosDel ,
55+ PosDels {
56+ file_path : String ,
57+ stream : ArrowRecordBatchStream ,
58+ } ,
5259 FreshEqDel {
5360 batch_stream : ArrowRecordBatchStream ,
5461 equality_ids : HashSet < i32 > ,
@@ -59,8 +66,12 @@ enum DeleteFileContext {
5966// Final result of the processing of a delete file task before
6067// results are fully merged into the DeleteFileManager's state
6168enum ParsedDeleteFileContext {
62- DelVecs ( HashMap < String , DeleteVector > ) ,
69+ DelVecs {
70+ file_path : String ,
71+ results : HashMap < String , DeleteVector > ,
72+ } ,
6373 EqDel ,
74+ ExistingPosDel ,
6475}
6576
6677#[ allow( unused_variables) ]
@@ -69,6 +80,7 @@ impl CachingDeleteFileLoader {
6980 CachingDeleteFileLoader {
7081 basic_delete_file_loader : BasicDeleteFileLoader :: new ( file_io) ,
7182 concurrency_limit_data_files,
83+ delete_filter : DeleteFilter :: default ( ) ,
7284 }
7385 }
7486
@@ -142,30 +154,29 @@ impl CachingDeleteFileLoader {
142154 schema : SchemaRef ,
143155 ) -> Receiver < Result < DeleteFilter > > {
144156 let ( tx, rx) = channel ( ) ;
145- let del_filter = DeleteFilter :: default ( ) ;
146157
147158 let stream_items = delete_file_entries
148159 . iter ( )
149160 . map ( |t| {
150161 (
151162 t. clone ( ) ,
152163 self . basic_delete_file_loader . clone ( ) ,
153- del_filter . clone ( ) ,
164+ self . delete_filter . clone ( ) ,
154165 schema. clone ( ) ,
155166 )
156167 } )
157168 . collect :: < Vec < _ > > ( ) ;
158169 let task_stream = futures:: stream:: iter ( stream_items) ;
159170
160- let del_filter = del_filter . clone ( ) ;
171+ let del_filter = self . delete_filter . clone ( ) ;
161172 let concurrency_limit_data_files = self . concurrency_limit_data_files ;
162173 let basic_delete_file_loader = self . basic_delete_file_loader . clone ( ) ;
163174 crate :: runtime:: spawn ( async move {
164175 let result = async move {
165176 let mut del_filter = del_filter;
166177 let basic_delete_file_loader = basic_delete_file_loader. clone ( ) ;
167178
168- let results : Vec < ParsedDeleteFileContext > = task_stream
179+ let mut results_stream = task_stream
169180 . map ( move |( task, file_io, del_filter, schema) | {
170181 let basic_delete_file_loader = basic_delete_file_loader. clone ( ) ;
171182 async move {
@@ -181,15 +192,16 @@ impl CachingDeleteFileLoader {
181192 . map ( move |ctx| {
182193 Ok ( async { Self :: parse_file_content_for_task ( ctx. await ?) . await } )
183194 } )
184- . try_buffer_unordered ( concurrency_limit_data_files)
185- . try_collect :: < Vec < _ > > ( )
186- . await ?;
195+ . try_buffer_unordered ( concurrency_limit_data_files) ;
187196
188- for item in results {
189- if let ParsedDeleteFileContext :: DelVecs ( hash_map) = item {
190- for ( data_file_path, delete_vector) in hash_map. into_iter ( ) {
197+ while let Some ( item) = results_stream. next ( ) . await {
198+ let item = item?;
199+ if let ParsedDeleteFileContext :: DelVecs { file_path, results } = item {
200+ for ( data_file_path, delete_vector) in results. into_iter ( ) {
191201 del_filter. upsert_delete_vector ( data_file_path, delete_vector) ;
192202 }
203+ // Mark the positional delete file as fully loaded so waiters can proceed
204+ del_filter. finish_pos_del_load ( & file_path) ;
193205 }
194206 }
195207
@@ -210,11 +222,24 @@ impl CachingDeleteFileLoader {
210222 schema : SchemaRef ,
211223 ) -> Result < DeleteFileContext > {
212224 match task. file_type {
213- DataContentType :: PositionDeletes => Ok ( DeleteFileContext :: PosDels (
214- basic_delete_file_loader
215- . parquet_to_batch_stream ( & task. file_path )
216- . await ?,
217- ) ) ,
225+ DataContentType :: PositionDeletes => {
226+ match del_filter. try_start_pos_del_load ( & task. file_path ) {
227+ PosDelLoadAction :: AlreadyLoaded => Ok ( DeleteFileContext :: ExistingPosDel ) ,
228+ PosDelLoadAction :: WaitFor ( notify) => {
229+ // Positional deletes are accessed synchronously by ArrowReader.
230+ // We must wait here to ensure the data is ready before returning,
231+ // otherwise ArrowReader might get an empty/partial result.
232+ notify. notified ( ) . await ;
233+ Ok ( DeleteFileContext :: ExistingPosDel )
234+ }
235+ PosDelLoadAction :: Load => Ok ( DeleteFileContext :: PosDels {
236+ file_path : task. file_path . clone ( ) ,
237+ stream : basic_delete_file_loader
238+ . parquet_to_batch_stream ( & task. file_path )
239+ . await ?,
240+ } ) ,
241+ }
242+ }
218243
219244 DataContentType :: EqualityDeletes => {
220245 let Some ( notify) = del_filter. try_start_eq_del_load ( & task. file_path ) else {
@@ -255,10 +280,13 @@ impl CachingDeleteFileLoader {
255280 ) -> Result < ParsedDeleteFileContext > {
256281 match ctx {
257282 DeleteFileContext :: ExistingEqDel => Ok ( ParsedDeleteFileContext :: EqDel ) ,
258- DeleteFileContext :: PosDels ( batch_stream) => {
259- let del_vecs =
260- Self :: parse_positional_deletes_record_batch_stream ( batch_stream) . await ?;
261- Ok ( ParsedDeleteFileContext :: DelVecs ( del_vecs) )
283+ DeleteFileContext :: ExistingPosDel => Ok ( ParsedDeleteFileContext :: ExistingPosDel ) ,
284+ DeleteFileContext :: PosDels { file_path, stream } => {
285+ let del_vecs = Self :: parse_positional_deletes_record_batch_stream ( stream) . await ?;
286+ Ok ( ParsedDeleteFileContext :: DelVecs {
287+ file_path,
288+ results : del_vecs,
289+ } )
262290 }
263291 DeleteFileContext :: FreshEqDel {
264292 sender,
@@ -979,4 +1007,43 @@ mod tests {
9791007
9801008 assert ! ( result. is_ok( ) ) ;
9811009 }
1010+
1011+ #[ tokio:: test]
1012+ async fn test_caching_delete_file_loader_caches_results ( ) {
1013+ let tmp_dir = TempDir :: new ( ) . unwrap ( ) ;
1014+ let table_location = tmp_dir. path ( ) ;
1015+ let file_io = FileIO :: from_path ( table_location. as_os_str ( ) . to_str ( ) . unwrap ( ) )
1016+ . unwrap ( )
1017+ . build ( )
1018+ . unwrap ( ) ;
1019+
1020+ let delete_file_loader = CachingDeleteFileLoader :: new ( file_io. clone ( ) , 10 ) ;
1021+
1022+ let file_scan_tasks = setup ( table_location) ;
1023+
1024+ // Load deletes for the first time
1025+ let delete_filter_1 = delete_file_loader
1026+ . load_deletes ( & file_scan_tasks[ 0 ] . deletes , file_scan_tasks[ 0 ] . schema_ref ( ) )
1027+ . await
1028+ . unwrap ( )
1029+ . unwrap ( ) ;
1030+
1031+ // Load deletes for the second time (same task/files)
1032+ let delete_filter_2 = delete_file_loader
1033+ . load_deletes ( & file_scan_tasks[ 0 ] . deletes , file_scan_tasks[ 0 ] . schema_ref ( ) )
1034+ . await
1035+ . unwrap ( )
1036+ . unwrap ( ) ;
1037+
1038+ let dv1 = delete_filter_1
1039+ . get_delete_vector ( & file_scan_tasks[ 0 ] )
1040+ . unwrap ( ) ;
1041+ let dv2 = delete_filter_2
1042+ . get_delete_vector ( & file_scan_tasks[ 0 ] )
1043+ . unwrap ( ) ;
1044+
1045+ // Verify that the delete vectors point to the same memory location,
1046+ // confirming that the second load reused the result from the first.
1047+ assert ! ( Arc :: ptr_eq( & dv1, & dv2) ) ;
1048+ }
9821049}
0 commit comments