diff --git a/crates/catalog/glue/public-api.txt b/crates/catalog/glue/public-api.txt index 8b7a38da50..902daf635f 100644 --- a/crates/catalog/glue/public-api.txt +++ b/crates/catalog/glue/public-api.txt @@ -29,6 +29,7 @@ pub fn iceberg_catalog_glue::GlueCatalogBuilder::fmt(&self, f: &mut core::fmt::F impl iceberg::catalog::CatalogBuilder for iceberg_catalog_glue::GlueCatalogBuilder pub type iceberg_catalog_glue::GlueCatalogBuilder::C = iceberg_catalog_glue::GlueCatalog pub fn iceberg_catalog_glue::GlueCatalogBuilder::load(self, name: impl core::convert::Into, props: std::collections::hash::map::HashMap) -> impl core::future::future::Future> + core::marker::Send +pub fn iceberg_catalog_glue::GlueCatalogBuilder::with_kms_client_factory(self, kms_client_factory: alloc::sync::Arc) -> Self pub fn iceberg_catalog_glue::GlueCatalogBuilder::with_runtime(self, runtime: iceberg::runtime::Runtime) -> Self pub fn iceberg_catalog_glue::GlueCatalogBuilder::with_storage_factory(self, storage_factory: alloc::sync::Arc) -> Self pub const iceberg_catalog_glue::AWS_ACCESS_KEY_ID: &str diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index 0a69797ef9..9123a53237 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -25,6 +25,7 @@ use async_trait::async_trait; use aws_sdk_glue::operation::create_table::CreateTableError; use aws_sdk_glue::operation::update_table::UpdateTableError; use aws_sdk_glue::types::TableInput; +use iceberg::encryption::kms::{KeyManagementClient, KmsClientFactory}; use iceberg::io::{ FileIO, FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY, S3_SESSION_TOKEN, StorageFactory, @@ -58,6 +59,7 @@ pub const GLUE_CATALOG_PROP_WAREHOUSE: &str = "warehouse"; pub struct GlueCatalogBuilder { config: GlueCatalogConfig, storage_factory: Option>, + kms_client_factory: Option>, runtime: Option, } @@ -72,6 +74,7 @@ impl Default for GlueCatalogBuilder { props: HashMap::new(), }, storage_factory: None, + kms_client_factory: None, runtime: None, } } @@ -85,6 +88,11 @@ impl CatalogBuilder for GlueCatalogBuilder { self } + fn with_kms_client_factory(mut self, kms_client_factory: Arc) -> Self { + self.kms_client_factory = Some(kms_client_factory); + self + } + fn with_runtime(mut self, runtime: Runtime) -> Self { self.runtime = Some(runtime); self @@ -140,7 +148,11 @@ impl CatalogBuilder for GlueCatalogBuilder { Some(rt) => rt, None => Runtime::try_current()?, }; - GlueCatalog::new(self.config, self.storage_factory, runtime).await + let kms_client = match self.kms_client_factory { + Some(factory) => Some(factory.create_kms_client(&self.config.props).await?), + None => None, + }; + GlueCatalog::new(self.config, self.storage_factory, runtime, kms_client).await } } } @@ -163,6 +175,7 @@ pub struct GlueCatalog { client: GlueClient, file_io: FileIO, runtime: Runtime, + kms_client: Option>, } impl Debug for GlueCatalog { @@ -179,6 +192,7 @@ impl GlueCatalog { config: GlueCatalogConfig, storage_factory: Option>, runtime: Runtime, + kms_client: Option>, ) -> Result { let sdk_config = create_sdk_config(&config.props, config.uri.as_ref()).await; let mut file_io_props = config.props.clone(); @@ -228,6 +242,7 @@ impl GlueCatalog { client: GlueClient(client), file_io, runtime, + kms_client, }) } /// Get the catalogs `FileIO` @@ -287,6 +302,7 @@ impl GlueCatalog { table_name.to_owned(), )) .runtime(self.runtime.clone()) + .kms_client(self.kms_client.clone()) .build()?; Ok((table, version_id)) @@ -644,6 +660,7 @@ impl Catalog for GlueCatalog { .metadata(metadata) .identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name)) .runtime(self.runtime.clone()) + .kms_client(self.kms_client.clone()) .build() } @@ -874,6 +891,7 @@ impl Catalog for GlueCatalog { .metadata(metadata) .file_io(self.file_io()) .runtime(self.runtime.clone()) + .kms_client(self.kms_client.clone()) .build()?) } diff --git a/crates/catalog/hms/public-api.txt b/crates/catalog/hms/public-api.txt index 744819c437..9ea5f553b3 100644 --- a/crates/catalog/hms/public-api.txt +++ b/crates/catalog/hms/public-api.txt @@ -35,6 +35,7 @@ pub fn iceberg_catalog_hms::HmsCatalogBuilder::fmt(&self, f: &mut core::fmt::For impl iceberg::catalog::CatalogBuilder for iceberg_catalog_hms::HmsCatalogBuilder pub type iceberg_catalog_hms::HmsCatalogBuilder::C = iceberg_catalog_hms::HmsCatalog pub fn iceberg_catalog_hms::HmsCatalogBuilder::load(self, name: impl core::convert::Into, props: std::collections::hash::map::HashMap) -> impl core::future::future::Future> + core::marker::Send +pub fn iceberg_catalog_hms::HmsCatalogBuilder::with_kms_client_factory(self, kms_client_factory: alloc::sync::Arc) -> Self pub fn iceberg_catalog_hms::HmsCatalogBuilder::with_runtime(self, runtime: iceberg::runtime::Runtime) -> Self pub fn iceberg_catalog_hms::HmsCatalogBuilder::with_storage_factory(self, storage_factory: alloc::sync::Arc) -> Self pub const iceberg_catalog_hms::HMS_CATALOG_PROP_THRIFT_TRANSPORT: &str diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index 0f15890c77..a4ced06ea4 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -26,6 +26,7 @@ use hive_metastore::{ ThriftHiveMetastoreClient, ThriftHiveMetastoreClientBuilder, ThriftHiveMetastoreGetDatabaseException, ThriftHiveMetastoreGetTableException, }; +use iceberg::encryption::kms::{KeyManagementClient, KmsClientFactory}; use iceberg::io::{FileIO, FileIOBuilder, StorageFactory}; use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; @@ -56,6 +57,7 @@ pub const HMS_CATALOG_PROP_WAREHOUSE: &str = "warehouse"; pub struct HmsCatalogBuilder { config: HmsCatalogConfig, storage_factory: Option>, + kms_client_factory: Option>, runtime: Option, } @@ -70,6 +72,7 @@ impl Default for HmsCatalogBuilder { props: HashMap::new(), }, storage_factory: None, + kms_client_factory: None, runtime: None, } } @@ -83,6 +86,11 @@ impl CatalogBuilder for HmsCatalogBuilder { self } + fn with_kms_client_factory(mut self, kms_client_factory: Arc) -> Self { + self.kms_client_factory = Some(kms_client_factory); + self + } + fn with_runtime(mut self, runtime: Runtime) -> Self { self.runtime = Some(runtime); self @@ -123,7 +131,12 @@ impl CatalogBuilder for HmsCatalogBuilder { }) .collect(); - let result = (|| -> Result { + async move { + let kms_client = match self.kms_client_factory { + Some(factory) => Some(factory.create_kms_client(&self.config.props).await?), + None => None, + }; + if self.config.name.is_none() { return Err(Error::new( ErrorKind::DataInvalid, @@ -146,10 +159,8 @@ impl CatalogBuilder for HmsCatalogBuilder { Some(rt) => rt, None => Runtime::try_current()?, }; - HmsCatalog::new(self.config, self.storage_factory, runtime) - })(); - - std::future::ready(result) + HmsCatalog::new(self.config, self.storage_factory, runtime, kms_client) + } } } @@ -182,6 +193,7 @@ pub struct HmsCatalog { client: HmsClient, file_io: FileIO, runtime: Runtime, + kms_client: Option>, } impl Debug for HmsCatalog { @@ -198,6 +210,7 @@ impl HmsCatalog { config: HmsCatalogConfig, storage_factory: Option>, runtime: Runtime, + kms_client: Option>, ) -> Result { let address = config .address @@ -238,6 +251,7 @@ impl HmsCatalog { client: HmsClient(client), file_io, runtime, + kms_client, }) } /// Get the catalogs `FileIO` @@ -545,6 +559,7 @@ impl Catalog for HmsCatalog { .metadata(metadata) .identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name)) .runtime(self.runtime.clone()) + .kms_client(self.kms_client.clone()) .build() } @@ -584,6 +599,7 @@ impl Catalog for HmsCatalog { table.name.clone(), )) .runtime(self.runtime.clone()) + .kms_client(self.kms_client.clone()) .build() } diff --git a/crates/catalog/loader/public-api.txt b/crates/catalog/loader/public-api.txt index c9e50207f4..1efaf831df 100644 --- a/crates/catalog/loader/public-api.txt +++ b/crates/catalog/loader/public-api.txt @@ -6,9 +6,11 @@ impl<'a> core::convert::From<&'a str> for iceberg_catalog_loader::CatalogLoader< pub fn iceberg_catalog_loader::CatalogLoader<'a>::from(s: &'a str) -> Self pub trait iceberg_catalog_loader::BoxedCatalogBuilder: core::marker::Send pub fn iceberg_catalog_loader::BoxedCatalogBuilder::load<'async_trait>(self: alloc::boxed::Box, name: alloc::string::String, props: std::collections::hash::map::HashMap) -> core::pin::Pin>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait +pub fn iceberg_catalog_loader::BoxedCatalogBuilder::with_kms_client_factory(self: alloc::boxed::Box, kms_client_factory: alloc::sync::Arc) -> alloc::boxed::Box pub fn iceberg_catalog_loader::BoxedCatalogBuilder::with_storage_factory(self: alloc::boxed::Box, storage_factory: alloc::sync::Arc) -> alloc::boxed::Box impl iceberg_catalog_loader::BoxedCatalogBuilder for T pub fn T::load<'async_trait>(self: alloc::boxed::Box, name: alloc::string::String, props: std::collections::hash::map::HashMap) -> core::pin::Pin>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait +pub fn T::with_kms_client_factory(self: alloc::boxed::Box, kms_client_factory: alloc::sync::Arc) -> alloc::boxed::Box pub fn T::with_storage_factory(self: alloc::boxed::Box, storage_factory: alloc::sync::Arc) -> alloc::boxed::Box pub fn iceberg_catalog_loader::load(type: &str) -> iceberg::error::Result> pub fn iceberg_catalog_loader::supported_types() -> alloc::vec::Vec<&'static str> diff --git a/crates/catalog/loader/src/lib.rs b/crates/catalog/loader/src/lib.rs index f681337736..d45decc40e 100644 --- a/crates/catalog/loader/src/lib.rs +++ b/crates/catalog/loader/src/lib.rs @@ -19,6 +19,7 @@ use std::collections::HashMap; use std::sync::Arc; use async_trait::async_trait; +use iceberg::encryption::kms::KmsClientFactory; use iceberg::io::StorageFactory; use iceberg::{Catalog, CatalogBuilder, Error, ErrorKind, Result}; use iceberg_catalog_glue::GlueCatalogBuilder; @@ -51,6 +52,11 @@ pub trait BoxedCatalogBuilder: Send { storage_factory: Arc, ) -> Box; + fn with_kms_client_factory( + self: Box, + kms_client_factory: Arc, + ) -> Box; + async fn load( self: Box, name: String, @@ -67,6 +73,16 @@ impl BoxedCatalogBuilder for T { Box::new(CatalogBuilder::with_storage_factory(*self, storage_factory)) } + fn with_kms_client_factory( + self: Box, + kms_client_factory: Arc, + ) -> Box { + Box::new(CatalogBuilder::with_kms_client_factory( + *self, + kms_client_factory, + )) + } + async fn load( self: Box, name: String, diff --git a/crates/catalog/rest/public-api.txt b/crates/catalog/rest/public-api.txt index 027df29b24..db7f906017 100644 --- a/crates/catalog/rest/public-api.txt +++ b/crates/catalog/rest/public-api.txt @@ -215,6 +215,7 @@ pub fn iceberg_catalog_rest::RestCatalogBuilder::fmt(&self, f: &mut core::fmt::F impl iceberg::catalog::CatalogBuilder for iceberg_catalog_rest::RestCatalogBuilder pub type iceberg_catalog_rest::RestCatalogBuilder::C = iceberg_catalog_rest::RestCatalog pub fn iceberg_catalog_rest::RestCatalogBuilder::load(self, name: impl core::convert::Into, props: std::collections::hash::map::HashMap) -> impl core::future::future::Future> + core::marker::Send +pub fn iceberg_catalog_rest::RestCatalogBuilder::with_kms_client_factory(self, kms_client_factory: alloc::sync::Arc) -> Self pub fn iceberg_catalog_rest::RestCatalogBuilder::with_runtime(self, runtime: iceberg::runtime::Runtime) -> Self pub fn iceberg_catalog_rest::RestCatalogBuilder::with_storage_factory(self, storage_factory: alloc::sync::Arc) -> Self pub struct iceberg_catalog_rest::StorageCredential diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 0626ce5061..499939625d 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -23,6 +23,7 @@ use std::str::FromStr; use std::sync::Arc; use async_trait::async_trait; +use iceberg::encryption::kms::{KeyManagementClient, KmsClientFactory}; use iceberg::io::{FileIO, FileIOBuilder, StorageFactory}; use iceberg::table::Table; use iceberg::{ @@ -62,6 +63,7 @@ const PATH_V1: &str = "v1"; pub struct RestCatalogBuilder { config: RestCatalogConfig, storage_factory: Option>, + kms_client_factory: Option>, runtime: Option, } @@ -76,6 +78,7 @@ impl Default for RestCatalogBuilder { client: None, }, storage_factory: None, + kms_client_factory: None, runtime: None, } } @@ -89,6 +92,11 @@ impl CatalogBuilder for RestCatalogBuilder { self } + fn with_kms_client_factory(mut self, kms_client_factory: Arc) -> Self { + self.kms_client_factory = Some(kms_client_factory); + self + } + fn with_runtime(mut self, runtime: Runtime) -> Self { self.runtime = Some(runtime); self @@ -118,7 +126,7 @@ impl CatalogBuilder for RestCatalogBuilder { .filter(|(k, _)| k != REST_CATALOG_PROP_URI && k != REST_CATALOG_PROP_WAREHOUSE) .collect(); - let result = { + async move { if self.config.name.is_none() { Err(Error::new( ErrorKind::DataInvalid, @@ -131,11 +139,18 @@ impl CatalogBuilder for RestCatalogBuilder { )) } else { let runtime = self.runtime.unwrap_or_else(Runtime::current); - Ok(RestCatalog::new(self.config, self.storage_factory, runtime)) + let kms_client = match self.kms_client_factory { + Some(factory) => Some(factory.create_kms_client(&self.config.props).await?), + None => None, + }; + Ok(RestCatalog::new( + self.config, + self.storage_factory, + runtime, + kms_client, + )) } - }; - - std::future::ready(result) + } } } @@ -360,6 +375,8 @@ pub struct RestCatalog { /// Storage factory for creating FileIO instances. storage_factory: Option>, runtime: Runtime, + /// Optional KMS client for encrypted tables. + kms_client: Option>, } impl RestCatalog { @@ -368,12 +385,14 @@ impl RestCatalog { config: RestCatalogConfig, storage_factory: Option>, runtime: Runtime, + kms_client: Option>, ) -> Self { Self { user_config: config, ctx: OnceCell::new(), storage_factory, runtime, + kms_client, } } @@ -805,7 +824,8 @@ impl Catalog for RestCatalog { .identifier(table_ident.clone()) .file_io(file_io) .metadata(response.metadata) - .runtime(self.runtime.clone()); + .runtime(self.runtime.clone()) + .kms_client(self.kms_client.clone()); if let Some(metadata_location) = response.metadata_location { table_builder.metadata_location(metadata_location).build() @@ -862,7 +882,8 @@ impl Catalog for RestCatalog { .identifier(table_ident.clone()) .file_io(file_io) .metadata(response.metadata) - .runtime(self.runtime.clone()); + .runtime(self.runtime.clone()) + .kms_client(self.kms_client.clone()); if let Some(metadata_location) = response.metadata_location { table_builder.metadata_location(metadata_location).build() @@ -999,6 +1020,7 @@ impl Catalog for RestCatalog { .metadata(response.metadata) .metadata_location(metadata_location.clone()) .runtime(self.runtime.clone()) + .kms_client(self.kms_client.clone()) .build() } @@ -1072,6 +1094,7 @@ impl Catalog for RestCatalog { .metadata(response.metadata) .metadata_location(response.metadata_location) .runtime(self.runtime.clone()) + .kms_client(self.kms_client.clone()) .build() } } @@ -1119,6 +1142,7 @@ mod tests { RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), Runtime::current(), + None, ); assert_eq!( @@ -1194,6 +1218,7 @@ mod tests { .build(), Some(Arc::new(LocalFsStorageFactory)), Runtime::current(), + None, ); let token = catalog.context().await.unwrap().client.token().await; @@ -1242,6 +1267,7 @@ mod tests { .build(), Some(Arc::new(LocalFsStorageFactory)), Runtime::current(), + None, ); let token = catalog.context().await.unwrap().client.token().await; @@ -1267,6 +1293,7 @@ mod tests { .build(), Some(Arc::new(LocalFsStorageFactory)), Runtime::current(), + None, ); let token = catalog.context().await.unwrap().client.token().await; @@ -1299,6 +1326,7 @@ mod tests { .build(), Some(Arc::new(LocalFsStorageFactory)), Runtime::current(), + None, ); let token = catalog.context().await.unwrap().client.token().await; @@ -1331,6 +1359,7 @@ mod tests { .build(), Some(Arc::new(LocalFsStorageFactory)), Runtime::current(), + None, ); let token = catalog.context().await.unwrap().client.token().await; @@ -1363,6 +1392,7 @@ mod tests { .build(), Some(Arc::new(LocalFsStorageFactory)), Runtime::current(), + None, ); let token = catalog.context().await.unwrap().client.token().await; @@ -1477,6 +1507,7 @@ mod tests { .build(), Some(Arc::new(LocalFsStorageFactory)), Runtime::current(), + None, ); let token = catalog.context().await.unwrap().client.token().await; @@ -1525,6 +1556,7 @@ mod tests { RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), Runtime::current(), + None, ); let _namespaces = catalog.list_namespaces(None).await.unwrap(); @@ -1556,6 +1588,7 @@ mod tests { RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), Runtime::current(), + None, ); let namespaces = catalog.list_namespaces(None).await.unwrap(); @@ -1608,6 +1641,7 @@ mod tests { RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), Runtime::current(), + None, ); let namespaces = catalog.list_namespaces(None).await.unwrap(); @@ -1708,6 +1742,7 @@ mod tests { RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), Runtime::current(), + None, ); let namespaces = catalog.list_namespaces(None).await.unwrap(); @@ -1762,6 +1797,7 @@ mod tests { RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), Runtime::current(), + None, ); let namespaces = catalog @@ -1806,6 +1842,7 @@ mod tests { RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), Runtime::current(), + None, ); let namespaces = catalog @@ -1840,6 +1877,7 @@ mod tests { RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), Runtime::current(), + None, ); assert!( @@ -1869,6 +1907,7 @@ mod tests { RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), Runtime::current(), + None, ); catalog @@ -1910,6 +1949,7 @@ mod tests { RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), Runtime::current(), + None, ); let tables = catalog @@ -1979,6 +2019,7 @@ mod tests { RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), Runtime::current(), + None, ); let tables = catalog @@ -2111,6 +2152,7 @@ mod tests { RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), Runtime::current(), + None, ); let tables = catalog @@ -2156,6 +2198,7 @@ mod tests { RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), Runtime::current(), + None, ); catalog @@ -2186,6 +2229,7 @@ mod tests { RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), Runtime::current(), + None, ); assert!( @@ -2218,6 +2262,7 @@ mod tests { RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), Runtime::current(), + None, ); catalog @@ -2253,6 +2298,7 @@ mod tests { RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), Runtime::current(), + None, ); let table = catalog @@ -2371,6 +2417,7 @@ mod tests { RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), Runtime::current(), + None, ); let table = catalog @@ -2408,6 +2455,7 @@ mod tests { RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), Runtime::current(), + None, ); let table_creation = TableCreation::builder() @@ -2558,6 +2606,7 @@ mod tests { RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), Runtime::current(), + None, ); let table_creation = TableCreation::builder() @@ -2628,6 +2677,7 @@ mod tests { RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), Runtime::current(), + None, ); let table1 = { @@ -2773,6 +2823,7 @@ mod tests { RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), Runtime::current(), + None, ); let table1 = { @@ -2839,6 +2890,7 @@ mod tests { RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), Runtime::current(), + None, ); let table_ident = TableIdent::new(NamespaceIdent::new("ns1".to_string()), "test1".to_string()); @@ -2891,6 +2943,7 @@ mod tests { RestCatalogConfig::builder().uri(server.url()).build(), Some(Arc::new(LocalFsStorageFactory)), Runtime::current(), + None, ); let table_ident = diff --git a/crates/catalog/s3tables/public-api.txt b/crates/catalog/s3tables/public-api.txt index b6b704e3dd..6d1b089e12 100644 --- a/crates/catalog/s3tables/public-api.txt +++ b/crates/catalog/s3tables/public-api.txt @@ -30,6 +30,7 @@ pub fn iceberg_catalog_s3tables::S3TablesCatalogBuilder::fmt(&self, f: &mut core impl iceberg::catalog::CatalogBuilder for iceberg_catalog_s3tables::S3TablesCatalogBuilder pub type iceberg_catalog_s3tables::S3TablesCatalogBuilder::C = iceberg_catalog_s3tables::S3TablesCatalog pub fn iceberg_catalog_s3tables::S3TablesCatalogBuilder::load(self, name: impl core::convert::Into, props: std::collections::hash::map::HashMap) -> impl core::future::future::Future> + core::marker::Send +pub fn iceberg_catalog_s3tables::S3TablesCatalogBuilder::with_kms_client_factory(self, kms_client_factory: alloc::sync::Arc) -> Self pub fn iceberg_catalog_s3tables::S3TablesCatalogBuilder::with_runtime(self, runtime: iceberg::runtime::Runtime) -> Self pub fn iceberg_catalog_s3tables::S3TablesCatalogBuilder::with_storage_factory(self, storage_factory: alloc::sync::Arc) -> Self pub const iceberg_catalog_s3tables::S3TABLES_CATALOG_PROP_ENDPOINT_URL: &str diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index e7f4174261..3bda98dfa3 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -27,6 +27,7 @@ use aws_sdk_s3tables::operation::get_table::{GetTableError, GetTableOutput}; use aws_sdk_s3tables::operation::list_tables::ListTablesOutput; use aws_sdk_s3tables::operation::update_table_metadata_location::UpdateTableMetadataLocationError; use aws_sdk_s3tables::types::OpenTableFormat; +use iceberg::encryption::kms::{KeyManagementClient, KmsClientFactory}; use iceberg::io::{FileIO, FileIOBuilder, StorageFactory}; use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; @@ -71,6 +72,7 @@ struct S3TablesCatalogConfig { pub struct S3TablesCatalogBuilder { config: S3TablesCatalogConfig, storage_factory: Option>, + kms_client_factory: Option>, runtime: Option, } @@ -86,6 +88,7 @@ impl Default for S3TablesCatalogBuilder { props: HashMap::new(), }, storage_factory: None, + kms_client_factory: None, runtime: None, } } @@ -134,6 +137,11 @@ impl CatalogBuilder for S3TablesCatalogBuilder { self } + fn with_kms_client_factory(mut self, kms_client_factory: Arc) -> Self { + self.kms_client_factory = Some(kms_client_factory); + self + } + fn with_runtime(mut self, runtime: Runtime) -> Self { self.runtime = Some(runtime); self @@ -183,7 +191,11 @@ impl CatalogBuilder for S3TablesCatalogBuilder { Some(rt) => rt, None => Runtime::try_current()?, }; - S3TablesCatalog::new(self.config, self.storage_factory, runtime).await + let kms_client = match self.kms_client_factory { + Some(factory) => Some(factory.create_kms_client(&self.config.props).await?), + None => None, + }; + S3TablesCatalog::new(self.config, self.storage_factory, runtime, kms_client).await } } } @@ -196,6 +208,7 @@ pub struct S3TablesCatalog { s3tables_client: aws_sdk_s3tables::Client, file_io: FileIO, runtime: Runtime, + kms_client: Option>, } impl S3TablesCatalog { @@ -204,6 +217,7 @@ impl S3TablesCatalog { config: S3TablesCatalogConfig, storage_factory: Option>, runtime: Runtime, + kms_client: Option>, ) -> Result { let s3tables_client = if let Some(client) = config.client.clone() { client @@ -227,6 +241,7 @@ impl S3TablesCatalog { s3tables_client, file_io, runtime, + kms_client, }) } @@ -276,6 +291,7 @@ impl S3TablesCatalog { .metadata_location(metadata_location) .file_io(self.file_io.clone()) .runtime(self.runtime.clone()) + .kms_client(self.kms_client.clone()) .build()?; Ok((table, resp.version_token)) } @@ -575,6 +591,7 @@ impl Catalog for S3TablesCatalog { .metadata(metadata) .file_io(self.file_io.clone()) .runtime(self.runtime.clone()) + .kms_client(self.kms_client.clone()) .build()?; Ok(table) } @@ -759,7 +776,7 @@ mod tests { }; Ok(Some( - S3TablesCatalog::new(config, None, Runtime::current()).await?, + S3TablesCatalog::new(config, None, Runtime::current(), None).await?, )) } diff --git a/crates/catalog/sql/public-api.txt b/crates/catalog/sql/public-api.txt index 9402ccde4e..7f3c638942 100644 --- a/crates/catalog/sql/public-api.txt +++ b/crates/catalog/sql/public-api.txt @@ -48,6 +48,7 @@ pub fn iceberg_catalog_sql::SqlCatalogBuilder::fmt(&self, f: &mut core::fmt::For impl iceberg::catalog::CatalogBuilder for iceberg_catalog_sql::SqlCatalogBuilder pub type iceberg_catalog_sql::SqlCatalogBuilder::C = iceberg_catalog_sql::SqlCatalog pub fn iceberg_catalog_sql::SqlCatalogBuilder::load(self, name: impl core::convert::Into, props: std::collections::hash::map::HashMap) -> impl core::future::future::Future> + core::marker::Send +pub fn iceberg_catalog_sql::SqlCatalogBuilder::with_kms_client_factory(self, kms_client_factory: alloc::sync::Arc) -> Self pub fn iceberg_catalog_sql::SqlCatalogBuilder::with_runtime(self, runtime: iceberg::runtime::Runtime) -> Self pub fn iceberg_catalog_sql::SqlCatalogBuilder::with_storage_factory(self, storage_factory: alloc::sync::Arc) -> Self pub const iceberg_catalog_sql::SQL_CATALOG_PROP_BIND_STYLE: &str diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 02e32c5f4a..89910f801a 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -21,6 +21,7 @@ use std::sync::Arc; use std::time::Duration; use async_trait::async_trait; +use iceberg::encryption::kms::{KeyManagementClient, KmsClientFactory}; use iceberg::io::{FileIO, FileIOBuilder, StorageFactory}; use iceberg::spec::{TableMetadata, TableMetadataBuilder}; use iceberg::table::Table; @@ -67,6 +68,7 @@ static TEST_BEFORE_ACQUIRE: bool = true; // Default the health-check of each con pub struct SqlCatalogBuilder { config: SqlCatalogConfig, storage_factory: Option>, + kms_client_factory: Option>, runtime: Option, } @@ -81,6 +83,7 @@ impl Default for SqlCatalogBuilder { props: HashMap::new(), }, storage_factory: None, + kms_client_factory: None, runtime: None, } } @@ -145,6 +148,11 @@ impl CatalogBuilder for SqlCatalogBuilder { self } + fn with_kms_client_factory(mut self, kms_client_factory: Arc) -> Self { + self.kms_client_factory = Some(kms_client_factory); + self + } + fn with_runtime(mut self, runtime: Runtime) -> Self { self.runtime = Some(runtime); self @@ -201,7 +209,11 @@ impl CatalogBuilder for SqlCatalogBuilder { Some(rt) => rt, None => Runtime::try_current()?, }; - SqlCatalog::new(self.config, self.storage_factory, runtime).await + let kms_client = match self.kms_client_factory { + Some(factory) => Some(factory.create_kms_client(&self.config.props).await?), + None => None, + }; + SqlCatalog::new(self.config, self.storage_factory, runtime, kms_client).await } } } @@ -233,6 +245,7 @@ pub struct SqlCatalog { fileio: FileIO, sql_bind_style: SqlBindStyle, runtime: Runtime, + kms_client: Option>, } #[derive(Debug, PartialEq, strum::EnumString, strum::Display)] @@ -250,6 +263,7 @@ impl SqlCatalog { config: SqlCatalogConfig, storage_factory: Option>, runtime: Runtime, + kms_client: Option>, ) -> Result { let factory = storage_factory.ok_or_else(|| { Error::new( @@ -321,6 +335,7 @@ impl SqlCatalog { fileio, sql_bind_style: config.sql_bind_style, runtime, + kms_client, }) } @@ -824,6 +839,7 @@ impl Catalog for SqlCatalog { .metadata_location(tbl_metadata_location) .metadata(metadata) .runtime(self.runtime.clone()) + .kms_client(self.kms_client.clone()) .build()?) } @@ -895,6 +911,7 @@ impl Catalog for SqlCatalog { .identifier(tbl_ident) .metadata(tbl_metadata) .runtime(self.runtime.clone()) + .kms_client(self.kms_client.clone()) .build()?) } @@ -967,6 +984,7 @@ impl Catalog for SqlCatalog { .metadata(metadata) .file_io(self.fileio.clone()) .runtime(self.runtime.clone()) + .kms_client(self.kms_client.clone()) .build()?) } diff --git a/crates/iceberg/public-api.txt b/crates/iceberg/public-api.txt index eb2d4f932b..9349c7a250 100644 --- a/crates/iceberg/public-api.txt +++ b/crates/iceberg/public-api.txt @@ -195,6 +195,15 @@ pub fn iceberg::encryption::kms::MemoryKeyManagementClient::generate_key<'life0, pub fn iceberg::encryption::kms::MemoryKeyManagementClient::supports_key_generation(&self) -> bool pub fn iceberg::encryption::kms::MemoryKeyManagementClient::unwrap_key<'life0, 'life1, 'life2, 'async_trait>(&'life0 self, wrapped_key: &'life1 [u8], wrapping_key_id: &'life2 str) -> core::pin::Pin> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait pub fn iceberg::encryption::kms::MemoryKeyManagementClient::wrap_key<'life0, 'life1, 'life2, 'async_trait>(&'life0 self, key: &'life1 [u8], wrapping_key_id: &'life2 str) -> core::pin::Pin>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait +pub struct iceberg::encryption::kms::MemoryKmsClientFactory +impl iceberg::encryption::kms::MemoryKmsClientFactory +pub fn iceberg::encryption::kms::MemoryKmsClientFactory::new(kms: &iceberg::encryption::kms::MemoryKeyManagementClient) -> Self +impl core::clone::Clone for iceberg::encryption::kms::MemoryKmsClientFactory +pub fn iceberg::encryption::kms::MemoryKmsClientFactory::clone(&self) -> iceberg::encryption::kms::MemoryKmsClientFactory +impl core::fmt::Debug for iceberg::encryption::kms::MemoryKmsClientFactory +pub fn iceberg::encryption::kms::MemoryKmsClientFactory::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl iceberg::encryption::kms::KmsClientFactory for iceberg::encryption::kms::MemoryKmsClientFactory +pub fn iceberg::encryption::kms::MemoryKmsClientFactory::create_kms_client<'life0, 'life1, 'async_trait>(&'life0 self, _properties: &'life1 std::collections::hash::map::HashMap) -> core::pin::Pin>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait pub trait iceberg::encryption::kms::KeyManagementClient: core::marker::Send + core::marker::Sync + core::fmt::Debug pub fn iceberg::encryption::kms::KeyManagementClient::generate_key<'life0, 'life1, 'async_trait>(&'life0 self, wrapping_key_id: &'life1 str) -> core::pin::Pin> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait pub fn iceberg::encryption::kms::KeyManagementClient::supports_key_generation(&self) -> bool @@ -210,6 +219,10 @@ pub fn T::generate_key<'life0, 'life1, 'async_trait>(&'life0 self, wrapping_key_ pub fn T::supports_key_generation(&self) -> bool pub fn T::unwrap_key<'life0, 'life1, 'life2, 'async_trait>(&'life0 self, wrapped_key: &'life1 [u8], wrapping_key_id: &'life2 str) -> core::pin::Pin> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait pub fn T::wrap_key<'life0, 'life1, 'life2, 'async_trait>(&'life0 self, key: &'life1 [u8], wrapping_key_id: &'life2 str) -> core::pin::Pin>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait +pub trait iceberg::encryption::kms::KmsClientFactory: core::fmt::Debug + core::marker::Send + core::marker::Sync +pub fn iceberg::encryption::kms::KmsClientFactory::create_kms_client<'life0, 'life1, 'async_trait>(&'life0 self, properties: &'life1 std::collections::hash::map::HashMap) -> core::pin::Pin>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait +impl iceberg::encryption::kms::KmsClientFactory for iceberg::encryption::kms::MemoryKmsClientFactory +pub fn iceberg::encryption::kms::MemoryKmsClientFactory::create_kms_client<'life0, 'life1, 'async_trait>(&'life0 self, _properties: &'life1 std::collections::hash::map::HashMap) -> core::pin::Pin>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait pub enum iceberg::encryption::AesKeySize pub iceberg::encryption::AesKeySize::Bits128 = 128 pub iceberg::encryption::AesKeySize::Bits192 = 192 @@ -1075,6 +1088,7 @@ pub fn iceberg::memory::MemoryCatalogBuilder::fmt(&self, f: &mut core::fmt::Form impl iceberg::CatalogBuilder for iceberg::memory::MemoryCatalogBuilder pub type iceberg::memory::MemoryCatalogBuilder::C = iceberg::memory::MemoryCatalog pub fn iceberg::memory::MemoryCatalogBuilder::load(self, name: impl core::convert::Into, props: std::collections::hash::map::HashMap) -> impl core::future::future::Future> + core::marker::Send +pub fn iceberg::memory::MemoryCatalogBuilder::with_kms_client_factory(self, kms_client_factory: alloc::sync::Arc) -> Self pub fn iceberg::memory::MemoryCatalogBuilder::with_runtime(self, runtime: iceberg::Runtime) -> Self pub fn iceberg::memory::MemoryCatalogBuilder::with_storage_factory(self, storage_factory: alloc::sync::Arc) -> Self pub const iceberg::memory::MEMORY_CATALOG_WAREHOUSE: &str @@ -3046,7 +3060,7 @@ pub fn iceberg::table::TableBuilder::cache_size_bytes(self, cache_size_bytes: u6 pub fn iceberg::table::TableBuilder::disable_cache(self) -> Self pub fn iceberg::table::TableBuilder::file_io(self, file_io: iceberg::io::FileIO) -> Self pub fn iceberg::table::TableBuilder::identifier(self, identifier: iceberg::TableIdent) -> Self -pub fn iceberg::table::TableBuilder::kms_client(self, kms_client: alloc::sync::Arc) -> Self +pub fn iceberg::table::TableBuilder::kms_client(self, kms_client: impl core::convert::Into>>) -> Self pub fn iceberg::table::TableBuilder::metadata>(self, metadata: T) -> Self pub fn iceberg::table::TableBuilder::metadata_location>(self, metadata_location: T) -> Self pub fn iceberg::table::TableBuilder::readonly(self, readonly: bool) -> Self @@ -3673,11 +3687,13 @@ pub fn iceberg::memory::MemoryCatalog::update_table<'life0, 'async_trait>(&'life pub trait iceberg::CatalogBuilder: core::default::Default + core::fmt::Debug + core::marker::Send + core::marker::Sync pub type iceberg::CatalogBuilder::C: iceberg::Catalog pub fn iceberg::CatalogBuilder::load(self, name: impl core::convert::Into, props: std::collections::hash::map::HashMap) -> impl core::future::future::Future> + core::marker::Send +pub fn iceberg::CatalogBuilder::with_kms_client_factory(self, kms_client_factory: alloc::sync::Arc) -> Self pub fn iceberg::CatalogBuilder::with_runtime(self, runtime: iceberg::Runtime) -> Self pub fn iceberg::CatalogBuilder::with_storage_factory(self, storage_factory: alloc::sync::Arc) -> Self impl iceberg::CatalogBuilder for iceberg::memory::MemoryCatalogBuilder pub type iceberg::memory::MemoryCatalogBuilder::C = iceberg::memory::MemoryCatalog pub fn iceberg::memory::MemoryCatalogBuilder::load(self, name: impl core::convert::Into, props: std::collections::hash::map::HashMap) -> impl core::future::future::Future> + core::marker::Send +pub fn iceberg::memory::MemoryCatalogBuilder::with_kms_client_factory(self, kms_client_factory: alloc::sync::Arc) -> Self pub fn iceberg::memory::MemoryCatalogBuilder::with_runtime(self, runtime: iceberg::Runtime) -> Self pub fn iceberg::memory::MemoryCatalogBuilder::with_storage_factory(self, storage_factory: alloc::sync::Arc) -> Self pub async fn iceberg::drop_table_data(table_info: &iceberg::table::Table) -> iceberg::Result<()> diff --git a/crates/iceberg/src/catalog/memory/catalog.rs b/crates/iceberg/src/catalog/memory/catalog.rs index 67f8ab8dd1..f0007b8738 100644 --- a/crates/iceberg/src/catalog/memory/catalog.rs +++ b/crates/iceberg/src/catalog/memory/catalog.rs @@ -26,6 +26,7 @@ use futures::lock::{Mutex, MutexGuard}; use itertools::Itertools; use super::namespace_state::NamespaceState; +use crate::encryption::kms::{KeyManagementClient, KmsClientFactory}; use crate::io::{FileIO, FileIOBuilder, MemoryStorageFactory, StorageFactory}; use crate::runtime::Runtime; use crate::spec::{TableMetadata, TableMetadataBuilder}; @@ -46,6 +47,7 @@ const LOCATION: &str = "location"; pub struct MemoryCatalogBuilder { config: MemoryCatalogConfig, storage_factory: Option>, + kms_client_factory: Option>, runtime: Option, } @@ -58,6 +60,7 @@ impl Default for MemoryCatalogBuilder { props: HashMap::new(), }, storage_factory: None, + kms_client_factory: None, runtime: None, } } @@ -71,6 +74,11 @@ impl CatalogBuilder for MemoryCatalogBuilder { self } + fn with_kms_client_factory(mut self, kms_client_factory: Arc) -> Self { + self.kms_client_factory = Some(kms_client_factory); + self + } + fn with_runtime(mut self, runtime: Runtime) -> Self { self.runtime = Some(runtime); self @@ -96,7 +104,7 @@ impl CatalogBuilder for MemoryCatalogBuilder { .filter(|(k, _)| k != MEMORY_CATALOG_WAREHOUSE) .collect(); - let result = { + async move { if self.config.name.is_none() { Err(Error::new( ErrorKind::DataInvalid, @@ -109,11 +117,13 @@ impl CatalogBuilder for MemoryCatalogBuilder { )) } else { let runtime = self.runtime.unwrap_or_else(Runtime::current); - MemoryCatalog::new(self.config, self.storage_factory, runtime) + let kms_client = match self.kms_client_factory { + Some(factory) => Some(factory.create_kms_client(&self.config.props).await?), + None => None, + }; + MemoryCatalog::new(self.config, self.storage_factory, runtime, kms_client) } - }; - - std::future::ready(result) + } } } @@ -131,6 +141,7 @@ pub struct MemoryCatalog { file_io: FileIO, warehouse_location: String, runtime: Runtime, + kms_client: Option>, } impl MemoryCatalog { @@ -139,6 +150,7 @@ impl MemoryCatalog { config: MemoryCatalogConfig, storage_factory: Option>, runtime: Runtime, + kms_client: Option>, ) -> Result { // Use provided factory or default to MemoryStorageFactory let factory = storage_factory.unwrap_or_else(|| Arc::new(MemoryStorageFactory)); @@ -148,6 +160,7 @@ impl MemoryCatalog { file_io: FileIOBuilder::new(factory).with_props(config.props).build(), warehouse_location: config.warehouse, runtime, + kms_client, }) } @@ -166,6 +179,7 @@ impl MemoryCatalog { .metadata_location(metadata_location.to_string()) .file_io(self.file_io.clone()) .runtime(self.runtime.clone()) + .kms_client(self.kms_client.clone()) .build() } } @@ -321,6 +335,7 @@ impl Catalog for MemoryCatalog { .metadata(metadata) .identifier(table_ident) .runtime(self.runtime.clone()) + .kms_client(self.kms_client.clone()) .build() } @@ -388,6 +403,7 @@ impl Catalog for MemoryCatalog { .metadata(metadata) .identifier(table_ident.clone()) .runtime(self.runtime.clone()) + .kms_client(self.kms_client.clone()) .build() } diff --git a/crates/iceberg/src/catalog/mod.rs b/crates/iceberg/src/catalog/mod.rs index 3ab0f2886b..d2dc1ac1a0 100644 --- a/crates/iceberg/src/catalog/mod.rs +++ b/crates/iceberg/src/catalog/mod.rs @@ -39,6 +39,7 @@ use serde_derive::{Deserialize, Serialize}; use typed_builder::TypedBuilder; use uuid::Uuid; +use crate::encryption::kms::KmsClientFactory; use crate::io::StorageFactory; use crate::runtime::Runtime; use crate::spec::{ @@ -152,6 +153,28 @@ pub trait CatalogBuilder: Default + Debug + Send + Sync { /// ``` fn with_storage_factory(self, storage_factory: Arc) -> Self; + /// Set a [`KmsClientFactory`] to enable table encryption. + /// + /// When provided, the catalog calls the factory once during + /// [`load`](Self::load) with the catalog properties to create a shared + /// [`KeyManagementClient`](crate::encryption::KeyManagementClient). + /// That client is then passed to each table's `TableBuilder` so tables + /// with `encryption.key-id` set can construct an `EncryptionManager`. + /// + /// # Example + /// + /// ```rust,ignore + /// use iceberg::CatalogBuilder; + /// use iceberg::encryption::kms::KmsClientFactory; + /// use std::sync::Arc; + /// + /// let catalog = MyCatalogBuilder::default() + /// .with_kms_client_factory(Arc::new(MyKmsClientFactory)) + /// .load("my_catalog", props) + /// .await?; + /// ``` + fn with_kms_client_factory(self, kms_client_factory: Arc) -> Self; + /// Set a custom tokio Runtime to use for spawning async tasks. /// /// When a Runtime is provided, the catalog will propagate it to all tables diff --git a/crates/iceberg/src/encryption/kms/factory.rs b/crates/iceberg/src/encryption/kms/factory.rs new file mode 100644 index 0000000000..b957b5a5f6 --- /dev/null +++ b/crates/iceberg/src/encryption/kms/factory.rs @@ -0,0 +1,51 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Factory trait for creating [`KeyManagementClient`] instances. + +use std::collections::HashMap; +use std::fmt::Debug; +use std::sync::Arc; + +use async_trait::async_trait; + +use super::KeyManagementClient; +use crate::Result; + +/// Factory for creating a [`KeyManagementClient`] from catalog properties. +/// +/// Replaces Java's reflection-based `encryption.kms-impl` + `initialize(properties)` +/// pattern. Users provide an implementation of this trait to the catalog builder via +/// [`CatalogBuilder::with_kms_client_factory`](crate::CatalogBuilder::with_kms_client_factory). +/// +/// The catalog calls [`create_kms_client`](Self::create_kms_client) **once** during +/// catalog initialization with the catalog's properties. The resulting client is +/// shared across all tables in the catalog and passed to each table's +/// [`EncryptionManager`](crate::encryption::EncryptionManager) via +/// `TableBuilder::kms_client(...)`. +#[async_trait] +pub trait KmsClientFactory: Debug + Send + Sync { + /// Create a [`KeyManagementClient`] from catalog properties. + /// + /// Called once during catalog initialization. Properties may include + /// KMS endpoint, region, credentials, or any backend-specific + /// configuration needed to construct the client. + async fn create_kms_client( + &self, + properties: &HashMap, + ) -> Result>; +} diff --git a/crates/iceberg/src/encryption/kms/memory.rs b/crates/iceberg/src/encryption/kms/memory.rs index 65319831dd..4222846129 100644 --- a/crates/iceberg/src/encryption/kms/memory.rs +++ b/crates/iceberg/src/encryption/kms/memory.rs @@ -27,6 +27,7 @@ use std::sync::{Arc, RwLock}; use async_trait::async_trait; use super::KeyManagementClient; +use super::factory::KmsClientFactory; use crate::encryption::{AesGcmCipher, AesKeySize, SecureKey, SensitiveBytes}; use crate::error::lock_error; use crate::{Error, ErrorKind, Result}; @@ -142,6 +143,39 @@ impl MemoryKeyManagementClient { } } +/// Factory for creating [`MemoryKeyManagementClient`] instances. +/// +/// This factory creates a fresh in-memory KMS client for each call. +/// Useful for testing encryption flows without a real KMS backend. +#[derive(Debug, Clone)] +pub struct MemoryKmsClientFactory { + master_keys: Arc>>, + master_key_size: AesKeySize, +} + +impl MemoryKmsClientFactory { + /// Creates a new factory that will produce clients sharing the given master keys. + pub fn new(kms: &MemoryKeyManagementClient) -> Self { + Self { + master_keys: Arc::clone(&kms.master_keys), + master_key_size: kms.master_key_size, + } + } +} + +#[async_trait] +impl KmsClientFactory for MemoryKmsClientFactory { + async fn create_kms_client( + &self, + _properties: &HashMap, + ) -> Result> { + Ok(Arc::new(MemoryKeyManagementClient { + master_keys: Arc::clone(&self.master_keys), + master_key_size: self.master_key_size, + })) + } +} + #[async_trait] impl KeyManagementClient for MemoryKeyManagementClient { async fn wrap_key(&self, key: &[u8], wrapping_key_id: &str) -> Result> { diff --git a/crates/iceberg/src/encryption/kms/mod.rs b/crates/iceberg/src/encryption/kms/mod.rs index 160e692550..0b4f331d13 100644 --- a/crates/iceberg/src/encryption/kms/mod.rs +++ b/crates/iceberg/src/encryption/kms/mod.rs @@ -21,7 +21,9 @@ //! integration and implementations for different key management systems. mod client; +mod factory; mod memory; pub use client::{GeneratedKey, KeyManagementClient}; -pub use memory::MemoryKeyManagementClient; +pub use factory::KmsClientFactory; +pub use memory::{MemoryKeyManagementClient, MemoryKmsClientFactory}; diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index 31feade038..7f01d9d622 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -113,8 +113,15 @@ impl TableBuilder { /// If the table metadata has the `encryption.key-id` property set, a /// [`KeyManagementClient`] must be provided here so the table can build /// an [`EncryptionManager`]; otherwise [`Self::build`] will return an error. - pub fn kms_client(mut self, kms_client: Arc) -> Self { - self.kms_client = Some(kms_client); + /// + /// Accepts either an `Arc` or an + /// `Option>`, so catalogs that hold an + /// optional client can forward it directly. Passing `None` is a no-op. + pub fn kms_client( + mut self, + kms_client: impl Into>>, + ) -> Self { + self.kms_client = kms_client.into(); self }