Skip to content

feat(datafusion-ballista) Add Initial Integration for Ballisa Datafusion#2613

Open
NoahKusaba wants to merge 7 commits into
apache:mainfrom
NoahKusaba:feature/datafusion-ballista
Open

feat(datafusion-ballista) Add Initial Integration for Ballisa Datafusion#2613
NoahKusaba wants to merge 7 commits into
apache:mainfrom
NoahKusaba:feature/datafusion-ballista

Conversation

@NoahKusaba

@NoahKusaba NoahKusaba commented Jun 10, 2026

Copy link
Copy Markdown
Contributor

Which issue does this PR close?

What changes are included in this PR?

Adds a new iceberg-ballista crate, which provides a distributed-query driver for Apache Iceberg for a distributed datafusion engine Apache Datafusion-Ballista + the targeted changes to iceberg-datafusion that make Iceberg's existing plan nodes serializable so they can cross node boundaries.

The core problem it solves

Iceberg's DataFusion integration already produces complete physical read and write plans, but every Iceberg plan node holds live, non-serializable state (Arc, an open Table/FileIO). Ballista ships logical and physical plans to remote schedulers/executors, so those nodes couldn't travel. This branch closes that gap with one consistent idea: serialize a minimal self-contained recipe (IcebergCatalogConfig + identifiers), rebuild the live objects on the receiving node.

  • IcebergLogicalCodec: serializes the catalog-backed table provider (config + table ident, plus snapshot/metadata variants) so the scheduler can rebuild it and do physical planning, including INSERT.

  • IcebergPhysicalCodec: serializes the four Iceberg execution nodes (IcebergTableScan, IcebergWriteExec, IcebergCommitExec, IcebergMetadataScan) and the PartitionExpr physical expression.

  • Tagged-envelope wire framing (TAG_ICEBERG / TAG_DELEGATED):every blob carries a leading tag; non-Iceberg nodes are delegated to Ballista's own codec, so shuffles/sorts/etc. keep working and an unknown tag is a hard error--> Based off comments from https://github.com/milenkovicm/ballista_delta

  • bridge.rs runtime bridge: Each executor node needs to build an HTTP client with the iceberg catalog which requires an async-call, but PhysicalExtensionCodec from datafusion_proto try_decode is synchronous. The block_on function is a workaround to make this async function call blocking. Each try_decode also performs a load_table catalog round-trip per plan node to resolve the table's current metadata pointer

-Public API: register_iceberg_codecs(SessionConfig) and register_iceberg_table

  • Snapshot pinning at encode time

Are these changes tested?

Ballista tests:

  • Distributed reads / writes tested against dockerized minio iceberg catalog ( Standalone + multi-executor cluster). Also tests partitioned files writes + iceberg table registration
  • roundtrips testing that configurations for nodes are maintained through serialization -> deserialization

Datafusion tests:

  • Unit tests for config propagation and snapshot pinning.

@NoahKusaba

Copy link
Copy Markdown
Contributor Author

Still need to put more time into this, but it's in a state where it's ready for feedback.

@NoahKusaba NoahKusaba marked this pull request as draft June 12, 2026 11:26
@NoahKusaba NoahKusaba marked this pull request as ready for review June 13, 2026 04:25

@blackmwk blackmwk left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @NoahKusaba for this pr, but I don't think it's the right direction to maintain such a huge integration in this repo. Currently the review resources is quite limited, and most committers are not familiar with ballista. I think it would be better to put it in ballista repo

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

[DISCUSS] Add open table format support.

2 participants