-
Notifications
You must be signed in to change notification settings - Fork 375
Description
Summary
Add support for configuring a custom Tokio runtime handle for OpenDAL operations, enabling proper runtime segregation in DataFusion applications that separate CPU-bound and I/O-bound workloads across different Tokio runtimes.
Motivation
Applications currently must wrap Iceberg table providers with manual runtime switching:
// Current workaround: manually spawn all operations on IO runtime
let wrapped_table = IOTableProviderWrapper::new(
iceberg_table_provider,
io_runtime_handle
);This approach has a couple downsides:
- Poor seperation: Parquet fetching, decoding, and filter pushdown all happen on the I/O pool
- Complex code: Requires wrapper abstractions around every table provider
Existing Solutions for gRPC
Similar runtime segregation requirements have been solved elegantly in other contexts. For example, Tonic's gRPC client allows configuring a custom executor:
endpoint.executor(MaybeHandleExecutor(io_handle))This approach allows HTTP/network I/O to run on the I/O runtime while application logic runs on the CPU runtime.
Proposed Solution
API Design
Leverage iceberg-rust's existing Extensions mechanism to allow users to configure a Tokio runtime handle:
// In iceberg-rust: crates/iceberg/src/io/file_io.rs
/// Runtime handle for executing async I/O operations.
/// When provided, OpenDAL operations will use this runtime for spawning tasks.
#[derive(Clone, Debug)]
pub struct RuntimeHandle(pub tokio::runtime::Handle);
impl RuntimeHandle {
/// Create a new RuntimeHandle from a Tokio runtime handle
pub fn new(handle: tokio::runtime::Handle) -> Self {
Self(handle)
}
/// Get the current runtime handle
pub fn current() -> Self {
Self(tokio::runtime::Handle::current())
}
}Usage Example
// Create dedicated I/O runtime
let io_runtime = tokio::runtime::Builder::new_multi_thread()
.worker_threads(8)
.thread_name("io-pool")
.enable_io()
.enable_time()
.build()?;
// Configure FileIO with runtime handle
let file_io = FileIOBuilder::new("s3")
.with_extension(RuntimeHandle::new(io_runtime.handle().clone()))
.with_props(s3_config)
.build()?;
// Or configure via catalog
let catalog = RestCatalogBuilder::new()
.with_file_io_extension(RuntimeHandle::new(io_runtime.handle().clone()))
.with_props(catalog_config)
.build()?;Implementation Approach
- Create Custom OpenDAL Executor
Implement OpenDAL's Execute trait with a custom Tokio executor:
// In crates/iceberg/src/io/storage.rs
#[derive(Clone)]
struct CustomTokioExecutor {
handle: tokio::runtime::Handle,
}
impl opendal::Execute for CustomTokioExecutor {
fn execute(&self, f: Pin<Box<dyn Future<Output = ()> + Send>>) {
self.handle.spawn(f);
}
}- Extract RuntimeHandle from Extensions
Modify storage backend builders to check for RuntimeHandle in extensions:
// In crates/iceberg/src/io/storage.rs
pub(crate) fn build(file_io_builder: FileIOBuilder) -> Result<Self> {
let (scheme_str, props, extensions) = file_io_builder.into_parts();
// Extract runtime handle if provided
let executor = if let Some(runtime_handle) = extensions.get::<RuntimeHandle>() {
let exec = CustomTokioExecutor {
handle: Arc::unwrap_or_clone(runtime_handle).0
};
Some(opendal::Executor::with(exec))
} else {
None // Use OpenDAL default
};
// ... storage initialization
}- Apply Executor to Operators
Configure OpenDAL operators with the custom executor:
let mut operator = Operator::new(builder)?.finish();
if let Some(executor) = executor {
operator = operator.with_executor(executor);
}
operator = operator.layer(RetryLayer::new());