diff --git a/crates/catalog/glue/src/catalog.rs b/crates/catalog/glue/src/catalog.rs index 0a69797ef9..44f03f37a5 100644 --- a/crates/catalog/glue/src/catalog.rs +++ b/crates/catalog/glue/src/catalog.rs @@ -221,6 +221,7 @@ impl GlueCatalog { }); let file_io = FileIOBuilder::new(factory) .with_props(file_io_props) + .with_runtime(runtime.clone()) .build(); Ok(GlueCatalog { diff --git a/crates/catalog/hms/src/catalog.rs b/crates/catalog/hms/src/catalog.rs index 0f15890c77..eaba7ee325 100644 --- a/crates/catalog/hms/src/catalog.rs +++ b/crates/catalog/hms/src/catalog.rs @@ -231,6 +231,7 @@ impl HmsCatalog { })?; let file_io = FileIOBuilder::new(factory) .with_props(&config.props) + .with_runtime(runtime.clone()) .build(); Ok(Self { diff --git a/crates/catalog/rest/src/catalog.rs b/crates/catalog/rest/src/catalog.rs index 0626ce5061..df72770de0 100644 --- a/crates/catalog/rest/src/catalog.rs +++ b/crates/catalog/rest/src/catalog.rs @@ -483,7 +483,10 @@ impl RestCatalog { ) })?; - let file_io = FileIOBuilder::new(factory).with_props(props).build(); + let file_io = FileIOBuilder::new(factory) + .with_props(props) + .with_runtime(self.runtime.clone()) + .build(); Ok(file_io) } diff --git a/crates/catalog/s3tables/src/catalog.rs b/crates/catalog/s3tables/src/catalog.rs index c7c92ab44d..5e4e7449f2 100644 --- a/crates/catalog/s3tables/src/catalog.rs +++ b/crates/catalog/s3tables/src/catalog.rs @@ -220,6 +220,7 @@ impl S3TablesCatalog { }); let file_io = FileIOBuilder::new(factory) .with_props(&config.props) + .with_runtime(runtime.clone()) .build(); Ok(Self { diff --git a/crates/catalog/sql/src/catalog.rs b/crates/catalog/sql/src/catalog.rs index 02e32c5f4a..a628e72326 100644 --- a/crates/catalog/sql/src/catalog.rs +++ b/crates/catalog/sql/src/catalog.rs @@ -261,6 +261,7 @@ impl SqlCatalog { // Unrecognized keys are ignored by backends. let fileio = FileIOBuilder::new(factory) .with_props(config.props.clone()) + .with_runtime(runtime.clone()) .build(); install_default_drivers(); diff --git a/crates/iceberg/public-api.txt b/crates/iceberg/public-api.txt index c731321906..821558647b 100644 --- a/crates/iceberg/public-api.txt +++ b/crates/iceberg/public-api.txt @@ -692,6 +692,7 @@ pub fn iceberg::io::FileIO::new_input(&self, path: impl core::convert::AsRef) -> iceberg::Result pub fn iceberg::io::FileIO::new_with_fs() -> Self pub fn iceberg::io::FileIO::new_with_memory() -> Self +pub fn iceberg::io::FileIO::with_runtime(self, runtime: iceberg::Runtime) -> Self impl core::clone::Clone for iceberg::io::FileIO pub fn iceberg::io::FileIO::clone(&self) -> iceberg::io::FileIO impl core::fmt::Debug for iceberg::io::FileIO @@ -703,6 +704,7 @@ pub fn iceberg::io::FileIOBuilder::config(&self) -> &iceberg::io::StorageConfig pub fn iceberg::io::FileIOBuilder::new(factory: alloc::sync::Arc) -> Self pub fn iceberg::io::FileIOBuilder::with_prop(self, key: impl alloc::string::ToString, value: impl alloc::string::ToString) -> Self pub fn iceberg::io::FileIOBuilder::with_props(self, args: impl core::iter::traits::collect::IntoIterator) -> Self +pub fn iceberg::io::FileIOBuilder::with_runtime(self, runtime: iceberg::Runtime) -> Self impl core::clone::Clone for iceberg::io::FileIOBuilder pub fn iceberg::io::FileIOBuilder::clone(&self) -> iceberg::io::FileIOBuilder impl core::fmt::Debug for iceberg::io::FileIOBuilder @@ -3029,6 +3031,7 @@ pub fn iceberg::table::Table::metadata_ref(&self) -> iceberg::spec::TableMetadat pub fn iceberg::table::Table::reader_builder(&self) -> iceberg::arrow::ArrowReaderBuilder pub fn iceberg::table::Table::readonly(&self) -> bool pub fn iceberg::table::Table::scan(&self) -> iceberg::scan::TableScanBuilder<'_> +pub fn iceberg::table::Table::with_runtime(self, runtime: iceberg::Runtime) -> Self impl core::clone::Clone for iceberg::table::Table pub fn iceberg::table::Table::clone(&self) -> iceberg::table::Table impl core::fmt::Debug for iceberg::table::Table diff --git a/crates/iceberg/src/catalog/memory/catalog.rs b/crates/iceberg/src/catalog/memory/catalog.rs index 67f8ab8dd1..9beb37a7e2 100644 --- a/crates/iceberg/src/catalog/memory/catalog.rs +++ b/crates/iceberg/src/catalog/memory/catalog.rs @@ -145,7 +145,10 @@ impl MemoryCatalog { Ok(Self { root_namespace_state: Mutex::new(NamespaceState::default()), - file_io: FileIOBuilder::new(factory).with_props(config.props).build(), + file_io: FileIOBuilder::new(factory) + .with_props(config.props) + .with_runtime(runtime.clone()) + .build(), warehouse_location: config.warehouse, runtime, }) diff --git a/crates/iceberg/src/io/file_io.rs b/crates/iceberg/src/io/file_io.rs index 227d8f4d5b..3d3655f199 100644 --- a/crates/iceberg/src/io/file_io.rs +++ b/crates/iceberg/src/io/file_io.rs @@ -24,7 +24,7 @@ use futures::{Stream, StreamExt}; use super::storage::{ LocalFsStorageFactory, MemoryStorageFactory, Storage, StorageConfig, StorageFactory, }; -use crate::Result; +use crate::{Result, Runtime, RuntimeHandle}; /// FileIO implementation, used to manipulate files in underlying storage. /// @@ -65,8 +65,10 @@ pub struct FileIO { config: StorageConfig, /// Factory for creating storage instances factory: Arc, - /// Cached storage instance (lazily initialized) + /// Cached raw storage instance (lazily initialized) storage: Arc>>, + /// IO runtime handle used for storage operations. + io_runtime: Option, } impl FileIO { @@ -78,6 +80,7 @@ impl FileIO { config: StorageConfig::new(), factory: Arc::new(MemoryStorageFactory), storage: Arc::new(OnceLock::new()), + io_runtime: None, } } @@ -89,6 +92,7 @@ impl FileIO { config: StorageConfig::new(), factory: Arc::new(LocalFsStorageFactory), storage: Arc::new(OnceLock::new()), + io_runtime: None, } } @@ -97,11 +101,17 @@ impl FileIO { &self.config } - /// Get or create the storage instance. + /// Route IO-bound storage operations through the IO handle in `runtime`. + pub fn with_runtime(mut self, runtime: Runtime) -> Self { + self.io_runtime = Some(runtime.io().clone()); + self + } + + /// Get or create the raw storage instance. /// /// The factory is invoked on first access and the result is cached /// for all subsequent operations. - fn get_storage(&self) -> Result> { + fn get_raw_storage(&self) -> Result> { // Check if already initialized if let Some(storage) = self.storage.get() { return Ok(storage.clone()); @@ -117,6 +127,18 @@ impl FileIO { Ok(self.storage.get().unwrap().clone()) } + /// Get the storage instance, wrapped for IO-runtime dispatch when configured. + fn get_storage(&self) -> Result> { + let storage = self.get_raw_storage()?; + match &self.io_runtime { + Some(runtime) => Ok(Arc::new(super::storage::RuntimeStorage::new( + storage, + runtime.clone(), + ))), + None => Ok(storage), + } + } + /// Deletes file. /// /// # Arguments @@ -191,6 +213,8 @@ pub struct FileIOBuilder { factory: Arc, /// Storage configuration config: StorageConfig, + /// IO runtime handle used for storage operations. + io_runtime: Option, } impl FileIOBuilder { @@ -199,6 +223,7 @@ impl FileIOBuilder { Self { factory, config: StorageConfig::new(), + io_runtime: None, } } @@ -224,12 +249,19 @@ impl FileIOBuilder { &self.config } + /// Route IO-bound storage operations through the IO handle in `runtime`. + pub fn with_runtime(mut self, runtime: Runtime) -> Self { + self.io_runtime = Some(runtime.io().clone()); + self + } + /// Builds [`FileIO`]. pub fn build(self) -> FileIO { FileIO { config: self.config, factory: self.factory, storage: Arc::new(OnceLock::new()), + io_runtime: self.io_runtime, } } } @@ -276,6 +308,10 @@ impl InputFile { Self { storage, path } } + pub(crate) fn into_parts(self) -> (Arc, String) { + (self.storage, self.path) + } + /// Absolute path to root uri. pub fn location(&self) -> &str { &self.path @@ -339,6 +375,10 @@ impl OutputFile { Self { storage, path } } + pub(crate) fn into_parts(self) -> (Arc, String) { + (self.storage, self.path) + } + /// Relative path to root uri. pub fn location(&self) -> &str { &self.path @@ -390,14 +430,23 @@ mod tests { use std::io::Write; use std::path::Path; use std::sync::Arc; + use std::sync::atomic::{AtomicUsize, Ordering}; use bytes::Bytes; use futures::AsyncReadExt; use futures::io::AllowStdIo; + use futures::stream::BoxStream; use tempfile::TempDir; use super::{FileIO, FileIOBuilder}; - use crate::io::{LocalFsStorageFactory, MemoryStorageFactory}; + use crate::Runtime; + use crate::io::{ + FileMetadata, FileRead, FileWrite, InputFile, LocalFsStorageFactory, MemoryStorageFactory, + OutputFile, Storage, StorageFactory, + }; + + const CPU_THREAD_NAME: &str = "iceberg-file-io-cpu-test"; + const IO_THREAD_NAME: &str = "iceberg-file-io-io-test"; fn create_local_file_io() -> FileIO { FileIO::new_with_fs() @@ -544,4 +593,188 @@ mod tests { assert_eq!(file_io.config().get("key1"), Some(&"value1".to_string())); assert_eq!(file_io.config().get("key2"), Some(&"value2".to_string())); } + + #[test] + fn test_file_io_with_runtime_routes_storage_operations() { + let (cpu_rt, io_rt) = split_test_runtimes(); + let runtime = Runtime::new_with_split(&io_rt, &cpu_rt); + let factory = Arc::new(ThreadNameStorageFactory::default()); + + let (raw_exists, wrapped_exists, builds) = cpu_rt.block_on(async { + let file_io = FileIOBuilder::new(factory.clone()).build(); + let raw_exists = file_io.exists("memory://thread-name").await.unwrap(); + + let file_io = file_io.with_runtime(runtime); + let wrapped_exists = file_io.exists("memory://thread-name").await.unwrap(); + + let input = file_io.new_input("memory://thread-name").unwrap(); + input.metadata().await.unwrap(); + input.read().await.unwrap(); + input.reader().await.unwrap().read(0..1).await.unwrap(); + + let output = file_io.new_output("memory://thread-name").unwrap(); + output.write(Bytes::from_static(b"chunk")).await.unwrap(); + let mut writer = output.writer().await.unwrap(); + writer.write(Bytes::from_static(b"chunk")).await.unwrap(); + writer.close().await.unwrap(); + + file_io.delete("memory://thread-name").await.unwrap(); + file_io.delete_prefix("memory://thread-name").await.unwrap(); + file_io + .delete_stream(futures::stream::iter(["memory://thread-name".to_string()])) + .await + .unwrap(); + + ( + raw_exists, + wrapped_exists, + factory.builds.load(Ordering::SeqCst), + ) + }); + + assert!(!raw_exists); + assert!(wrapped_exists); + assert_eq!(builds, 1); + } + + fn split_test_runtimes() -> (tokio::runtime::Runtime, tokio::runtime::Runtime) { + let cpu_rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .thread_name(CPU_THREAD_NAME) + .enable_all() + .build() + .unwrap(); + let io_rt = tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .thread_name(IO_THREAD_NAME) + .enable_all() + .build() + .unwrap(); + + (cpu_rt, io_rt) + } + + #[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)] + struct ThreadNameStorageFactory { + #[serde(skip)] + builds: Arc, + } + + #[typetag::serde] + impl StorageFactory for ThreadNameStorageFactory { + fn build(&self, _config: &crate::io::StorageConfig) -> crate::Result> { + self.builds.fetch_add(1, Ordering::SeqCst); + Ok(Arc::new(ThreadNameStorage::default())) + } + } + + #[derive(Clone, Debug, Default, serde::Serialize, serde::Deserialize)] + struct ThreadNameStorage { + input: bool, + output: bool, + } + + #[async_trait::async_trait] + #[typetag::serde] + impl Storage for ThreadNameStorage { + async fn exists(&self, _path: &str) -> crate::Result { + Ok(std::thread::current().name() == Some(IO_THREAD_NAME)) + } + + async fn metadata(&self, _path: &str) -> crate::Result { + assert_io_thread(); + assert!(self.input); + Ok(FileMetadata { size: 0 }) + } + + async fn read(&self, _path: &str) -> crate::Result { + assert_io_thread(); + assert!(self.input); + Ok(Bytes::new()) + } + + async fn reader(&self, path: &str) -> crate::Result> { + assert_io_thread(); + assert!(self.input); + assert!(path.ends_with("#input")); + Ok(Box::new(ThreadNameFileRead)) + } + + async fn write(&self, _path: &str, _bs: Bytes) -> crate::Result<()> { + assert_io_thread(); + assert!(self.output); + Ok(()) + } + + async fn writer(&self, path: &str) -> crate::Result> { + assert_io_thread(); + assert!(self.output); + assert!(path.ends_with("#output")); + Ok(Box::new(ThreadNameFileWrite)) + } + + async fn delete(&self, _path: &str) -> crate::Result<()> { + assert_io_thread(); + Ok(()) + } + + async fn delete_prefix(&self, _path: &str) -> crate::Result<()> { + assert_io_thread(); + Ok(()) + } + + async fn delete_stream(&self, _paths: BoxStream<'static, String>) -> crate::Result<()> { + assert_io_thread(); + Ok(()) + } + + fn new_input(&self, path: &str) -> crate::Result { + Ok(InputFile::new( + Arc::new(Self { + input: true, + output: false, + }), + format!("{path}#input"), + )) + } + + fn new_output(&self, path: &str) -> crate::Result { + Ok(OutputFile::new( + Arc::new(Self { + input: false, + output: true, + }), + format!("{path}#output"), + )) + } + } + + struct ThreadNameFileRead; + + #[async_trait::async_trait] + impl FileRead for ThreadNameFileRead { + async fn read(&self, _range: std::ops::Range) -> crate::Result { + assert_io_thread(); + Ok(Bytes::new()) + } + } + + struct ThreadNameFileWrite; + + #[async_trait::async_trait] + impl FileWrite for ThreadNameFileWrite { + async fn write(&mut self, _bs: Bytes) -> crate::Result<()> { + assert_io_thread(); + Ok(()) + } + + async fn close(&mut self) -> crate::Result<()> { + assert_io_thread(); + Ok(()) + } + } + + fn assert_io_thread() { + assert_eq!(std::thread::current().name(), Some(IO_THREAD_NAME)); + } } diff --git a/crates/iceberg/src/io/object_cache.rs b/crates/iceberg/src/io/object_cache.rs index cdcda2bac5..84b2b7b683 100644 --- a/crates/iceberg/src/io/object_cache.rs +++ b/crates/iceberg/src/io/object_cache.rs @@ -84,6 +84,14 @@ impl ObjectCache { } } + pub(crate) fn with_file_io(&self, file_io: FileIO) -> Self { + Self { + cache: self.cache.clone(), + file_io, + cache_disabled: self.cache_disabled, + } + } + /// Retrieves an Arc [`Manifest`] from the cache /// or retrieves one from FileIO and parses it if not present pub(crate) async fn get_manifest(&self, manifest_file: &ManifestFile) -> Result> { diff --git a/crates/iceberg/src/io/storage/mod.rs b/crates/iceberg/src/io/storage/mod.rs index 5276c7771f..8b8a934ea3 100644 --- a/crates/iceberg/src/io/storage/mod.rs +++ b/crates/iceberg/src/io/storage/mod.rs @@ -20,6 +20,7 @@ mod config; mod local_fs; mod memory; +mod runtime; use std::fmt::Debug; use std::sync::Arc; @@ -30,6 +31,7 @@ pub use config::*; use futures::stream::BoxStream; pub use local_fs::{LocalFsStorage, LocalFsStorageFactory}; pub use memory::{MemoryStorage, MemoryStorageFactory}; +pub(crate) use runtime::RuntimeStorage; use super::{FileMetadata, FileRead, FileWrite, InputFile, OutputFile}; use crate::Result; diff --git a/crates/iceberg/src/io/storage/runtime.rs b/crates/iceberg/src/io/storage/runtime.rs new file mode 100644 index 0000000000..8a9d8ce0bb --- /dev/null +++ b/crates/iceberg/src/io/storage/runtime.rs @@ -0,0 +1,282 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Runtime-aware storage adapter. +//! +//! `RuntimeStorage` is a private scheduling adapter around a real `Storage` +//! backend. It is not a storage backend itself: callers never configure it, and +//! it is intentionally not registered with `typetag`. +//! +//! Async storage operations are spawned onto the configured IO runtime. The +//! synchronous `new_input` and `new_output` methods cannot do IO themselves, so +//! they preserve the backend-returned child storage and path, then wrap that +//! child storage with the same IO runtime. +//! +//! Reader and writer creation are not the final IO boundary. Returned +//! `FileRead` and `FileWrite` handles can perform later byte-range reads, +//! chunk writes, and close operations, so those handles are wrapped too. + +use std::future::Future; +use std::ops::Range; +use std::sync::Arc; + +use async_trait::async_trait; +use bytes::Bytes; +use futures::stream::BoxStream; +use serde::{Deserialize, Deserializer, Serialize, Serializer}; +use tokio::sync::Mutex; + +use super::Storage; +use crate::io::{FileMetadata, FileRead, FileWrite, InputFile, OutputFile}; +use crate::{Error, ErrorKind, Result, RuntimeHandle}; + +/// Storage adapter that dispatches IO-bound operations on a dedicated runtime. +#[derive(Clone, Debug)] +pub(crate) struct RuntimeStorage { + inner: Arc, + io_runtime: RuntimeHandle, +} + +impl RuntimeStorage { + pub(crate) fn new(inner: Arc, io_runtime: RuntimeHandle) -> Self { + Self { inner, io_runtime } + } + + async fn run( + &self, + operation: &'static str, + f: impl FnOnce(Arc) -> Fut + Send + 'static, + ) -> Result + where + Fut: Future> + Send + 'static, + T: Send + 'static, + { + let inner = Arc::clone(&self.inner); + spawn_on_io(&self.io_runtime, operation, f(inner)).await + } + + async fn run_path( + &self, + operation: &'static str, + path: &str, + f: impl FnOnce(Arc, String) -> Fut + Send + 'static, + ) -> Result + where + Fut: Future> + Send + 'static, + T: Send + 'static, + { + let path = path.to_owned(); + self.run(operation, move |inner| f(inner, path)).await + } + + fn wrap_storage(&self, storage: Arc) -> Arc { + Arc::new(Self::new(storage, self.io_runtime.clone())) + } + + fn wrap_reader(&self, reader: Box) -> Box { + Box::new(RuntimeFileRead::new( + Arc::from(reader), + self.io_runtime.clone(), + )) + } + + fn wrap_writer(&self, writer: Box) -> Box { + Box::new(RuntimeFileWrite::new(writer, self.io_runtime.clone())) + } +} + +impl Serialize for RuntimeStorage { + fn serialize(&self, _serializer: S) -> std::result::Result + where S: Serializer { + Err(serde::ser::Error::custom( + "RuntimeStorage is a transient scheduling adapter and cannot be serialized", + )) + } +} + +impl<'de> Deserialize<'de> for RuntimeStorage { + fn deserialize(_deserializer: D) -> std::result::Result + where D: Deserializer<'de> { + Err(serde::de::Error::custom( + "RuntimeStorage is a transient scheduling adapter and cannot be deserialized", + )) + } +} + +#[async_trait] +impl Storage for RuntimeStorage { + #[doc(hidden)] + fn typetag_name(&self) -> &'static str { + "RuntimeStorage" + } + + #[doc(hidden)] + fn typetag_deserialize(&self) {} + + async fn exists(&self, path: &str) -> Result { + self.run_path("checking file existence", path, |inner, path| async move { + inner.exists(&path).await + }) + .await + } + + async fn metadata(&self, path: &str) -> Result { + self.run_path("reading file metadata", path, |inner, path| async move { + inner.metadata(&path).await + }) + .await + } + + async fn read(&self, path: &str) -> Result { + self.run_path("reading file", path, |inner, path| async move { + inner.read(&path).await + }) + .await + } + + async fn reader(&self, path: &str) -> Result> { + let reader = self + .run_path("opening file reader", path, |inner, path| async move { + inner.reader(&path).await + }) + .await?; + + Ok(self.wrap_reader(reader)) + } + + async fn write(&self, path: &str, bs: Bytes) -> Result<()> { + self.run_path("writing file", path, |inner, path| async move { + inner.write(&path, bs).await + }) + .await + } + + async fn writer(&self, path: &str) -> Result> { + let writer = self + .run_path("opening file writer", path, |inner, path| async move { + inner.writer(&path).await + }) + .await?; + + Ok(self.wrap_writer(writer)) + } + + async fn delete(&self, path: &str) -> Result<()> { + self.run_path("deleting file", path, |inner, path| async move { + inner.delete(&path).await + }) + .await + } + + async fn delete_prefix(&self, path: &str) -> Result<()> { + self.run_path("deleting file prefix", path, |inner, path| async move { + inner.delete_prefix(&path).await + }) + .await + } + + async fn delete_stream(&self, paths: BoxStream<'static, String>) -> Result<()> { + self.run("deleting file stream", |inner| async move { + inner.delete_stream(paths).await + }) + .await + } + + fn new_input(&self, path: &str) -> Result { + let (storage, path) = self.inner.new_input(path)?.into_parts(); + Ok(InputFile::new(self.wrap_storage(storage), path)) + } + + fn new_output(&self, path: &str) -> Result { + let (storage, path) = self.inner.new_output(path)?.into_parts(); + Ok(OutputFile::new(self.wrap_storage(storage), path)) + } +} + +struct RuntimeFileRead { + inner: Arc, + io_runtime: RuntimeHandle, +} + +impl RuntimeFileRead { + fn new(inner: Arc, io_runtime: RuntimeHandle) -> Self { + Self { inner, io_runtime } + } +} + +#[async_trait] +impl FileRead for RuntimeFileRead { + async fn read(&self, range: Range) -> Result { + let inner = Arc::clone(&self.inner); + spawn_on_io(&self.io_runtime, "reading file range", async move { + inner.read(range).await + }) + .await + } +} + +struct RuntimeFileWrite { + inner: Arc>>, + io_runtime: RuntimeHandle, +} + +impl RuntimeFileWrite { + fn new(inner: Box, io_runtime: RuntimeHandle) -> Self { + Self { + inner: Arc::new(Mutex::new(inner)), + io_runtime, + } + } +} + +#[async_trait] +impl FileWrite for RuntimeFileWrite { + async fn write(&mut self, bs: Bytes) -> Result<()> { + let inner = Arc::clone(&self.inner); + spawn_on_io(&self.io_runtime, "writing file chunk", async move { + inner.lock().await.write(bs).await + }) + .await + } + + async fn close(&mut self) -> Result<()> { + let inner = Arc::clone(&self.inner); + spawn_on_io(&self.io_runtime, "closing file writer", async move { + inner.lock().await.close().await + }) + .await + } +} + +async fn spawn_on_io(runtime: &RuntimeHandle, operation: &'static str, future: F) -> Result +where + F: Future> + Send + 'static, + T: Send + 'static, +{ + runtime + .spawn_abort_on_drop(future) + .await + .map_err(|e| spawned_task_error(operation, e))? +} + +fn spawned_task_error(operation: &'static str, error: Error) -> Error { + Error::new( + ErrorKind::Unexpected, + format!("{operation} failed on the IO runtime"), + ) + .with_source(error) +} diff --git a/crates/iceberg/src/runtime/mod.rs b/crates/iceberg/src/runtime/mod.rs index c04aef330f..7c57a200f1 100644 --- a/crates/iceberg/src/runtime/mod.rs +++ b/crates/iceberg/src/runtime/mod.rs @@ -48,6 +48,26 @@ impl Future for JoinHandle { } } +pub(crate) struct AbortOnDropJoinHandle(task::JoinHandle); + +impl Unpin for AbortOnDropJoinHandle {} + +impl Drop for AbortOnDropJoinHandle { + fn drop(&mut self) { + self.0.abort(); + } +} + +impl Future for AbortOnDropJoinHandle { + type Output = crate::Result; + + fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + Pin::new(&mut self.get_mut().0).poll(cx).map(|r| { + r.map_err(|e| Error::new(ErrorKind::Unexpected, "spawned task failed").with_source(e)) + }) + } +} + /// Handle to a single tokio runtime. /// /// Wraps a [`tokio::runtime::Handle`], which is cheap to clone. The caller is @@ -79,6 +99,14 @@ impl RuntimeHandle { JoinHandle(self.handle.spawn(future)) } + pub(crate) fn spawn_abort_on_drop(&self, future: F) -> AbortOnDropJoinHandle + where + F: Future + Send + 'static, + F::Output: Send + 'static, + { + AbortOnDropJoinHandle(self.handle.spawn(future)) + } + /// Spawn a blocking task. pub fn spawn_blocking(&self, f: F) -> JoinHandle where diff --git a/crates/iceberg/src/table.rs b/crates/iceberg/src/table.rs index f4880509b3..d1da5fbc73 100644 --- a/crates/iceberg/src/table.rs +++ b/crates/iceberg/src/table.rs @@ -145,6 +145,8 @@ impl TableBuilder { )); }; + let file_io = file_io.with_runtime(runtime.clone()); + let object_cache = if disable_cache { Arc::new(ObjectCache::with_disabled_cache(file_io.clone())) } else if let Some(cache_size_bytes) = cache_size_bytes { @@ -193,6 +195,15 @@ impl Table { self } + /// Sets the runtime this table uses when spawning tasks. + pub fn with_runtime(mut self, runtime: Runtime) -> Self { + let file_io = self.file_io.with_runtime(runtime.clone()); + self.object_cache = Arc::new(self.object_cache.with_file_io(file_io.clone())); + self.file_io = file_io; + self.runtime = runtime; + self + } + /// Returns a TableBuilder to build a table pub fn builder() -> TableBuilder { TableBuilder::new() diff --git a/crates/integrations/datafusion/public-api.txt b/crates/integrations/datafusion/public-api.txt index d24bd9fc9e..3785b8a937 100644 --- a/crates/integrations/datafusion/public-api.txt +++ b/crates/integrations/datafusion/public-api.txt @@ -74,6 +74,8 @@ pub fn iceberg_datafusion::IcebergStaticTableProvider::schema(&self) -> arrow_sc pub fn iceberg_datafusion::IcebergStaticTableProvider::supports_filters_pushdown(&self, filters: &[&datafusion_expr::expr::Expr]) -> datafusion_common::error::Result> pub fn iceberg_datafusion::IcebergStaticTableProvider::table_type(&self) -> datafusion_expr::table_source::TableType pub struct iceberg_datafusion::table::IcebergTableProvider +impl iceberg_datafusion::IcebergTableProvider +pub async fn iceberg_datafusion::IcebergTableProvider::try_new_with_runtime(catalog: alloc::sync::Arc, namespace: iceberg::catalog::NamespaceIdent, name: impl core::convert::Into, runtime: iceberg::runtime::Runtime) -> iceberg::error::Result impl core::clone::Clone for iceberg_datafusion::IcebergTableProvider pub fn iceberg_datafusion::IcebergTableProvider::clone(&self) -> iceberg_datafusion::IcebergTableProvider impl core::fmt::Debug for iceberg_datafusion::IcebergTableProvider @@ -99,6 +101,7 @@ pub fn iceberg_datafusion::table_provider_factory::IcebergTableProviderFactory:: pub struct iceberg_datafusion::IcebergCatalogProvider impl iceberg_datafusion::IcebergCatalogProvider pub async fn iceberg_datafusion::IcebergCatalogProvider::try_new(client: alloc::sync::Arc) -> iceberg::error::Result +pub async fn iceberg_datafusion::IcebergCatalogProvider::try_new_with_runtime(client: alloc::sync::Arc, runtime: iceberg::runtime::Runtime) -> iceberg::error::Result impl core::fmt::Debug for iceberg_datafusion::IcebergCatalogProvider pub fn iceberg_datafusion::IcebergCatalogProvider::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result impl datafusion_catalog::catalog::CatalogProvider for iceberg_datafusion::IcebergCatalogProvider @@ -121,6 +124,8 @@ pub fn iceberg_datafusion::IcebergStaticTableProvider::schema(&self) -> arrow_sc pub fn iceberg_datafusion::IcebergStaticTableProvider::supports_filters_pushdown(&self, filters: &[&datafusion_expr::expr::Expr]) -> datafusion_common::error::Result> pub fn iceberg_datafusion::IcebergStaticTableProvider::table_type(&self) -> datafusion_expr::table_source::TableType pub struct iceberg_datafusion::IcebergTableProvider +impl iceberg_datafusion::IcebergTableProvider +pub async fn iceberg_datafusion::IcebergTableProvider::try_new_with_runtime(catalog: alloc::sync::Arc, namespace: iceberg::catalog::NamespaceIdent, name: impl core::convert::Into, runtime: iceberg::runtime::Runtime) -> iceberg::error::Result impl core::clone::Clone for iceberg_datafusion::IcebergTableProvider pub fn iceberg_datafusion::IcebergTableProvider::clone(&self) -> iceberg_datafusion::IcebergTableProvider impl core::fmt::Debug for iceberg_datafusion::IcebergTableProvider diff --git a/crates/integrations/datafusion/src/catalog.rs b/crates/integrations/datafusion/src/catalog.rs index c3cbcc88b4..e0c094ba04 100644 --- a/crates/integrations/datafusion/src/catalog.rs +++ b/crates/integrations/datafusion/src/catalog.rs @@ -21,7 +21,7 @@ use std::sync::Arc; use datafusion::catalog::{CatalogProvider, SchemaProvider}; use futures::future::try_join_all; -use iceberg::{Catalog, NamespaceIdent, Result}; +use iceberg::{Catalog, NamespaceIdent, Result, Runtime}; use crate::schema::IcebergSchemaProvider; @@ -47,6 +47,18 @@ impl IcebergCatalogProvider { /// attempts to create a schema provider for each namespace, and /// collects these providers into a `HashMap`. pub async fn try_new(client: Arc) -> Result { + Self::try_new_optional_runtime(client, None).await + } + + /// Like [`Self::try_new`], propagating `runtime` to child table providers. + pub async fn try_new_with_runtime(client: Arc, runtime: Runtime) -> Result { + Self::try_new_optional_runtime(client, Some(runtime)).await + } + + async fn try_new_optional_runtime( + client: Arc, + runtime: Option, + ) -> Result { // TODO: // Schemas and providers should be cached and evicted based on time // As of right now; schemas might become stale. @@ -61,9 +73,10 @@ impl IcebergCatalogProvider { schema_names .iter() .map(|name| { - IcebergSchemaProvider::try_new( + IcebergSchemaProvider::try_new_optional_runtime( client.clone(), NamespaceIdent::new(name.clone()), + runtime.clone(), ) }) .collect::>(), diff --git a/crates/integrations/datafusion/src/physical_plan/commit.rs b/crates/integrations/datafusion/src/physical_plan/commit.rs index 835c804908..52e329cc7d 100644 --- a/crates/integrations/datafusion/src/physical_plan/commit.rs +++ b/crates/integrations/datafusion/src/physical_plan/commit.rs @@ -590,10 +590,11 @@ mod tests { let source_table = Arc::new(MemTable::try_new(Arc::clone(&arrow_schema), partitions)?); ctx.register_table("source_table", source_table)?; - let iceberg_table_provider = IcebergTableProvider::try_new( + let iceberg_table_provider = IcebergTableProvider::try_new_optional_runtime( catalog.clone(), namespace.clone(), table_name.to_string(), + None, ) .await?; ctx.register_table("iceberg_table", Arc::new(iceberg_table_provider))?; diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index 508aeb303b..e8a5e29522 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -29,7 +29,9 @@ use futures::StreamExt; use futures::future::try_join_all; use iceberg::arrow::arrow_schema_to_schema_auto_assign_ids; use iceberg::inspect::MetadataTableType; -use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableCreation, TableIdent}; +use iceberg::{ + Catalog, Error, ErrorKind, NamespaceIdent, Result, Runtime, TableCreation, TableIdent, +}; use crate::table::IcebergTableProvider; use crate::to_datafusion_error; @@ -47,19 +49,17 @@ pub(crate) struct IcebergSchemaProvider { /// [`TableProvider`] trait. /// Wrapped in Arc to allow sharing across async boundaries in register_table. tables: Arc>>, + /// Propagated to every [`IcebergTableProvider`] created by this provider. + runtime: Option, } impl IcebergSchemaProvider { - /// Asynchronously tries to construct a new [`IcebergSchemaProvider`] - /// using the given client to fetch and initialize table providers for - /// the provided namespace in the Iceberg [`Catalog`]. - /// - /// This method retrieves a list of table names - /// attempts to create a table provider for each table name, and - /// collects these providers into a `HashMap`. - pub(crate) async fn try_new( + /// Asynchronously tries to construct a new [`IcebergSchemaProvider`], + /// propagating `runtime` to every table provider. + pub(crate) async fn try_new_optional_runtime( client: Arc, namespace: NamespaceIdent, + runtime: Option, ) -> Result { // TODO: // Tables and providers should be cached based on table_name @@ -75,7 +75,14 @@ impl IcebergSchemaProvider { let providers = try_join_all( table_names .iter() - .map(|name| IcebergTableProvider::try_new(client.clone(), namespace.clone(), name)) + .map(|name| { + IcebergTableProvider::try_new_optional_runtime( + client.clone(), + namespace.clone(), + name, + runtime.clone(), + ) + }) .collect::>(), ) .await?; @@ -89,6 +96,7 @@ impl IcebergSchemaProvider { catalog: client, namespace, tables, + runtime, }) } } @@ -173,6 +181,7 @@ impl SchemaProvider for IcebergSchemaProvider { let namespace = self.namespace.clone(); let tables = self.tables.clone(); let name_clone = name.clone(); + let runtime = self.runtime.clone(); // Use tokio's spawn_blocking to handle the async work on a blocking thread pool let result = tokio::task::spawn_blocking(move || { @@ -190,10 +199,11 @@ impl SchemaProvider for IcebergSchemaProvider { .map_err(to_datafusion_error)?; // Create a new table provider using the catalog reference - let table_provider = IcebergTableProvider::try_new( + let table_provider = IcebergTableProvider::try_new_optional_runtime( catalog.clone(), namespace.clone(), name_clone.clone(), + runtime, ) .await .map_err(to_datafusion_error)?; @@ -315,9 +325,10 @@ mod tests { .await .unwrap(); - let provider = IcebergSchemaProvider::try_new(Arc::new(catalog), namespace) - .await - .unwrap(); + let provider = + IcebergSchemaProvider::try_new_optional_runtime(Arc::new(catalog), namespace, None) + .await + .unwrap(); (provider, temp_dir) } diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 75b7988d8d..a92f23f69c 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -46,7 +46,7 @@ use iceberg::arrow::schema_to_arrow_schema; use iceberg::inspect::MetadataTableType; use iceberg::spec::TableProperties; use iceberg::table::Table; -use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent}; +use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, Runtime, TableIdent}; use metadata_table::IcebergMetadataTableProvider; use crate::error::to_datafusion_error; @@ -65,6 +65,10 @@ use crate::physical_plan::write::IcebergWriteExec; /// /// For read-only access to a specific snapshot without catalog overhead, use /// [`IcebergStaticTableProvider`] instead. +/// +/// When using a CPU/IO split runtime, pass a [`Runtime`] via +/// [`Self::try_new_with_runtime`] so that table loads use the IO runtime and +/// loaded tables use the configured Iceberg runtime. #[derive(Debug, Clone)] pub struct IcebergTableProvider { /// The catalog that manages this table @@ -73,37 +77,77 @@ pub struct IcebergTableProvider { table_ident: TableIdent, /// A reference-counted arrow `Schema` (cached at construction) schema: ArrowSchemaRef, + /// When `Some`, catalog reloads use `runtime.io()` and loaded tables use + /// this runtime for Iceberg-internal IO and CPU task scheduling. + runtime: Option, } impl IcebergTableProvider { - /// Creates a new catalog-backed table provider. - /// - /// Loads the table once to get the initial schema, then stores the catalog - /// reference for future metadata refreshes on each operation. - pub(crate) async fn try_new( + /// Creates a catalog-backed table provider and routes table loads through + /// `runtime.io()` instead of running inline on the caller's runtime. + pub async fn try_new_with_runtime( catalog: Arc, namespace: NamespaceIdent, name: impl Into, + runtime: Runtime, + ) -> Result { + Self::try_new_optional_runtime(catalog, namespace, name, Some(runtime)).await + } + + pub(crate) async fn try_new_optional_runtime( + catalog: Arc, + namespace: NamespaceIdent, + name: impl Into, + runtime: Option, ) -> Result { let table_ident = TableIdent::new(namespace, name.into()); - // Load table once to get initial schema - let table = catalog.load_table(&table_ident).await?; + let table = + Self::load_table_on_io(catalog.clone(), table_ident.clone(), runtime.as_ref()).await?; let schema = Arc::new(schema_to_arrow_schema(table.metadata().current_schema())?); Ok(IcebergTableProvider { catalog, table_ident, schema, + runtime, }) } + async fn load_table_on_io( + catalog: Arc, + table_ident: TableIdent, + runtime: Option<&Runtime>, + ) -> Result { + let table = match runtime { + Some(runtime) => { + runtime + .io() + .spawn(async move { catalog.load_table(&table_ident).await }) + .await?? + } + None => catalog.load_table(&table_ident).await?, + }; + Ok(Self::table_with_runtime(table, runtime)) + } + + fn table_with_runtime(table: Table, runtime: Option<&Runtime>) -> Table { + match runtime { + Some(runtime) => table.with_runtime(runtime.clone()), + None => table, + } + } + pub(crate) async fn metadata_table( &self, r#type: MetadataTableType, ) -> Result { - // Load fresh table metadata for metadata table access - let table = self.catalog.load_table(&self.table_ident).await?; + let table = Self::load_table_on_io( + self.catalog.clone(), + self.table_ident.clone(), + self.runtime.as_ref(), + ) + .await?; Ok(IcebergMetadataTableProvider { table, r#type }) } } @@ -129,12 +173,13 @@ impl TableProvider for IcebergTableProvider { filters: &[Expr], limit: Option, ) -> DFResult> { - // Load fresh table metadata from catalog - let table = self - .catalog - .load_table(&self.table_ident) - .await - .map_err(to_datafusion_error)?; + let table = Self::load_table_on_io( + self.catalog.clone(), + self.table_ident.clone(), + self.runtime.as_ref(), + ) + .await + .map_err(to_datafusion_error)?; // Create scan with fresh metadata (always use current snapshot) Ok(Arc::new(IcebergTableScan::new( @@ -161,12 +206,13 @@ impl TableProvider for IcebergTableProvider { input: Arc, _insert_op: InsertOp, ) -> DFResult> { - // Load fresh table metadata from catalog - let table = self - .catalog - .load_table(&self.table_ident) - .await - .map_err(to_datafusion_error)?; + let table = Self::load_table_on_io( + self.catalog.clone(), + self.table_ident.clone(), + self.runtime.as_ref(), + ) + .await + .map_err(to_datafusion_error)?; let partition_spec = table.metadata().default_partition_spec(); @@ -428,6 +474,21 @@ mod tests { ) } + async fn new_catalog_backed_provider( + catalog: &Arc, + namespace: &NamespaceIdent, + table_name: &str, + ) -> IcebergTableProvider { + IcebergTableProvider::try_new_optional_runtime( + catalog.clone(), + namespace.clone(), + table_name, + None, + ) + .await + .unwrap() + } + // Tests for IcebergStaticTableProvider #[tokio::test] @@ -526,10 +587,7 @@ mod tests { let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await; // Test creating a catalog-backed provider - let provider = - IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone()) - .await - .unwrap(); + let provider = new_catalog_backed_provider(&catalog, &namespace, &table_name).await; // Verify the schema is loaded correctly let schema = provider.schema(); @@ -542,10 +600,7 @@ mod tests { async fn test_catalog_backed_provider_scan() { let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await; - let provider = - IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone()) - .await - .unwrap(); + let provider = new_catalog_backed_provider(&catalog, &namespace, &table_name).await; let ctx = SessionContext::new(); ctx.register_table("test_table", Arc::new(provider)) @@ -568,10 +623,7 @@ mod tests { async fn test_catalog_backed_provider_insert() { let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await; - let provider = - IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone()) - .await - .unwrap(); + let provider = new_catalog_backed_provider(&catalog, &namespace, &table_name).await; let ctx = SessionContext::new(); ctx.register_table("test_table", Arc::new(provider)) @@ -595,10 +647,7 @@ mod tests { async fn test_physical_input_schema_consistent_with_logical_input_schema() { let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await; - let provider = - IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone()) - .await - .unwrap(); + let provider = new_catalog_backed_provider(&catalog, &namespace, &table_name).await; let ctx = SessionContext::new(); ctx.register_table("test_table", Arc::new(provider)) @@ -720,10 +769,7 @@ mod tests { let (catalog, namespace, table_name, _temp_dir) = get_partitioned_test_catalog_and_table(Some(true)).await; - let provider = - IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone()) - .await - .unwrap(); + let provider = new_catalog_backed_provider(&catalog, &namespace, &table_name).await; let ctx = SessionContext::new(); let input_schema = provider.schema(); @@ -752,10 +798,7 @@ mod tests { let (catalog, namespace, table_name, _temp_dir) = get_partitioned_test_catalog_and_table(Some(false)).await; - let provider = - IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone()) - .await - .unwrap(); + let provider = new_catalog_backed_provider(&catalog, &namespace, &table_name).await; let ctx = SessionContext::new(); let input_schema = provider.schema(); @@ -812,10 +855,7 @@ mod tests { let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await; - let provider = - IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone()) - .await - .unwrap(); + let provider = new_catalog_backed_provider(&catalog, &namespace, &table_name).await; let ctx = SessionContext::new(); let state = ctx.state();