diff --git a/Cargo.lock b/Cargo.lock index 98bdd58fc0..3d7bba07fc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3464,7 +3464,9 @@ name = "iceberg-catalog-rest" version = "0.9.0" dependencies = [ "async-trait", + "bytes", "chrono", + "futures", "http 1.4.0", "iceberg", "iceberg_test_utils", @@ -3476,6 +3478,7 @@ dependencies = [ "serde_json", "tokio", "typed-builder", + "typetag", "uuid", ] diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index c51f6a6a89..46ef487f84 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -221,6 +221,7 @@ impl GlueCatalog { }); let file_io = FileIOBuilder::new(factory) .with_props(file_io_props) + .with_runtime(runtime.clone()) .build(); Ok(GlueCatalog { diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index d778a3d5fc..d64e73ceb2 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -231,6 +231,7 @@ impl HmsCatalog { })?; let file_io = FileIOBuilder::new(factory) .with_props(&config.props) + .with_runtime(runtime.clone()) .build(); Ok(Self { diff --git a/crates/catalog/rest/Cargo.toml b/crates/catalog/rest/Cargo.toml index 40dd70a952..61c448eac7 100644 --- a/crates/catalog/rest/Cargo.toml +++ b/crates/catalog/rest/Cargo.toml @@ -43,6 +43,9 @@ typed-builder = { workspace = true } uuid = { workspace = true, features = ["v4"] } [dev-dependencies] +bytes = { workspace = true } +futures = { workspace = true } iceberg_test_utils = { path = "../../test_utils", features = ["tests"] } mockito = { workspace = true } tokio = { workspace = true } +typetag = { workspace = true } diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 72780343d3..a0274b475b 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -24,7 +24,7 @@ use std::sync::Arc; use async_trait::async_trait; use iceberg::io::{FileIO, FileIOBuilder, StorageFactory}; -use iceberg::table::Table; +use iceberg::table::{Table, TableBuilder}; use iceberg::{ Catalog, CatalogBuilder, Error, ErrorKind, Namespace, NamespaceIdent, Result, Runtime, TableCommit, TableCreation, TableIdent, @@ -57,12 +57,44 @@ const ICEBERG_REST_SPEC_VERSION: &str = "0.14.1"; const CARGO_PKG_VERSION: &str = env!("CARGO_PKG_VERSION"); const PATH_V1: &str = "v1"; +#[derive(Clone, Debug, Default)] +struct RestCatalogRuntime { + table_runtime: Option, + file_io_runtime: Option, +} + +impl RestCatalogRuntime { + fn with_table_runtime(runtime: Runtime) -> Self { + Self { + table_runtime: Some(runtime), + file_io_runtime: None, + } + } + + fn with_file_io_runtime(io_handle: tokio::runtime::Handle) -> Self { + Self { + table_runtime: None, + file_io_runtime: Some(io_handle), + } + } + + fn is_empty(&self) -> bool { + self.table_runtime.is_none() && self.file_io_runtime.is_none() + } +} + +impl From for RestCatalogRuntime { + fn from(runtime: Runtime) -> Self { + Self::with_table_runtime(runtime) + } +} + /// Builder for [`RestCatalog`]. #[derive(Debug)] pub struct RestCatalogBuilder { config: RestCatalogConfig, storage_factory: Option>, - runtime: Option, + runtime: RestCatalogRuntime, } impl Default for RestCatalogBuilder { @@ -76,11 +108,23 @@ impl Default for RestCatalogBuilder { client: None, }, storage_factory: None, - runtime: None, + runtime: RestCatalogRuntime::default(), } } } +impl RestCatalogBuilder { + /// Route storage operations through `io_handle` without assigning a full + /// runtime to tables loaded by this catalog. + /// + /// This is mutually exclusive with [`CatalogBuilder::with_runtime`]. If both + /// are called, the last call wins. + pub fn with_file_io_runtime(mut self, io_handle: tokio::runtime::Handle) -> Self { + self.runtime = RestCatalogRuntime::with_file_io_runtime(io_handle); + self + } +} + impl CatalogBuilder for RestCatalogBuilder { type C = RestCatalog; @@ -90,7 +134,7 @@ impl CatalogBuilder for RestCatalogBuilder { } fn with_runtime(mut self, runtime: Runtime) -> Self { - self.runtime = Some(runtime); + self.runtime = RestCatalogRuntime::with_table_runtime(runtime); self } @@ -130,7 +174,11 @@ impl CatalogBuilder for RestCatalogBuilder { "Catalog uri is required", )) } else { - let runtime = self.runtime.unwrap_or_else(Runtime::current); + let runtime = if self.runtime.is_empty() { + RestCatalogRuntime::with_table_runtime(Runtime::current()) + } else { + self.runtime + }; Ok(RestCatalog::new(self.config, self.storage_factory, runtime)) } }; @@ -359,7 +407,7 @@ pub struct RestCatalog { ctx: OnceCell, /// Storage factory for creating FileIO instances. storage_factory: Option>, - runtime: Runtime, + runtime: RestCatalogRuntime, } impl RestCatalog { @@ -367,13 +415,20 @@ impl RestCatalog { fn new( config: RestCatalogConfig, storage_factory: Option>, - runtime: Runtime, + runtime: impl Into, ) -> Self { Self { user_config: config, ctx: OnceCell::new(), storage_factory, - runtime, + runtime: runtime.into(), + } + } + + fn with_table_runtime(&self, builder: TableBuilder) -> TableBuilder { + match &self.runtime.table_runtime { + Some(runtime) => builder.runtime(runtime.clone()), + None => builder, } } @@ -483,7 +538,13 @@ impl RestCatalog { ) })?; - let file_io = FileIOBuilder::new(factory).with_props(props).build(); + let mut builder = FileIOBuilder::new(factory).with_props(props); + if let Some(runtime) = &self.runtime.table_runtime { + builder = builder.with_runtime(runtime.clone()); + } else if let Some(io_handle) = &self.runtime.file_io_runtime { + builder = builder.with_io_runtime(io_handle.clone()); + } + let file_io = builder.build(); Ok(file_io) } @@ -804,8 +865,8 @@ impl Catalog for RestCatalog { let table_builder = Table::builder() .identifier(table_ident.clone()) .file_io(file_io) - .metadata(response.metadata) - .runtime(self.runtime.clone()); + .metadata(response.metadata); + let table_builder = self.with_table_runtime(table_builder); if let Some(metadata_location) = response.metadata_location { table_builder.metadata_location(metadata_location).build() @@ -861,8 +922,8 @@ impl Catalog for RestCatalog { let table_builder = Table::builder() .identifier(table_ident.clone()) .file_io(file_io) - .metadata(response.metadata) - .runtime(self.runtime.clone()); + .metadata(response.metadata); + let table_builder = self.with_table_runtime(table_builder); if let Some(metadata_location) = response.metadata_location { table_builder.metadata_location(metadata_location).build() @@ -993,13 +1054,12 @@ impl Catalog for RestCatalog { let file_io = self.load_file_io(Some(metadata_location), None).await?; - Table::builder() + let table_builder = Table::builder() .identifier(table_ident.clone()) .file_io(file_io) .metadata(response.metadata) - .metadata_location(metadata_location.clone()) - .runtime(self.runtime.clone()) - .build() + .metadata_location(metadata_location.clone()); + self.with_table_runtime(table_builder).build() } async fn update_table(&self, mut commit: TableCommit) -> Result { @@ -1066,13 +1126,12 @@ impl Catalog for RestCatalog { .load_file_io(Some(&response.metadata_location), None) .await?; - Table::builder() + let table_builder = Table::builder() .identifier(commit.identifier().clone()) .file_io(file_io) .metadata(response.metadata) - .metadata_location(response.metadata_location) - .runtime(self.runtime.clone()) - .build() + .metadata_location(response.metadata_location); + self.with_table_runtime(table_builder).build() } } @@ -1082,8 +1141,13 @@ mod tests { use std::io::BufReader; use std::sync::Arc; + use bytes::Bytes; use chrono::{TimeZone, Utc}; - use iceberg::io::LocalFsStorageFactory; + use futures::stream::BoxStream; + use iceberg::io::{ + FileMetadata, FileRead, FileWrite, InputFile, LocalFsStorageFactory, OutputFile, Storage, + StorageConfig, StorageFactory, + }; use iceberg::spec::{ FormatVersion, NestedField, NullOrder, Operation, PrimitiveType, Schema, Snapshot, SnapshotLog, SortDirection, SortField, SortOrder, Summary, Transform, Type, @@ -1096,6 +1160,8 @@ mod tests { use super::*; + const REST_IO_THREAD_NAME: &str = "iceberg-rest-file-io-runtime-test"; + #[tokio::test] async fn test_update_config() { let mut server = Server::new_async().await; @@ -2345,6 +2411,63 @@ mod tests { rename_table_mock.assert_async().await; } + #[test] + fn test_load_table_with_file_io_runtime_routes_storage_to_io() { + let test_rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .enable_all() + .build() + .unwrap(); + let io_rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .thread_name(REST_IO_THREAD_NAME) + .enable_all() + .build() + .unwrap(); + + test_rt.block_on(async { + let mut server = Server::new_async().await; + let config_mock = create_config_mock(&mut server).await; + let load_table_mock = server + .mock("GET", "/v1/namespaces/ns1/tables/test1") + .with_status(200) + .with_body_from_file(format!( + "{}/testdata/{}", + env!("CARGO_MANIFEST_DIR"), + "load_table_response.json" + )) + .create_async() + .await; + + let props = HashMap::from([(REST_CATALOG_PROP_URI.to_string(), server.url())]); + let catalog = RestCatalogBuilder::default() + .with_storage_factory(Arc::new(ThreadNameStorageFactory)) + .with_file_io_runtime(io_rt.handle().clone()) + .load("rest", props) + .await + .unwrap(); + + let table = catalog + .load_table(&TableIdent::new( + NamespaceIdent::new("ns1".to_string()), + "test1".to_string(), + )) + .await + .unwrap(); + + assert!( + table + .file_io() + .exists("s3://warehouse/database/table/data") + .await + .unwrap() + ); + + config_mock.assert_async().await; + load_table_mock.assert_async().await; + }); + } + #[tokio::test] async fn test_load_table_404() { let mut server = Server::new_async().await; @@ -2955,4 +3078,65 @@ mod tests { assert_eq!(err.message(), "Catalog uri is required"); } } + + #[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)] + struct ThreadNameStorageFactory; + + #[typetag::serde] + impl StorageFactory for ThreadNameStorageFactory { + fn build(&self, _config: &StorageConfig) -> Result> { + Ok(Arc::new(ThreadNameStorage)) + } + } + + #[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)] + struct ThreadNameStorage; + + #[typetag::serde] + #[async_trait] + impl Storage for ThreadNameStorage { + async fn exists(&self, _path: &str) -> Result { + Ok(std::thread::current().name() == Some(REST_IO_THREAD_NAME)) + } + + async fn metadata(&self, _path: &str) -> Result { + unimplemented!("not used") + } + + async fn read(&self, _path: &str) -> Result { + unimplemented!("not used") + } + + async fn reader(&self, _path: &str) -> Result> { + unimplemented!("not used") + } + + async fn write(&self, _path: &str, _bs: Bytes) -> Result<()> { + unimplemented!("not used") + } + + async fn writer(&self, _path: &str) -> Result> { + unimplemented!("not used") + } + + async fn delete(&self, _path: &str) -> Result<()> { + unimplemented!("not used") + } + + async fn delete_prefix(&self, _path: &str) -> Result<()> { + unimplemented!("not used") + } + + async fn delete_stream(&self, _paths: BoxStream<'static, String>) -> Result<()> { + unimplemented!("not used") + } + + fn new_input(&self, _path: &str) -> Result { + unimplemented!("not used") + } + + fn new_output(&self, _path: &str) -> Result { + unimplemented!("not used") + } + } } diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index 03f4a28de3..d747ceca3c 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -220,6 +220,7 @@ impl S3TablesCatalog { }); let file_io = FileIOBuilder::new(factory) .with_props(&config.props) + .with_runtime(runtime.clone()) .build(); Ok(Self { diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index c7bf9d0cfd..351c97f30d 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -257,7 +257,9 @@ impl SqlCatalog { "StorageFactory must be provided for SqlCatalog. Use `with_storage_factory` to configure it.", ) })?; - let fileio = FileIOBuilder::new(factory).build(); + let fileio = FileIOBuilder::new(factory) + .with_runtime(runtime.clone()) + .build(); install_default_drivers(); let max_connections: u32 = config diff --git a/crates/iceberg/src/catalog/memory/catalog.rs b/crates/iceberg/src/catalog/memory/catalog.rs index 12895b45ff..6a4ca96066 100644 --- a/crates/iceberg/src/catalog/memory/catalog.rs +++ b/crates/iceberg/src/catalog/memory/catalog.rs @@ -145,7 +145,10 @@ impl MemoryCatalog { Ok(Self { root_namespace_state: Mutex::new(NamespaceState::default()), - file_io: FileIOBuilder::new(factory).with_props(config.props).build(), + file_io: FileIOBuilder::new(factory) + .with_props(config.props) + .with_runtime(runtime.clone()) + .build(), warehouse_location: config.warehouse, runtime, }) diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 227d8f4d5b..14caafa209 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -24,7 +24,7 @@ use futures::{Stream, StreamExt}; use super::storage::{ LocalFsStorageFactory, MemoryStorageFactory, Storage, StorageConfig, StorageFactory, }; -use crate::Result; +use crate::{Result, Runtime, RuntimeHandle}; /// FileIO implementation, used to manipulate files in underlying storage. /// @@ -65,8 +65,10 @@ pub struct FileIO { config: StorageConfig, /// Factory for creating storage instances factory: Arc, - /// Cached storage instance (lazily initialized) + /// Cached raw storage instance (lazily initialized) storage: Arc>>, + /// IO runtime handle used for storage operations. + io_runtime: Option, } impl FileIO { @@ -78,6 +80,7 @@ impl FileIO { config: StorageConfig::new(), factory: Arc::new(MemoryStorageFactory), storage: Arc::new(OnceLock::new()), + io_runtime: None, } } @@ -89,6 +92,7 @@ impl FileIO { config: StorageConfig::new(), factory: Arc::new(LocalFsStorageFactory), storage: Arc::new(OnceLock::new()), + io_runtime: None, } } @@ -97,11 +101,27 @@ impl FileIO { &self.config } - /// Get or create the storage instance. + /// Route IO-bound storage operations through the IO handle in `runtime`. + /// + /// If called with [`Self::with_io_runtime`], the last call wins. + pub fn with_runtime(mut self, runtime: Runtime) -> Self { + self.io_runtime = Some(runtime.io().clone()); + self + } + + /// Route IO-bound storage operations through `io_handle`. + /// + /// If called with [`Self::with_runtime`], the last call wins. + pub fn with_io_runtime(mut self, io_handle: tokio::runtime::Handle) -> Self { + self.io_runtime = Some(RuntimeHandle::from_tokio_handle(io_handle)); + self + } + + /// Get or create the raw storage instance. /// /// The factory is invoked on first access and the result is cached /// for all subsequent operations. - fn get_storage(&self) -> Result> { + fn get_raw_storage(&self) -> Result> { // Check if already initialized if let Some(storage) = self.storage.get() { return Ok(storage.clone()); @@ -117,6 +137,18 @@ impl FileIO { Ok(self.storage.get().unwrap().clone()) } + /// Get the storage instance, wrapped for IO-runtime dispatch when configured. + fn get_storage(&self) -> Result> { + let storage = self.get_raw_storage()?; + match &self.io_runtime { + Some(runtime) => Ok(Arc::new(super::storage::RuntimeStorage::new( + storage, + runtime.clone(), + ))), + None => Ok(storage), + } + } + /// Deletes file. /// /// # Arguments @@ -191,6 +223,8 @@ pub struct FileIOBuilder { factory: Arc, /// Storage configuration config: StorageConfig, + /// IO runtime handle used for storage operations. + io_runtime: Option, } impl FileIOBuilder { @@ -199,6 +233,7 @@ impl FileIOBuilder { Self { factory, config: StorageConfig::new(), + io_runtime: None, } } @@ -224,12 +259,29 @@ impl FileIOBuilder { &self.config } + /// Route IO-bound storage operations through the IO handle in `runtime`. + /// + /// If called with [`Self::with_io_runtime`], the last call wins. + pub fn with_runtime(mut self, runtime: Runtime) -> Self { + self.io_runtime = Some(runtime.io().clone()); + self + } + + /// Route IO-bound storage operations through `io_handle`. + /// + /// If called with [`Self::with_runtime`], the last call wins. + pub fn with_io_runtime(mut self, io_handle: tokio::runtime::Handle) -> Self { + self.io_runtime = Some(RuntimeHandle::from_tokio_handle(io_handle)); + self + } + /// Builds [`FileIO`]. pub fn build(self) -> FileIO { FileIO { config: self.config, factory: self.factory, storage: Arc::new(OnceLock::new()), + io_runtime: self.io_runtime, } } } @@ -276,6 +328,10 @@ impl InputFile { Self { storage, path } } + pub(crate) fn into_parts(self) -> (Arc, String) { + (self.storage, self.path) + } + /// Absolute path to root uri. pub fn location(&self) -> &str { &self.path @@ -339,6 +395,10 @@ impl OutputFile { Self { storage, path } } + pub(crate) fn into_parts(self) -> (Arc, String) { + (self.storage, self.path) + } + /// Relative path to root uri. pub fn location(&self) -> &str { &self.path @@ -390,14 +450,22 @@ mod tests { use std::io::Write; use std::path::Path; use std::sync::Arc; + use std::sync::atomic::{AtomicUsize, Ordering}; use bytes::Bytes; use futures::AsyncReadExt; use futures::io::AllowStdIo; + use futures::stream::BoxStream; use tempfile::TempDir; use super::{FileIO, FileIOBuilder}; - use crate::io::{LocalFsStorageFactory, MemoryStorageFactory}; + use crate::io::{ + FileRead, FileWrite, InputFile, LocalFsStorageFactory, MemoryStorageFactory, OutputFile, + Storage, StorageFactory, + }; + + const CPU_THREAD_NAME: &str = "iceberg-file-io-cpu-test"; + const IO_THREAD_NAME: &str = "iceberg-file-io-io-test"; fn create_local_file_io() -> FileIO { FileIO::new_with_fs() @@ -544,4 +612,142 @@ mod tests { assert_eq!(file_io.config().get("key1"), Some(&"value1".to_string())); assert_eq!(file_io.config().get("key2"), Some(&"value2".to_string())); } + + #[test] + fn test_file_io_with_runtime_wraps_builder_and_cached_storage() { + let (cpu_rt, io_rt) = split_test_runtimes(); + + let factory = Arc::new(ThreadNameStorageFactory::default()); + let (raw_exists, late_wrapped_exists, cached_builds) = cpu_rt.block_on(async { + let file_io = FileIOBuilder::new(factory.clone()).build(); + let raw_exists = file_io.exists("memory://thread-name").await.unwrap(); + + let file_io = file_io.with_io_runtime(io_rt.handle().clone()); + let late_wrapped_exists = file_io.exists("memory://thread-name").await.unwrap(); + + let reader = FileIOBuilder::new(Arc::new(ThreadNameStorageFactory::default())) + .with_io_runtime(io_rt.handle().clone()) + .build() + .new_input("memory://thread-name") + .unwrap() + .reader() + .await + .unwrap(); + reader.read(0..1).await.unwrap(); + + ( + raw_exists, + late_wrapped_exists, + factory.builds.load(Ordering::SeqCst), + ) + }); + + assert!(!raw_exists); + assert!(late_wrapped_exists); + assert_eq!(cached_builds, 1); + } + + fn split_test_runtimes() -> (tokio::runtime::Runtime, tokio::runtime::Runtime) { + let cpu_rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .thread_name(CPU_THREAD_NAME) + .enable_all() + .build() + .unwrap(); + let io_rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .thread_name(IO_THREAD_NAME) + .enable_all() + .build() + .unwrap(); + + (cpu_rt, io_rt) + } + + #[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)] + struct ThreadNameStorageFactory { + #[serde(skip)] + builds: Arc, + } + + #[typetag::serde] + impl StorageFactory for ThreadNameStorageFactory { + fn build(&self, _config: &crate::io::StorageConfig) -> crate::Result> { + self.builds.fetch_add(1, Ordering::SeqCst); + Ok(Arc::new(ThreadNameStorage::default())) + } + } + + #[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)] + struct ThreadNameStorage { + input: bool, + } + + #[async_trait::async_trait] + #[typetag::serde] + impl Storage for ThreadNameStorage { + async fn exists(&self, _path: &str) -> crate::Result { + Ok(std::thread::current().name() == Some(IO_THREAD_NAME)) + } + + async fn metadata(&self, _path: &str) -> crate::Result { + unimplemented!("not used") + } + + async fn read(&self, _path: &str) -> crate::Result { + unimplemented!("not used") + } + + async fn reader(&self, path: &str) -> crate::Result> { + assert_io_thread(); + assert!(self.input); + assert!(path.ends_with("#input"), "{path} should end with #input"); + Ok(Box::new(ThreadNameFileRead)) + } + + async fn write(&self, _path: &str, _bs: Bytes) -> crate::Result<()> { + unimplemented!("not used") + } + + async fn writer(&self, _path: &str) -> crate::Result> { + unimplemented!("not used") + } + + async fn delete(&self, _path: &str) -> crate::Result<()> { + unimplemented!("not used") + } + + async fn delete_prefix(&self, _path: &str) -> crate::Result<()> { + unimplemented!("not used") + } + + async fn delete_stream(&self, _paths: BoxStream<'static, String>) -> crate::Result<()> { + unimplemented!("not used") + } + + fn new_input(&self, path: &str) -> crate::Result { + Ok(InputFile::new( + Arc::new(Self { input: true }), + format!("{path}#input"), + )) + } + + fn new_output(&self, _path: &str) -> crate::Result { + unimplemented!("not used") + } + } + + struct ThreadNameFileRead; + + #[async_trait::async_trait] + impl FileRead for ThreadNameFileRead { + async fn read(&self, _range: std::ops::Range) -> crate::Result { + assert_io_thread(); + Ok(Bytes::new()) + } + } + + fn assert_io_thread() { + assert_eq!(std::thread::current().name(), Some(IO_THREAD_NAME)); + } } diff --git a/crates/iceberg/src/io/object_cache.rs b/crates/iceberg/src/io/object_cache.rs index 8881471ae8..5cd4b7bf1e 100644 --- a/crates/iceberg/src/io/object_cache.rs +++ b/crates/iceberg/src/io/object_cache.rs @@ -83,6 +83,14 @@ impl ObjectCache { } } + pub(crate) fn with_file_io(&self, file_io: FileIO) -> Self { + Self { + cache: self.cache.clone(), + file_io, + cache_disabled: self.cache_disabled, + } + } + /// Retrieves an Arc [`Manifest`] from the cache /// or retrieves one from FileIO and parses it if not present pub(crate) async fn get_manifest(&self, manifest_file: &ManifestFile) -> Result> { diff --git a/crates/iceberg/src/io/storage/mod.rs b/crates/iceberg/src/io/storage/mod.rs index 5276c7771f..8b8a934ea3 100644 --- a/crates/iceberg/src/io/storage/mod.rs +++ b/crates/iceberg/src/io/storage/mod.rs @@ -20,6 +20,7 @@ mod config; mod local_fs; mod memory; +mod runtime; use std::fmt::Debug; use std::sync::Arc; @@ -30,6 +31,7 @@ pub use config::*; use futures::stream::BoxStream; pub use local_fs::{LocalFsStorage, LocalFsStorageFactory}; pub use memory::{MemoryStorage, MemoryStorageFactory}; +pub(crate) use runtime::RuntimeStorage; use super::{FileMetadata, FileRead, FileWrite, InputFile, OutputFile}; use crate::Result; diff --git a/crates/iceberg/src/io/storage/runtime.rs b/crates/iceberg/src/io/storage/runtime.rs new file mode 100644 index 0000000000..b8cd735ff3 --- /dev/null +++ b/crates/iceberg/src/io/storage/runtime.rs @@ -0,0 +1,288 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Runtime-aware storage adapter. +//! +//! `RuntimeStorage` is a private scheduling adapter around a real `Storage` +//! backend. It is not a storage backend itself: callers never configure it, and +//! it is intentionally not registered with `typetag`. +//! +//! Async storage operations are spawned onto the configured IO runtime. The +//! synchronous `new_input` and `new_output` methods cannot do IO themselves, so +//! they preserve the backend-returned child storage and path, then wrap that +//! child storage with the same IO runtime. +//! +//! Reader and writer creation are not the final IO boundary. Returned +//! `FileRead` and `FileWrite` handles can perform later byte-range reads, +//! chunk writes, and close operations, so those handles are wrapped too. + +use std::future::Future; +use std::ops::Range; +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::stream::BoxStream; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use tokio::sync::Mutex; + +use super::Storage; +use crate::io::{FileMetadata, FileRead, FileWrite, InputFile, OutputFile}; +use crate::{Error, ErrorKind, Result, RuntimeHandle}; + +/// Storage adapter that dispatches IO-bound operations on a dedicated runtime. +#[derive(Clone, Debug)] +pub(crate) struct RuntimeStorage { + inner: Arc, + io_runtime: RuntimeHandle, +} + +impl RuntimeStorage { + pub(crate) fn new(inner: Arc, io_runtime: RuntimeHandle) -> Self { + Self { inner, io_runtime } + } + + async fn run( + &self, + operation: &'static str, + f: impl FnOnce(Arc) -> Fut + Send + 'static, + ) -> Result + where + Fut: Future> + Send + 'static, + T: Send + 'static, + { + let inner = Arc::clone(&self.inner); + spawn_on_io(&self.io_runtime, operation, f(inner)).await + } + + async fn run_path( + &self, + operation: &'static str, + path: &str, + f: impl FnOnce(Arc, String) -> Fut + Send + 'static, + ) -> Result + where + Fut: Future> + Send + 'static, + T: Send + 'static, + { + // Spawned tasks must be 'static, so borrowed paths are owned at the runtime boundary. + let path = path.to_owned(); + self.run(operation, move |inner| f(inner, path)).await + } + + fn wrap_storage(&self, storage: Arc) -> Arc { + Arc::new(Self::new(storage, self.io_runtime.clone())) + } + + fn wrap_reader(&self, reader: Box) -> Box { + Box::new(RuntimeFileRead::new( + Arc::from(reader), + self.io_runtime.clone(), + )) + } + + fn wrap_writer(&self, writer: Box) -> Box { + Box::new(RuntimeFileWrite::new(writer, self.io_runtime.clone())) + } +} + +impl Serialize for RuntimeStorage { + fn serialize(&self, _serializer: S) -> std::result::Result + where S: Serializer { + Err(serde::ser::Error::custom( + "RuntimeStorage is a transient scheduling adapter and cannot be serialized", + )) + } +} + +impl<'de> Deserialize<'de> for RuntimeStorage { + fn deserialize(_deserializer: D) -> std::result::Result + where D: Deserializer<'de> { + Err(serde::de::Error::custom( + "RuntimeStorage is a transient scheduling adapter and cannot be deserialized", + )) + } +} + +#[async_trait] +impl Storage for RuntimeStorage { + // `Storage` is a typetag trait; provide typetag's required bookkeeping + // methods without registering this transient adapter for deserialization. + #[doc(hidden)] + fn typetag_name(&self) -> &'static str { + "RuntimeStorage" + } + + #[doc(hidden)] + fn typetag_deserialize(&self) {} + + async fn exists(&self, path: &str) -> Result { + self.run_path("checking file existence", path, |inner, path| async move { + inner.exists(&path).await + }) + .await + } + + async fn metadata(&self, path: &str) -> Result { + self.run_path("reading file metadata", path, |inner, path| async move { + inner.metadata(&path).await + }) + .await + } + + async fn read(&self, path: &str) -> Result { + self.run_path("reading file", path, |inner, path| async move { + inner.read(&path).await + }) + .await + } + + async fn reader(&self, path: &str) -> Result> { + let reader = self + .run_path("opening file reader", path, |inner, path| async move { + inner.reader(&path).await + }) + .await?; + + Ok(self.wrap_reader(reader)) + } + + async fn write(&self, path: &str, bs: Bytes) -> Result<()> { + self.run_path("writing file", path, |inner, path| async move { + inner.write(&path, bs).await + }) + .await + } + + async fn writer(&self, path: &str) -> Result> { + let writer = self + .run_path("opening file writer", path, |inner, path| async move { + inner.writer(&path).await + }) + .await?; + + Ok(self.wrap_writer(writer)) + } + + async fn delete(&self, path: &str) -> Result<()> { + self.run_path("deleting file", path, |inner, path| async move { + inner.delete(&path).await + }) + .await + } + + async fn delete_prefix(&self, path: &str) -> Result<()> { + self.run_path("deleting file prefix", path, |inner, path| async move { + inner.delete_prefix(&path).await + }) + .await + } + + async fn delete_stream(&self, paths: BoxStream<'static, String>) -> Result<()> { + self.run("deleting file stream", |inner| async move { + inner.delete_stream(paths).await + }) + .await + } + + fn new_input(&self, path: &str) -> Result { + let (storage, path) = self.inner.new_input(path)?.into_parts(); + Ok(InputFile::new(self.wrap_storage(storage), path)) + } + + fn new_output(&self, path: &str) -> Result { + let (storage, path) = self.inner.new_output(path)?.into_parts(); + Ok(OutputFile::new(self.wrap_storage(storage), path)) + } +} + +struct RuntimeFileRead { + inner: Arc, + io_runtime: RuntimeHandle, +} + +impl RuntimeFileRead { + fn new(inner: Arc, io_runtime: RuntimeHandle) -> Self { + Self { inner, io_runtime } + } +} + +#[async_trait] +impl FileRead for RuntimeFileRead { + async fn read(&self, range: Range) -> Result { + let inner = Arc::clone(&self.inner); + spawn_on_io(&self.io_runtime, "reading file range", async move { + inner.read(range).await + }) + .await + } +} + +struct RuntimeFileWrite { + // FileWrite methods take &mut self, but spawned IO tasks must own 'static values. + // The mutex lets each spawned operation borrow the underlying writer after + // it has moved into the wrapper. + inner: Arc>>, + io_runtime: RuntimeHandle, +} + +impl RuntimeFileWrite { + fn new(inner: Box, io_runtime: RuntimeHandle) -> Self { + Self { + inner: Arc::new(Mutex::new(inner)), + io_runtime, + } + } +} + +#[async_trait] +impl FileWrite for RuntimeFileWrite { + async fn write(&mut self, bs: Bytes) -> Result<()> { + let inner = Arc::clone(&self.inner); + spawn_on_io(&self.io_runtime, "writing file chunk", async move { + inner.lock().await.write(bs).await + }) + .await + } + + async fn close(&mut self) -> Result<()> { + let inner = Arc::clone(&self.inner); + spawn_on_io(&self.io_runtime, "closing file writer", async move { + inner.lock().await.close().await + }) + .await + } +} + +async fn spawn_on_io(runtime: &RuntimeHandle, operation: &'static str, future: F) -> Result +where + F: Future> + Send + 'static, + T: Send + 'static, +{ + runtime + .spawn_abort_on_drop(future) + .await + .map_err(|e| spawned_task_error(operation, e))? +} + +fn spawned_task_error(operation: &'static str, error: Error) -> Error { + Error::new( + ErrorKind::Unexpected, + format!("{operation} failed on the IO runtime"), + ) + .with_source(error) +} diff --git a/crates/iceberg/src/runtime/mod.rs b/crates/iceberg/src/runtime/mod.rs index 2202995766..e52fee9f0e 100644 --- a/crates/iceberg/src/runtime/mod.rs +++ b/crates/iceberg/src/runtime/mod.rs @@ -49,6 +49,26 @@ impl Future for JoinHandle { } } +pub(crate) struct AbortOnDropJoinHandle(task::JoinHandle); + +impl Unpin for AbortOnDropJoinHandle {} + +impl Drop for AbortOnDropJoinHandle { + fn drop(&mut self) { + self.0.abort(); + } +} + +impl Future for AbortOnDropJoinHandle { + type Output = crate::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.get_mut().0).poll(cx).map(|r| { + r.map_err(|e| Error::new(ErrorKind::Unexpected, "spawned task failed").with_source(e)) + }) + } +} + /// Handle to a single tokio runtime. /// /// Wraps a [`tokio::runtime::Handle`], which is cheap to clone. The caller is @@ -67,7 +87,7 @@ impl fmt::Debug for RuntimeHandle { } impl RuntimeHandle { - fn from_tokio_handle(handle: tokio::runtime::Handle) -> Self { + pub(crate) fn from_tokio_handle(handle: tokio::runtime::Handle) -> Self { Self { handle } } @@ -80,6 +100,14 @@ impl RuntimeHandle { JoinHandle(self.handle.spawn(future)) } + pub(crate) fn spawn_abort_on_drop(&self, future: F) -> AbortOnDropJoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + AbortOnDropJoinHandle(self.handle.spawn(future)) + } + /// Spawn a blocking task. pub fn spawn_blocking(&self, f: F) -> JoinHandle where @@ -138,6 +166,17 @@ impl Runtime { } } + /// Create a Runtime from explicit tokio handles for IO and CPU work. + pub fn new_with_handles( + io_handle: tokio::runtime::Handle, + cpu_handle: tokio::runtime::Handle, + ) -> Self { + Self { + io: RuntimeHandle::from_tokio_handle(io_handle), + cpu: RuntimeHandle::from_tokio_handle(cpu_handle), + } + } + /// Borrows the tokio runtime the caller is currently running in. /// /// Panics if called outside a tokio runtime context. Use @@ -264,6 +303,52 @@ mod tests { assert_eq!(cpu_result, "cpu"); } + #[test] + fn test_runtime_with_handles_uses_explicit_cpu_handle() { + let current_rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .thread_name("iceberg-current-runtime-test") + .enable_all() + .build() + .unwrap(); + let cpu_rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .thread_name("iceberg-explicit-cpu-runtime-test") + .enable_all() + .build() + .unwrap(); + let io_rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .thread_name("iceberg-explicit-io-runtime-test") + .enable_all() + .build() + .unwrap(); + + let (io_thread, cpu_thread) = current_rt.block_on(async { + let rt = Runtime::new_with_handles(io_rt.handle().clone(), cpu_rt.handle().clone()); + let io_thread = rt + .io() + .spawn(async { std::thread::current().name().map(str::to_owned) }) + .await + .unwrap(); + let cpu_thread = rt + .cpu() + .spawn(async { std::thread::current().name().map(str::to_owned) }) + .await + .unwrap(); + (io_thread, cpu_thread) + }); + + assert_eq!( + io_thread.as_deref(), + Some("iceberg-explicit-io-runtime-test") + ); + assert_eq!( + cpu_thread.as_deref(), + Some("iceberg-explicit-cpu-runtime-test") + ); + } + #[test] fn test_runtime_clone() { let h = TestRuntime::new(); diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index 40755ec005..db6db27898 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -138,6 +138,11 @@ impl TableBuilder { )); }; + let file_io = match runtime.as_ref() { + Some(runtime) => file_io.with_runtime(runtime.clone()), + None => file_io, + }; + let object_cache = if disable_cache { Arc::new(ObjectCache::with_disabled_cache(file_io.clone())) } else if let Some(cache_size_bytes) = cache_size_bytes { @@ -188,6 +193,15 @@ impl Table { self } + /// Sets the runtime this table uses when spawning tasks. + pub fn with_runtime(mut self, runtime: Runtime) -> Self { + let file_io = self.file_io.with_runtime(runtime.clone()); + self.object_cache = Arc::new(self.object_cache.with_file_io(file_io.clone())); + self.file_io = file_io; + self.runtime = Some(runtime); + self + } + /// Returns a TableBuilder to build a table pub fn builder() -> TableBuilder { TableBuilder::new() diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index fb24af3eef..ee1b9a0ce4 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -329,9 +329,10 @@ mod tests { .await .unwrap(); - let provider = IcebergSchemaProvider::try_new_with_runtime(Arc::new(catalog), namespace, None) - .await - .unwrap(); + let provider = + IcebergSchemaProvider::try_new_with_runtime(Arc::new(catalog), namespace, None) + .await + .unwrap(); (provider, temp_dir) } diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index f02b2e3b94..56faab8075 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -82,7 +82,8 @@ pub struct IcebergTableProvider { table_ident: TableIdent, /// A reference-counted arrow `Schema` (cached at construction) schema: ArrowSchemaRef, - /// When `Some`, IO in `scan` and `insert_into` is spawned on `runtime.io()`. + /// When `Some`, catalog reloads use `runtime.io()` and loaded tables use + /// this runtime for Iceberg-internal IO and CPU task scheduling. runtime: Option, } @@ -99,8 +100,8 @@ impl IcebergTableProvider { Self::try_new_with_runtime(catalog, namespace, name, None).await } - /// Like [`Self::try_new`], but routes catalog IO in `scan` and `insert_into` - /// through `runtime.io()` instead of running inline on the caller's runtime. + /// Like [`Self::try_new`], but routes catalog IO through `runtime.io()` + /// instead of running inline on the caller's runtime. pub async fn try_new_with_runtime( catalog: Arc, namespace: NamespaceIdent, @@ -109,7 +110,8 @@ impl IcebergTableProvider { ) -> Result { let table_ident = TableIdent::new(namespace, name.into()); - let table = catalog.load_table(&table_ident).await?; + let table = + Self::load_table_on_io(catalog.clone(), table_ident.clone(), runtime.as_ref()).await?; let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); Ok(IcebergTableProvider { @@ -120,18 +122,27 @@ impl IcebergTableProvider { }) } - /// Runs `fut` on `self.runtime.io()` if set, otherwise runs it inline. - async fn run_on_io(&self, fut: F) -> DFResult - where - F: std::future::Future> + Send + 'static, - T: Send + 'static, - { - match &self.runtime { - Some(rt) => match rt.io().spawn(fut).await { - Ok(inner) => inner, - Err(e) => Err(to_datafusion_error(e)), - }, - None => fut.await, + async fn load_table_on_io( + catalog: Arc, + table_ident: TableIdent, + runtime: Option<&Runtime>, + ) -> Result
{ + let table = match runtime { + Some(runtime) => { + runtime + .io() + .spawn(async move { catalog.load_table(&table_ident).await }) + .await?? + } + None => catalog.load_table(&table_ident).await?, + }; + Ok(Self::table_with_runtime(table, runtime)) + } + + fn table_with_runtime(table: Table, runtime: Option<&Runtime>) -> Table { + match runtime { + Some(runtime) => table.with_runtime(runtime.clone()), + None => table, } } @@ -139,7 +150,12 @@ impl IcebergTableProvider { &self, r#type: MetadataTableType, ) -> Result { - let table = self.catalog.load_table(&self.table_ident).await?; + let table = Self::load_table_on_io( + self.catalog.clone(), + self.table_ident.clone(), + self.runtime.as_ref(), + ) + .await?; Ok(IcebergMetadataTableProvider { table, r#type }) } } @@ -168,53 +184,43 @@ impl TableProvider for IcebergTableProvider { // Compute the predicate on the caller's runtime (pure CPU, no IO). let predicate = convert_filters_to_predicate(filters); - // Capture everything the IO closure needs; Session is not Send. let target_partitions = state.config().target_partitions(); - let projection_owned: Option> = projection.cloned(); - let catalog = self.catalog.clone(); - let table_ident = self.table_ident.clone(); - let arrow_schema = self.schema.clone(); - - // ── IO-bound: reload table + plan files ────────────────────────────── - // Spawned on `self.runtime.io()` when configured, otherwise runs inline. - let (table, tasks) = self - .run_on_io(async move { - // Second load: fetch the latest snapshot so scans always reflect - // current table state. - let table = catalog - .load_table(&table_ident) - .await - .map_err(to_datafusion_error)?; - - let col_names = projection_owned.as_ref().map(|indices| { - indices - .iter() - .map(|&i| arrow_schema.field(i).name().clone()) - .collect::>() - }); - - let mut builder = table.scan(); - builder = match col_names { - Some(names) => builder.select(names), - None => builder.select_all(), - }; - if let Some(pred) = predicate { - builder = builder.with_filter(pred); - } - - let tasks: Vec = builder - .build() - .map_err(to_datafusion_error)? - .plan_files() - .await - .map_err(to_datafusion_error)? - .try_collect::>() - .await - .map_err(to_datafusion_error)?; - - DFResult::Ok((table, tasks)) - }) - .await?; + + // Second load: fetch the latest snapshot so scans always reflect current + // table state. Catalog IO is routed through the configured IO runtime. + let table = Self::load_table_on_io( + self.catalog.clone(), + self.table_ident.clone(), + self.runtime.as_ref(), + ) + .await + .map_err(to_datafusion_error)?; + + let col_names = projection.map(|indices| { + indices + .iter() + .map(|&i| self.schema.field(i).name().clone()) + .collect::>() + }); + + let mut builder = table.scan(); + builder = match col_names { + Some(names) => builder.select(names), + None => builder.select_all(), + }; + if let Some(pred) = predicate { + builder = builder.with_filter(pred); + } + + let tasks: Vec = builder + .build() + .map_err(to_datafusion_error)? + .plan_files() + .await + .map_err(to_datafusion_error)? + .try_collect::>() + .await + .map_err(to_datafusion_error)?; // ── CPU-bound: schema projection + bucketing ────────────────────────── // Output schema after projection: column indices in `Hash` exprs and any @@ -280,16 +286,13 @@ impl TableProvider for IcebergTableProvider { input: Arc, _insert_op: InsertOp, ) -> DFResult> { - let catalog = self.catalog.clone(); - let table_ident = self.table_ident.clone(); - let table = self - .run_on_io(async move { - catalog - .load_table(&table_ident) - .await - .map_err(to_datafusion_error) - }) - .await?; + let table = Self::load_table_on_io( + self.catalog.clone(), + self.table_ident.clone(), + self.runtime.as_ref(), + ) + .await + .map_err(to_datafusion_error)?; let partition_spec = table.metadata().default_partition_spec();