diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index aceeae49f7..5d0b1da712 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -23,7 +23,7 @@ use arrow_array::{Array, ArrayRef, Int64Array, StringArray, StructArray}; use futures::{StreamExt, TryStreamExt}; use tokio::sync::oneshot::{Receiver, channel}; -use super::delete_filter::DeleteFilter; +use super::delete_filter::{DeleteFilter, PosDelLoadAction}; use crate::arrow::delete_file_loader::BasicDeleteFileLoader; use crate::arrow::{arrow_primitive_to_literal, arrow_schema_to_schema}; use crate::delete_vector::DeleteVector; @@ -42,13 +42,20 @@ use crate::{Error, ErrorKind, Result}; pub(crate) struct CachingDeleteFileLoader { basic_delete_file_loader: BasicDeleteFileLoader, concurrency_limit_data_files: usize, + /// Shared filter state to allow caching loaded deletes across multiple + /// calls to `load_deletes` (e.g., across multiple file scan tasks). + delete_filter: DeleteFilter, } // Intermediate context during processing of a delete file task. enum DeleteFileContext { // TODO: Delete Vector loader from Puffin files ExistingEqDel, - PosDels(ArrowRecordBatchStream), + ExistingPosDel, + PosDels { + file_path: String, + stream: ArrowRecordBatchStream, + }, FreshEqDel { batch_stream: ArrowRecordBatchStream, equality_ids: HashSet, @@ -59,8 +66,12 @@ enum DeleteFileContext { // Final result of the processing of a delete file task before // results are fully merged into the DeleteFileManager's state enum ParsedDeleteFileContext { - DelVecs(HashMap), + DelVecs { + file_path: String, + results: HashMap, + }, EqDel, + ExistingPosDel, } #[allow(unused_variables)] @@ -69,6 +80,7 @@ impl CachingDeleteFileLoader { CachingDeleteFileLoader { basic_delete_file_loader: BasicDeleteFileLoader::new(file_io), concurrency_limit_data_files, + delete_filter: DeleteFilter::default(), } } @@ -142,7 +154,6 @@ impl CachingDeleteFileLoader { schema: SchemaRef, ) -> Receiver> { let (tx, rx) = channel(); - let del_filter = DeleteFilter::default(); let stream_items = delete_file_entries .iter() @@ -150,14 +161,14 @@ impl CachingDeleteFileLoader { ( t.clone(), self.basic_delete_file_loader.clone(), - del_filter.clone(), + self.delete_filter.clone(), schema.clone(), ) }) .collect::>(); let task_stream = futures::stream::iter(stream_items); - let del_filter = del_filter.clone(); + let del_filter = self.delete_filter.clone(); let concurrency_limit_data_files = self.concurrency_limit_data_files; let basic_delete_file_loader = self.basic_delete_file_loader.clone(); crate::runtime::spawn(async move { @@ -165,7 +176,7 @@ impl CachingDeleteFileLoader { let mut del_filter = del_filter; let basic_delete_file_loader = basic_delete_file_loader.clone(); - let results: Vec = task_stream + let mut results_stream = task_stream .map(move |(task, file_io, del_filter, schema)| { let basic_delete_file_loader = basic_delete_file_loader.clone(); async move { @@ -181,15 +192,16 @@ impl CachingDeleteFileLoader { .map(move |ctx| { Ok(async { Self::parse_file_content_for_task(ctx.await?).await }) }) - .try_buffer_unordered(concurrency_limit_data_files) - .try_collect::>() - .await?; + .try_buffer_unordered(concurrency_limit_data_files); - for item in results { - if let ParsedDeleteFileContext::DelVecs(hash_map) = item { - for (data_file_path, delete_vector) in hash_map.into_iter() { + while let Some(item) = results_stream.next().await { + let item = item?; + if let ParsedDeleteFileContext::DelVecs { file_path, results } = item { + for (data_file_path, delete_vector) in results.into_iter() { del_filter.upsert_delete_vector(data_file_path, delete_vector); } + // Mark the positional delete file as fully loaded so waiters can proceed + del_filter.finish_pos_del_load(&file_path); } } @@ -210,11 +222,24 @@ impl CachingDeleteFileLoader { schema: SchemaRef, ) -> Result { match task.file_type { - DataContentType::PositionDeletes => Ok(DeleteFileContext::PosDels( - basic_delete_file_loader - .parquet_to_batch_stream(&task.file_path) - .await?, - )), + DataContentType::PositionDeletes => { + match del_filter.try_start_pos_del_load(&task.file_path) { + PosDelLoadAction::AlreadyLoaded => Ok(DeleteFileContext::ExistingPosDel), + PosDelLoadAction::WaitFor(notify) => { + // Positional deletes are accessed synchronously by ArrowReader. + // We must wait here to ensure the data is ready before returning, + // otherwise ArrowReader might get an empty/partial result. + notify.notified().await; + Ok(DeleteFileContext::ExistingPosDel) + } + PosDelLoadAction::Load => Ok(DeleteFileContext::PosDels { + file_path: task.file_path.clone(), + stream: basic_delete_file_loader + .parquet_to_batch_stream(&task.file_path) + .await?, + }), + } + } DataContentType::EqualityDeletes => { let Some(notify) = del_filter.try_start_eq_del_load(&task.file_path) else { @@ -255,10 +280,13 @@ impl CachingDeleteFileLoader { ) -> Result { match ctx { DeleteFileContext::ExistingEqDel => Ok(ParsedDeleteFileContext::EqDel), - DeleteFileContext::PosDels(batch_stream) => { - let del_vecs = - Self::parse_positional_deletes_record_batch_stream(batch_stream).await?; - Ok(ParsedDeleteFileContext::DelVecs(del_vecs)) + DeleteFileContext::ExistingPosDel => Ok(ParsedDeleteFileContext::ExistingPosDel), + DeleteFileContext::PosDels { file_path, stream } => { + let del_vecs = Self::parse_positional_deletes_record_batch_stream(stream).await?; + Ok(ParsedDeleteFileContext::DelVecs { + file_path, + results: del_vecs, + }) } DeleteFileContext::FreshEqDel { sender, @@ -979,4 +1007,43 @@ mod tests { assert!(result.is_ok()); } + + #[tokio::test] + async fn test_caching_delete_file_loader_caches_results() { + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path(); + let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap()) + .unwrap() + .build() + .unwrap(); + + let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10); + + let file_scan_tasks = setup(table_location); + + // Load deletes for the first time + let delete_filter_1 = delete_file_loader + .load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref()) + .await + .unwrap() + .unwrap(); + + // Load deletes for the second time (same task/files) + let delete_filter_2 = delete_file_loader + .load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref()) + .await + .unwrap() + .unwrap(); + + let dv1 = delete_filter_1 + .get_delete_vector(&file_scan_tasks[0]) + .unwrap(); + let dv2 = delete_filter_2 + .get_delete_vector(&file_scan_tasks[0]) + .unwrap(); + + // Verify that the delete vectors point to the same memory location, + // confirming that the second load reused the result from the first. + assert!(Arc::ptr_eq(&dv1, &dv2)); + } } diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index d05e028997..e6434e72ec 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -34,10 +34,23 @@ enum EqDelState { Loaded(Predicate), } +/// State tracking for positional delete files. +/// Unlike equality deletes, positional deletes must be fully loaded before +/// the ArrowReader proceeds because retrieval is synchronous and non-blocking. +#[derive(Debug)] +enum PosDelState { + /// The file is currently being loaded by a task. + /// The notifier allows other tasks to wait for completion. + Loading(Arc), + /// The file has been fully loaded and merged into the delete vector map. + Loaded, +} + #[derive(Debug, Default)] struct DeleteFileFilterState { delete_vectors: HashMap>>, equality_deletes: HashMap, + positional_deletes: HashMap, } #[derive(Clone, Debug, Default)] @@ -45,6 +58,18 @@ pub(crate) struct DeleteFilter { state: Arc>, } +/// Action to take when trying to start loading a positional delete file +pub(crate) enum PosDelLoadAction { + /// The file is not loaded, the caller should load it. + Load, + /// The file is already loaded, nothing to do. + AlreadyLoaded, + /// The file is currently being loaded by another task. + /// The caller *must* wait for this notifier to ensure data availability + /// before returning, as subsequent access (get_delete_vector) is synchronous. + WaitFor(Arc), +} + impl DeleteFilter { /// Retrieve a delete vector for the data file associated with a given file scan task pub(crate) fn get_delete_vector( @@ -82,6 +107,47 @@ impl DeleteFilter { Some(notifier) } + /// Attempts to mark a positional delete file as "loading". + /// + /// Returns an action dictating whether the caller should load the file, + /// wait for another task to load it, or do nothing. + pub(crate) fn try_start_pos_del_load(&self, file_path: &str) -> PosDelLoadAction { + let mut state = self.state.write().unwrap(); + + if let Some(state) = state.positional_deletes.get(file_path) { + match state { + PosDelState::Loaded => return PosDelLoadAction::AlreadyLoaded, + PosDelState::Loading(notify) => return PosDelLoadAction::WaitFor(notify.clone()), + } + } + + let notifier = Arc::new(Notify::new()); + state + .positional_deletes + .insert(file_path.to_string(), PosDelState::Loading(notifier)); + + PosDelLoadAction::Load + } + + /// Marks a positional delete file as successfully loaded and notifies any waiting tasks. + pub(crate) fn finish_pos_del_load(&self, file_path: &str) { + let notify = { + let mut state = self.state.write().unwrap(); + if let Some(PosDelState::Loading(notify)) = state + .positional_deletes + .insert(file_path.to_string(), PosDelState::Loaded) + { + Some(notify) + } else { + None + } + }; + + if let Some(notify) = notify { + notify.notify_waiters(); + } + } + /// Retrieve the equality delete predicate for a given eq delete file path pub(crate) async fn get_equality_delete_predicate_for_delete_file_path( &self,