Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 4 additions & 2 deletions crates/iceberg/public-api.txt
Original file line number Diff line number Diff line change
Expand Up @@ -1225,6 +1225,7 @@ pub iceberg::scan::FileScanTask::data_file_format: iceberg::spec::DataFileFormat
pub iceberg::scan::FileScanTask::data_file_path: alloc::string::String
pub iceberg::scan::FileScanTask::deletes: alloc::vec::Vec<iceberg::scan::FileScanTaskDeleteFile>
pub iceberg::scan::FileScanTask::file_size_in_bytes: u64
pub iceberg::scan::FileScanTask::key_metadata: core::option::Option<alloc::boxed::Box<[u8]>>
pub iceberg::scan::FileScanTask::length: u64
pub iceberg::scan::FileScanTask::name_mapping: core::option::Option<alloc::sync::Arc<iceberg::spec::NameMapping>>
pub iceberg::scan::FileScanTask::partition: core::option::Option<iceberg::spec::Struct>
Expand All @@ -1248,7 +1249,7 @@ impl core::fmt::Debug for iceberg::scan::FileScanTask
pub fn iceberg::scan::FileScanTask::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result
impl core::marker::StructuralPartialEq for iceberg::scan::FileScanTask
impl iceberg::scan::FileScanTask
pub fn iceberg::scan::FileScanTask::builder() -> FileScanTaskBuilder<((), (), (), (), (), (), (), (), (), (), (), (), (), ())>
pub fn iceberg::scan::FileScanTask::builder() -> FileScanTaskBuilder<((), (), (), (), (), (), (), (), (), (), (), (), (), (), ())>
impl serde_core::ser::Serialize for iceberg::scan::FileScanTask
pub fn iceberg::scan::FileScanTask::serialize<__S>(&self, __serializer: __S) -> core::result::Result<<__S as serde_core::ser::Serializer>::Ok, <__S as serde_core::ser::Serializer>::Error> where __S: serde_core::ser::Serializer
impl<'de> serde_core::de::Deserialize<'de> for iceberg::scan::FileScanTask
Expand All @@ -1258,6 +1259,7 @@ pub iceberg::scan::FileScanTaskDeleteFile::equality_ids: core::option::Option<al
pub iceberg::scan::FileScanTaskDeleteFile::file_path: alloc::string::String
pub iceberg::scan::FileScanTaskDeleteFile::file_size_in_bytes: u64
pub iceberg::scan::FileScanTaskDeleteFile::file_type: iceberg::spec::DataContentType
pub iceberg::scan::FileScanTaskDeleteFile::key_metadata: core::option::Option<alloc::boxed::Box<[u8]>>
pub iceberg::scan::FileScanTaskDeleteFile::partition_spec_id: i32
impl core::clone::Clone for iceberg::scan::FileScanTaskDeleteFile
pub fn iceberg::scan::FileScanTaskDeleteFile::clone(&self) -> iceberg::scan::FileScanTaskDeleteFile
Expand All @@ -1267,7 +1269,7 @@ impl core::fmt::Debug for iceberg::scan::FileScanTaskDeleteFile
pub fn iceberg::scan::FileScanTaskDeleteFile::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result
impl core::marker::StructuralPartialEq for iceberg::scan::FileScanTaskDeleteFile
impl iceberg::scan::FileScanTaskDeleteFile
pub fn iceberg::scan::FileScanTaskDeleteFile::builder() -> FileScanTaskDeleteFileBuilder<((), (), (), (), ())>
pub fn iceberg::scan::FileScanTaskDeleteFile::builder() -> FileScanTaskDeleteFileBuilder<((), (), (), (), (), ())>
impl serde_core::ser::Serialize for iceberg::scan::FileScanTaskDeleteFile
pub fn iceberg::scan::FileScanTaskDeleteFile::serialize<__S>(&self, __serializer: __S) -> core::result::Result<<__S as serde_core::ser::Serializer>::Ok, <__S as serde_core::ser::Serializer>::Error> where __S: serde_core::ser::Serializer
impl<'de> serde_core::de::Deserialize<'de> for iceberg::scan::FileScanTaskDeleteFile
Expand Down
16 changes: 13 additions & 3 deletions crates/iceberg/src/arrow/caching_delete_file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -252,7 +252,11 @@ impl CachingDeleteFileLoader {
PosDelLoadAction::Load => Ok(DeleteFileContext::PosDels {
file_path: task.file_path.clone(),
stream: basic_delete_file_loader
.parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes)
.parquet_to_batch_stream(
&task.file_path,
task.file_size_in_bytes,
task.key_metadata.as_deref(),
)
.await?,
}),
}
Expand All @@ -271,7 +275,11 @@ impl CachingDeleteFileLoader {
let equality_ids_vec = task.equality_ids.clone().unwrap();
let evolved_stream = BasicDeleteFileLoader::evolve_schema(
basic_delete_file_loader
.parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes)
.parquet_to_batch_stream(
&task.file_path,
task.file_size_in_bytes,
task.key_metadata.as_deref(),
)
.await?,
schema,
&equality_ids_vec,
Expand Down Expand Up @@ -635,6 +643,7 @@ mod tests {
.parquet_to_batch_stream(
&eq_delete_file_path,
std::fs::metadata(&eq_delete_file_path).unwrap().len(),
None,
)
.await
.expect("could not get batch stream");
Expand Down Expand Up @@ -834,6 +843,7 @@ mod tests {
.parquet_to_batch_stream(
&delete_file_path,
std::fs::metadata(&delete_file_path).unwrap().len(),
None,
)
.await
.unwrap();
Expand Down Expand Up @@ -1015,7 +1025,7 @@ mod tests {
let basic_delete_file_loader =
BasicDeleteFileLoader::new(file_io.clone(), ScanMetrics::new());
let record_batch_stream = basic_delete_file_loader
.parquet_to_batch_stream(&path, std::fs::metadata(&path).unwrap().len())
.parquet_to_batch_stream(&path, std::fs::metadata(&path).unwrap().len(), None)
.await
.expect("could not get batch stream");

Expand Down
173 changes: 172 additions & 1 deletion crates/iceberg/src/arrow/delete_file_loader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@ impl BasicDeleteFileLoader {
&self,
data_file_path: &str,
file_size_in_bytes: u64,
key_metadata: Option<&[u8]>,
) -> Result<ArrowRecordBatchStream> {
/*
Essentially a super-cut-down ArrowReader. We can't use ArrowReader directly
Expand All @@ -80,6 +81,7 @@ impl BasicDeleteFileLoader {
file_size_in_bytes,
parquet_read_options,
self.scan_metrics.bytes_read_counter(),
key_metadata,
)
.await?;

Expand Down Expand Up @@ -121,7 +123,11 @@ impl DeleteFileLoader for BasicDeleteFileLoader {
schema: SchemaRef,
) -> Result<ArrowRecordBatchStream> {
let raw_batch_stream = self
.parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes)
.parquet_to_batch_stream(
&task.file_path,
task.file_size_in_bytes,
task.key_metadata.as_deref(),
)
.await?;

// For equality deletes, only evolve the equality_ids columns.
Expand Down Expand Up @@ -165,4 +171,169 @@ mod tests {

assert_eq!(result.len(), 1);
}

fn write_encrypted_parquet(path: &str, batch: &arrow_array::RecordBatch, key: &[u8]) {

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

write_encrypted_parquet is copy-pasted, and the delete-loader copy drops the aad_prefix param. Share one helper so the delete path also gets AAD coverage?

use std::fs::File;

use parquet::arrow::ArrowWriter;
use parquet::basic::Compression;
use parquet::encryption::encrypt::FileEncryptionProperties;
use parquet::file::properties::WriterProperties;

let encryption_properties = FileEncryptionProperties::builder(key.to_vec())
.build()
.unwrap();
let props = WriterProperties::builder()
.set_compression(Compression::SNAPPY)
.with_file_encryption_properties(encryption_properties)
.build();

let file = File::create(path).unwrap();
let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props)).unwrap();
writer.write(batch).unwrap();
writer.close().unwrap();
}

#[tokio::test]
async fn test_read_encrypted_positional_delete_file() {
use std::sync::Arc;

use arrow_array::{Int64Array, RecordBatch, StringArray};

use crate::arrow::delete_filter::tests::create_pos_del_schema;
use crate::encryption::StandardKeyMetadata;
use crate::scan::FileScanTaskDeleteFile;
use crate::spec::DataContentType;

let encryption_key = b"0123456789abcdef";

let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().to_str().unwrap();
let file_io = FileIO::new_with_fs();

let positional_delete_schema = create_pos_del_schema();
let file_path_col = Arc::new(StringArray::from_iter_values(vec!["data.parquet"; 4]));
let pos_col = Arc::new(Int64Array::from(vec![0i64, 1, 5, 10]));
let batch = RecordBatch::try_new(positional_delete_schema.clone(), vec![
file_path_col,
pos_col,
])
.unwrap();

let del_path = format!("{table_location}/encrypted-pos-del.parquet");
write_encrypted_parquet(&del_path, &batch, encryption_key);

let key_metadata = StandardKeyMetadata::new(encryption_key).encode().unwrap();

let schema = Arc::new(
crate::spec::Schema::builder()
.with_schema_id(1)
.with_fields(vec![
crate::spec::NestedField::required(
2147483546,
"file_path",
crate::spec::Type::Primitive(crate::spec::PrimitiveType::String),
)
.into(),
crate::spec::NestedField::required(
2147483545,
"pos",
crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long),
)
.into(),
])
.build()
.unwrap(),
);

let task = FileScanTaskDeleteFile {
file_path: del_path.clone(),
file_size_in_bytes: std::fs::metadata(&del_path).unwrap().len(),
file_type: DataContentType::PositionDeletes,
partition_spec_id: 0,
equality_ids: None,
key_metadata: Some(Box::from(key_metadata.as_ref())),
};

let scan_metrics = ScanMetrics::new();
let delete_file_loader = BasicDeleteFileLoader::new(file_io, scan_metrics);

let result = delete_file_loader
.read_delete_file(&task, schema)
.await
.unwrap();

let batches: Vec<_> = result.try_collect().await.unwrap();
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 4);
}

#[tokio::test]
async fn test_read_encrypted_equality_delete_file() {
use std::collections::HashMap;
use std::sync::Arc;

use arrow_array::{Int64Array, RecordBatch};
use parquet::arrow::PARQUET_FIELD_ID_META_KEY;

use crate::encryption::StandardKeyMetadata;
use crate::scan::FileScanTaskDeleteFile;
use crate::spec::DataContentType;

let encryption_key = b"0123456789abcdef";

let tmp_dir = TempDir::new().unwrap();
let table_location = tmp_dir.path().to_str().unwrap();
let file_io = FileIO::new_with_fs();

let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![
arrow_schema::Field::new("id", arrow_schema::DataType::Int64, false).with_metadata(
HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]),
),
]));

let id_col = Arc::new(Int64Array::from(vec![100i64, 200, 300]));
let batch = RecordBatch::try_new(arrow_schema.clone(), vec![id_col]).unwrap();

let del_path = format!("{table_location}/encrypted-eq-del.parquet");
write_encrypted_parquet(&del_path, &batch, encryption_key);

let key_metadata = StandardKeyMetadata::new(encryption_key).encode().unwrap();

let schema = Arc::new(
crate::spec::Schema::builder()
.with_schema_id(1)
.with_fields(vec![
crate::spec::NestedField::required(
1,
"id",
crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long),
)
.into(),
])
.build()
.unwrap(),
);

let task = FileScanTaskDeleteFile {
file_path: del_path.clone(),
file_size_in_bytes: std::fs::metadata(&del_path).unwrap().len(),
file_type: DataContentType::EqualityDeletes,
partition_spec_id: 0,
equality_ids: Some(vec![1]),
key_metadata: Some(Box::from(key_metadata.as_ref())),
};

let scan_metrics = ScanMetrics::new();
let delete_file_loader = BasicDeleteFileLoader::new(file_io, scan_metrics);

let result = delete_file_loader
.read_delete_file(&task, schema)
.await
.unwrap();

let batches: Vec<_> = result.try_collect().await.unwrap();
assert_eq!(batches.len(), 1);
assert_eq!(batches[0].num_rows(), 3);
}
}
Loading
Loading