diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 0626ce5061..2a92dd02e2 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -43,7 +43,7 @@ use crate::client::{ use crate::types::{ CatalogConfig, CommitTableRequest, CommitTableResponse, CreateNamespaceRequest, CreateTableRequest, ListNamespaceResponse, ListTablesResponse, LoadTableResult, - NamespaceResponse, RegisterTableRequest, RenameTableRequest, + NamespaceResponse, RegisterTableRequest, RenameTableRequest, StorageCredential, }; /// REST catalog URI @@ -52,8 +52,13 @@ pub const REST_CATALOG_PROP_URI: &str = "uri"; pub const REST_CATALOG_PROP_WAREHOUSE: &str = "warehouse"; /// Disable header redaction in error logs (defaults to false for security) pub const REST_CATALOG_PROP_DISABLE_HEADER_REDACTION: &str = "disable-header-redaction"; +/// Access delegation sent as the `X-Iceberg-Access-Delegation` header on load +/// and create. Defaults to `vended-credentials`; empty omits the header. +pub const REST_CATALOG_PROP_ACCESS_DELEGATION: &str = "rest.access-delegation"; const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1"; +const HEADER_ACCESS_DELEGATION: &str = "X-Iceberg-Access-Delegation"; +const DEFAULT_ACCESS_DELEGATION: &str = "vended-credentials"; const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION"); const PATH_V1: &str = "v1"; @@ -215,6 +220,16 @@ impl RestCatalogConfig { ]) } + /// Value for the `X-Iceberg-Access-Delegation` header, from the + /// `rest.access-delegation` prop (default `vended-credentials`; empty = none). + pub(crate) fn access_delegation(&self) -> Option<&str> { + match self.props.get(REST_CATALOG_PROP_ACCESS_DELEGATION) { + Some(v) if v.is_empty() => None, + Some(v) => Some(v.as_str()), + None => Some(DEFAULT_ACCESS_DELEGATION), + } + } + /// Get the client from the config. pub(crate) fn client(&self) -> Option { self.client.clone() @@ -451,6 +466,7 @@ impl RestCatalog { &self, metadata_location: Option<&str>, extra_config: Option>, + storage_credentials: Option<&[StorageCredential]>, ) -> Result { let mut props = self.context().await?.config.props.clone(); if let Some(config) = extra_config { @@ -483,9 +499,19 @@ impl RestCatalog { ) })?; - let file_io = FileIOBuilder::new(factory).with_props(props).build(); + let mut builder = FileIOBuilder::new(factory).with_props(props.clone()); + + // Vended credentials are scoped per location prefix: give each its own + // storage so reads/writes use the matching credentials. + if let Some(creds) = storage_credentials { + for cred in creds { + let mut prefixed = props.clone(); + prefixed.extend(cred.config.clone()); + builder = builder.with_prefixed_props(cred.prefix.clone(), prefixed); + } + } - Ok(file_io) + Ok(builder.build()) } /// Invalidate the current token without generating a new one. On the next request, the client @@ -745,9 +771,13 @@ impl Catalog for RestCatalog { let table_ident = TableIdent::new(namespace.clone(), creation.name.clone()); - let request = context + let mut builder = context .client - .request(Method::POST, context.config.tables_endpoint(namespace)) + .request(Method::POST, context.config.tables_endpoint(namespace)); + if let Some(delegation) = context.config.access_delegation() { + builder = builder.header(HEADER_ACCESS_DELEGATION, delegation); + } + let request = builder .json(&CreateTableRequest { name: creation.name, location: creation.location, @@ -791,14 +821,15 @@ impl Catalog for RestCatalog { "Metadata location missing in `create_table` response!", ))?; - let config = response - .config - .into_iter() - .chain(self.user_config.props.clone()) - .collect(); + let mut base_config = response.config.clone(); + base_config.extend(self.user_config.props.clone()); let file_io = self - .load_file_io(Some(metadata_location), Some(config)) + .load_file_io( + Some(metadata_location), + Some(base_config), + response.storage_credentials.as_deref(), + ) .await?; let table_builder = Table::builder() @@ -822,10 +853,14 @@ impl Catalog for RestCatalog { async fn load_table(&self, table_ident: &TableIdent) -> Result { let context = self.context().await?; - let request = context + let mut builder = context .client - .request(Method::GET, context.config.table_endpoint(table_ident)) - .build()?; + .request(Method::GET, context.config.table_endpoint(table_ident)); + // Opt in to vended storage credentials. + if let Some(delegation) = context.config.access_delegation() { + builder = builder.header(HEADER_ACCESS_DELEGATION, delegation); + } + let request = builder.build()?; let http_response = context.client.query_catalog(request).await?; @@ -848,14 +883,15 @@ impl Catalog for RestCatalog { } }; - let config = response - .config - .into_iter() - .chain(self.user_config.props.clone()) - .collect(); + let mut base_config = response.config.clone(); + base_config.extend(self.user_config.props.clone()); let file_io = self - .load_file_io(response.metadata_location.as_deref(), Some(config)) + .load_file_io( + response.metadata_location.as_deref(), + Some(base_config), + response.storage_credentials.as_deref(), + ) .await?; let table_builder = Table::builder() @@ -991,7 +1027,9 @@ impl Catalog for RestCatalog { "Metadata location missing in `register_table` response!", ))?; - let file_io = self.load_file_io(Some(metadata_location), None).await?; + let file_io = self + .load_file_io(Some(metadata_location), None, None) + .await?; Table::builder() .identifier(table_ident.clone()) @@ -1062,8 +1100,10 @@ impl Catalog for RestCatalog { } }; + // The commit response carries no credentials, so build a plain FileIO; + // the transaction layer reuses the credentialed one it already holds. let file_io = self - .load_file_io(Some(&response.metadata_location), None) + .load_file_io(Some(&response.metadata_location), None, None) .await?; Table::builder() @@ -2387,6 +2427,87 @@ mod tests { rename_table_mock.assert_async().await; } + #[tokio::test] + async fn test_load_table_requests_vended_credentials() { + let mut server = Server::new_async().await; + + let config_mock = create_config_mock(&mut server).await; + + // The request must carry the access-delegation header, and the response's + // vended `storage-credentials` must be accepted (the FileIO builds). + let load_table_mock = server + .mock("GET", "/v1/namespaces/ns1/tables/test1") + .match_header("x-iceberg-access-delegation", "vended-credentials") + .with_status(200) + .with_body_from_file(format!( + "{}/testdata/{}", + env!("CARGO_MANIFEST_DIR"), + "load_table_response_with_credentials.json" + )) + .create_async() + .await; + + let catalog = RestCatalog::new( + RestCatalogConfig::builder().uri(server.url()).build(), + Some(Arc::new(LocalFsStorageFactory)), + Runtime::current(), + ); + + let table = catalog + .load_table(&TableIdent::from_strs(["ns1", "test1"]).unwrap()) + .await + .unwrap(); + + assert_eq!( + "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json", + table.metadata_location().unwrap() + ); + + config_mock.assert_async().await; + load_table_mock.assert_async().await; + } + + #[tokio::test] + async fn test_load_table_access_delegation_can_be_disabled() { + let mut server = Server::new_async().await; + + let config_mock = create_config_mock(&mut server).await; + + // With `rest.access-delegation` set empty, the header must be omitted. + let load_table_mock = server + .mock("GET", "/v1/namespaces/ns1/tables/test1") + .match_header("x-iceberg-access-delegation", mockito::Matcher::Missing) + .with_status(200) + .with_body_from_file(format!( + "{}/testdata/{}", + env!("CARGO_MANIFEST_DIR"), + "load_table_response.json" + )) + .create_async() + .await; + + let props = HashMap::from([( + REST_CATALOG_PROP_ACCESS_DELEGATION.to_string(), + "".to_string(), + )]); + let catalog = RestCatalog::new( + RestCatalogConfig::builder() + .uri(server.url()) + .props(props) + .build(), + Some(Arc::new(LocalFsStorageFactory)), + Runtime::current(), + ); + + catalog + .load_table(&TableIdent::from_strs(["ns1", "test1"]).unwrap()) + .await + .unwrap(); + + config_mock.assert_async().await; + load_table_mock.assert_async().await; + } + #[tokio::test] async fn test_create_table() { let mut server = Server::new_async().await; @@ -2602,6 +2723,7 @@ mod tests { let config_mock = create_config_mock(&mut server).await; + // GET hit once: the transaction refreshes the table before committing. let load_table_mock = server .mock("GET", "/v1/namespaces/ns1/tables/test1") .with_status(200) @@ -2610,6 +2732,7 @@ mod tests { env!("CARGO_MANIFEST_DIR"), "load_table_response.json" )) + .expect(1) .create_async() .await; diff --git a/crates/catalog/rest/testdata/load_table_response_with_credentials.json b/crates/catalog/rest/testdata/load_table_response_with_credentials.json new file mode 100644 index 0000000000..c0a540fbac --- /dev/null +++ b/crates/catalog/rest/testdata/load_table_response_with_credentials.json @@ -0,0 +1,78 @@ +{ + "metadata-location": "s3://warehouse/database/table/metadata/00001-5f2f8166-244c-4eae-ac36-384ecdec81fc.gz.metadata.json", + "metadata": { + "format-version": 1, + "table-uuid": "b55d9dda-6561-423a-8bfc-787980ce421f", + "location": "s3://warehouse/database/table", + "last-updated-ms": 1646787054459, + "last-column-id": 2, + "schema": { + "type": "struct", + "schema-id": 0, + "fields": [ + {"id": 1, "name": "id", "required": false, "type": "int"}, + {"id": 2, "name": "data", "required": false, "type": "string"} + ] + }, + "current-schema-id": 0, + "schemas": [ + { + "type": "struct", + "schema-id": 0, + "fields": [ + {"id": 1, "name": "id", "required": false, "type": "int"}, + {"id": 2, "name": "data", "required": false, "type": "string"} + ] + } + ], + "partition-spec": [], + "default-spec-id": 0, + "partition-specs": [{"spec-id": 0, "fields": []}], + "last-partition-id": 999, + "default-sort-order-id": 0, + "sort-orders": [{"order-id": 0, "fields": []}], + "properties": {"owner": "bryan", "write.metadata.compression-codec": "gzip"}, + "current-snapshot-id": 3497810964824022504, + "refs": {"main": {"snapshot-id": 3497810964824022504, "type": "branch"}}, + "snapshots": [ + { + "snapshot-id": 3497810964824022504, + "timestamp-ms": 1646787054459, + "summary": { + "operation": "append", + "spark.app.id": "local-1646787004168", + "added-data-files": "1", + "added-records": "1", + "added-files-size": "697", + "changed-partition-count": "1", + "total-records": "1", + "total-files-size": "697", + "total-data-files": "1", + "total-delete-files": "0", + "total-position-deletes": "0", + "total-equality-deletes": "0" + }, + "manifest-list": "s3://warehouse/database/table/metadata/snap-3497810964824022504-1-c4f68204-666b-4e50-a9df-b10c34bf6b82.avro", + "schema-id": 0 + } + ], + "snapshot-log": [{"timestamp-ms": 1646787054459, "snapshot-id": 3497810964824022504}], + "metadata-log": [ + { + "timestamp-ms": 1646787031514, + "metadata-file": "s3://warehouse/database/table/metadata/00000-88484a1c-00e5-4a07-a787-c0e7aeffa805.gz.metadata.json" + } + ] + }, + "config": {"client.factory": "io.tabular.iceberg.catalog.TabularAwsClientFactory", "region": "us-west-2"}, + "storage-credentials": [ + { + "prefix": "s3://warehouse/database/table", + "config": { + "s3.access-key-id": "vended-key-id", + "s3.secret-access-key": "vended-secret", + "s3.session-token": "vended-token" + } + } + ] +} diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 227d8f4d5b..0338bca566 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -15,11 +15,12 @@ // specific language governing permissions and limitations // under the License. +use std::collections::HashMap; use std::ops::Range; use std::sync::{Arc, OnceLock}; use bytes::Bytes; -use futures::{Stream, StreamExt}; +use futures::{Stream, StreamExt, stream}; use super::storage::{ LocalFsStorageFactory, MemoryStorageFactory, Storage, StorageConfig, StorageFactory, @@ -67,6 +68,17 @@ pub struct FileIO { factory: Arc, /// Cached storage instance (lazily initialized) storage: Arc>>, + /// Per-prefix storages (longest prefix first) for tables that vend distinct + /// credentials per location prefix. Paths matching none use `storage` above. + prefixed: Arc>, +} + +/// A storage scoped to a location `prefix`, lazily built from its own config. +#[derive(Debug)] +struct PrefixedStorage { + prefix: String, + config: StorageConfig, + storage: OnceLock>, } impl FileIO { @@ -78,6 +90,7 @@ impl FileIO { config: StorageConfig::new(), factory: Arc::new(MemoryStorageFactory), storage: Arc::new(OnceLock::new()), + prefixed: Arc::new(Vec::new()), } } @@ -89,6 +102,7 @@ impl FileIO { config: StorageConfig::new(), factory: Arc::new(LocalFsStorageFactory), storage: Arc::new(OnceLock::new()), + prefixed: Arc::new(Vec::new()), } } @@ -97,24 +111,31 @@ impl FileIO { &self.config } - /// Get or create the storage instance. - /// - /// The factory is invoked on first access and the result is cached - /// for all subsequent operations. - fn get_storage(&self) -> Result> { - // Check if already initialized - if let Some(storage) = self.storage.get() { - return Ok(storage.clone()); + /// Get or create the storage for `path`, routing to the longest-matching + /// prefix storage if any, else the default. Built once, then cached. + fn get_storage(&self, path: &str) -> Result> { + // `prefixed` is sorted longest-first, so the first match is most specific. + for ps in self.prefixed.iter() { + if path.starts_with(&ps.prefix) { + return Self::get_or_build(&ps.storage, &self.factory, &ps.config); + } } + Self::get_or_build(&self.storage, &self.factory, &self.config) + } - // Build the storage - let storage = self.factory.build(&self.config)?; - - // Try to set it (another thread might have set it first) - let _ = self.storage.set(storage.clone()); - - // Return whatever is in the cell (either ours or another thread's) - Ok(self.storage.get().unwrap().clone()) + /// Get a cached storage from `cell`, building it from `config` on first use. + fn get_or_build( + cell: &OnceLock>, + factory: &Arc, + config: &StorageConfig, + ) -> Result> { + if let Some(storage) = cell.get() { + return Ok(storage.clone()); + } + let storage = factory.build(config)?; + // Another thread might have set it first; keep whatever ends up in the cell. + let _ = cell.set(storage); + Ok(cell.get().unwrap().clone()) } /// Deletes file. @@ -123,7 +144,7 @@ impl FileIO { /// /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. pub async fn delete(&self, path: impl AsRef) -> Result<()> { - self.get_storage()?.delete(path.as_ref()).await + self.get_storage(path.as_ref())?.delete(path.as_ref()).await } /// Remove the path and all nested dirs and files recursively. @@ -138,7 +159,9 @@ impl FileIO { /// - If the path is a empty directory, this function will remove the directory itself. /// - If the path is a non-empty directory, this function will remove the directory and all nested files and directories. pub async fn delete_prefix(&self, path: impl AsRef) -> Result<()> { - self.get_storage()?.delete_prefix(path.as_ref()).await + self.get_storage(path.as_ref())? + .delete_prefix(path.as_ref()) + .await } /// Delete multiple files from a stream of paths. @@ -150,7 +173,29 @@ impl FileIO { &self, paths: impl Stream + Send + 'static, ) -> Result<()> { - self.get_storage()?.delete_stream(paths.boxed()).await + // No per-prefix storages: delete the whole batch on the default storage. + if self.prefixed.is_empty() { + return self.get_storage("")?.delete_stream(paths.boxed()).await; + } + + // Otherwise group paths by routed storage, then batch-delete each group. + let mut groups: HashMap> = HashMap::new(); + let mut paths = paths.boxed(); + while let Some(path) = paths.next().await { + let key = self + .prefixed + .iter() + .find(|ps| path.starts_with(&ps.prefix)) + .map(|ps| ps.prefix.clone()) + .unwrap_or_default(); + groups.entry(key).or_default().push(path); + } + + for batch in groups.into_values() { + let storage = self.get_storage(&batch[0])?; + storage.delete_stream(stream::iter(batch).boxed()).await?; + } + Ok(()) } /// Check file exists. @@ -159,7 +204,7 @@ impl FileIO { /// /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. pub async fn exists(&self, path: impl AsRef) -> Result { - self.get_storage()?.exists(path.as_ref()).await + self.get_storage(path.as_ref())?.exists(path.as_ref()).await } /// Creates input file. @@ -168,7 +213,7 @@ impl FileIO { /// /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. pub fn new_input(&self, path: impl AsRef) -> Result { - self.get_storage()?.new_input(path.as_ref()) + self.get_storage(path.as_ref())?.new_input(path.as_ref()) } /// Creates output file. @@ -177,7 +222,7 @@ impl FileIO { /// /// * path: It should be *absolute* path starting with scheme string used to construct [`FileIO`]. pub fn new_output(&self, path: impl AsRef) -> Result { - self.get_storage()?.new_output(path.as_ref()) + self.get_storage(path.as_ref())?.new_output(path.as_ref()) } } @@ -191,6 +236,8 @@ pub struct FileIOBuilder { factory: Arc, /// Storage configuration config: StorageConfig, + /// Per-location-prefix configs (prefix, config). + prefixed: Vec<(String, StorageConfig)>, } impl FileIOBuilder { @@ -199,6 +246,7 @@ impl FileIOBuilder { Self { factory, config: StorageConfig::new(), + prefixed: Vec::new(), } } @@ -219,6 +267,23 @@ impl FileIOBuilder { self } + /// Add a per-prefix storage config. Paths starting with `prefix` (longest + /// match wins) use these props instead of the default config. + pub fn with_prefixed_props( + mut self, + prefix: impl Into, + props: impl IntoIterator, + ) -> Self { + let config = StorageConfig::from_props( + props + .into_iter() + .map(|(k, v)| (k.to_string(), v.to_string())) + .collect(), + ); + self.prefixed.push((prefix.into(), config)); + self + } + /// Get the storage configuration. pub fn config(&self) -> &StorageConfig { &self.config @@ -226,10 +291,22 @@ impl FileIOBuilder { /// Builds [`FileIO`]. pub fn build(self) -> FileIO { + let mut prefixed: Vec = self + .prefixed + .into_iter() + .map(|(prefix, config)| PrefixedStorage { + prefix, + config, + storage: OnceLock::new(), + }) + .collect(); + // Longest prefix first so routing picks the most specific match. + prefixed.sort_by(|a, b| b.prefix.len().cmp(&a.prefix.len())); FileIO { config: self.config, factory: self.factory, storage: Arc::new(OnceLock::new()), + prefixed: Arc::new(prefixed), } } } @@ -544,4 +621,70 @@ mod tests { assert_eq!(file_io.config().get("key1"), Some(&"value1".to_string())); assert_eq!(file_io.config().get("key2"), Some(&"value2".to_string())); } + + #[tokio::test] + async fn test_prefixed_props_sorted_by_descending_prefix_length() { + let factory = Arc::new(MemoryStorageFactory); + let file_io = FileIOBuilder::new(factory) + .with_prefixed_props("memory://a/", [("k", "short")]) + .with_prefixed_props("memory://a/longer/", [("k", "long")]) + .build(); + + // Longest prefix first so the most specific match wins at routing time. + let prefixes: Vec<&str> = file_io.prefixed.iter().map(|p| p.prefix.as_str()).collect(); + assert_eq!(prefixes, vec!["memory://a/longer/", "memory://a/"]); + } + + #[tokio::test] + async fn test_get_storage_routes_by_prefix() { + let factory = Arc::new(MemoryStorageFactory); + let file_io = FileIOBuilder::new(factory) + .with_prop("scope", "default") + .with_prefixed_props("memory://creds/", [("scope", "prefixed")]) + .build(); + + let default_a = file_io.get_storage("memory://other/x").unwrap(); + let default_b = file_io.get_storage("memory://other/y").unwrap(); + let prefixed_a = file_io.get_storage("memory://creds/x").unwrap(); + let prefixed_b = file_io.get_storage("memory://creds/y").unwrap(); + + // Repeated routing to the same bucket returns the memoized storage... + assert!(Arc::ptr_eq(&default_a, &default_b)); + assert!(Arc::ptr_eq(&prefixed_a, &prefixed_b)); + // ...and a prefix-matching path resolves to a distinct storage from the default. + assert!(!Arc::ptr_eq(&default_a, &prefixed_a)); + } + + #[tokio::test] + async fn test_delete_stream_routes_by_prefix() { + let factory = Arc::new(MemoryStorageFactory); + let file_io = FileIOBuilder::new(factory) + .with_prefixed_props("memory:/creds/", [("k", "v")]) + .build(); + + // One file under each routing bucket (default vs prefixed storage). + let default_path = "memory:/other/a.txt"; + let prefixed_path = "memory:/creds/b.txt"; + for path in [default_path, prefixed_path] { + file_io + .new_output(path) + .unwrap() + .write("x".into()) + .await + .unwrap(); + assert!(file_io.exists(path).await.unwrap()); + } + + // delete_stream must route each path to the storage that holds it. + file_io + .delete_stream(futures::stream::iter(vec![ + default_path.to_string(), + prefixed_path.to_string(), + ])) + .await + .unwrap(); + + assert!(!file_io.exists(default_path).await.unwrap()); + assert!(!file_io.exists(prefixed_path).await.unwrap()); + } } diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index 31feade038..2885e33644 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -220,6 +220,12 @@ impl Table { self } + /// Sets the [`Table`] `FileIO` and returns an updated instance. + pub(crate) fn with_file_io(mut self, file_io: FileIO) -> Self { + self.file_io = file_io; + self + } + /// Returns a TableBuilder to build a table pub fn builder() -> TableBuilder { TableBuilder::new() diff --git a/crates/iceberg/src/transaction/mod.rs b/crates/iceberg/src/transaction/mod.rs index f87c274637..12d83c6692 100644 --- a/crates/iceberg/src/transaction/mod.rs +++ b/crates/iceberg/src/transaction/mod.rs @@ -239,7 +239,9 @@ impl Transaction { .requirements(existing_requirements) .build(); - catalog.update_table(table_commit).await + let committed = catalog.update_table(table_commit).await?; + // Reuse the credentialed FileIO we already hold; the commit response has none. + Ok(committed.with_file_io(self.table.file_io().clone())) } }