From fd32b7a4856032aedbcc860daafbe2641bd134ca Mon Sep 17 00:00:00 2001 From: Minh Vu Date: Fri, 12 Jun 2026 21:30:30 +0200 Subject: [PATCH] Fix equality delete load notification --- .../src/arrow/caching_delete_file_loader.rs | 4 +- crates/iceberg/src/arrow/delete_filter.rs | 46 ++++++++++++++++--- 2 files changed, 41 insertions(+), 9 deletions(-) diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index 1925964f53..ffa2752213 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -259,9 +259,9 @@ impl CachingDeleteFileLoader { } DataContentType::EqualityDeletes => { - let Some(notify) = del_filter.try_start_eq_del_load(&task.file_path) else { + if del_filter.try_start_eq_del_load(&task.file_path).is_none() { return Ok(DeleteFileContext::ExistingEqDel); - }; + } let (sender, receiver) = channel(); del_filter.insert_equality_delete(&task.file_path, receiver); diff --git a/crates/iceberg/src/arrow/delete_filter.rs b/crates/iceberg/src/arrow/delete_filter.rs index b627b15863..db50b37f86 100644 --- a/crates/iceberg/src/arrow/delete_filter.rs +++ b/crates/iceberg/src/arrow/delete_filter.rs @@ -244,14 +244,20 @@ impl DeleteFilter { delete_file_path: &str, eq_del: Receiver, ) { - let notify = Arc::new(Notify::new()); - { + let notify = { let mut state = self.state.write().unwrap(); - state.equality_deletes.insert( - delete_file_path.to_string(), - EqDelState::Loading(notify.clone()), - ); - } + match state.equality_deletes.get(delete_file_path) { + Some(EqDelState::Loading(notify)) => notify.clone(), + _ => { + let notify = Arc::new(Notify::new()); + state.equality_deletes.insert( + delete_file_path.to_string(), + EqDelState::Loading(notify.clone()), + ); + notify + } + } + }; let state = self.state.clone(); let delete_file_path = delete_file_path.to_string(); @@ -277,6 +283,8 @@ pub(crate) mod tests { use std::fs::File; use std::path::Path; use std::sync::Arc; + use std::task::Poll; + use std::time::Duration; use arrow_array::{Int64Array, RecordBatch, StringArray}; use arrow_schema::Schema as ArrowSchema; @@ -530,4 +538,28 @@ pub(crate) mod tests { "case_sensitive=true should fail when column case mismatches" ); } + + #[tokio::test] + async fn test_equality_delete_waiter_is_notified_after_load_started() { + let filter = DeleteFilter::new(Runtime::current()); + let delete_file_path = "eq-del.parquet"; + + assert!(filter.try_start_eq_del_load(delete_file_path).is_some()); + + let waiter = filter.get_equality_delete_predicate_for_delete_file_path(delete_file_path); + tokio::pin!(waiter); + assert!(matches!(futures::poll!(&mut waiter), Poll::Pending)); + + let (tx, rx) = tokio::sync::oneshot::channel(); + filter.insert_equality_delete(delete_file_path, rx); + tx.send(Reference::new("id").equal_to(Datum::long(10))) + .unwrap(); + + let predicate = tokio::time::timeout(Duration::from_secs(1), waiter) + .await + .expect("equality delete waiter should be notified") + .expect("equality delete predicate should be loaded"); + + assert_eq!(predicate.to_string(), "id = 10"); + } }