Route FileIO through Iceberg runtime#22
Conversation
|
8e8f82a to
878a9a9
Compare
c96a871 to
4c1d3b1
Compare
|
@codex review |
|
Codex Review: Didn't find any major issues. Chef's kiss. ℹ️ About Codex in GitHubYour team has set up Codex to review pull requests in this repo. Reviews are triggered when you
If Codex has suggestions, it will comment; otherwise it will react with 👍. Codex can also answer questions or update the PR. Try commenting "@codex address that feedback". |
9a14913 to
6e0b315
Compare
|
Any chance to contribute something like this upstream? I can imagine how this is a problem that anyone with CPU/IO runtime separation will have in the community |
@gabotechs That's the goal for sure. But as discussed with @toutane about contributing #20 upstream, the runtime changes in our fork are currently depending on apache#2298 , which isn't merged upstream yet. |
6e0b315 to
a434203
Compare
|
@codex review |
|
Codex Review: Didn't find any major issues. Keep them coming! ℹ️ About Codex in GitHubYour team has set up Codex to review pull requests in this repo. Reviews are triggered when you
If Codex has suggestions, it will comment; otherwise it will react with 👍. Codex can also answer questions or update the PR. Try commenting "@codex address that feedback". |
toutane
left a comment
There was a problem hiding this comment.
Looks very good in deed!
I'm not very familiar with the Storage trait but I think your wrapper looks very neat.
I think a refactor is now possible in DataFusion's table/mod.rs to more precisely separate what needs to run on the IO vs. the CPU runtime. Wdyt?
| .run_on_io(async move { | ||
| catalog | ||
| let table = catalog | ||
| .load_table(&table_ident) |
There was a problem hiding this comment.
Same thing here, we could use load_table_on_io now and get rid of the run_on_io call.
There was a problem hiding this comment.
Codex here: agreed. I updated insert_into to use load_table_on_io directly and removed run_on_io entirely.
The new shape is:
let table = Self::load_table_on_io(
self.catalog.clone(),
self.table_ident.clone(),
self.runtime.as_ref(),
)
.await
.map_err(to_datafusion_error)?;There was a problem hiding this comment.
I think the run_on_io wrapping here is now redundant, and maybe even counter-productive given that load_table_on_io is available.
The only operations in this closure that genuinely benefit from being routed to the IO runtime are the load_table call and get_manifest_list inside plan_files.
Either we modify plan_files to spawn get_manifest_list on the IO runtime itself, or we narrow run_on_io down to just wrapping the plan_files call.
Happy to address it in a follow-up PR if it sounds good to you!
There was a problem hiding this comment.
That's a great point! Looks like we can indeed get rid of run_on_io completely, which will simplify things quite a bit.
There was a problem hiding this comment.
Codex here: agreed, and I took the narrower version now instead of leaving it for a follow-up.
scan() now routes only the catalog refresh through load_table_on_io; scan building and plan_files() collection stay on the caller runtime. The loaded Table still carries the Iceberg runtime, so manifest/FileIO operations dispatch through their own runtime-aware paths.
let table = Self::load_table_on_io(...).await.map_err(to_datafusion_error)?;
let tasks: Vec<FileScanTask> = builder
.build()
.map_err(to_datafusion_error)?
.plan_files()
.await
.map_err(to_datafusion_error)?
.try_collect::<Vec<_>>()
.await
.map_err(to_datafusion_error)?;
@geoffreyclaude I think what we're lacking upstream is the One possible solution would be to include |
a434203 to
7ae0d92
Compare
|
Codex update, replying to #22 (comment): @toutane @gabotechs we now have a dedicated upstream path for this, independent of apache#2298:
The draft PR includes the DataFusion Could you both take an initial look when you have a chance, especially at whether the API shape matches the direction you had in mind? |
Summary
This adds runtime-aware
FileIOconstruction for callers that keep storage IO on a dedicated Tokio runtime.When an IO runtime is configured,
FileIOstill caches the raw backend storage, but exposes operations through a privateRuntimeStorageadapter. Direct storage calls, reader creation, byte-range reads throughFileRead::read(range), writer creation, writer chunks, and writer close are dispatched throughruntime.io().The public API has two entry points:
FileIO::with_runtime(runtime)/FileIOBuilder::with_runtime(runtime)for callers that already have a full IcebergRuntime.FileIO::with_io_runtime(handle)/FileIOBuilder::with_io_runtime(handle)for callers that only need to route storage IO.RestCatalogBuilder::with_file_io_runtime(handle)uses the IO-only path for long-lived REST catalogs without assigning a full table runtime. ExistingCatalogBuilder::with_runtime(runtime)behavior remains the full-runtime path for loaded tables.Concrete storage backends remain runtime-agnostic.
Runtime Impact
This PR intentionally routes storage operations, not every piece of Iceberg metadata processing.
Callers with separate runtimes can pass explicit handles with
Runtime::new_with_handles(io, cpu). The storage adapter uses the IO half for storage scheduling.Data-file Parquet scan CPU stays where the returned
RecordBatchStreamis polled. For DataFusion callers, decode, decompression, row filtering, projection, and Iceberg batch transformation remain on the query runtime polling the stream, while byte-range reads run through the IO runtime.Catalog-backed DataFusion provider paths now use the IO runtime only for catalog reloads. Scan building and
plan_files()collection stay on the caller runtime; loaded tables still carry the Iceberg runtime, so manifest and FileIO operations dispatch through their own runtime-aware paths.Manifest planning and delete metadata processing keep their existing scheduling behavior. Existing Iceberg tasks that already use
runtime.cpu()continue to use the supplied CPU handle, but this PR does not add a custom CPU spawn/accounting hook and does not broaden metadata offloading.Shape
Before, callers that wanted storage IO off the query runtime had to move the whole scan stream onto IO:
After, only storage work crosses into the IO runtime; scan CPU remains on the caller runtime:
Why This Layer
Parquet readers perform byte-range reads through Iceberg
FileReadobjects after scan planning. Routing at the storage adapter layer covers those IO operations without moving the whole DataFusion scan stream onto the IO runtime and without making concrete storage backends runtime-aware.The REST catalog IO-only hook is for catalog construction paths that should route FileIO storage work to IO without assigning a long-lived CPU runtime to every loaded table. Query-created tables can still be rebound later with a full runtime.
Validation
cargo fmt --checkcargo check -p iceberg -p iceberg-catalog-rest -p iceberg-storage-opendal -p iceberg-datafusion --lockedcargo test -p iceberg file_io --lockedcargo test -p iceberg test_runtime_with_handles_uses_explicit_cpu_handle --lockedcargo test -p iceberg-catalog-rest test_load_table_with_file_io_runtime_routes_storage_to_io --lockedcargo test -p iceberg-datafusion test_catalog_backed_provider --lockedcargo test -p iceberg test_plan_files --locked