Skip to content

Commit a329a3b

Browse files
authored
feat: Implement shared delete file loading and caching for ArrowReader (#1941)
## Which issue does this PR close? - Closes #. ## What changes are included in this PR? Currently, ArrowReader instantiates a new CachingDeleteFileLoader (and consequently a new DeleteFilter) for each FileScanTask when calling load_deletes. This results in the DeleteFilter state being isolated per task. If multiple tasks reference the same delete file (common in positional deletes), that delete file is re-read and re-parsed for every task, leading to significant performance overhead and redundant I/O. Changes * Shared State: Moved the DeleteFilter instance into the CachingDeleteFileLoader struct. Since ArrowReader holds a single CachingDeleteFileLoader instance across its lifetime, the DeleteFilter state is now effectively shared across all file scan tasks processed by that reader. * Positional Delete Caching: Implemented a state machine for loading positional delete files (PosDelState) in DeleteFilter. * Added try_start_pos_del_load: Coordinates concurrent access to the same positional delete file. * Added finish_pos_del_load: Signals completion of loading. * Synchronization: Introduced a WaitFor state. Unlike equality deletes (which are accessed asynchronously), positional deletes are accessed synchronously by ArrowReader. Therefore, if a task encounters a file that is currently being loaded by another task, it must asynchronously wait (notify.notified().await) during the loading phase to ensure the data is fully populated before ArrowReader proceeds. * Refactoring: Updated load_file_for_task and related types in CachingDeleteFileLoader to support the new caching logic and carry file paths through the loading context. ## Are these changes tested? Added test_caching_delete_file_loader_caches_results to verify that repeated loads of the same delete file return shared memory objects
1 parent b7ba2e8 commit a329a3b

File tree

2 files changed

+155
-22
lines changed

2 files changed

+155
-22
lines changed

crates/iceberg/src/arrow/caching_delete_file_loader.rs

Lines changed: 89 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@ use arrow_array::{Array, ArrayRef, Int64Array, StringArray, StructArray};
2323
use futures::{StreamExt, TryStreamExt};
2424
use tokio::sync::oneshot::{Receiver, channel};
2525

26-
use super::delete_filter::DeleteFilter;
26+
use super::delete_filter::{DeleteFilter, PosDelLoadAction};
2727
use crate::arrow::delete_file_loader::BasicDeleteFileLoader;
2828
use crate::arrow::{arrow_primitive_to_literal, arrow_schema_to_schema};
2929
use crate::delete_vector::DeleteVector;
@@ -42,13 +42,20 @@ use crate::{Error, ErrorKind, Result};
4242
pub(crate) struct CachingDeleteFileLoader {
4343
basic_delete_file_loader: BasicDeleteFileLoader,
4444
concurrency_limit_data_files: usize,
45+
/// Shared filter state to allow caching loaded deletes across multiple
46+
/// calls to `load_deletes` (e.g., across multiple file scan tasks).
47+
delete_filter: DeleteFilter,
4548
}
4649

4750
// Intermediate context during processing of a delete file task.
4851
enum DeleteFileContext {
4952
// TODO: Delete Vector loader from Puffin files
5053
ExistingEqDel,
51-
PosDels(ArrowRecordBatchStream),
54+
ExistingPosDel,
55+
PosDels {
56+
file_path: String,
57+
stream: ArrowRecordBatchStream,
58+
},
5259
FreshEqDel {
5360
batch_stream: ArrowRecordBatchStream,
5461
equality_ids: HashSet<i32>,
@@ -59,8 +66,12 @@ enum DeleteFileContext {
5966
// Final result of the processing of a delete file task before
6067
// results are fully merged into the DeleteFileManager's state
6168
enum ParsedDeleteFileContext {
62-
DelVecs(HashMap<String, DeleteVector>),
69+
DelVecs {
70+
file_path: String,
71+
results: HashMap<String, DeleteVector>,
72+
},
6373
EqDel,
74+
ExistingPosDel,
6475
}
6576

6677
#[allow(unused_variables)]
@@ -69,6 +80,7 @@ impl CachingDeleteFileLoader {
6980
CachingDeleteFileLoader {
7081
basic_delete_file_loader: BasicDeleteFileLoader::new(file_io),
7182
concurrency_limit_data_files,
83+
delete_filter: DeleteFilter::default(),
7284
}
7385
}
7486

@@ -142,30 +154,29 @@ impl CachingDeleteFileLoader {
142154
schema: SchemaRef,
143155
) -> Receiver<Result<DeleteFilter>> {
144156
let (tx, rx) = channel();
145-
let del_filter = DeleteFilter::default();
146157

147158
let stream_items = delete_file_entries
148159
.iter()
149160
.map(|t| {
150161
(
151162
t.clone(),
152163
self.basic_delete_file_loader.clone(),
153-
del_filter.clone(),
164+
self.delete_filter.clone(),
154165
schema.clone(),
155166
)
156167
})
157168
.collect::<Vec<_>>();
158169
let task_stream = futures::stream::iter(stream_items);
159170

160-
let del_filter = del_filter.clone();
171+
let del_filter = self.delete_filter.clone();
161172
let concurrency_limit_data_files = self.concurrency_limit_data_files;
162173
let basic_delete_file_loader = self.basic_delete_file_loader.clone();
163174
crate::runtime::spawn(async move {
164175
let result = async move {
165176
let mut del_filter = del_filter;
166177
let basic_delete_file_loader = basic_delete_file_loader.clone();
167178

168-
let results: Vec<ParsedDeleteFileContext> = task_stream
179+
let mut results_stream = task_stream
169180
.map(move |(task, file_io, del_filter, schema)| {
170181
let basic_delete_file_loader = basic_delete_file_loader.clone();
171182
async move {
@@ -181,15 +192,16 @@ impl CachingDeleteFileLoader {
181192
.map(move |ctx| {
182193
Ok(async { Self::parse_file_content_for_task(ctx.await?).await })
183194
})
184-
.try_buffer_unordered(concurrency_limit_data_files)
185-
.try_collect::<Vec<_>>()
186-
.await?;
195+
.try_buffer_unordered(concurrency_limit_data_files);
187196

188-
for item in results {
189-
if let ParsedDeleteFileContext::DelVecs(hash_map) = item {
190-
for (data_file_path, delete_vector) in hash_map.into_iter() {
197+
while let Some(item) = results_stream.next().await {
198+
let item = item?;
199+
if let ParsedDeleteFileContext::DelVecs { file_path, results } = item {
200+
for (data_file_path, delete_vector) in results.into_iter() {
191201
del_filter.upsert_delete_vector(data_file_path, delete_vector);
192202
}
203+
// Mark the positional delete file as fully loaded so waiters can proceed
204+
del_filter.finish_pos_del_load(&file_path);
193205
}
194206
}
195207

@@ -210,11 +222,24 @@ impl CachingDeleteFileLoader {
210222
schema: SchemaRef,
211223
) -> Result<DeleteFileContext> {
212224
match task.file_type {
213-
DataContentType::PositionDeletes => Ok(DeleteFileContext::PosDels(
214-
basic_delete_file_loader
215-
.parquet_to_batch_stream(&task.file_path)
216-
.await?,
217-
)),
225+
DataContentType::PositionDeletes => {
226+
match del_filter.try_start_pos_del_load(&task.file_path) {
227+
PosDelLoadAction::AlreadyLoaded => Ok(DeleteFileContext::ExistingPosDel),
228+
PosDelLoadAction::WaitFor(notify) => {
229+
// Positional deletes are accessed synchronously by ArrowReader.
230+
// We must wait here to ensure the data is ready before returning,
231+
// otherwise ArrowReader might get an empty/partial result.
232+
notify.notified().await;
233+
Ok(DeleteFileContext::ExistingPosDel)
234+
}
235+
PosDelLoadAction::Load => Ok(DeleteFileContext::PosDels {
236+
file_path: task.file_path.clone(),
237+
stream: basic_delete_file_loader
238+
.parquet_to_batch_stream(&task.file_path)
239+
.await?,
240+
}),
241+
}
242+
}
218243

219244
DataContentType::EqualityDeletes => {
220245
let Some(notify) = del_filter.try_start_eq_del_load(&task.file_path) else {
@@ -255,10 +280,13 @@ impl CachingDeleteFileLoader {
255280
) -> Result<ParsedDeleteFileContext> {
256281
match ctx {
257282
DeleteFileContext::ExistingEqDel => Ok(ParsedDeleteFileContext::EqDel),
258-
DeleteFileContext::PosDels(batch_stream) => {
259-
let del_vecs =
260-
Self::parse_positional_deletes_record_batch_stream(batch_stream).await?;
261-
Ok(ParsedDeleteFileContext::DelVecs(del_vecs))
283+
DeleteFileContext::ExistingPosDel => Ok(ParsedDeleteFileContext::ExistingPosDel),
284+
DeleteFileContext::PosDels { file_path, stream } => {
285+
let del_vecs = Self::parse_positional_deletes_record_batch_stream(stream).await?;
286+
Ok(ParsedDeleteFileContext::DelVecs {
287+
file_path,
288+
results: del_vecs,
289+
})
262290
}
263291
DeleteFileContext::FreshEqDel {
264292
sender,
@@ -979,4 +1007,43 @@ mod tests {
9791007

9801008
assert!(result.is_ok());
9811009
}
1010+
1011+
#[tokio::test]
1012+
async fn test_caching_delete_file_loader_caches_results() {
1013+
let tmp_dir = TempDir::new().unwrap();
1014+
let table_location = tmp_dir.path();
1015+
let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
1016+
.unwrap()
1017+
.build()
1018+
.unwrap();
1019+
1020+
let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10);
1021+
1022+
let file_scan_tasks = setup(table_location);
1023+
1024+
// Load deletes for the first time
1025+
let delete_filter_1 = delete_file_loader
1026+
.load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref())
1027+
.await
1028+
.unwrap()
1029+
.unwrap();
1030+
1031+
// Load deletes for the second time (same task/files)
1032+
let delete_filter_2 = delete_file_loader
1033+
.load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref())
1034+
.await
1035+
.unwrap()
1036+
.unwrap();
1037+
1038+
let dv1 = delete_filter_1
1039+
.get_delete_vector(&file_scan_tasks[0])
1040+
.unwrap();
1041+
let dv2 = delete_filter_2
1042+
.get_delete_vector(&file_scan_tasks[0])
1043+
.unwrap();
1044+
1045+
// Verify that the delete vectors point to the same memory location,
1046+
// confirming that the second load reused the result from the first.
1047+
assert!(Arc::ptr_eq(&dv1, &dv2));
1048+
}
9821049
}

crates/iceberg/src/arrow/delete_filter.rs

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,17 +34,42 @@ enum EqDelState {
3434
Loaded(Predicate),
3535
}
3636

37+
/// State tracking for positional delete files.
38+
/// Unlike equality deletes, positional deletes must be fully loaded before
39+
/// the ArrowReader proceeds because retrieval is synchronous and non-blocking.
40+
#[derive(Debug)]
41+
enum PosDelState {
42+
/// The file is currently being loaded by a task.
43+
/// The notifier allows other tasks to wait for completion.
44+
Loading(Arc<Notify>),
45+
/// The file has been fully loaded and merged into the delete vector map.
46+
Loaded,
47+
}
48+
3749
#[derive(Debug, Default)]
3850
struct DeleteFileFilterState {
3951
delete_vectors: HashMap<String, Arc<Mutex<DeleteVector>>>,
4052
equality_deletes: HashMap<String, EqDelState>,
53+
positional_deletes: HashMap<String, PosDelState>,
4154
}
4255

4356
#[derive(Clone, Debug, Default)]
4457
pub(crate) struct DeleteFilter {
4558
state: Arc<RwLock<DeleteFileFilterState>>,
4659
}
4760

61+
/// Action to take when trying to start loading a positional delete file
62+
pub(crate) enum PosDelLoadAction {
63+
/// The file is not loaded, the caller should load it.
64+
Load,
65+
/// The file is already loaded, nothing to do.
66+
AlreadyLoaded,
67+
/// The file is currently being loaded by another task.
68+
/// The caller *must* wait for this notifier to ensure data availability
69+
/// before returning, as subsequent access (get_delete_vector) is synchronous.
70+
WaitFor(Arc<Notify>),
71+
}
72+
4873
impl DeleteFilter {
4974
/// Retrieve a delete vector for the data file associated with a given file scan task
5075
pub(crate) fn get_delete_vector(
@@ -82,6 +107,47 @@ impl DeleteFilter {
82107
Some(notifier)
83108
}
84109

110+
/// Attempts to mark a positional delete file as "loading".
111+
///
112+
/// Returns an action dictating whether the caller should load the file,
113+
/// wait for another task to load it, or do nothing.
114+
pub(crate) fn try_start_pos_del_load(&self, file_path: &str) -> PosDelLoadAction {
115+
let mut state = self.state.write().unwrap();
116+
117+
if let Some(state) = state.positional_deletes.get(file_path) {
118+
match state {
119+
PosDelState::Loaded => return PosDelLoadAction::AlreadyLoaded,
120+
PosDelState::Loading(notify) => return PosDelLoadAction::WaitFor(notify.clone()),
121+
}
122+
}
123+
124+
let notifier = Arc::new(Notify::new());
125+
state
126+
.positional_deletes
127+
.insert(file_path.to_string(), PosDelState::Loading(notifier));
128+
129+
PosDelLoadAction::Load
130+
}
131+
132+
/// Marks a positional delete file as successfully loaded and notifies any waiting tasks.
133+
pub(crate) fn finish_pos_del_load(&self, file_path: &str) {
134+
let notify = {
135+
let mut state = self.state.write().unwrap();
136+
if let Some(PosDelState::Loading(notify)) = state
137+
.positional_deletes
138+
.insert(file_path.to_string(), PosDelState::Loaded)
139+
{
140+
Some(notify)
141+
} else {
142+
None
143+
}
144+
};
145+
146+
if let Some(notify) = notify {
147+
notify.notify_waiters();
148+
}
149+
}
150+
85151
/// Retrieve the equality delete predicate for a given eq delete file path
86152
pub(crate) async fn get_equality_delete_predicate_for_delete_file_path(
87153
&self,

0 commit comments

Comments
 (0)