Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions crates/catalog/glue/public-api.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ pub fn iceberg_catalog_glue::GlueCatalogBuilder::fmt(&self, f: &mut core::fmt::F
impl iceberg::catalog::CatalogBuilder for iceberg_catalog_glue::GlueCatalogBuilder
pub type iceberg_catalog_glue::GlueCatalogBuilder::C = iceberg_catalog_glue::GlueCatalog
pub fn iceberg_catalog_glue::GlueCatalogBuilder::load(self, name: impl core::convert::Into<alloc::string::String>, props: std::collections::hash::map::HashMap<alloc::string::String, alloc::string::String>) -> impl core::future::future::Future<Output = iceberg::error::Result<Self::C>> + core::marker::Send
pub fn iceberg_catalog_glue::GlueCatalogBuilder::with_kms_client_factory(self, kms_client_factory: alloc::sync::Arc<dyn iceberg::encryption::kms::factory::KmsClientFactory>) -> Self
pub fn iceberg_catalog_glue::GlueCatalogBuilder::with_runtime(self, runtime: iceberg::runtime::Runtime) -> Self
pub fn iceberg_catalog_glue::GlueCatalogBuilder::with_storage_factory(self, storage_factory: alloc::sync::Arc<dyn iceberg::io::storage::StorageFactory>) -> Self
pub const iceberg_catalog_glue::AWS_ACCESS_KEY_ID: &str
Expand Down
20 changes: 19 additions & 1 deletion crates/catalog/glue/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use async_trait::async_trait;
use aws_sdk_glue::operation::create_table::CreateTableError;
use aws_sdk_glue::operation::update_table::UpdateTableError;
use aws_sdk_glue::types::TableInput;
use iceberg::encryption::kms::{KeyManagementClient, KmsClientFactory};
use iceberg::io::{
FileIO, FileIOBuilder, S3_ACCESS_KEY_ID, S3_ENDPOINT, S3_REGION, S3_SECRET_ACCESS_KEY,
S3_SESSION_TOKEN, StorageFactory,
Expand Down Expand Up @@ -58,6 +59,7 @@ pub const GLUE_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
pub struct GlueCatalogBuilder {
config: GlueCatalogConfig,
storage_factory: Option<Arc<dyn StorageFactory>>,
kms_client_factory: Option<Arc<dyn KmsClientFactory>>,
runtime: Option<Runtime>,
}

Expand All @@ -72,6 +74,7 @@ impl Default for GlueCatalogBuilder {
props: HashMap::new(),
},
storage_factory: None,
kms_client_factory: None,
runtime: None,
}
}
Expand All @@ -85,6 +88,11 @@ impl CatalogBuilder for GlueCatalogBuilder {
self
}

fn with_kms_client_factory(mut self, kms_client_factory: Arc<dyn KmsClientFactory>) -> Self {
self.kms_client_factory = Some(kms_client_factory);
self
}

fn with_runtime(mut self, runtime: Runtime) -> Self {
self.runtime = Some(runtime);
self
Expand Down Expand Up @@ -140,7 +148,11 @@ impl CatalogBuilder for GlueCatalogBuilder {
Some(rt) => rt,
None => Runtime::try_current()?,
};
GlueCatalog::new(self.config, self.storage_factory, runtime).await
let kms_client = match self.kms_client_factory {
Some(factory) => Some(factory.create_kms_client(&self.config.props).await?),
None => None,
};
GlueCatalog::new(self.config, self.storage_factory, runtime, kms_client).await
}
}
}
Expand All @@ -163,6 +175,7 @@ pub struct GlueCatalog {
client: GlueClient,
file_io: FileIO,
runtime: Runtime,
kms_client: Option<Arc<dyn KeyManagementClient>>,
}

impl Debug for GlueCatalog {
Expand All @@ -179,6 +192,7 @@ impl GlueCatalog {
config: GlueCatalogConfig,
storage_factory: Option<Arc<dyn StorageFactory>>,
runtime: Runtime,
kms_client: Option<Arc<dyn KeyManagementClient>>,
) -> Result<Self> {
let sdk_config = create_sdk_config(&config.props, config.uri.as_ref()).await;
let mut file_io_props = config.props.clone();
Expand Down Expand Up @@ -228,6 +242,7 @@ impl GlueCatalog {
client: GlueClient(client),
file_io,
runtime,
kms_client,
})
}
/// Get the catalogs `FileIO`
Expand Down Expand Up @@ -287,6 +302,7 @@ impl GlueCatalog {
table_name.to_owned(),
))
.runtime(self.runtime.clone())
.kms_client(self.kms_client.clone())
.build()?;

Ok((table, version_id))
Expand Down Expand Up @@ -644,6 +660,7 @@ impl Catalog for GlueCatalog {
.metadata(metadata)
.identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
.runtime(self.runtime.clone())
.kms_client(self.kms_client.clone())
.build()
}

Expand Down Expand Up @@ -874,6 +891,7 @@ impl Catalog for GlueCatalog {
.metadata(metadata)
.file_io(self.file_io())
.runtime(self.runtime.clone())
.kms_client(self.kms_client.clone())
.build()?)
}

Expand Down
1 change: 1 addition & 0 deletions crates/catalog/hms/public-api.txt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ pub fn iceberg_catalog_hms::HmsCatalogBuilder::fmt(&self, f: &mut core::fmt::For
impl iceberg::catalog::CatalogBuilder for iceberg_catalog_hms::HmsCatalogBuilder
pub type iceberg_catalog_hms::HmsCatalogBuilder::C = iceberg_catalog_hms::HmsCatalog
pub fn iceberg_catalog_hms::HmsCatalogBuilder::load(self, name: impl core::convert::Into<alloc::string::String>, props: std::collections::hash::map::HashMap<alloc::string::String, alloc::string::String>) -> impl core::future::future::Future<Output = iceberg::error::Result<Self::C>> + core::marker::Send
pub fn iceberg_catalog_hms::HmsCatalogBuilder::with_kms_client_factory(self, kms_client_factory: alloc::sync::Arc<dyn iceberg::encryption::kms::factory::KmsClientFactory>) -> Self
pub fn iceberg_catalog_hms::HmsCatalogBuilder::with_runtime(self, runtime: iceberg::runtime::Runtime) -> Self
pub fn iceberg_catalog_hms::HmsCatalogBuilder::with_storage_factory(self, storage_factory: alloc::sync::Arc<dyn iceberg::io::storage::StorageFactory>) -> Self
pub const iceberg_catalog_hms::HMS_CATALOG_PROP_THRIFT_TRANSPORT: &str
Expand Down
26 changes: 21 additions & 5 deletions crates/catalog/hms/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use hive_metastore::{
ThriftHiveMetastoreClient, ThriftHiveMetastoreClientBuilder,
ThriftHiveMetastoreGetDatabaseException, ThriftHiveMetastoreGetTableException,
};
use iceberg::encryption::kms::{KeyManagementClient, KmsClientFactory};
use iceberg::io::{FileIO, FileIOBuilder, StorageFactory};
use iceberg::spec::{TableMetadata, TableMetadataBuilder};
use iceberg::table::Table;
Expand Down Expand Up @@ -56,6 +57,7 @@ pub const HMS_CATALOG_PROP_WAREHOUSE: &str = "warehouse";
pub struct HmsCatalogBuilder {
config: HmsCatalogConfig,
storage_factory: Option<Arc<dyn StorageFactory>>,
kms_client_factory: Option<Arc<dyn KmsClientFactory>>,
runtime: Option<Runtime>,
}

Expand All @@ -70,6 +72,7 @@ impl Default for HmsCatalogBuilder {
props: HashMap::new(),
},
storage_factory: None,
kms_client_factory: None,
runtime: None,
}
}
Expand All @@ -83,6 +86,11 @@ impl CatalogBuilder for HmsCatalogBuilder {
self
}

fn with_kms_client_factory(mut self, kms_client_factory: Arc<dyn KmsClientFactory>) -> Self {
self.kms_client_factory = Some(kms_client_factory);
self
}

fn with_runtime(mut self, runtime: Runtime) -> Self {
self.runtime = Some(runtime);
self
Expand Down Expand Up @@ -123,7 +131,12 @@ impl CatalogBuilder for HmsCatalogBuilder {
})
.collect();

let result = (|| -> Result<HmsCatalog> {
async move {
let kms_client = match self.kms_client_factory {
Some(factory) => Some(factory.create_kms_client(&self.config.props).await?),
None => None,
};

if self.config.name.is_none() {
return Err(Error::new(
ErrorKind::DataInvalid,
Expand All @@ -146,10 +159,8 @@ impl CatalogBuilder for HmsCatalogBuilder {
Some(rt) => rt,
None => Runtime::try_current()?,
};
HmsCatalog::new(self.config, self.storage_factory, runtime)
})();

std::future::ready(result)
HmsCatalog::new(self.config, self.storage_factory, runtime, kms_client)
}
}
}

Expand Down Expand Up @@ -182,6 +193,7 @@ pub struct HmsCatalog {
client: HmsClient,
file_io: FileIO,
runtime: Runtime,
kms_client: Option<Arc<dyn KeyManagementClient>>,
}

impl Debug for HmsCatalog {
Expand All @@ -198,6 +210,7 @@ impl HmsCatalog {
config: HmsCatalogConfig,
storage_factory: Option<Arc<dyn StorageFactory>>,
runtime: Runtime,
kms_client: Option<Arc<dyn KeyManagementClient>>,
) -> Result<Self> {
let address = config
.address
Expand Down Expand Up @@ -238,6 +251,7 @@ impl HmsCatalog {
client: HmsClient(client),
file_io,
runtime,
kms_client,
})
}
/// Get the catalogs `FileIO`
Expand Down Expand Up @@ -545,6 +559,7 @@ impl Catalog for HmsCatalog {
.metadata(metadata)
.identifier(TableIdent::new(NamespaceIdent::new(db_name), table_name))
.runtime(self.runtime.clone())
.kms_client(self.kms_client.clone())
.build()
}

Expand Down Expand Up @@ -584,6 +599,7 @@ impl Catalog for HmsCatalog {
table.name.clone(),
))
.runtime(self.runtime.clone())
.kms_client(self.kms_client.clone())
.build()
}

Expand Down
2 changes: 2 additions & 0 deletions crates/catalog/loader/public-api.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,11 @@ impl<'a> core::convert::From<&'a str> for iceberg_catalog_loader::CatalogLoader<
pub fn iceberg_catalog_loader::CatalogLoader<'a>::from(s: &'a str) -> Self
pub trait iceberg_catalog_loader::BoxedCatalogBuilder: core::marker::Send
pub fn iceberg_catalog_loader::BoxedCatalogBuilder::load<'async_trait>(self: alloc::boxed::Box<Self>, name: alloc::string::String, props: std::collections::hash::map::HashMap<alloc::string::String, alloc::string::String>) -> core::pin::Pin<alloc::boxed::Box<(dyn core::future::future::Future<Output = iceberg::error::Result<alloc::sync::Arc<dyn iceberg::catalog::Catalog>>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait
pub fn iceberg_catalog_loader::BoxedCatalogBuilder::with_kms_client_factory(self: alloc::boxed::Box<Self>, kms_client_factory: alloc::sync::Arc<dyn iceberg::encryption::kms::factory::KmsClientFactory>) -> alloc::boxed::Box<dyn iceberg_catalog_loader::BoxedCatalogBuilder>
pub fn iceberg_catalog_loader::BoxedCatalogBuilder::with_storage_factory(self: alloc::boxed::Box<Self>, storage_factory: alloc::sync::Arc<dyn iceberg::io::storage::StorageFactory>) -> alloc::boxed::Box<dyn iceberg_catalog_loader::BoxedCatalogBuilder>
impl<T: iceberg::catalog::CatalogBuilder + 'static> iceberg_catalog_loader::BoxedCatalogBuilder for T
pub fn T::load<'async_trait>(self: alloc::boxed::Box<Self>, name: alloc::string::String, props: std::collections::hash::map::HashMap<alloc::string::String, alloc::string::String>) -> core::pin::Pin<alloc::boxed::Box<(dyn core::future::future::Future<Output = iceberg::error::Result<alloc::sync::Arc<dyn iceberg::catalog::Catalog>>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait
pub fn T::with_kms_client_factory(self: alloc::boxed::Box<Self>, kms_client_factory: alloc::sync::Arc<dyn iceberg::encryption::kms::factory::KmsClientFactory>) -> alloc::boxed::Box<dyn iceberg_catalog_loader::BoxedCatalogBuilder>
pub fn T::with_storage_factory(self: alloc::boxed::Box<Self>, storage_factory: alloc::sync::Arc<dyn iceberg::io::storage::StorageFactory>) -> alloc::boxed::Box<dyn iceberg_catalog_loader::BoxedCatalogBuilder>
pub fn iceberg_catalog_loader::load(type: &str) -> iceberg::error::Result<alloc::boxed::Box<dyn iceberg_catalog_loader::BoxedCatalogBuilder>>
pub fn iceberg_catalog_loader::supported_types() -> alloc::vec::Vec<&'static str>
16 changes: 16 additions & 0 deletions crates/catalog/loader/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ use std::collections::HashMap;
use std::sync::Arc;

use async_trait::async_trait;
use iceberg::encryption::kms::KmsClientFactory;
use iceberg::io::StorageFactory;
use iceberg::{Catalog, CatalogBuilder, Error, ErrorKind, Result};
use iceberg_catalog_glue::GlueCatalogBuilder;
Expand Down Expand Up @@ -51,6 +52,11 @@ pub trait BoxedCatalogBuilder: Send {
storage_factory: Arc<dyn StorageFactory>,
) -> Box<dyn BoxedCatalogBuilder>;

fn with_kms_client_factory(
self: Box<Self>,
kms_client_factory: Arc<dyn KmsClientFactory>,
) -> Box<dyn BoxedCatalogBuilder>;

async fn load(
self: Box<Self>,
name: String,
Expand All @@ -67,6 +73,16 @@ impl<T: CatalogBuilder + 'static> BoxedCatalogBuilder for T {
Box::new(CatalogBuilder::with_storage_factory(*self, storage_factory))
}

fn with_kms_client_factory(
self: Box<Self>,
kms_client_factory: Arc<dyn KmsClientFactory>,
) -> Box<dyn BoxedCatalogBuilder> {
Box::new(CatalogBuilder::with_kms_client_factory(
*self,
kms_client_factory,
))
}

async fn load(
self: Box<Self>,
name: String,
Expand Down
1 change: 1 addition & 0 deletions crates/catalog/rest/public-api.txt
Original file line number Diff line number Diff line change
Expand Up @@ -215,6 +215,7 @@ pub fn iceberg_catalog_rest::RestCatalogBuilder::fmt(&self, f: &mut core::fmt::F
impl iceberg::catalog::CatalogBuilder for iceberg_catalog_rest::RestCatalogBuilder
pub type iceberg_catalog_rest::RestCatalogBuilder::C = iceberg_catalog_rest::RestCatalog
pub fn iceberg_catalog_rest::RestCatalogBuilder::load(self, name: impl core::convert::Into<alloc::string::String>, props: std::collections::hash::map::HashMap<alloc::string::String, alloc::string::String>) -> impl core::future::future::Future<Output = iceberg::error::Result<Self::C>> + core::marker::Send
pub fn iceberg_catalog_rest::RestCatalogBuilder::with_kms_client_factory(self, kms_client_factory: alloc::sync::Arc<dyn iceberg::encryption::kms::factory::KmsClientFactory>) -> Self
pub fn iceberg_catalog_rest::RestCatalogBuilder::with_runtime(self, runtime: iceberg::runtime::Runtime) -> Self
pub fn iceberg_catalog_rest::RestCatalogBuilder::with_storage_factory(self, storage_factory: alloc::sync::Arc<dyn iceberg::io::storage::StorageFactory>) -> Self
pub struct iceberg_catalog_rest::StorageCredential
Expand Down
Loading
Loading