Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
571 changes: 517 additions & 54 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,12 @@ aws-sdk-s3tables = { version = "1.28", default-features = false, features = [
"rt-tokio",
] }
backon = "1.5.1"
ballista = "53"
ballista-core = "53"
ballista-executor = { version = "53", default-features = false, features = [
"arrow-ipc-optimizations",
] }
ballista-scheduler = { version = "53", default-features = false }
base64 = "0.22.1"
bimap = "0.6"
bytes = "1.11"
Expand All @@ -77,6 +83,7 @@ dashmap = "6"
datafusion = "53.1.0"
datafusion-cli = "53.0.0"
datafusion-ffi = "53.0.0"
datafusion-proto = "53.0.0"
datafusion-sqllogictest = "53.0.0"
derive_builder = "0.20"
dirs = "6"
Expand All @@ -98,6 +105,7 @@ http = "1.2"
iceberg = { version = "0.9.0", path = "./crates/iceberg" }
iceberg-catalog-glue = { version = "0.9.0", path = "./crates/catalog/glue" }
iceberg-catalog-hms = { version = "0.9.0", path = "./crates/catalog/hms" }
iceberg-catalog-loader = { version = "0.9.0", path = "./crates/catalog/loader" }
iceberg-catalog-rest = { version = "0.9.0", path = "./crates/catalog/rest" }
iceberg-catalog-s3tables = { version = "0.9.0", path = "./crates/catalog/s3tables" }
iceberg-catalog-sql = { version = "0.9.0", path = "./crates/catalog/sql" }
Expand Down
56 changes: 56 additions & 0 deletions crates/integrations/ballista/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

[package]
edition = { workspace = true }
homepage = { workspace = true }
name = "iceberg-ballista"
publish = true
rust-version = { workspace = true }
version = { workspace = true }

categories = ["database"]
description = "Apache Iceberg distributed read/write driver for Ballista"
keywords = ["iceberg", "ballista", "datafusion", "distributed"]
license = { workspace = true }
repository = { workspace = true }

[dependencies]
ballista-core = { workspace = true }
datafusion = { workspace = true }
datafusion-proto = { workspace = true }
iceberg = { workspace = true }
iceberg-catalog-loader = { workspace = true }
iceberg-datafusion = { workspace = true }
iceberg-storage-opendal = { workspace = true }
serde = { workspace = true, features = ["derive"] }
serde_json = { workspace = true }
tokio = { workspace = true }

[dev-dependencies]
arrow = { workspace = true }
ballista = { workspace = true, features = ["standalone"] }
# Used by the multi-executor integration test to start a scheduler + several
# in-process executors directly. Feature flags mirror what ballista's
# `standalone` feature already enables, so these reuse the same compiled
# artifacts rather than triggering a separate build.
ballista-executor = { workspace = true }
ballista-scheduler = { workspace = true }
env_logger = { workspace = true }
iceberg-catalog-rest = { workspace = true }
log = { workspace = true }
tokio = { workspace = true, features = ["macros", "rt-multi-thread"] }
158 changes: 158 additions & 0 deletions crates/integrations/ballista/examples/standalone-iceberg-write.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,158 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! # Standalone Iceberg write example
//!
//! Demonstrates distributed reads and writes against an Apache Iceberg table
//! from a standalone Ballista cluster.
//!
//! This example requires a running Iceberg REST catalog and MinIO. The easiest
//! way is to use the docker fixture shipped with `iceberg-rust`. From the
//! `iceberg-rust` workspace root:
//!
//! ```bash
//! make docker-up
//! cargo run -p iceberg-ballista --example standalone-iceberg-write
//! ```
//!
//! Endpoints can be overridden with the `ICEBERG_REST_URI` and
//! `ICEBERG_S3_ENDPOINT` environment variables.

use std::collections::HashMap;
use std::sync::Arc;

use ballista::datafusion::common::Result;
use ballista::datafusion::execution::SessionStateBuilder;
use ballista::datafusion::prelude::{SessionConfig, SessionContext};
use ballista::prelude::{SessionConfigExt, SessionContextExt};
use iceberg::spec::{NestedField, PrimitiveType, Schema, Type};
use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation, TableIdent};
use iceberg_ballista::{IcebergCatalogConfig, register_iceberg_codecs, register_iceberg_table};
use iceberg_catalog_rest::RestCatalogBuilder;
use iceberg_storage_opendal::OpenDalStorageFactory;

/// Catalog + storage properties for the REST catalog and its MinIO storage.
fn catalog_props() -> HashMap<String, String> {
let rest_uri =
std::env::var("ICEBERG_REST_URI").unwrap_or_else(|_| "http://localhost:8181".to_string());
let s3_endpoint = std::env::var("ICEBERG_S3_ENDPOINT")
.unwrap_or_else(|_| "http://localhost:9000".to_string());
HashMap::from([
("uri".to_string(), rest_uri),
("s3.endpoint".to_string(), s3_endpoint),
("s3.access-key-id".to_string(), "admin".to_string()),
("s3.secret-access-key".to_string(), "password".to_string()),
("s3.region".to_string(), "us-east-1".to_string()),
("s3.path-style-access".to_string(), "true".to_string()),
])
}

/// Creates the demo namespace and table in the catalog if they do not exist.
async fn ensure_table(props: &HashMap<String, String>) -> Result<(NamespaceIdent, String)> {
let catalog = RestCatalogBuilder::default()
.with_storage_factory(Arc::new(OpenDalStorageFactory::S3 {
customized_credential_load: None,
}))
.load("rest", props.clone())
.await
.expect("build rest catalog");

let namespace = NamespaceIdent::new("ballista_demo".to_string());
if !catalog
.namespace_exists(&namespace)
.await
.expect("ns exists")
{
catalog
.create_namespace(&namespace, HashMap::new())
.await
.expect("create namespace");
}

let table_ident = TableIdent::new(namespace.clone(), "events".to_string());
if !catalog
.table_exists(&table_ident)
.await
.expect("table exists")
{
let schema = Schema::builder()
.with_schema_id(0)
.with_fields(vec![
NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(),
NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(),
])
.build()
.expect("build schema");
let creation = TableCreation::builder()
.name("events".to_string())
.schema(schema)
.properties(HashMap::new())
.build();
catalog
.create_table(&namespace, creation)
.await
.expect("create table");
}

Ok((namespace, "events".to_string()))
}

#[tokio::main]
async fn main() -> Result<()> {
let _ = env_logger::builder()
.filter_level(log::LevelFilter::Info)
.try_init();

let props = catalog_props();

// Make sure the target table exists in the catalog.
let (namespace, table) = ensure_table(&props).await?;

// Build a Ballista session config with the Iceberg codecs installed, so the
// standalone scheduler and executor can serialize the Iceberg plan nodes.
let config = register_iceberg_codecs(
SessionConfig::new_with_ballista()
.with_target_partitions(2)
.with_ballista_standalone_parallelism(2),
);
let state = SessionStateBuilder::new()
.with_config(config)
.with_default_features()
.build();
let ctx = SessionContext::standalone_with_state(state).await?;

// Register the catalog-backed Iceberg table for distributed reads and writes.
let catalog_config = IcebergCatalogConfig::new("rest", "rest", props);
register_iceberg_table(&ctx, "events", catalog_config, namespace, table).await?;

// Distributed INSERT: IcebergWriteExec runs across the cluster and
// IcebergCommitExec atomically appends the data files to the table.
println!("== INSERT ==");
ctx.sql("INSERT INTO events VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')")
.await?
.show()
.await?;

// Read it back through the distributed scan.
println!("== SELECT ==");
ctx.sql("SELECT id, name FROM events ORDER BY id")
.await?
.show()
.await?;

Ok(())
}
31 changes: 31 additions & 0 deletions crates/integrations/ballista/public-api.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
pub mod iceberg_ballista
pub use iceberg_ballista::IcebergCatalogConfig
pub struct iceberg_ballista::IcebergLogicalCodec
impl iceberg_ballista::IcebergLogicalCodec
pub fn iceberg_ballista::IcebergLogicalCodec::new(inner: alloc::sync::Arc<dyn datafusion_proto::logical_plan::LogicalExtensionCodec>) -> Self
impl core::default::Default for iceberg_ballista::IcebergLogicalCodec
pub fn iceberg_ballista::IcebergLogicalCodec::default() -> Self
impl core::fmt::Debug for iceberg_ballista::IcebergLogicalCodec
pub fn iceberg_ballista::IcebergLogicalCodec::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result
impl datafusion_proto::logical_plan::LogicalExtensionCodec for iceberg_ballista::IcebergLogicalCodec
pub fn iceberg_ballista::IcebergLogicalCodec::try_decode(&self, buf: &[u8], inputs: &[datafusion_expr::logical_plan::plan::LogicalPlan], ctx: &datafusion_execution::task::TaskContext) -> core::result::Result<datafusion_expr::logical_plan::plan::Extension, datafusion_common::error::DataFusionError>
pub fn iceberg_ballista::IcebergLogicalCodec::try_decode_file_format(&self, buf: &[u8], ctx: &datafusion_execution::task::TaskContext) -> core::result::Result<alloc::sync::Arc<dyn datafusion_datasource::file_format::FileFormatFactory>, datafusion_common::error::DataFusionError>
pub fn iceberg_ballista::IcebergLogicalCodec::try_decode_table_provider(&self, buf: &[u8], table_ref: &datafusion_common::table_reference::TableReference, schema: arrow_schema::schema::SchemaRef, ctx: &datafusion_execution::task::TaskContext) -> core::result::Result<alloc::sync::Arc<dyn datafusion_catalog::table::TableProvider>, datafusion_common::error::DataFusionError>
pub fn iceberg_ballista::IcebergLogicalCodec::try_encode(&self, node: &datafusion_expr::logical_plan::plan::Extension, buf: &mut alloc::vec::Vec<u8>) -> core::result::Result<(), datafusion_common::error::DataFusionError>
pub fn iceberg_ballista::IcebergLogicalCodec::try_encode_file_format(&self, buf: &mut alloc::vec::Vec<u8>, node: alloc::sync::Arc<dyn datafusion_datasource::file_format::FileFormatFactory>) -> core::result::Result<(), datafusion_common::error::DataFusionError>
pub fn iceberg_ballista::IcebergLogicalCodec::try_encode_table_provider(&self, table_ref: &datafusion_common::table_reference::TableReference, node: alloc::sync::Arc<dyn datafusion_catalog::table::TableProvider>, buf: &mut alloc::vec::Vec<u8>) -> core::result::Result<(), datafusion_common::error::DataFusionError>
pub struct iceberg_ballista::IcebergPhysicalCodec
impl iceberg_ballista::IcebergPhysicalCodec
pub fn iceberg_ballista::IcebergPhysicalCodec::new(inner: alloc::sync::Arc<dyn datafusion_proto::physical_plan::PhysicalExtensionCodec>) -> Self
impl core::default::Default for iceberg_ballista::IcebergPhysicalCodec
pub fn iceberg_ballista::IcebergPhysicalCodec::default() -> Self
impl core::fmt::Debug for iceberg_ballista::IcebergPhysicalCodec
pub fn iceberg_ballista::IcebergPhysicalCodec::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result
impl datafusion_proto::physical_plan::PhysicalExtensionCodec for iceberg_ballista::IcebergPhysicalCodec
pub fn iceberg_ballista::IcebergPhysicalCodec::try_decode(&self, buf: &[u8], inputs: &[alloc::sync::Arc<dyn datafusion_physical_plan::execution_plan::ExecutionPlan>], ctx: &datafusion_execution::task::TaskContext) -> core::result::Result<alloc::sync::Arc<dyn datafusion_physical_plan::execution_plan::ExecutionPlan>, datafusion_common::error::DataFusionError>
pub fn iceberg_ballista::IcebergPhysicalCodec::try_decode_expr(&self, buf: &[u8], inputs: &[alloc::sync::Arc<dyn datafusion_physical_expr_common::physical_expr::PhysicalExpr>]) -> core::result::Result<alloc::sync::Arc<dyn datafusion_physical_expr_common::physical_expr::PhysicalExpr>, datafusion_common::error::DataFusionError>
pub fn iceberg_ballista::IcebergPhysicalCodec::try_encode(&self, node: alloc::sync::Arc<dyn datafusion_physical_plan::execution_plan::ExecutionPlan>, buf: &mut alloc::vec::Vec<u8>) -> core::result::Result<(), datafusion_common::error::DataFusionError>
pub fn iceberg_ballista::IcebergPhysicalCodec::try_encode_expr(&self, node: &alloc::sync::Arc<dyn datafusion_physical_expr_common::physical_expr::PhysicalExpr>, buf: &mut alloc::vec::Vec<u8>) -> core::result::Result<(), datafusion_common::error::DataFusionError>
pub async fn iceberg_ballista::register_iceberg_catalog(ctx: &datafusion::execution::context::SessionContext, register_name: &str, config: iceberg_datafusion::catalog_config::IcebergCatalogConfig) -> core::result::Result<(), datafusion_common::error::DataFusionError>
pub fn iceberg_ballista::register_iceberg_codecs(config: datafusion_execution::config::SessionConfig) -> datafusion_execution::config::SessionConfig
pub async fn iceberg_ballista::register_iceberg_table(ctx: &datafusion::execution::context::SessionContext, register_name: &str, config: iceberg_datafusion::catalog_config::IcebergCatalogConfig, namespace: iceberg::catalog::NamespaceIdent, table: impl core::convert::Into<alloc::string::String>) -> core::result::Result<(), datafusion_common::error::DataFusionError>
Loading
Loading