diff --git a/crates/iceberg/public-api.txt b/crates/iceberg/public-api.txt index 2d92c9aa43..29fc49ac10 100644 --- a/crates/iceberg/public-api.txt +++ b/crates/iceberg/public-api.txt @@ -1225,6 +1225,7 @@ pub iceberg::scan::FileScanTask::data_file_format: iceberg::spec::DataFileFormat pub iceberg::scan::FileScanTask::data_file_path: alloc::string::String pub iceberg::scan::FileScanTask::deletes: alloc::vec::Vec pub iceberg::scan::FileScanTask::file_size_in_bytes: u64 +pub iceberg::scan::FileScanTask::key_metadata: core::option::Option> pub iceberg::scan::FileScanTask::length: u64 pub iceberg::scan::FileScanTask::name_mapping: core::option::Option> pub iceberg::scan::FileScanTask::partition: core::option::Option @@ -1248,7 +1249,7 @@ impl core::fmt::Debug for iceberg::scan::FileScanTask pub fn iceberg::scan::FileScanTask::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result impl core::marker::StructuralPartialEq for iceberg::scan::FileScanTask impl iceberg::scan::FileScanTask -pub fn iceberg::scan::FileScanTask::builder() -> FileScanTaskBuilder<((), (), (), (), (), (), (), (), (), (), (), (), (), ())> +pub fn iceberg::scan::FileScanTask::builder() -> FileScanTaskBuilder<((), (), (), (), (), (), (), (), (), (), (), (), (), (), ())> impl serde_core::ser::Serialize for iceberg::scan::FileScanTask pub fn iceberg::scan::FileScanTask::serialize<__S>(&self, __serializer: __S) -> core::result::Result<<__S as serde_core::ser::Serializer>::Ok, <__S as serde_core::ser::Serializer>::Error> where __S: serde_core::ser::Serializer impl<'de> serde_core::de::Deserialize<'de> for iceberg::scan::FileScanTask @@ -1258,6 +1259,7 @@ pub iceberg::scan::FileScanTaskDeleteFile::equality_ids: core::option::Option> pub iceberg::scan::FileScanTaskDeleteFile::partition_spec_id: i32 impl core::clone::Clone for iceberg::scan::FileScanTaskDeleteFile pub fn iceberg::scan::FileScanTaskDeleteFile::clone(&self) -> iceberg::scan::FileScanTaskDeleteFile @@ -1267,7 +1269,7 @@ impl core::fmt::Debug for iceberg::scan::FileScanTaskDeleteFile pub fn iceberg::scan::FileScanTaskDeleteFile::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result impl core::marker::StructuralPartialEq for iceberg::scan::FileScanTaskDeleteFile impl iceberg::scan::FileScanTaskDeleteFile -pub fn iceberg::scan::FileScanTaskDeleteFile::builder() -> FileScanTaskDeleteFileBuilder<((), (), (), (), ())> +pub fn iceberg::scan::FileScanTaskDeleteFile::builder() -> FileScanTaskDeleteFileBuilder<((), (), (), (), (), ())> impl serde_core::ser::Serialize for iceberg::scan::FileScanTaskDeleteFile pub fn iceberg::scan::FileScanTaskDeleteFile::serialize<__S>(&self, __serializer: __S) -> core::result::Result<<__S as serde_core::ser::Serializer>::Ok, <__S as serde_core::ser::Serializer>::Error> where __S: serde_core::ser::Serializer impl<'de> serde_core::de::Deserialize<'de> for iceberg::scan::FileScanTaskDeleteFile diff --git a/crates/iceberg/src/arrow/caching_delete_file_loader.rs b/crates/iceberg/src/arrow/caching_delete_file_loader.rs index 1925964f53..a7e7507927 100644 --- a/crates/iceberg/src/arrow/caching_delete_file_loader.rs +++ b/crates/iceberg/src/arrow/caching_delete_file_loader.rs @@ -252,7 +252,11 @@ impl CachingDeleteFileLoader { PosDelLoadAction::Load => Ok(DeleteFileContext::PosDels { file_path: task.file_path.clone(), stream: basic_delete_file_loader - .parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes) + .parquet_to_batch_stream( + &task.file_path, + task.file_size_in_bytes, + task.key_metadata.as_deref(), + ) .await?, }), } @@ -271,7 +275,11 @@ impl CachingDeleteFileLoader { let equality_ids_vec = task.equality_ids.clone().unwrap(); let evolved_stream = BasicDeleteFileLoader::evolve_schema( basic_delete_file_loader - .parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes) + .parquet_to_batch_stream( + &task.file_path, + task.file_size_in_bytes, + task.key_metadata.as_deref(), + ) .await?, schema, &equality_ids_vec, @@ -635,6 +643,7 @@ mod tests { .parquet_to_batch_stream( &eq_delete_file_path, std::fs::metadata(&eq_delete_file_path).unwrap().len(), + None, ) .await .expect("could not get batch stream"); @@ -834,6 +843,7 @@ mod tests { .parquet_to_batch_stream( &delete_file_path, std::fs::metadata(&delete_file_path).unwrap().len(), + None, ) .await .unwrap(); @@ -1015,7 +1025,7 @@ mod tests { let basic_delete_file_loader = BasicDeleteFileLoader::new(file_io.clone(), ScanMetrics::new()); let record_batch_stream = basic_delete_file_loader - .parquet_to_batch_stream(&path, std::fs::metadata(&path).unwrap().len()) + .parquet_to_batch_stream(&path, std::fs::metadata(&path).unwrap().len(), None) .await .expect("could not get batch stream"); diff --git a/crates/iceberg/src/arrow/delete_file_loader.rs b/crates/iceberg/src/arrow/delete_file_loader.rs index 134b029613..31dc1beab5 100644 --- a/crates/iceberg/src/arrow/delete_file_loader.rs +++ b/crates/iceberg/src/arrow/delete_file_loader.rs @@ -67,6 +67,7 @@ impl BasicDeleteFileLoader { &self, data_file_path: &str, file_size_in_bytes: u64, + key_metadata: Option<&[u8]>, ) -> Result { /* Essentially a super-cut-down ArrowReader. We can't use ArrowReader directly @@ -80,6 +81,7 @@ impl BasicDeleteFileLoader { file_size_in_bytes, parquet_read_options, self.scan_metrics.bytes_read_counter(), + key_metadata, ) .await?; @@ -121,7 +123,11 @@ impl DeleteFileLoader for BasicDeleteFileLoader { schema: SchemaRef, ) -> Result { let raw_batch_stream = self - .parquet_to_batch_stream(&task.file_path, task.file_size_in_bytes) + .parquet_to_batch_stream( + &task.file_path, + task.file_size_in_bytes, + task.key_metadata.as_deref(), + ) .await?; // For equality deletes, only evolve the equality_ids columns. @@ -165,4 +171,169 @@ mod tests { assert_eq!(result.len(), 1); } + + fn write_encrypted_parquet(path: &str, batch: &arrow_array::RecordBatch, key: &[u8]) { + use std::fs::File; + + use parquet::arrow::ArrowWriter; + use parquet::basic::Compression; + use parquet::encryption::encrypt::FileEncryptionProperties; + use parquet::file::properties::WriterProperties; + + let encryption_properties = FileEncryptionProperties::builder(key.to_vec()) + .build() + .unwrap(); + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .with_file_encryption_properties(encryption_properties) + .build(); + + let file = File::create(path).unwrap(); + let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props)).unwrap(); + writer.write(batch).unwrap(); + writer.close().unwrap(); + } + + #[tokio::test] + async fn test_read_encrypted_positional_delete_file() { + use std::sync::Arc; + + use arrow_array::{Int64Array, RecordBatch, StringArray}; + + use crate::arrow::delete_filter::tests::create_pos_del_schema; + use crate::encryption::StandardKeyMetadata; + use crate::scan::FileScanTaskDeleteFile; + use crate::spec::DataContentType; + + let encryption_key = b"0123456789abcdef"; + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap(); + let file_io = FileIO::new_with_fs(); + + let positional_delete_schema = create_pos_del_schema(); + let file_path_col = Arc::new(StringArray::from_iter_values(vec!["data.parquet"; 4])); + let pos_col = Arc::new(Int64Array::from(vec![0i64, 1, 5, 10])); + let batch = RecordBatch::try_new(positional_delete_schema.clone(), vec![ + file_path_col, + pos_col, + ]) + .unwrap(); + + let del_path = format!("{table_location}/encrypted-pos-del.parquet"); + write_encrypted_parquet(&del_path, &batch, encryption_key); + + let key_metadata = StandardKeyMetadata::new(encryption_key).encode().unwrap(); + + let schema = Arc::new( + crate::spec::Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + crate::spec::NestedField::required( + 2147483546, + "file_path", + crate::spec::Type::Primitive(crate::spec::PrimitiveType::String), + ) + .into(), + crate::spec::NestedField::required( + 2147483545, + "pos", + crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long), + ) + .into(), + ]) + .build() + .unwrap(), + ); + + let task = FileScanTaskDeleteFile { + file_path: del_path.clone(), + file_size_in_bytes: std::fs::metadata(&del_path).unwrap().len(), + file_type: DataContentType::PositionDeletes, + partition_spec_id: 0, + equality_ids: None, + key_metadata: Some(Box::from(key_metadata.as_ref())), + }; + + let scan_metrics = ScanMetrics::new(); + let delete_file_loader = BasicDeleteFileLoader::new(file_io, scan_metrics); + + let result = delete_file_loader + .read_delete_file(&task, schema) + .await + .unwrap(); + + let batches: Vec<_> = result.try_collect().await.unwrap(); + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].num_rows(), 4); + } + + #[tokio::test] + async fn test_read_encrypted_equality_delete_file() { + use std::collections::HashMap; + use std::sync::Arc; + + use arrow_array::{Int64Array, RecordBatch}; + use parquet::arrow::PARQUET_FIELD_ID_META_KEY; + + use crate::encryption::StandardKeyMetadata; + use crate::scan::FileScanTaskDeleteFile; + use crate::spec::DataContentType; + + let encryption_key = b"0123456789abcdef"; + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap(); + let file_io = FileIO::new_with_fs(); + + let arrow_schema = Arc::new(arrow_schema::Schema::new(vec![ + arrow_schema::Field::new("id", arrow_schema::DataType::Int64, false).with_metadata( + HashMap::from([(PARQUET_FIELD_ID_META_KEY.to_string(), "1".to_string())]), + ), + ])); + + let id_col = Arc::new(Int64Array::from(vec![100i64, 200, 300])); + let batch = RecordBatch::try_new(arrow_schema.clone(), vec![id_col]).unwrap(); + + let del_path = format!("{table_location}/encrypted-eq-del.parquet"); + write_encrypted_parquet(&del_path, &batch, encryption_key); + + let key_metadata = StandardKeyMetadata::new(encryption_key).encode().unwrap(); + + let schema = Arc::new( + crate::spec::Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + crate::spec::NestedField::required( + 1, + "id", + crate::spec::Type::Primitive(crate::spec::PrimitiveType::Long), + ) + .into(), + ]) + .build() + .unwrap(), + ); + + let task = FileScanTaskDeleteFile { + file_path: del_path.clone(), + file_size_in_bytes: std::fs::metadata(&del_path).unwrap().len(), + file_type: DataContentType::EqualityDeletes, + partition_spec_id: 0, + equality_ids: Some(vec![1]), + key_metadata: Some(Box::from(key_metadata.as_ref())), + }; + + let scan_metrics = ScanMetrics::new(); + let delete_file_loader = BasicDeleteFileLoader::new(file_io, scan_metrics); + + let result = delete_file_loader + .read_delete_file(&task, schema) + .await + .unwrap(); + + let batches: Vec<_> = result.try_collect().await.unwrap(); + assert_eq!(batches.len(), 1); + assert_eq!(batches[0].num_rows(), 3); + } } diff --git a/crates/iceberg/src/arrow/reader/pipeline.rs b/crates/iceberg/src/arrow/reader/pipeline.rs index 57c614575a..1e6fa4cd26 100644 --- a/crates/iceberg/src/arrow/reader/pipeline.rs +++ b/crates/iceberg/src/arrow/reader/pipeline.rs @@ -26,6 +26,7 @@ use std::sync::atomic::AtomicU64; use futures::{StreamExt, TryStreamExt}; use parquet::arrow::arrow_reader::{ArrowReaderMetadata, ArrowReaderOptions}; use parquet::arrow::{PARQUET_FIELD_ID_META_KEY, ParquetRecordBatchStreamBuilder}; +use parquet::encryption::decrypt::FileDecryptionProperties; use super::{ ArrowFileReader, ArrowReader, ParquetReadOptions, add_fallback_field_ids_to_arrow_schema, @@ -35,6 +36,7 @@ use crate::arrow::caching_delete_file_loader::CachingDeleteFileLoader; use crate::arrow::int96::coerce_int96_timestamps; use crate::arrow::record_batch_transformer::RecordBatchTransformerBuilder; use crate::arrow::scan_metrics::{CountingFileRead, ScanMetrics, ScanResult}; +use crate::encryption::StandardKeyMetadata; use crate::error::Result; use crate::io::{FileIO, FileMetadata, FileRead}; use crate::metadata_columns::{RESERVED_FIELD_ID_FILE, is_metadata_field}; @@ -120,6 +122,7 @@ impl FileScanTaskReader { task.file_size_in_bytes, parquet_read_options, self.scan_metrics.bytes_read_counter(), + task.key_metadata.as_deref(), ) .await?; @@ -406,6 +409,7 @@ impl ArrowReader { file_size_in_bytes: u64, parquet_read_options: ParquetReadOptions, bytes_read: &Arc, + key_metadata: Option<&[u8]>, ) -> Result<(ArrowFileReader, ArrowReaderMetadata)> { let parquet_file = file_io.new_input(data_file_path)?; let counting_reader = @@ -414,6 +418,7 @@ impl ArrowReader { Box::new(counting_reader), file_size_in_bytes, parquet_read_options, + key_metadata, ) .await } @@ -422,6 +427,7 @@ impl ArrowReader { parquet_reader: Box, file_size_in_bytes: u64, parquet_read_options: ParquetReadOptions, + key_metadata: Option<&[u8]>, ) -> Result<(ArrowFileReader, ArrowReaderMetadata)> { let mut reader = ArrowFileReader::new( FileMetadata { @@ -431,7 +437,9 @@ impl ArrowReader { ) .with_parquet_read_options(parquet_read_options); - let arrow_metadata = ArrowReaderMetadata::load_async(&mut reader, Default::default()) + let arrow_reader_options = Self::build_arrow_reader_options(key_metadata)?; + + let arrow_metadata = ArrowReaderMetadata::load_async(&mut reader, arrow_reader_options) .await .map_err(|e| { Error::new(ErrorKind::Unexpected, "Failed to load Parquet metadata").with_source(e) @@ -439,6 +447,34 @@ impl ArrowReader { Ok((reader, arrow_metadata)) } + + /// Builds `ArrowReaderOptions`, adding `FileDecryptionProperties` when + /// key metadata is present for Parquet Modular Encryption. + fn build_arrow_reader_options(key_metadata: Option<&[u8]>) -> Result { + match key_metadata { + Some(km) => { + let standard_key_metadata = StandardKeyMetadata::decode(km)?; + let mut builder = FileDecryptionProperties::builder( + standard_key_metadata.encryption_key().as_bytes().to_vec(), + ); + if let Some(aad) = standard_key_metadata.aad_prefix() { + builder = builder.with_aad_prefix(aad.to_vec()); + } + let decryption_properties = builder.build().map_err(|e| { + Error::new( + ErrorKind::Unexpected, + "Failed to build Parquet file decryption properties", + ) + .with_source(e) + })?; + Ok( + ArrowReaderOptions::new() + .with_file_decryption_properties(decryption_properties), + ) + } + None => Ok(ArrowReaderOptions::default()), + } + } } #[cfg(test)] @@ -448,11 +484,12 @@ mod tests { use std::sync::Arc; use arrow_array::cast::AsArray; - use arrow_array::{Array, ArrayRef, RecordBatch}; + use arrow_array::{Array, ArrayRef, Int32Array, RecordBatch}; use arrow_schema::{DataType, Field, Schema as ArrowSchema}; use futures::TryStreamExt; use parquet::arrow::{ArrowWriter, PARQUET_FIELD_ID_META_KEY}; use parquet::basic::Compression; + use parquet::encryption::encrypt::FileEncryptionProperties; use parquet::file::properties::WriterProperties; use tempfile::TempDir; @@ -638,6 +675,222 @@ mod tests { } } + fn write_encrypted_parquet( + path: &str, + batch: &RecordBatch, + key: &[u8], + aad_prefix: Option<&[u8]>, + ) { + let mut builder = FileEncryptionProperties::builder(key.to_vec()); + if let Some(aad) = aad_prefix { + builder = builder.with_aad_prefix(aad.to_vec()); + } + let encryption_properties = builder.build().unwrap(); + + let props = WriterProperties::builder() + .set_compression(Compression::SNAPPY) + .with_file_encryption_properties(encryption_properties) + .build(); + + let file = File::create(path).unwrap(); + let mut writer = ArrowWriter::try_new(file, batch.schema(), Some(props)).unwrap(); + writer.write(batch).expect("Writing batch"); + writer.close().unwrap(); + } + + #[tokio::test] + async fn test_read_encrypted_parquet() { + let encryption_key = b"0123456789abcdef"; + let aad_prefix = b"my-table-uuid!!"; + + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + ])); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::new_with_fs(); + + let id_data = Arc::new(Int32Array::from(vec![10, 20, 30])) as ArrayRef; + let batch = RecordBatch::try_new(arrow_schema.clone(), vec![id_data]).unwrap(); + + let file_path = format!("{table_location}/encrypted.parquet"); + write_encrypted_parquet(&file_path, &batch, encryption_key, Some(aad_prefix)); + + let key_metadata = crate::encryption::StandardKeyMetadata::new(encryption_key) + .with_aad_prefix(aad_prefix) + .encode() + .unwrap(); + + let reader = ArrowReaderBuilder::new(file_io, Runtime::current()).build(); + + let task = FileScanTask::builder() + .with_file_size_in_bytes(std::fs::metadata(&file_path).unwrap().len()) + .with_start(0) + .with_length(0) + .with_data_file_path(file_path) + .with_data_file_format(DataFileFormat::Parquet) + .with_schema(schema) + .with_project_field_ids(vec![1]) + .with_case_sensitive(false) + .with_key_metadata(Some(key_metadata)) + .build(); + + let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; + let batches: Vec = reader + .read(tasks) + .unwrap() + .stream() + .try_collect() + .await + .unwrap(); + + assert_eq!(batches.len(), 1); + let ids = batches[0] + .column(0) + .as_any() + .downcast_ref::() + .unwrap(); + assert_eq!(ids.values(), &[10, 20, 30]); + } + + #[tokio::test] + async fn test_read_encrypted_parquet_without_key_metadata_fails() { + let encryption_key = b"0123456789abcdef"; + + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + ])); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::new_with_fs(); + + let id_data = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef; + let batch = RecordBatch::try_new(arrow_schema.clone(), vec![id_data]).unwrap(); + + let file_path = format!("{table_location}/encrypted_no_key.parquet"); + write_encrypted_parquet(&file_path, &batch, encryption_key, None); + + let reader = ArrowReaderBuilder::new(file_io, Runtime::current()).build(); + + let task = FileScanTask::builder() + .with_file_size_in_bytes(std::fs::metadata(&file_path).unwrap().len()) + .with_start(0) + .with_length(0) + .with_data_file_path(file_path) + .with_data_file_format(DataFileFormat::Parquet) + .with_schema(schema) + .with_project_field_ids(vec![1]) + .with_case_sensitive(false) + .build(); + + let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; + let result: Result, _> = + reader.read(tasks).unwrap().stream().try_collect().await; + + let err = result.unwrap_err(); + assert_eq!(err.kind(), crate::ErrorKind::Unexpected); + let err_str = format!("{err}"); + assert!( + err_str.contains("encrypted footer"), + "Expected error about encrypted footer, got: {err_str}" + ); + assert!( + err_str.contains("decryption properties were not provided"), + "Expected error about missing decryption properties, got: {err_str}" + ); + } + + #[tokio::test] + async fn test_read_encrypted_parquet_with_wrong_key_fails() { + let encryption_key = b"0123456789abcdef"; + let wrong_key = b"fedcba9876543210"; + + let schema = Arc::new( + Schema::builder() + .with_schema_id(1) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(), + ); + + let arrow_schema = Arc::new(ArrowSchema::new(vec![ + Field::new("id", DataType::Int32, false).with_metadata(HashMap::from([( + PARQUET_FIELD_ID_META_KEY.to_string(), + "1".to_string(), + )])), + ])); + + let tmp_dir = TempDir::new().unwrap(); + let table_location = tmp_dir.path().to_str().unwrap().to_string(); + let file_io = FileIO::new_with_fs(); + + let id_data = Arc::new(Int32Array::from(vec![1, 2, 3])) as ArrayRef; + let batch = RecordBatch::try_new(arrow_schema.clone(), vec![id_data]).unwrap(); + + let file_path = format!("{table_location}/encrypted_wrong_key.parquet"); + write_encrypted_parquet(&file_path, &batch, encryption_key, None); + + let wrong_key_metadata = crate::encryption::StandardKeyMetadata::new(wrong_key) + .encode() + .unwrap(); + + let reader = ArrowReaderBuilder::new(file_io, Runtime::current()).build(); + + let task = FileScanTask::builder() + .with_file_size_in_bytes(std::fs::metadata(&file_path).unwrap().len()) + .with_start(0) + .with_length(0) + .with_data_file_path(file_path) + .with_data_file_format(DataFileFormat::Parquet) + .with_schema(schema) + .with_project_field_ids(vec![1]) + .with_case_sensitive(false) + .with_key_metadata(Some(wrong_key_metadata)) + .build(); + + let tasks = Box::pin(futures::stream::iter(vec![Ok(task)])) as FileScanTaskStream; + let result: Result, _> = + reader.read(tasks).unwrap().stream().try_collect().await; + + let err = result.unwrap_err(); + assert_eq!(err.kind(), crate::ErrorKind::Unexpected); + let err_str = format!("{err}"); + assert!( + err_str.contains("unable to decrypt parquet footer"), + "Expected error about decryption failure, got: {err_str}" + ); + } + /// Test that concurrency=1 reads all files correctly and in deterministic order. /// This verifies the fast-path optimization for single concurrency. #[tokio::test] diff --git a/crates/iceberg/src/encryption/key_metadata.rs b/crates/iceberg/src/encryption/key_metadata.rs index 4ef66ce394..9e536e4d83 100644 --- a/crates/iceberg/src/encryption/key_metadata.rs +++ b/crates/iceberg/src/encryption/key_metadata.rs @@ -20,7 +20,7 @@ use std::fmt; -use super::SensitiveBytes; +use super::{SecureKey, SensitiveBytes}; use crate::{Error, ErrorKind, Result}; /// Standard key metadata for Iceberg table encryption. @@ -97,7 +97,17 @@ impl StandardKeyMetadata { /// Decodes from Java-compatible format. pub fn decode(bytes: &[u8]) -> Result { - _serde::StandardKeyMetadataV1::decode(bytes).map(Self::from) + let metadata = _serde::StandardKeyMetadataV1::decode(bytes).map(Self::from)?; + // Validate the DEK is a usable AES key (16/24/32 bytes) up front, so a + // malformed key surfaces a clear error. + SecureKey::new(metadata.encryption_key.as_bytes()).map_err(|e| { + Error::new( + ErrorKind::DataInvalid, + "Invalid encryption key in key metadata", + ) + .with_source(e) + })?; + Ok(metadata) } } @@ -276,11 +286,33 @@ mod tests { #[test] fn test_roundtrip_without_aad() { - let metadata = StandardKeyMetadata::new(&[1, 2, 3, 4]); + let key = b"0123456789012345"; + let metadata = StandardKeyMetadata::new(key); let serialized = metadata.encode().unwrap(); let parsed = StandardKeyMetadata::decode(&serialized).unwrap(); - assert_eq!(parsed.encryption_key().as_bytes(), &[1, 2, 3, 4]); + assert_eq!(parsed.encryption_key().as_bytes(), key); assert_eq!(parsed.aad_prefix(), None); } + + #[test] + fn test_decode_rejects_invalid_key_length() { + // 24-byte (AES-192) and 32-byte (AES-256) keys are accepted. + for len in [16usize, 24, 32] { + let metadata = StandardKeyMetadata::new(&vec![0u8; len]); + let serialized = metadata.encode().unwrap(); + assert!(StandardKeyMetadata::decode(&serialized).is_ok()); + } + + for len in [0usize, 4, 15, 20, 33] { + let metadata = StandardKeyMetadata::new(&vec![0u8; len]); + let serialized = metadata.encode().unwrap(); + let err = StandardKeyMetadata::decode(&serialized).unwrap_err(); + assert_eq!(err.kind(), ErrorKind::DataInvalid); + assert!( + err.to_string() + .contains("Invalid encryption key in key metadata") + ); + } + } } diff --git a/crates/iceberg/src/scan/context.rs b/crates/iceberg/src/scan/context.rs index bb8baea616..83ff3676ae 100644 --- a/crates/iceberg/src/scan/context.rs +++ b/crates/iceberg/src/scan/context.rs @@ -136,6 +136,7 @@ impl ManifestEntryContext { // TODO: Extract name_mapping from table metadata property "schema.name-mapping.default" .with_name_mapping(None) .with_case_sensitive(self.case_sensitive) + .with_key_metadata(self.manifest_entry.data_file.key_metadata().map(Box::from)) .build()) } } diff --git a/crates/iceberg/src/scan/task.rs b/crates/iceberg/src/scan/task.rs index f3b556bcbf..cd119072a1 100644 --- a/crates/iceberg/src/scan/task.rs +++ b/crates/iceberg/src/scan/task.rs @@ -118,6 +118,13 @@ pub struct FileScanTask { /// Whether this scan task should treat column names as case-sensitive when binding predicates. pub case_sensitive: bool, + + /// Key metadata for encrypted data files (Parquet Modular Encryption). + /// When present, the reader uses this to build `FileDecryptionProperties`. + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(default)] + pub key_metadata: Option>, } impl FileScanTask { @@ -161,6 +168,13 @@ impl From<&DeleteFileContext> for FileScanTaskDeleteFile { .with_file_type(ctx.manifest_entry.content_type()) .with_partition_spec_id(ctx.partition_spec_id) .with_equality_ids(ctx.manifest_entry.data_file.equality_ids.clone()) + .with_key_metadata( + ctx.manifest_entry + .data_file + .key_metadata + .as_deref() + .map(Box::from), + ) .build() } } @@ -184,4 +198,11 @@ pub struct FileScanTaskDeleteFile { /// equality ids for equality deletes (null for anything other than equality-deletes) #[builder(default)] pub equality_ids: Option>, + + /// Key metadata for encrypted data files (Parquet Modular Encryption). + /// When present, the reader uses this to build `FileDecryptionProperties`. + #[serde(default)] + #[serde(skip_serializing_if = "Option::is_none")] + #[builder(default)] + pub key_metadata: Option>, }