Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
111 changes: 89 additions & 22 deletions crates/iceberg/src/arrow/caching_delete_file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use arrow_array::{Array, ArrayRef, Int64Array, StringArray, StructArray};
use futures::{StreamExt, TryStreamExt};
use tokio::sync::oneshot::{Receiver, channel};

use super::delete_filter::DeleteFilter;
use super::delete_filter::{DeleteFilter, PosDelLoadAction};
use crate::arrow::delete_file_loader::BasicDeleteFileLoader;
use crate::arrow::{arrow_primitive_to_literal, arrow_schema_to_schema};
use crate::delete_vector::DeleteVector;
Expand All @@ -42,13 +42,20 @@ use crate::{Error, ErrorKind, Result};
pub(crate) struct CachingDeleteFileLoader {
basic_delete_file_loader: BasicDeleteFileLoader,
concurrency_limit_data_files: usize,
/// Shared filter state to allow caching loaded deletes across multiple
/// calls to `load_deletes` (e.g., across multiple file scan tasks).
delete_filter: DeleteFilter,
}

// Intermediate context during processing of a delete file task.
enum DeleteFileContext {
// TODO: Delete Vector loader from Puffin files
ExistingEqDel,
PosDels(ArrowRecordBatchStream),
ExistingPosDel,
PosDels {
file_path: String,
stream: ArrowRecordBatchStream,
},
FreshEqDel {
batch_stream: ArrowRecordBatchStream,
equality_ids: HashSet<i32>,
Expand All @@ -59,8 +66,12 @@ enum DeleteFileContext {
// Final result of the processing of a delete file task before
// results are fully merged into the DeleteFileManager's state
enum ParsedDeleteFileContext {
DelVecs(HashMap<String, DeleteVector>),
DelVecs {
file_path: String,
results: HashMap<String, DeleteVector>,
},
EqDel,
ExistingPosDel,
}

#[allow(unused_variables)]
Expand All @@ -69,6 +80,7 @@ impl CachingDeleteFileLoader {
CachingDeleteFileLoader {
basic_delete_file_loader: BasicDeleteFileLoader::new(file_io),
concurrency_limit_data_files,
delete_filter: DeleteFilter::default(),
}
}

Expand Down Expand Up @@ -142,30 +154,29 @@ impl CachingDeleteFileLoader {
schema: SchemaRef,
) -> Receiver<Result<DeleteFilter>> {
let (tx, rx) = channel();
let del_filter = DeleteFilter::default();

let stream_items = delete_file_entries
.iter()
.map(|t| {
(
t.clone(),
self.basic_delete_file_loader.clone(),
del_filter.clone(),
self.delete_filter.clone(),
schema.clone(),
)
})
.collect::<Vec<_>>();
let task_stream = futures::stream::iter(stream_items);

let del_filter = del_filter.clone();
let del_filter = self.delete_filter.clone();
let concurrency_limit_data_files = self.concurrency_limit_data_files;
let basic_delete_file_loader = self.basic_delete_file_loader.clone();
crate::runtime::spawn(async move {
let result = async move {
let mut del_filter = del_filter;
let basic_delete_file_loader = basic_delete_file_loader.clone();

let results: Vec<ParsedDeleteFileContext> = task_stream
let mut results_stream = task_stream
.map(move |(task, file_io, del_filter, schema)| {
let basic_delete_file_loader = basic_delete_file_loader.clone();
async move {
Expand All @@ -181,15 +192,16 @@ impl CachingDeleteFileLoader {
.map(move |ctx| {
Ok(async { Self::parse_file_content_for_task(ctx.await?).await })
})
.try_buffer_unordered(concurrency_limit_data_files)
.try_collect::<Vec<_>>()
.await?;
.try_buffer_unordered(concurrency_limit_data_files);

for item in results {
if let ParsedDeleteFileContext::DelVecs(hash_map) = item {
for (data_file_path, delete_vector) in hash_map.into_iter() {
while let Some(item) = results_stream.next().await {
let item = item?;
if let ParsedDeleteFileContext::DelVecs { file_path, results } = item {
for (data_file_path, delete_vector) in results.into_iter() {
del_filter.upsert_delete_vector(data_file_path, delete_vector);
}
// Mark the positional delete file as fully loaded so waiters can proceed
del_filter.finish_pos_del_load(&file_path);
}
}

Expand All @@ -210,11 +222,24 @@ impl CachingDeleteFileLoader {
schema: SchemaRef,
) -> Result<DeleteFileContext> {
match task.file_type {
DataContentType::PositionDeletes => Ok(DeleteFileContext::PosDels(
basic_delete_file_loader
.parquet_to_batch_stream(&task.file_path)
.await?,
)),
DataContentType::PositionDeletes => {
match del_filter.try_start_pos_del_load(&task.file_path) {
PosDelLoadAction::AlreadyLoaded => Ok(DeleteFileContext::ExistingPosDel),
PosDelLoadAction::WaitFor(notify) => {
// Positional deletes are accessed synchronously by ArrowReader.
// We must wait here to ensure the data is ready before returning,
// otherwise ArrowReader might get an empty/partial result.
notify.notified().await;
Ok(DeleteFileContext::ExistingPosDel)
}
PosDelLoadAction::Load => Ok(DeleteFileContext::PosDels {
file_path: task.file_path.clone(),
stream: basic_delete_file_loader
.parquet_to_batch_stream(&task.file_path)
.await?,
}),
}
}

DataContentType::EqualityDeletes => {
let Some(notify) = del_filter.try_start_eq_del_load(&task.file_path) else {
Expand Down Expand Up @@ -255,10 +280,13 @@ impl CachingDeleteFileLoader {
) -> Result<ParsedDeleteFileContext> {
match ctx {
DeleteFileContext::ExistingEqDel => Ok(ParsedDeleteFileContext::EqDel),
DeleteFileContext::PosDels(batch_stream) => {
let del_vecs =
Self::parse_positional_deletes_record_batch_stream(batch_stream).await?;
Ok(ParsedDeleteFileContext::DelVecs(del_vecs))
DeleteFileContext::ExistingPosDel => Ok(ParsedDeleteFileContext::ExistingPosDel),
DeleteFileContext::PosDels { file_path, stream } => {
let del_vecs = Self::parse_positional_deletes_record_batch_stream(stream).await?;
Ok(ParsedDeleteFileContext::DelVecs {
file_path,
results: del_vecs,
})
}
DeleteFileContext::FreshEqDel {
sender,
Expand Down Expand Up @@ -979,4 +1007,43 @@ mod tests {

assert!(result.is_ok());
}

#[tokio::test]
async fn test_caching_delete_file_loader_caches_results() {
let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path();
let file_io = FileIO::from_path(table_location.as_os_str().to_str().unwrap())
.unwrap()
.build()
.unwrap();

let delete_file_loader = CachingDeleteFileLoader::new(file_io.clone(), 10);

let file_scan_tasks = setup(table_location);

// Load deletes for the first time
let delete_filter_1 = delete_file_loader
.load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref())
.await
.unwrap()
.unwrap();

// Load deletes for the second time (same task/files)
let delete_filter_2 = delete_file_loader
.load_deletes(&file_scan_tasks[0].deletes, file_scan_tasks[0].schema_ref())
.await
.unwrap()
.unwrap();

let dv1 = delete_filter_1
.get_delete_vector(&file_scan_tasks[0])
.unwrap();
let dv2 = delete_filter_2
.get_delete_vector(&file_scan_tasks[0])
.unwrap();

// Verify that the delete vectors point to the same memory location,
// confirming that the second load reused the result from the first.
assert!(Arc::ptr_eq(&dv1, &dv2));
}
}
66 changes: 66 additions & 0 deletions crates/iceberg/src/arrow/delete_filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,42 @@ enum EqDelState {
Loaded(Predicate),
}

/// State tracking for positional delete files.
/// Unlike equality deletes, positional deletes must be fully loaded before
/// the ArrowReader proceeds because retrieval is synchronous and non-blocking.
#[derive(Debug)]
enum PosDelState {
/// The file is currently being loaded by a task.
/// The notifier allows other tasks to wait for completion.
Loading(Arc<Notify>),
/// The file has been fully loaded and merged into the delete vector map.
Loaded,
}

#[derive(Debug, Default)]
struct DeleteFileFilterState {
delete_vectors: HashMap<String, Arc<Mutex<DeleteVector>>>,
equality_deletes: HashMap<String, EqDelState>,
positional_deletes: HashMap<String, PosDelState>,
}

#[derive(Clone, Debug, Default)]
pub(crate) struct DeleteFilter {
state: Arc<RwLock<DeleteFileFilterState>>,
}

/// Action to take when trying to start loading a positional delete file
pub(crate) enum PosDelLoadAction {
/// The file is not loaded, the caller should load it.
Load,
/// The file is already loaded, nothing to do.
AlreadyLoaded,
/// The file is currently being loaded by another task.
/// The caller *must* wait for this notifier to ensure data availability
/// before returning, as subsequent access (get_delete_vector) is synchronous.
WaitFor(Arc<Notify>),
}

impl DeleteFilter {
/// Retrieve a delete vector for the data file associated with a given file scan task
pub(crate) fn get_delete_vector(
Expand Down Expand Up @@ -82,6 +107,47 @@ impl DeleteFilter {
Some(notifier)
}

/// Attempts to mark a positional delete file as "loading".
///
/// Returns an action dictating whether the caller should load the file,
/// wait for another task to load it, or do nothing.
pub(crate) fn try_start_pos_del_load(&self, file_path: &str) -> PosDelLoadAction {
let mut state = self.state.write().unwrap();

if let Some(state) = state.positional_deletes.get(file_path) {
match state {
PosDelState::Loaded => return PosDelLoadAction::AlreadyLoaded,
PosDelState::Loading(notify) => return PosDelLoadAction::WaitFor(notify.clone()),
}
}

let notifier = Arc::new(Notify::new());
state
.positional_deletes
.insert(file_path.to_string(), PosDelState::Loading(notifier));

PosDelLoadAction::Load
}

/// Marks a positional delete file as successfully loaded and notifies any waiting tasks.
pub(crate) fn finish_pos_del_load(&self, file_path: &str) {
let notify = {
let mut state = self.state.write().unwrap();
if let Some(PosDelState::Loading(notify)) = state
.positional_deletes
.insert(file_path.to_string(), PosDelState::Loaded)
{
Some(notify)
} else {
None
}
};

if let Some(notify) = notify {
notify.notify_waiters();
}
}

/// Retrieve the equality delete predicate for a given eq delete file path
pub(crate) async fn get_equality_delete_predicate_for_delete_file_path(
&self,
Expand Down
Loading