diff --git a/Cargo.lock b/Cargo.lock index 5bb728a838..94486634c3 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -272,6 +272,15 @@ dependencies = [ "object", ] +[[package]] +name = "arc-swap" +version = "1.9.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a3a1fd6f75306b68087b831f025c712524bcb19aad54e557b1129cfa0a2b207" +dependencies = [ + "rustversion", +] + [[package]] name = "array-init" version = "2.1.0" @@ -407,6 +416,34 @@ dependencies = [ "num-traits", ] +[[package]] +name = "arrow-flight" +version = "58.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "302b2e036335f3f04d65dad3f74ff1f2aae6dc671d6aa04dc6b61193761e16fb" +dependencies = [ + "arrow-arith", + "arrow-array", + "arrow-buffer", + "arrow-cast", + "arrow-data", + "arrow-ipc", + "arrow-ord", + "arrow-row", + "arrow-schema", + "arrow-select", + "arrow-string", + "base64", + "bytes", + "futures", + "once_cell", + "paste", + "prost", + "prost-types", + "tonic", + "tonic-prost", +] + [[package]] name = "arrow-ipc" version = "58.1.0" @@ -644,9 +681,9 @@ checksum = "c08606f8c3cbf4ce6ec8e28fb0014a2c086708fe954eaa885384a6165172e7e8" [[package]] name = "aws-config" -version = "1.8.15" +version = "1.8.18" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "11493b0bad143270fb8ad284a096dd529ba91924c5409adeac856cc1bf047dbc" +checksum = "e33f815b73a3899c03b380d543532e5865f230dce9678d108dc10732a8682275" dependencies = [ "aws-credential-types", "aws-runtime", @@ -658,6 +695,7 @@ dependencies = [ "aws-smithy-json", "aws-smithy-runtime", "aws-smithy-runtime-api", + "aws-smithy-schema", "aws-smithy-types", "aws-types", "bytes", @@ -709,9 +747,9 @@ dependencies = [ [[package]] name = "aws-runtime" -version = "1.7.2" +version = "1.7.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5fc0651c57e384202e47153c1260b84a9936e19803d747615edf199dc3b98d17" +checksum = "6c9b9de216a988dd54b754a82a7660cfe14cee4f6782ae4524470972fa0ccb39" dependencies = [ "aws-credential-types", "aws-sigv4", @@ -782,10 +820,11 @@ dependencies = [ [[package]] name = "aws-sdk-sso" -version = "1.97.0" +version = "1.102.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9aadc669e184501caaa6beafb28c6267fc1baef0810fb58f9b205485ca3f2567" +checksum = "8c82b3ac19f1431854f7ace3a7531674633e286bfdde21976893bfee36fd493b" dependencies = [ + "arc-swap", "aws-credential-types", "aws-runtime", "aws-smithy-async", @@ -806,10 +845,11 @@ dependencies = [ [[package]] name = "aws-sdk-ssooidc" -version = "1.99.0" +version = "1.104.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1342a7db8f358d3de0aed2007a0b54e875458e39848d54cc1d46700b2bfcb0a8" +checksum = "321000d2b4c5519ee573f73167f612efd7329322d9b26969ad1979f0427f1913" dependencies = [ + "arc-swap", "aws-credential-types", "aws-runtime", "aws-smithy-async", @@ -830,10 +870,11 @@ dependencies = [ [[package]] name = "aws-sdk-sts" -version = "1.101.0" +version = "1.107.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab41ad64e4051ecabeea802d6a17845a91e83287e1dd249e6963ea1ba78c428a" +checksum = "3d0d328ba962af23ecfa3c9f23b98d3d35e325fa218d7f13d17a6bf522f8a560" dependencies = [ + "arc-swap", "aws-credential-types", "aws-runtime", "aws-smithy-async", @@ -855,9 +896,9 @@ dependencies = [ [[package]] name = "aws-sigv4" -version = "1.4.2" +version = "1.4.5" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b0b660013a6683ab23797778e21f1f854744fdf05f68204b4cca4c8c04b5d1f4" +checksum = "bae38512beae0ffee7010fc24e7a8a123c53efdfef42a61e80fda4882418dc71" dependencies = [ "aws-credential-types", "aws-smithy-http", @@ -866,11 +907,11 @@ dependencies = [ "bytes", "form_urlencoded", "hex", - "hmac", + "hmac 0.13.0", "http 0.2.12", "http 1.4.0", "percent-encoding", - "sha2", + "sha2 0.11.0", "time", "tracing", ] @@ -933,10 +974,12 @@ dependencies = [ [[package]] name = "aws-smithy-json" -version = "0.62.5" +version = "0.62.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9648b0bb82a2eedd844052c6ad2a1a822d1f8e3adee5fbf668366717e428856a" +checksum = "701a947f4797e52a911e114a898667c746c39feea467bbd1abd7b3721f702ffa" dependencies = [ + "aws-smithy-runtime-api", + "aws-smithy-schema", "aws-smithy-types", ] @@ -961,15 +1004,16 @@ dependencies = [ [[package]] name = "aws-smithy-runtime" -version = "1.10.3" +version = "1.11.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "028999056d2d2fd58a697232f9eec4a643cf73a71cf327690a7edad1d2af2110" +checksum = "b8e6f5caf6fea86f8c2206541ab5857cfcda9013426cdbe8fa0098b9e2d32182" dependencies = [ "aws-smithy-async", "aws-smithy-http", "aws-smithy-http-client", "aws-smithy-observability", "aws-smithy-runtime-api", + "aws-smithy-schema", "aws-smithy-types", "bytes", "fastrand", @@ -986,11 +1030,12 @@ dependencies = [ [[package]] name = "aws-smithy-runtime-api" -version = "1.11.6" +version = "1.12.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "876ab3c9c29791ba4ba02b780a3049e21ec63dabda09268b175272c3733a79e6" +checksum = "9db177daa6ba8afb9ee1aefcf548c907abcf52065e394ee11a92780057fe0e8c" dependencies = [ "aws-smithy-async", + "aws-smithy-runtime-api-macros", "aws-smithy-types", "bytes", "http 0.2.12", @@ -1001,11 +1046,33 @@ dependencies = [ "zeroize", ] +[[package]] +name = "aws-smithy-runtime-api-macros" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d7396fd9500589e62e460e987ecb671bad374934e55ec3b5f498cc7a8a8a7b7" +dependencies = [ + "proc-macro2", + "quote", + "syn 2.0.117", +] + +[[package]] +name = "aws-smithy-schema" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7442cb268338f0eb8278140a107c046756aa01093d8ef5e99628d34ae09c94f5" +dependencies = [ + "aws-smithy-runtime-api", + "aws-smithy-types", + "http 1.4.0", +] + [[package]] name = "aws-smithy-types" -version = "1.4.7" +version = "1.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d73dbfbaa8e4bc57b9045137680b958d274823509a360abfd8e1d514d40c95c" +checksum = "32b42fcf341259d85ca10fac9a2f6448a8ec691c6955a18e45bc3b71a85fab85" dependencies = [ "base64-simd", "bytes", @@ -1038,18 +1105,71 @@ dependencies = [ [[package]] name = "aws-types" -version = "1.3.14" +version = "1.3.16" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "47c8323699dd9b3c8d5b3c13051ae9cdef58fd179957c882f8374dd8725962d9" +checksum = "d16bf10b03a3c01e6b3b7d47cd964e873ffe9e7d4e80fad16bd4c077cb068531" dependencies = [ "aws-credential-types", "aws-smithy-async", "aws-smithy-runtime-api", + "aws-smithy-schema", "aws-smithy-types", "rustc_version", "tracing", ] +[[package]] +name = "axum" +version = "0.8.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "31b698c5f9a010f6573133b09e0de5408834d0c82f8d7475a89fc1867a71cd90" +dependencies = [ + "axum-core", + "bytes", + "form_urlencoded", + "futures-util", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", + "hyper", + "hyper-util", + "itoa", + "matchit", + "memchr", + "mime", + "percent-encoding", + "pin-project-lite", + "serde_core", + "serde_json", + "serde_path_to_error", + "serde_urlencoded", + "sync_wrapper", + "tokio", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "axum-core" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08c78f31d7b1291f7ee735c1c6780ccde7785daae9a9206026862dab7d8792d1" +dependencies = [ + "bytes", + "futures-core", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", + "mime", + "pin-project-lite", + "sync_wrapper", + "tower-layer", + "tower-service", + "tracing", +] + [[package]] name = "backon" version = "1.6.0" @@ -1061,6 +1181,126 @@ dependencies = [ "tokio", ] +[[package]] +name = "ballista" +version = "53.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "35a651c6cca5ef37d0add06087bfcfd65b7d670f024849073aa50e8aae519fbb" +dependencies = [ + "async-trait", + "ballista-core", + "ballista-executor", + "ballista-scheduler", + "datafusion", + "log", + "tokio", + "url", +] + +[[package]] +name = "ballista-core" +version = "53.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "140cbfc86a7ee22a1d7ce2beadbf546bb2f84382c667e9fc087ac89695c9d73d" +dependencies = [ + "arrow-flight", + "async-trait", + "aws-config", + "aws-credential-types", + "chrono", + "clap", + "datafusion", + "datafusion-proto", + "datafusion-proto-common", + "futures", + "itertools 0.14.0", + "log", + "md-5 0.11.0", + "object_store", + "parking_lot", + "prost", + "prost-types", + "rand 0.10.1", + "rustc_version", + "serde", + "tokio", + "tokio-stream", + "tonic", + "tonic-build", + "tonic-prost", + "tonic-prost-build", + "url", + "uuid", +] + +[[package]] +name = "ballista-executor" +version = "53.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "81cd251d7dc9169cdc821a8ba4fbe793fb5aaca01847ef5d3d8c0aea378a459e" +dependencies = [ + "arrow", + "arrow-flight", + "async-trait", + "ballista-core", + "bytesize", + "clap", + "dashmap", + "datafusion", + "datafusion-proto", + "futures", + "libc", + "log", + "memory-stats", + "mimalloc", + "parking_lot", + "serde", + "sysinfo", + "tempfile", + "tokio", + "tokio-stream", + "tokio-util", + "tonic", + "tracing", + "tracing-appender", + "tracing-subscriber", + "uuid", +] + +[[package]] +name = "ballista-scheduler" +version = "53.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4e4921eda69899d12d868e9337ce989f4f11f81ca296d1484c422a0491d9b4e" +dependencies = [ + "arrow-flight", + "async-trait", + "axum", + "ballista-core", + "clap", + "dashmap", + "datafusion", + "datafusion-proto", + "datafusion-substrait", + "futures", + "http 1.4.0", + "insta", + "log", + "object_store", + "parking_lot", + "prost", + "prost-types", + "rand 0.10.1", + "serde", + "tokio", + "tokio-stream", + "tonic", + "tracing", + "tracing-appender", + "tracing-subscriber", + "uuid", +] + [[package]] name = "base64" version = "0.22.1" @@ -1289,6 +1529,12 @@ dependencies = [ "either", ] +[[package]] +name = "bytesize" +version = "2.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6bd91ee7b2422bcb158d90ef4d14f75ef67f340943fc4149891dcce8f8b972a3" + [[package]] name = "bzip2" version = "0.6.1" @@ -1440,6 +1686,12 @@ dependencies = [ "cc", ] +[[package]] +name = "cmov" +version = "0.5.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0c9ea0ac24bc397ab3c98583a3c9ba74fa56b09a4449bbe172b9b1ddb016027a" + [[package]] name = "colorchoice" version = "1.0.5" @@ -1452,7 +1704,7 @@ version = "3.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "faf9468729b8cbcea668e36183cb69d317348c2e08e994829fb56ebfdfbaac34" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.61.2", ] [[package]] @@ -1786,6 +2038,15 @@ dependencies = [ "cipher", ] +[[package]] +name = "ctutils" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7d5515a3834141de9eafb9717ad39eea8247b5674e6066c404e8c4b365d2a29e" +dependencies = [ + "cmov", +] + [[package]] name = "darling" version = "0.20.11" @@ -2318,7 +2579,7 @@ dependencies = [ "num-traits", "rand 0.9.4", "regex", - "sha2", + "sha2 0.10.9", "unicode-segmentation", "uuid", ] @@ -2658,7 +2919,7 @@ dependencies = [ "rand 0.9.4", "serde_json", "sha1", - "sha2", + "sha2 0.10.9", "url", ] @@ -2806,6 +3067,7 @@ dependencies = [ "block-buffer 0.12.0", "const-oid 0.10.2", "crypto-common 0.2.2", + "ctutils", ] [[package]] @@ -2826,7 +3088,7 @@ dependencies = [ "libc", "option-ext", "redox_users", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -2994,7 +3256,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" dependencies = [ "libc", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -3547,7 +3809,7 @@ version = "0.12.4" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7b5f8eb2ad728638ea2c7d47a21db23b7b58a72ed6a38256b8a1849f15fbbdf7" dependencies = [ - "hmac", + "hmac 0.12.1", ] [[package]] @@ -3559,6 +3821,15 @@ dependencies = [ "digest 0.10.7", ] +[[package]] +name = "hmac" +version = "0.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6303bc9732ae41b04cb554b844a762b4115a61bfaa81e3e83050991eeb56863f" +dependencies = [ + "digest 0.11.3", +] + [[package]] name = "home" version = "0.5.11" @@ -3690,6 +3961,19 @@ dependencies = [ "tower-service", ] +[[package]] +name = "hyper-timeout" +version = "0.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b90d566bffbce6a75bd8b09a05aa8c2cb1fabb6cb348f8840c9e4c90a0d83b0" +dependencies = [ + "hyper", + "hyper-util", + "pin-project-lite", + "tokio", + "tower-service", +] + [[package]] name = "hyper-util" version = "0.1.20" @@ -3707,7 +3991,7 @@ dependencies = [ "libc", "percent-encoding", "pin-project-lite", - "socket2 0.5.10", + "socket2 0.6.3", "system-configuration", "tokio", "tower-service", @@ -3801,6 +4085,29 @@ dependencies = [ "zstd", ] +[[package]] +name = "iceberg-ballista" +version = "0.9.0" +dependencies = [ + "arrow", + "ballista", + "ballista-core", + "ballista-executor", + "ballista-scheduler", + "datafusion", + "datafusion-proto", + "env_logger", + "iceberg", + "iceberg-catalog-loader", + "iceberg-catalog-rest", + "iceberg-datafusion", + "iceberg-storage-opendal", + "log", + "serde", + "serde_json", + "tokio", +] + [[package]] name = "iceberg-cache-moka" version = "0.9.0" @@ -4199,6 +4506,18 @@ dependencies = [ "generic-array", ] +[[package]] +name = "insta" +version = "1.48.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86f0f8fee8c926415c58d6ae43a08523a26faccb2323f5e6b644fe7dd4ef6b82" +dependencies = [ + "console", + "once_cell", + "similar", + "tempfile", +] + [[package]] name = "integer-encoding" version = "3.0.4" @@ -4284,7 +4603,7 @@ dependencies = [ "portable-atomic-util", "serde_core", "wasm-bindgen", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -4676,6 +4995,12 @@ dependencies = [ "regex-automata", ] +[[package]] +name = "matchit" +version = "0.8.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "47e1ffaa40ddd1f3ed91f717a33c8c0ee23fff369e3aa8772b9605cc1d22f4c3" + [[package]] name = "md-5" version = "0.10.6" @@ -4726,6 +5051,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "memory-stats" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c73f5c649995a115e1a0220b35e4df0a1294500477f97a91d0660fb5abeb574a" +dependencies = [ + "libc", + "windows-sys 0.52.0", +] + [[package]] name = "metainfo" version = "0.7.14" @@ -4748,6 +5083,12 @@ dependencies = [ "libmimalloc-sys", ] +[[package]] +name = "mime" +version = "0.3.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6877bb514081ee2a7ff5ef9de3281f14a4dd4bceac4c09388074a6b5df8a139a" + [[package]] name = "minijinja" version = "2.20.0" @@ -4966,7 +5307,7 @@ version = "0.50.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "7957b9740744892f114936ab4a57b3f487491bbeafaf8083688b16841a4240e5" dependencies = [ - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -5553,7 +5894,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "f8ed6a7761f76e3b9f92dfb0a60a6a6477c61024b775147ff0973a02653abaf2" dependencies = [ "digest 0.10.7", - "hmac", + "hmac 0.12.1", ] [[package]] @@ -5687,7 +6028,7 @@ dependencies = [ "der", "pbkdf2", "scrypt", - "sha2", + "sha2 0.10.9", "spki", ] @@ -5854,6 +6195,8 @@ dependencies = [ "prettyplease", "prost", "prost-types", + "pulldown-cmark", + "pulldown-cmark-to-cmark", "regex", "syn 2.0.117", "tempfile", @@ -5911,6 +6254,26 @@ dependencies = [ "syn 2.0.117", ] +[[package]] +name = "pulldown-cmark" +version = "0.13.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e9f068eba8e7071c5f9511831b44f32c740d5adf574e990f946ddb53db2f314e" +dependencies = [ + "bitflags", + "memchr", + "unicase", +] + +[[package]] +name = "pulldown-cmark-to-cmark" +version = "22.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50793def1b900256624a709439404384204a5dc3a6ec580281bfaac35e882e90" +dependencies = [ + "pulldown-cmark", +] + [[package]] name = "pyiceberg_core_rust" version = "0.9.0" @@ -6011,7 +6374,7 @@ dependencies = [ "quinn-udp", "rustc-hash", "rustls", - "socket2 0.5.10", + "socket2 0.6.3", "thiserror 2.0.18", "tokio", "tracing", @@ -6049,9 +6412,9 @@ dependencies = [ "cfg_aliases", "libc", "once_cell", - "socket2 0.5.10", + "socket2 0.6.3", "tracing", - "windows-sys 0.59.0", + "windows-sys 0.60.2", ] [[package]] @@ -6388,13 +6751,13 @@ dependencies = [ "form_urlencoded", "futures", "hex", - "hmac", + "hmac 0.12.1", "http 1.4.0", "jiff", "log", "percent-encoding", "sha1", - "sha2", + "sha2 0.10.9", "windows-sys 0.61.2", ] @@ -6425,7 +6788,7 @@ dependencies = [ "rsa", "serde", "serde_json", - "sha2", + "sha2 0.10.9", "tokio", ] @@ -6592,7 +6955,7 @@ dependencies = [ "pkcs1", "pkcs8", "rand_core 0.6.4", - "sha2", + "sha2 0.10.9", "signature", "spki", "subtle", @@ -6666,7 +7029,7 @@ dependencies = [ "errno", "libc", "linux-raw-sys", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -6724,7 +7087,7 @@ dependencies = [ "security-framework", "security-framework-sys", "webpki-root-certs", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -6874,7 +7237,7 @@ checksum = "0516a385866c09368f0b5bcd1caff3366aace790fcd46e2bb032697bb172fd1f" dependencies = [ "pbkdf2", "salsa20", - "sha2", + "sha2 0.10.9", ] [[package]] @@ -7004,6 +7367,17 @@ dependencies = [ "zmij", ] +[[package]] +name = "serde_path_to_error" +version = "0.1.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "10a9ff822e371bb5403e391ecd83e182e0e77ba7f6fe0160b795797109d1b457" +dependencies = [ + "itoa", + "serde", + "serde_core", +] + [[package]] name = "serde_repr" version = "0.1.20" @@ -7116,6 +7490,17 @@ dependencies = [ "sha2-asm", ] +[[package]] +name = "sha2" +version = "0.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "446ba717509524cb3f22f17ecc096f10f4822d76ab5c0b9822c5f9c284e825f4" +dependencies = [ + "cfg-if 1.0.4", + "cpufeatures 0.3.0", + "digest 0.11.3", +] + [[package]] name = "sha2-asm" version = "0.6.4" @@ -7401,7 +7786,7 @@ dependencies = [ "rustls", "serde", "serde_json", - "sha2", + "sha2 0.10.9", "smallvec", "thiserror 2.0.18", "tokio", @@ -7439,7 +7824,7 @@ dependencies = [ "quote", "serde", "serde_json", - "sha2", + "sha2 0.10.9", "sqlx-core", "sqlx-sqlite", "syn 2.0.117", @@ -7469,7 +7854,7 @@ dependencies = [ "generic-array", "hex", "hkdf", - "hmac", + "hmac 0.12.1", "itoa", "log", "md-5 0.10.6", @@ -7479,7 +7864,7 @@ dependencies = [ "rand 0.8.5", "rsa", "sha1", - "sha2", + "sha2 0.10.9", "smallvec", "sqlx-core", "stringprep", @@ -7506,7 +7891,7 @@ dependencies = [ "futures-util", "hex", "hkdf", - "hmac", + "hmac 0.12.1", "home", "itoa", "log", @@ -7516,7 +7901,7 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", - "sha2", + "sha2 0.10.9", "smallvec", "sqlx-core", "stringprep", @@ -7768,7 +8153,7 @@ dependencies = [ "getrandom 0.4.2", "once_cell", "rustix", - "windows-sys 0.59.0", + "windows-sys 0.61.2", ] [[package]] @@ -8047,6 +8432,74 @@ version = "0.1.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "5d99f8c9a7727884afe522e9bd5edbfc91a3312b36a77b5fb8926e4c31a41801" +[[package]] +name = "tonic" +version = "0.14.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ac2a5518c70fa84342385732db33fb3f44bc4cc748936eb5833d2df34d6445ef" +dependencies = [ + "async-trait", + "axum", + "base64", + "bytes", + "h2", + "http 1.4.0", + "http-body 1.0.1", + "http-body-util", + "hyper", + "hyper-timeout", + "hyper-util", + "percent-encoding", + "pin-project", + "socket2 0.6.3", + "sync_wrapper", + "tokio", + "tokio-stream", + "tower", + "tower-layer", + "tower-service", + "tracing", +] + +[[package]] +name = "tonic-build" +version = "0.14.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c68f61875ac5293cf72e6c8cf0158086428c82c37229e98c840878f1706b0322" +dependencies = [ + "prettyplease", + "proc-macro2", + "quote", + "syn 2.0.117", +] + +[[package]] +name = "tonic-prost" +version = "0.14.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "50849f68853be452acf590cde0b146665b8d507b3b8af17261df47e02c209ea0" +dependencies = [ + "bytes", + "prost", + "tonic", +] + +[[package]] +name = "tonic-prost-build" +version = "0.14.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "654e5643eff75d7f8c99197ce1440ed19a3474eada74c12bbac488b2cafdae27" +dependencies = [ + "prettyplease", + "proc-macro2", + "prost-build", + "prost-types", + "quote", + "syn 2.0.117", + "tempfile", + "tonic-build", +] + [[package]] name = "tower" version = "0.5.3" @@ -8055,11 +8508,15 @@ checksum = "ebe5ef63511595f1344e2d5cfa636d973292adc0eec1f0ad45fae9f0851ab1d4" dependencies = [ "futures-core", "futures-util", + "indexmap 2.13.0", "pin-project-lite", + "slab", "sync_wrapper", "tokio", + "tokio-util", "tower-layer", "tower-service", + "tracing", ] [[package]] @@ -8322,6 +8779,12 @@ dependencies = [ "typify-impl", ] +[[package]] +name = "unicase" +version = "2.9.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbc4bc3a9f746d862c45cb89d705aa10f187bb96c76001afab07a0d35ce60142" + [[package]] name = "unicode-bidi" version = "0.3.18" @@ -8807,7 +9270,7 @@ version = "0.1.11" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c2a7b1c03c876122aa43f3020e6c3c3ee5c05081c9a00739faf7503aeba10d22" dependencies = [ - "windows-sys 0.48.0", + "windows-sys 0.61.2", ] [[package]] @@ -9384,7 +9847,7 @@ dependencies = [ "rand 0.10.1", "serde", "serde_json", - "sha2", + "sha2 0.10.9", "tempfile", "thiserror 2.0.18", "tokio", diff --git a/Cargo.toml b/Cargo.toml index c4500b1a37..d9f5d35156 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -67,6 +67,12 @@ aws-sdk-s3tables = { version = "1.28", default-features = false, features = [ "rt-tokio", ] } backon = "1.5.1" +ballista = "53" +ballista-core = "53" +ballista-executor = { version = "53", default-features = false, features = [ + "arrow-ipc-optimizations", +] } +ballista-scheduler = { version = "53", default-features = false } base64 = "0.22.1" bimap = "0.6" bytes = "1.11" @@ -77,6 +83,7 @@ dashmap = "6" datafusion = "53.1.0" datafusion-cli = "53.0.0" datafusion-ffi = "53.0.0" +datafusion-proto = "53.0.0" datafusion-sqllogictest = "53.0.0" derive_builder = "0.20" dirs = "6" @@ -98,6 +105,7 @@ http = "1.2" iceberg = { version = "0.9.0", path = "./crates/iceberg" } iceberg-catalog-glue = { version = "0.9.0", path = "./crates/catalog/glue" } iceberg-catalog-hms = { version = "0.9.0", path = "./crates/catalog/hms" } +iceberg-catalog-loader = { version = "0.9.0", path = "./crates/catalog/loader" } iceberg-catalog-rest = { version = "0.9.0", path = "./crates/catalog/rest" } iceberg-catalog-s3tables = { version = "0.9.0", path = "./crates/catalog/s3tables" } iceberg-catalog-sql = { version = "0.9.0", path = "./crates/catalog/sql" } diff --git a/crates/integrations/ballista/Cargo.toml b/crates/integrations/ballista/Cargo.toml new file mode 100644 index 0000000000..c1c581de48 --- /dev/null +++ b/crates/integrations/ballista/Cargo.toml @@ -0,0 +1,56 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +[package] +edition = { workspace = true } +homepage = { workspace = true } +name = "iceberg-ballista" +publish = true +rust-version = { workspace = true } +version = { workspace = true } + +categories = ["database"] +description = "Apache Iceberg distributed read/write driver for Ballista" +keywords = ["iceberg", "ballista", "datafusion", "distributed"] +license = { workspace = true } +repository = { workspace = true } + +[dependencies] +ballista-core = { workspace = true } +datafusion = { workspace = true } +datafusion-proto = { workspace = true } +iceberg = { workspace = true } +iceberg-catalog-loader = { workspace = true } +iceberg-datafusion = { workspace = true } +iceberg-storage-opendal = { workspace = true } +serde = { workspace = true, features = ["derive"] } +serde_json = { workspace = true } +tokio = { workspace = true } + +[dev-dependencies] +arrow = { workspace = true } +ballista = { workspace = true, features = ["standalone"] } +# Used by the multi-executor integration test to start a scheduler + several +# in-process executors directly. Feature flags mirror what ballista's +# `standalone` feature already enables, so these reuse the same compiled +# artifacts rather than triggering a separate build. +ballista-executor = { workspace = true } +ballista-scheduler = { workspace = true } +env_logger = { workspace = true } +iceberg-catalog-rest = { workspace = true } +log = { workspace = true } +tokio = { workspace = true, features = ["macros", "rt-multi-thread"] } diff --git a/crates/integrations/ballista/examples/standalone-iceberg-write.rs b/crates/integrations/ballista/examples/standalone-iceberg-write.rs new file mode 100644 index 0000000000..51132103eb --- /dev/null +++ b/crates/integrations/ballista/examples/standalone-iceberg-write.rs @@ -0,0 +1,158 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! # Standalone Iceberg write example +//! +//! Demonstrates distributed reads and writes against an Apache Iceberg table +//! from a standalone Ballista cluster. +//! +//! This example requires a running Iceberg REST catalog and MinIO. The easiest +//! way is to use the docker fixture shipped with `iceberg-rust`. From the +//! `iceberg-rust` workspace root: +//! +//! ```bash +//! make docker-up +//! cargo run -p iceberg-ballista --example standalone-iceberg-write +//! ``` +//! +//! Endpoints can be overridden with the `ICEBERG_REST_URI` and +//! `ICEBERG_S3_ENDPOINT` environment variables. + +use std::collections::HashMap; +use std::sync::Arc; + +use ballista::datafusion::common::Result; +use ballista::datafusion::execution::SessionStateBuilder; +use ballista::datafusion::prelude::{SessionConfig, SessionContext}; +use ballista::prelude::{SessionConfigExt, SessionContextExt}; +use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; +use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation, TableIdent}; +use iceberg_ballista::{IcebergCatalogConfig, register_iceberg_codecs, register_iceberg_table}; +use iceberg_catalog_rest::RestCatalogBuilder; +use iceberg_storage_opendal::OpenDalStorageFactory; + +/// Catalog + storage properties for the REST catalog and its MinIO storage. +fn catalog_props() -> HashMap { + let rest_uri = + std::env::var("ICEBERG_REST_URI").unwrap_or_else(|_| "http://localhost:8181".to_string()); + let s3_endpoint = std::env::var("ICEBERG_S3_ENDPOINT") + .unwrap_or_else(|_| "http://localhost:9000".to_string()); + HashMap::from([ + ("uri".to_string(), rest_uri), + ("s3.endpoint".to_string(), s3_endpoint), + ("s3.access-key-id".to_string(), "admin".to_string()), + ("s3.secret-access-key".to_string(), "password".to_string()), + ("s3.region".to_string(), "us-east-1".to_string()), + ("s3.path-style-access".to_string(), "true".to_string()), + ]) +} + +/// Creates the demo namespace and table in the catalog if they do not exist. +async fn ensure_table(props: &HashMap) -> Result<(NamespaceIdent, String)> { + let catalog = RestCatalogBuilder::default() + .with_storage_factory(Arc::new(OpenDalStorageFactory::S3 { + customized_credential_load: None, + })) + .load("rest", props.clone()) + .await + .expect("build rest catalog"); + + let namespace = NamespaceIdent::new("ballista_demo".to_string()); + if !catalog + .namespace_exists(&namespace) + .await + .expect("ns exists") + { + catalog + .create_namespace(&namespace, HashMap::new()) + .await + .expect("create namespace"); + } + + let table_ident = TableIdent::new(namespace.clone(), "events".to_string()); + if !catalog + .table_exists(&table_ident) + .await + .expect("table exists") + { + let schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .expect("build schema"); + let creation = TableCreation::builder() + .name("events".to_string()) + .schema(schema) + .properties(HashMap::new()) + .build(); + catalog + .create_table(&namespace, creation) + .await + .expect("create table"); + } + + Ok((namespace, "events".to_string())) +} + +#[tokio::main] +async fn main() -> Result<()> { + let _ = env_logger::builder() + .filter_level(log::LevelFilter::Info) + .try_init(); + + let props = catalog_props(); + + // Make sure the target table exists in the catalog. + let (namespace, table) = ensure_table(&props).await?; + + // Build a Ballista session config with the Iceberg codecs installed, so the + // standalone scheduler and executor can serialize the Iceberg plan nodes. + let config = register_iceberg_codecs( + SessionConfig::new_with_ballista() + .with_target_partitions(2) + .with_ballista_standalone_parallelism(2), + ); + let state = SessionStateBuilder::new() + .with_config(config) + .with_default_features() + .build(); + let ctx = SessionContext::standalone_with_state(state).await?; + + // Register the catalog-backed Iceberg table for distributed reads and writes. + let catalog_config = IcebergCatalogConfig::new("rest", "rest", props); + register_iceberg_table(&ctx, "events", catalog_config, namespace, table).await?; + + // Distributed INSERT: IcebergWriteExec runs across the cluster and + // IcebergCommitExec atomically appends the data files to the table. + println!("== INSERT =="); + ctx.sql("INSERT INTO events VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')") + .await? + .show() + .await?; + + // Read it back through the distributed scan. + println!("== SELECT =="); + ctx.sql("SELECT id, name FROM events ORDER BY id") + .await? + .show() + .await?; + + Ok(()) +} diff --git a/crates/integrations/ballista/public-api.txt b/crates/integrations/ballista/public-api.txt new file mode 100644 index 0000000000..886d31a7a6 --- /dev/null +++ b/crates/integrations/ballista/public-api.txt @@ -0,0 +1,31 @@ +pub mod iceberg_ballista +pub use iceberg_ballista::IcebergCatalogConfig +pub struct iceberg_ballista::IcebergLogicalCodec +impl iceberg_ballista::IcebergLogicalCodec +pub fn iceberg_ballista::IcebergLogicalCodec::new(inner: alloc::sync::Arc) -> Self +impl core::default::Default for iceberg_ballista::IcebergLogicalCodec +pub fn iceberg_ballista::IcebergLogicalCodec::default() -> Self +impl core::fmt::Debug for iceberg_ballista::IcebergLogicalCodec +pub fn iceberg_ballista::IcebergLogicalCodec::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl datafusion_proto::logical_plan::LogicalExtensionCodec for iceberg_ballista::IcebergLogicalCodec +pub fn iceberg_ballista::IcebergLogicalCodec::try_decode(&self, buf: &[u8], inputs: &[datafusion_expr::logical_plan::plan::LogicalPlan], ctx: &datafusion_execution::task::TaskContext) -> core::result::Result +pub fn iceberg_ballista::IcebergLogicalCodec::try_decode_file_format(&self, buf: &[u8], ctx: &datafusion_execution::task::TaskContext) -> core::result::Result, datafusion_common::error::DataFusionError> +pub fn iceberg_ballista::IcebergLogicalCodec::try_decode_table_provider(&self, buf: &[u8], table_ref: &datafusion_common::table_reference::TableReference, schema: arrow_schema::schema::SchemaRef, ctx: &datafusion_execution::task::TaskContext) -> core::result::Result, datafusion_common::error::DataFusionError> +pub fn iceberg_ballista::IcebergLogicalCodec::try_encode(&self, node: &datafusion_expr::logical_plan::plan::Extension, buf: &mut alloc::vec::Vec) -> core::result::Result<(), datafusion_common::error::DataFusionError> +pub fn iceberg_ballista::IcebergLogicalCodec::try_encode_file_format(&self, buf: &mut alloc::vec::Vec, node: alloc::sync::Arc) -> core::result::Result<(), datafusion_common::error::DataFusionError> +pub fn iceberg_ballista::IcebergLogicalCodec::try_encode_table_provider(&self, table_ref: &datafusion_common::table_reference::TableReference, node: alloc::sync::Arc, buf: &mut alloc::vec::Vec) -> core::result::Result<(), datafusion_common::error::DataFusionError> +pub struct iceberg_ballista::IcebergPhysicalCodec +impl iceberg_ballista::IcebergPhysicalCodec +pub fn iceberg_ballista::IcebergPhysicalCodec::new(inner: alloc::sync::Arc) -> Self +impl core::default::Default for iceberg_ballista::IcebergPhysicalCodec +pub fn iceberg_ballista::IcebergPhysicalCodec::default() -> Self +impl core::fmt::Debug for iceberg_ballista::IcebergPhysicalCodec +pub fn iceberg_ballista::IcebergPhysicalCodec::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl datafusion_proto::physical_plan::PhysicalExtensionCodec for iceberg_ballista::IcebergPhysicalCodec +pub fn iceberg_ballista::IcebergPhysicalCodec::try_decode(&self, buf: &[u8], inputs: &[alloc::sync::Arc], ctx: &datafusion_execution::task::TaskContext) -> core::result::Result, datafusion_common::error::DataFusionError> +pub fn iceberg_ballista::IcebergPhysicalCodec::try_decode_expr(&self, buf: &[u8], inputs: &[alloc::sync::Arc]) -> core::result::Result, datafusion_common::error::DataFusionError> +pub fn iceberg_ballista::IcebergPhysicalCodec::try_encode(&self, node: alloc::sync::Arc, buf: &mut alloc::vec::Vec) -> core::result::Result<(), datafusion_common::error::DataFusionError> +pub fn iceberg_ballista::IcebergPhysicalCodec::try_encode_expr(&self, node: &alloc::sync::Arc, buf: &mut alloc::vec::Vec) -> core::result::Result<(), datafusion_common::error::DataFusionError> +pub async fn iceberg_ballista::register_iceberg_catalog(ctx: &datafusion::execution::context::SessionContext, register_name: &str, config: iceberg_datafusion::catalog_config::IcebergCatalogConfig) -> core::result::Result<(), datafusion_common::error::DataFusionError> +pub fn iceberg_ballista::register_iceberg_codecs(config: datafusion_execution::config::SessionConfig) -> datafusion_execution::config::SessionConfig +pub async fn iceberg_ballista::register_iceberg_table(ctx: &datafusion::execution::context::SessionContext, register_name: &str, config: iceberg_datafusion::catalog_config::IcebergCatalogConfig, namespace: iceberg::catalog::NamespaceIdent, table: impl core::convert::Into) -> core::result::Result<(), datafusion_common::error::DataFusionError> diff --git a/crates/integrations/ballista/src/bridge.rs b/crates/integrations/ballista/src/bridge.rs new file mode 100644 index 0000000000..276efcd232 --- /dev/null +++ b/crates/integrations/ballista/src/bridge.rs @@ -0,0 +1,215 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Bridge between Ballista's synchronous, serialization-oriented codec API and +//! Iceberg's asynchronous, live-handle world, shared by the logical and +//! physical extension codecs. +//! +//! The central problem this module solves is that Ballista serializes physical +//! and logical plans to ship them to remote nodes, but the Iceberg plan nodes +//! hold live, non-serializable state (an `Arc` and a `Table` with +//! an open `FileIO`). We side-step this by serializing only the minimal, +//! self-contained [`IcebergCatalogConfig`] plus the [`TableIdent`], and then +//! *reconstructing* the catalog and table on the receiving node by building the +//! catalog from the config and loading the table from it. +//! +//! Reconstruction is asynchronous (catalog clients and table loads do I/O) but +//! the codec entry points are synchronous, so [`block_on`] bridges the two by +//! running every catalog future on [`CATALOG_RT`], a dedicated process-lived +//! runtime. + +use std::collections::{BTreeMap, HashMap}; +use std::future::Future; +use std::sync::{Arc, LazyLock, Mutex}; + +use datafusion::common::DataFusionError; +use iceberg::table::Table; +use iceberg::{Catalog, TableIdent}; +use iceberg_datafusion::IcebergCatalogConfig; +use iceberg_storage_opendal::OpenDalResolvingStorageFactory; +use serde::{Deserialize, Serialize}; + +/// Converts an arbitrary error into a [`DataFusionError`]. +pub(crate) fn to_df_err(e: E) -> DataFusionError { + DataFusionError::External(Box::new(e)) +} + +/// Dedicated process-lived runtime that drives all Iceberg catalog I/O. +/// +/// A catalog's HTTP/connection pool is bound to the runtime that drives it, so +/// running every catalog future here — instead of on whatever runtime the codec +/// caller happens to be on — means a cached catalog can never reference an +/// already-dropped runtime, no matter which thread or test asks for it later. +/// Catalog operations only happen at plan encode/decode time, so one worker is +/// plenty. +static CATALOG_RT: LazyLock = LazyLock::new(|| { + tokio::runtime::Builder::new_multi_thread() + .worker_threads(1) + .thread_name("iceberg-catalog") + .enable_all() + .build() + .expect("failed to build iceberg catalog runtime") +}); + +/// Runs an async future to completion on [`CATALOG_RT`] from a synchronous +/// context, whatever runtime (if any) the caller happens to be on. +/// +/// The future runs on a scoped helper thread (entering another runtime's +/// `block_on` is forbidden from inside a runtime context, and the helper thread +/// also lets `fut` borrow from the caller). If the caller is itself on a +/// multi-thread runtime worker, [`tokio::task::block_in_place`] tells that +/// scheduler the worker is parked so the rest of its runtime keeps making +/// progress. +pub(crate) fn block_on(fut: F) -> F::Output +where + F: Future + Send, + F::Output: Send, +{ + use tokio::runtime::{Handle, RuntimeFlavor}; + + let wait = move || { + std::thread::scope(|scope| { + scope + .spawn(|| CATALOG_RT.block_on(fut)) + .join() + .expect("iceberg catalog access thread panicked") + }) + }; + + match Handle::try_current().map(|h| h.runtime_flavor()) { + Ok(RuntimeFlavor::MultiThread) => tokio::task::block_in_place(wait), + _ => wait(), + } +} + +/// Process-wide cache of reconstructed catalogs, keyed by config. +/// +/// Building a catalog client (and its underlying HTTP/connection pool) is +/// relatively expensive, and the codec may decode many plan nodes that share +/// one catalog, so we cache by config. Every cached catalog lives on +/// [`CATALOG_RT`], which never shuts down, so entries stay valid for the life +/// of the process and can be served to any caller. +static CATALOGS: LazyLock>>> = + LazyLock::new(|| Mutex::new(HashMap::new())); + +/// Builds a catalog from its config. +/// +/// The catalog type is resolved through [`iceberg_catalog_loader`], so any +/// catalog it supports (`rest`, `sql`, `glue`, `hms`, `s3tables`) works here. +/// Storage is provided by [`OpenDalResolvingStorageFactory`], which picks the +/// object-store backend (S3, GCS, Azure, local fs, …) from each file's path +/// scheme, configured from the same `props`. So a single code path covers every +/// catalog/storage combination the workspace supports. +pub(crate) async fn build_catalog( + config: &IcebergCatalogConfig, +) -> Result, DataFusionError> { + iceberg_catalog_loader::load(&config.r#type) + .map_err(to_df_err)? + .with_storage_factory(Arc::new(OpenDalResolvingStorageFactory::new())) + .load(config.name.clone(), config.props.clone()) + .await + .map_err(to_df_err) +} + +/// Returns a catalog built from `config`, cached process-wide. +pub(crate) fn get_catalog( + config: &IcebergCatalogConfig, +) -> Result, DataFusionError> { + let key = CatalogConfigProto::from(config); + if let Some(catalog) = CATALOGS.lock().unwrap().get(&key) { + return Ok(catalog.clone()); + } + let catalog = block_on(build_catalog(config))?; + CATALOGS.lock().unwrap().insert(key, catalog.clone()); + Ok(catalog) +} + +/// Loads a fresh [`Table`] from the catalog described by `config`. +pub(crate) fn load_table( + config: &IcebergCatalogConfig, + ident: &TableIdent, +) -> Result { + let catalog = get_catalog(config)?; + block_on(catalog.load_table(ident)).map_err(to_df_err) +} + +// --------------------------------------------------------------------------- +// Wire format +// --------------------------------------------------------------------------- + +/// Leading tag byte that frames every blob this crate's codecs produce. +/// +/// Both the logical and physical codecs handle some nodes themselves and +/// delegate the rest to an inner Ballista codec. We write a tag for *both* branches. Decode +/// then dispatches on a value we always control, and the inner payload is nested +/// after the tag — never content-inspected. An unknown or missing tag is a hard +/// error instead of a silent misparse. +pub(crate) const TAG_DELEGATED: u8 = 0; +/// Tag for a payload owned by this crate's Iceberg codecs (JSON follows). +pub(crate) const TAG_ICEBERG: u8 = 1; + +/// Frames `payload` as an Iceberg-owned blob: [`TAG_ICEBERG`] then JSON. +pub(crate) fn encode_blob( + buf: &mut Vec, + payload: &T, +) -> Result<(), DataFusionError> { + buf.push(TAG_ICEBERG); + buf.extend_from_slice(&serde_json::to_vec(payload).map_err(to_df_err)?); + Ok(()) +} + +/// Splits a codec blob into its leading tag byte and payload. +pub(crate) fn split_tagged<'a>( + buf: &'a [u8], + context: &str, +) -> Result<(u8, &'a [u8]), DataFusionError> { + match buf.split_first() { + Some((&tag, rest)) => Ok((tag, rest)), + None => Err(DataFusionError::Internal(format!("empty {context} buffer"))), + } +} + +/// Serializable mirror of [`IcebergCatalogConfig`] (which is intentionally not +/// serde-aware in the iceberg crate to avoid a serde dependency there). +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq, Hash)] +pub(crate) struct CatalogConfigProto { + pub r#type: String, + pub name: String, + // BTreeMap (not HashMap) so the struct can derive Hash for the catalog + // cache, and so encode→decode→encode round-trips to identical bytes. + pub props: BTreeMap, +} + +impl From<&IcebergCatalogConfig> for CatalogConfigProto { + fn from(c: &IcebergCatalogConfig) -> Self { + Self { + r#type: c.r#type.clone(), + name: c.name.clone(), + props: c + .props + .iter() + .map(|(k, v)| (k.clone(), v.clone())) + .collect(), + } + } +} + +impl From for IcebergCatalogConfig { + fn from(p: CatalogConfigProto) -> Self { + IcebergCatalogConfig::new(p.r#type, p.name, p.props.into_iter().collect()) + } +} diff --git a/crates/integrations/ballista/src/lib.rs b/crates/integrations/ballista/src/lib.rs new file mode 100644 index 0000000000..4f0ab384ec --- /dev/null +++ b/crates/integrations/ballista/src/lib.rs @@ -0,0 +1,127 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! # Ballista Iceberg driver +//! +//! Adds distributed Apache Iceberg reads and writes to Ballista. +//! +//! Iceberg's DataFusion integration already produces a complete physical write +//! plan (`IcebergCommitExec -> CoalescePartitionsExec -> IcebergWriteExec -> +//! input`). The only thing missing for Ballista is serialization: Ballista +//! ships logical and physical plans to remote nodes, and the Iceberg plan nodes +//! hold live catalog/storage handles that cannot be serialized. This crate +//! provides the logical and physical extension codecs that serialize the +//! minimal config needed to rebuild those handles on each node. +//! +//! ## Usage (standalone) +//! +//! ```ignore +//! use std::collections::HashMap; +//! use std::sync::Arc; +//! +//! use iceberg_ballista::{register_iceberg_codecs, register_iceberg_table, IcebergCatalogConfig}; +//! use ballista_core::extension::SessionConfigExt; +//! use datafusion::prelude::{SessionConfig, SessionContext}; +//! use iceberg::NamespaceIdent; +//! +//! # async fn run() -> datafusion::error::Result<()> { +//! // 1. Register the Iceberg codecs on the session config, then start standalone Ballista. +//! let config = register_iceberg_codecs(SessionConfig::new_with_ballista()); +//! let ctx = SessionContext::standalone_with_config(config).await?; +//! +//! // 2. Register a catalog-backed Iceberg table for reads and writes. +//! let props = HashMap::from([("uri".to_string(), "http://localhost:8181".to_string())]); +//! let cfg = IcebergCatalogConfig::new("rest", "rest", props); +//! register_iceberg_table(&ctx, "t", cfg, NamespaceIdent::new("ns".into()), "tbl").await?; +//! +//! // 3. INSERT runs distributed across the cluster. +//! ctx.sql("INSERT INTO t SELECT * FROM source").await?.collect().await?; +//! # Ok(()) +//! # } +//! ``` + +mod bridge; +mod logical_codec; +mod physical_codec; + +use std::sync::Arc; + +use ballista_core::extension::SessionConfigExt; +use datafusion::common::DataFusionError; +use datafusion::prelude::{SessionConfig, SessionContext}; +use iceberg::NamespaceIdent; +pub use iceberg_datafusion::IcebergCatalogConfig; + +pub use crate::logical_codec::IcebergLogicalCodec; +pub use crate::physical_codec::IcebergPhysicalCodec; + +/// Installs the Iceberg logical and physical extension codecs onto a +/// [`SessionConfig`]. +/// +/// In a standalone Ballista cluster the scheduler and executor both derive +/// their codecs from this session config, so this single call is enough for an +/// end-to-end distributed Iceberg query. For a separately deployed scheduler and +/// executor, set the same codecs on their process configs +/// (`override_logical_codec` / `override_physical_codec`). +pub fn register_iceberg_codecs(config: SessionConfig) -> SessionConfig { + config + .with_ballista_logical_extension_codec(Arc::new(IcebergLogicalCodec::default())) + .with_ballista_physical_extension_codec(Arc::new(IcebergPhysicalCodec::default())) +} + +/// Builds a catalog-backed [`IcebergTableProvider`](iceberg_datafusion::IcebergTableProvider) +/// from `config` and registers it on `ctx` under `register_name`. +/// +/// The provider carries the `config` so that the plan nodes it produces can be +/// serialized and reconstructed on remote Ballista nodes. +pub async fn register_iceberg_table( + ctx: &SessionContext, + register_name: &str, + config: IcebergCatalogConfig, + namespace: NamespaceIdent, + table: impl Into, +) -> Result<(), DataFusionError> { + let catalog = bridge::build_catalog(&config).await?; + let provider = iceberg_datafusion::IcebergTableProvider::try_new_with_config( + catalog, config, namespace, table, + ) + .await + .map_err(bridge::to_df_err)?; + ctx.register_table(register_name, Arc::new(provider))?; + Ok(()) +} + +/// Builds an [`IcebergCatalogProvider`](iceberg_datafusion::IcebergCatalogProvider) +/// from `config` and registers it on `ctx` under `register_name`, mounting the +/// whole Iceberg catalog at once. +/// +/// Every table then resolves as `..` in SQL, +/// and every provider carries the `config` so the plan nodes it produces can be +/// serialized and reconstructed on remote Ballista nodes — including metadata +/// tables such as `
$snapshots`. +pub async fn register_iceberg_catalog( + ctx: &SessionContext, + register_name: &str, + config: IcebergCatalogConfig, +) -> Result<(), DataFusionError> { + let catalog = bridge::build_catalog(&config).await?; + let provider = iceberg_datafusion::IcebergCatalogProvider::try_new_with_config(catalog, config) + .await + .map_err(bridge::to_df_err)?; + ctx.register_catalog(register_name, Arc::new(provider)); + Ok(()) +} diff --git a/crates/integrations/ballista/src/logical_codec.rs b/crates/integrations/ballista/src/logical_codec.rs new file mode 100644 index 0000000000..72e3c3f504 --- /dev/null +++ b/crates/integrations/ballista/src/logical_codec.rs @@ -0,0 +1,217 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Logical extension codec that serializes the catalog-backed +//! [`IcebergTableProvider`] (its [`IcebergCatalogConfig`] + table identifier) so +//! that the Ballista scheduler can rebuild the provider from a logical plan and +//! perform physical planning (including `insert_into`) for Iceberg tables. +//! +//! All other logical-plan serialization (extension nodes, file formats, other +//! table providers) is delegated to an inner codec (by default Ballista's +//! [`BallistaLogicalExtensionCodec`]). + +use std::fmt::Debug; +use std::sync::Arc; + +use ballista_core::serde::BallistaLogicalExtensionCodec; +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::catalog::TableProvider; +use datafusion::common::DataFusionError; +use datafusion::datasource::file_format::FileFormatFactory; +use datafusion::execution::TaskContext; +use datafusion::logical_expr::{Extension, LogicalPlan}; +use datafusion::sql::TableReference; +use datafusion_proto::logical_plan::LogicalExtensionCodec; +use iceberg::TableIdent; +use iceberg::inspect::MetadataTableType; +use iceberg_datafusion::{IcebergMetadataTableProvider, IcebergTableProvider}; +use serde::{Deserialize, Serialize}; + +use crate::bridge::{ + CatalogConfigProto, TAG_DELEGATED, TAG_ICEBERG, block_on, encode_blob, get_catalog, load_table, + split_tagged, to_df_err, +}; + +/// Wire representation of an Iceberg table provider. Carries enough to rebuild +/// either the catalog-backed data provider or a metadata-table provider on a +/// remote node. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)] +enum IcebergProviderProto { + /// The catalog-backed [`IcebergTableProvider`]. + Table { + catalog: CatalogConfigProto, + table: TableIdent, + /// Pinned snapshot for time-travel reads, if any. + #[serde(default)] + snapshot_id: Option, + }, + /// An [`IcebergMetadataTableProvider`] (e.g. `tbl$snapshots`). + Metadata { + catalog: CatalogConfigProto, + table: TableIdent, + /// The metadata table kind, as its lowercase string name. + metadata_type: String, + }, +} + +/// A [`LogicalExtensionCodec`] that understands the catalog-backed +/// [`IcebergTableProvider`] and delegates everything else to an inner codec. +#[derive(Debug)] +pub struct IcebergLogicalCodec { + inner: Arc, +} + +impl Default for IcebergLogicalCodec { + fn default() -> Self { + Self { + inner: Arc::new(BallistaLogicalExtensionCodec::default()), + } + } +} + +impl IcebergLogicalCodec { + /// Creates a codec that delegates non-Iceberg work to `inner`. + pub fn new(inner: Arc) -> Self { + Self { inner } + } +} + +impl LogicalExtensionCodec for IcebergLogicalCodec { + fn try_decode( + &self, + buf: &[u8], + inputs: &[LogicalPlan], + ctx: &TaskContext, + ) -> Result { + self.inner.try_decode(buf, inputs, ctx) + } + + fn try_encode(&self, node: &Extension, buf: &mut Vec) -> Result<(), DataFusionError> { + self.inner.try_encode(node, buf) + } + + fn try_decode_table_provider( + &self, + buf: &[u8], + table_ref: &TableReference, + schema: SchemaRef, + ctx: &TaskContext, + ) -> Result, DataFusionError> { + let (tag, rest) = split_tagged(buf, "iceberg logical table-provider")?; + match tag { + TAG_DELEGATED => self + .inner + .try_decode_table_provider(rest, table_ref, schema, ctx), + TAG_ICEBERG => { + let proto: IcebergProviderProto = + serde_json::from_slice(rest).map_err(to_df_err)?; + match proto { + IcebergProviderProto::Table { + catalog, + table, + snapshot_id, + } => { + let config = catalog.into(); + let cat = get_catalog(&config)?; + let TableIdent { namespace, name } = table; + let provider = block_on(IcebergTableProvider::try_new_with_config( + cat, config, namespace, name, + )) + .map_err(to_df_err)? + .with_snapshot_id(snapshot_id); + Ok(Arc::new(provider)) + } + IcebergProviderProto::Metadata { + catalog, + table, + metadata_type, + } => { + let config = catalog.into(); + let table_obj = load_table(&config, &table)?; + let kind = MetadataTableType::try_from(metadata_type.as_str()) + .map_err(DataFusionError::Internal)?; + let provider = IcebergMetadataTableProvider::new(table_obj, kind) + .with_catalog_config(Some(config)); + Ok(Arc::new(provider)) + } + } + } + other => Err(DataFusionError::Internal(format!( + "unknown iceberg logical table-provider tag {other}" + ))), + } + } + + fn try_encode_table_provider( + &self, + table_ref: &TableReference, + node: Arc, + buf: &mut Vec, + ) -> Result<(), DataFusionError> { + if let Some(provider) = node.as_any().downcast_ref::() { + let config = provider.config().ok_or_else(|| { + DataFusionError::Internal( + "IcebergTableProvider has no IcebergCatalogConfig and cannot be \ + distributed; register it with \ + IcebergTableProvider::try_new_with_config (see \ + iceberg_ballista::register_iceberg_table)." + .to_string(), + ) + })?; + let proto = IcebergProviderProto::Table { + catalog: config.into(), + table: provider.table_ident().clone(), + snapshot_id: provider.snapshot_id(), + }; + return encode_blob(buf, &proto); + } + if let Some(provider) = node.as_any().downcast_ref::() { + let config = provider.catalog_config().ok_or_else(|| { + DataFusionError::Internal( + "IcebergMetadataTableProvider has no IcebergCatalogConfig and cannot be \ + distributed; register the catalog with \ + IcebergCatalogProvider::try_new_with_config so its tables carry it." + .to_string(), + ) + })?; + let proto = IcebergProviderProto::Metadata { + catalog: config.into(), + table: provider.table().identifier().clone(), + metadata_type: provider.metadata_type().as_str().to_string(), + }; + return encode_blob(buf, &proto); + } + buf.push(TAG_DELEGATED); + self.inner.try_encode_table_provider(table_ref, node, buf) + } + + fn try_decode_file_format( + &self, + buf: &[u8], + ctx: &TaskContext, + ) -> Result, DataFusionError> { + self.inner.try_decode_file_format(buf, ctx) + } + + fn try_encode_file_format( + &self, + buf: &mut Vec, + node: Arc, + ) -> Result<(), DataFusionError> { + self.inner.try_encode_file_format(buf, node) + } +} diff --git a/crates/integrations/ballista/src/physical_codec.rs b/crates/integrations/ballista/src/physical_codec.rs new file mode 100644 index 0000000000..1877a2545e --- /dev/null +++ b/crates/integrations/ballista/src/physical_codec.rs @@ -0,0 +1,477 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! Physical extension codec for the Iceberg execution plan nodes. +//! +//! Encodes/decodes [`IcebergTableScan`], [`IcebergWriteExec`], and +//! [`IcebergCommitExec`] so Ballista can ship them to remote executors. Any +//! node that is not an Iceberg node is delegated to an inner codec (by default +//! Ballista's own [`BallistaPhysicalExtensionCodec`]), so shuffle and other +//! Ballista plan nodes keep working. + +use std::fmt::Debug; +use std::sync::Arc; + +use ballista_core::serde::BallistaPhysicalExtensionCodec; +use datafusion::arrow::datatypes::SchemaRef; +use datafusion::common::DataFusionError; +use datafusion::execution::TaskContext; +use datafusion::physical_expr::PhysicalExpr; +use datafusion::physical_plan::ExecutionPlan; +use datafusion_proto::physical_plan::PhysicalExtensionCodec; +use iceberg::TableIdent; +use iceberg::arrow::schema_to_arrow_schema; +use iceberg::expr::Predicate; +use iceberg::inspect::MetadataTableType; +use iceberg::spec::{PartitionSpec, Schema}; +use iceberg::table::Table; +use iceberg_datafusion::IcebergMetadataTableProvider; +use iceberg_datafusion::physical_plan::{ + IcebergCommitExec, IcebergMetadataScan, IcebergTableScan, IcebergWriteExec, PartitionExpr, +}; +use serde::{Deserialize, Serialize}; + +use crate::bridge::{ + CatalogConfigProto, TAG_DELEGATED, TAG_ICEBERG, encode_blob, get_catalog, load_table, + split_tagged, to_df_err, +}; + +/// Wire representation of an Iceberg physical plan node. +// `Predicate` is not `Eq` (it can hold float literals), so this derives only +// `PartialEq`. +#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] +enum IcebergPhysicalNode { + Scan { + catalog: CatalogConfigProto, + table: TableIdent, + snapshot_id: Option, + projection: Option>, + limit: Option, + /// Pushed-down filter, restored on the remote node so Iceberg file + /// pruning is preserved (DataFusion still re-applies it above the scan). + #[serde(default)] + predicates: Option, + }, + Write { + catalog: CatalogConfigProto, + table: TableIdent, + }, + Commit { + catalog: CatalogConfigProto, + table: TableIdent, + }, + Metadata { + catalog: CatalogConfigProto, + table: TableIdent, + /// The metadata table kind, as its lowercase string name. + metadata_type: String, + }, +} + +/// Wire representation of an [`IcebergDataFusion`](iceberg_datafusion) partition +/// expression. The live `PartitionValueCalculator` it wraps is not serializable, +/// but it can be rebuilt on the far node from the (self-contained) partition spec +/// and table schema, so those are all that travels on the wire. +#[derive(Debug, Clone, Serialize, Deserialize)] +struct PartitionExprProto { + partition_spec: PartitionSpec, + schema: Schema, +} + +/// A [`PhysicalExtensionCodec`] that understands the Iceberg plan nodes and +/// delegates everything else to an inner codec. +#[derive(Debug)] +pub struct IcebergPhysicalCodec { + inner: Arc, +} + +impl Default for IcebergPhysicalCodec { + fn default() -> Self { + Self { + inner: Arc::new(BallistaPhysicalExtensionCodec::default()), + } + } +} + +impl IcebergPhysicalCodec { + /// Creates a codec that delegates non-Iceberg nodes to `inner`. + pub fn new(inner: Arc) -> Self { + Self { inner } + } +} + +fn missing_config_err(node: &str) -> DataFusionError { + DataFusionError::Internal(format!( + "{node} has no IcebergCatalogConfig and cannot be distributed. Register the \ + table with IcebergTableProvider::try_new_with_config (see \ + iceberg_ballista::register_iceberg_table)." + )) +} + +/// Arrow schema of the table's current Iceberg schema. +fn arrow_schema_of(table: &Table) -> Result { + Ok(Arc::new( + schema_to_arrow_schema(table.metadata().current_schema()).map_err(to_df_err)?, + )) +} + +impl PhysicalExtensionCodec for IcebergPhysicalCodec { + fn try_decode( + &self, + buf: &[u8], + inputs: &[Arc], + ctx: &TaskContext, + ) -> Result, DataFusionError> { + let (tag, rest) = split_tagged(buf, "iceberg physical codec")?; + if tag == TAG_DELEGATED { + return self.inner.try_decode(rest, inputs, ctx); + } + if tag != TAG_ICEBERG { + return Err(DataFusionError::Internal(format!( + "unknown iceberg physical codec tag {tag}" + ))); + } + + let node: IcebergPhysicalNode = serde_json::from_slice(rest).map_err(to_df_err)?; + + match node { + IcebergPhysicalNode::Scan { + catalog, + table, + snapshot_id, + projection, + limit, + predicates, + } => { + let config = catalog.into(); + let table_obj = load_table(&config, &table)?; + let arrow_schema = arrow_schema_of(&table_obj)?; + // Map the projected column names back to indices in the table + // schema. A name that doesn't resolve is a hard error: the + // executor reloads table metadata independently of the scheduler, + // so silently dropping it would rebuild the scan with fewer + // columns than the plan expects and surface later as a confusing + // column-count mismatch instead of a clear failure here. + let proj_indices: Option> = projection + .as_ref() + .map(|names| { + names + .iter() + .map(|n| { + arrow_schema.index_of(n).map_err(|_| { + DataFusionError::Internal(format!( + "projected column {n:?} not found in table {} schema; \ + scheduler and executor table metadata may be out of sync", + table + )) + }) + }) + .collect::, _>>() + }) + .transpose()?; + let scan = IcebergTableScan::new( + table_obj, + snapshot_id, + arrow_schema, + proj_indices.as_ref(), + &[], + limit, + ) + .with_predicates(predicates) + .with_catalog_config(Some(config)); + Ok(Arc::new(scan)) + } + IcebergPhysicalNode::Write { catalog, table } => { + let config = catalog.into(); + let table_obj = load_table(&config, &table)?; + let arrow_schema = arrow_schema_of(&table_obj)?; + let input = single_input(inputs, "IcebergWriteExec")?; + let write = IcebergWriteExec::new(table_obj, input, arrow_schema) + .with_catalog_config(Some(config)); + Ok(Arc::new(write)) + } + IcebergPhysicalNode::Commit { catalog, table } => { + let config = catalog.into(); + let cat = get_catalog(&config)?; + let table_obj = load_table(&config, &table)?; + let arrow_schema = arrow_schema_of(&table_obj)?; + let input = single_input(inputs, "IcebergCommitExec")?; + let commit = IcebergCommitExec::new(table_obj, cat, input, arrow_schema) + .with_catalog_config(Some(config)); + Ok(Arc::new(commit)) + } + IcebergPhysicalNode::Metadata { + catalog, + table, + metadata_type, + } => { + let config = catalog.into(); + let table_obj = load_table(&config, &table)?; + let kind = MetadataTableType::try_from(metadata_type.as_str()) + .map_err(DataFusionError::Internal)?; + let provider = IcebergMetadataTableProvider::new(table_obj, kind) + .with_catalog_config(Some(config)); + Ok(Arc::new(IcebergMetadataScan::new(provider))) + } + } + } + + fn try_encode( + &self, + node: Arc, + buf: &mut Vec, + ) -> Result<(), DataFusionError> { + if let Some(scan) = node.as_any().downcast_ref::() { + let config = scan + .catalog_config() + .ok_or_else(|| missing_config_err("IcebergTableScan"))?; + // Pin the snapshot at encode (planning) time. The executor reloads + // table metadata independently, so an unpinned scan would read + // whatever snapshot is current when each task decodes — concurrent + // commits could then give two tasks of one query different + // snapshots. `scan.table()` is the table as loaded at planning, so + // its current snapshot is the consistent choice for every task. + let snapshot_id = scan + .snapshot_id() + .or_else(|| scan.table().metadata().current_snapshot_id()); + let proto = IcebergPhysicalNode::Scan { + catalog: config.into(), + table: scan.table().identifier().clone(), + snapshot_id, + projection: scan.projection().map(|s| s.to_vec()), + limit: scan.limit(), + predicates: scan.predicates().cloned(), + }; + return encode_blob(buf, &proto); + } + + if let Some(write) = node.as_any().downcast_ref::() { + let config = write + .catalog_config() + .ok_or_else(|| missing_config_err("IcebergWriteExec"))?; + let proto = IcebergPhysicalNode::Write { + catalog: config.into(), + table: write.table().identifier().clone(), + }; + return encode_blob(buf, &proto); + } + + if let Some(commit) = node.as_any().downcast_ref::() { + let config = commit + .catalog_config() + .ok_or_else(|| missing_config_err("IcebergCommitExec"))?; + let proto = IcebergPhysicalNode::Commit { + catalog: config.into(), + table: commit.table().identifier().clone(), + }; + return encode_blob(buf, &proto); + } + + if let Some(meta) = node.as_any().downcast_ref::() { + let provider = meta.provider(); + let config = provider + .catalog_config() + .ok_or_else(|| missing_config_err("IcebergMetadataScan"))?; + let proto = IcebergPhysicalNode::Metadata { + catalog: config.into(), + table: provider.table().identifier().clone(), + metadata_type: provider.metadata_type().as_str().to_string(), + }; + return encode_blob(buf, &proto); + } + + buf.push(TAG_DELEGATED); + self.inner.try_encode(node, buf) + } + + fn try_encode_expr( + &self, + node: &Arc, + buf: &mut Vec, + ) -> Result<(), DataFusionError> { + // The partition-value expression a partitioned write injects holds a + // live calculator; serialize the spec + schema it can be rebuilt from. + if let Some(expr) = node.as_any().downcast_ref::() { + let proto = PartitionExprProto { + partition_spec: expr.partition_spec().as_ref().clone(), + schema: expr.table_schema().as_ref().clone(), + }; + return encode_blob(buf, &proto); + } + buf.push(TAG_DELEGATED); + self.inner.try_encode_expr(node, buf) + } + + fn try_decode_expr( + &self, + buf: &[u8], + inputs: &[Arc], + ) -> Result, DataFusionError> { + let (tag, rest) = split_tagged(buf, "iceberg physical expr")?; + match tag { + TAG_DELEGATED => self.inner.try_decode_expr(rest, inputs), + TAG_ICEBERG => { + let proto: PartitionExprProto = serde_json::from_slice(rest).map_err(to_df_err)?; + let expr = + PartitionExpr::try_new(Arc::new(proto.partition_spec), Arc::new(proto.schema))?; + Ok(Arc::new(expr)) + } + other => Err(DataFusionError::Internal(format!( + "unknown iceberg physical expr tag {other}" + ))), + } + } +} + +fn single_input( + inputs: &[Arc], + node: &str, +) -> Result, DataFusionError> { + if inputs.len() != 1 { + return Err(DataFusionError::Internal(format!( + "{node} expects exactly one input, got {}", + inputs.len() + ))); + } + Ok(inputs[0].clone()) +} + +#[cfg(test)] +mod tests { + use std::collections::BTreeMap; + + use super::*; + + fn sample_catalog() -> CatalogConfigProto { + CatalogConfigProto { + r#type: "rest".to_string(), + name: "rest".to_string(), + props: BTreeMap::from([ + ("uri".to_string(), "http://localhost:8181".to_string()), + ("warehouse".to_string(), "s3://bucket/wh".to_string()), + ]), + } + } + + fn roundtrip(node: &IcebergPhysicalNode) -> IcebergPhysicalNode { + let mut buf = Vec::new(); + encode_blob(&mut buf, node).expect("encode"); + assert_eq!(buf[0], TAG_ICEBERG, "blob must carry the iceberg tag"); + serde_json::from_slice(&buf[1..]).expect("decode") + } + + #[test] + fn scan_node_roundtrips() { + let node = IcebergPhysicalNode::Scan { + catalog: sample_catalog(), + table: TableIdent::from_strs(["ns", "tbl"]).unwrap(), + snapshot_id: Some(42), + projection: Some(vec!["a".to_string(), "b".to_string()]), + limit: Some(10), + predicates: None, + }; + assert_eq!(node, roundtrip(&node)); + } + + #[test] + fn scan_node_with_predicate_roundtrips() { + use iceberg::expr::Reference; + use iceberg::spec::Datum; + + let node = IcebergPhysicalNode::Scan { + catalog: sample_catalog(), + table: TableIdent::from_strs(["ns", "tbl"]).unwrap(), + snapshot_id: None, + projection: None, + limit: None, + predicates: Some(Reference::new("a").less_than(Datum::long(5))), + }; + assert_eq!(node, roundtrip(&node)); + } + + #[test] + fn metadata_node_roundtrips() { + let node = IcebergPhysicalNode::Metadata { + catalog: sample_catalog(), + table: TableIdent::from_strs(["ns", "tbl"]).unwrap(), + metadata_type: "snapshots".to_string(), + }; + assert_eq!(node, roundtrip(&node)); + } + + #[test] + fn write_node_roundtrips() { + let node = IcebergPhysicalNode::Write { + catalog: sample_catalog(), + table: TableIdent::from_strs(["ns", "tbl"]).unwrap(), + }; + assert_eq!(node, roundtrip(&node)); + } + + #[test] + fn commit_node_roundtrips() { + let node = IcebergPhysicalNode::Commit { + catalog: sample_catalog(), + table: TableIdent::from_strs(["a", "b", "tbl"]).unwrap(), + }; + assert_eq!(node, roundtrip(&node)); + } + + #[test] + fn non_iceberg_node_roundtrips_through_inner_codec() { + use ballista_core::execution_plans::ShuffleWriterExec; + use datafusion::arrow::datatypes::{DataType, Field, Schema as ArrowSchema}; + use datafusion::physical_plan::empty::EmptyExec; + use datafusion::prelude::SessionContext; + + // A Ballista shuffle node is not an Iceberg node, so the codec must + // frame it with TAG_DELEGATED and hand it to the inner Ballista codec — + // and decode must route it back there, reconstructing the same node. + let schema = Arc::new(ArrowSchema::new(vec![Field::new( + "a", + DataType::Int32, + false, + )])); + let input: Arc = Arc::new(EmptyExec::new(schema)); + let shuffle = ShuffleWriterExec::try_new( + "job-1".to_string(), + 7, + input.clone(), + "/tmp/work".to_string(), + None, + ) + .expect("build shuffle writer"); + + let codec = IcebergPhysicalCodec::default(); + let mut buf = Vec::new(); + codec + .try_encode(Arc::new(shuffle), &mut buf) + .expect("encode delegated node"); + assert_eq!(buf[0], TAG_DELEGATED, "non-Iceberg node must be delegated"); + + let ctx = SessionContext::new(); + let decoded = codec + .try_decode(&buf, &[input], &ctx.task_ctx()) + .expect("decode delegated node"); + let decoded = decoded + .as_any() + .downcast_ref::() + .expect("decoded plan should be a ShuffleWriterExec"); + assert_eq!(decoded.job_id(), "job-1"); + assert_eq!(decoded.stage_id(), 7); + } +} diff --git a/crates/integrations/ballista/tests/distributed_read_write.rs b/crates/integrations/ballista/tests/distributed_read_write.rs new file mode 100644 index 0000000000..6df010c853 --- /dev/null +++ b/crates/integrations/ballista/tests/distributed_read_write.rs @@ -0,0 +1,434 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! End-to-end distributed Iceberg write/read test. +//! +//! Like the other iceberg-rust integration tests, this runs against the shared +//! docker fixture (an Iceberg REST catalog + MinIO). The `make test` target +//! brings the fixture up automatically; to run it on its own: +//! +//! ```bash +//! cd iceberg-rust && make docker-up +//! cargo test -p iceberg-ballista --test distributed_read_write +//! ``` +//! +//! The endpoints can be overridden with the `ICEBERG_REST_URI` and +//! `ICEBERG_S3_ENDPOINT` environment variables. + +use std::collections::HashMap; +use std::sync::{Arc, LazyLock}; +use std::time::{Duration, SystemTime, UNIX_EPOCH}; + +use arrow::array::{AsArray, Int64Array, RecordBatch}; +use arrow::datatypes::Int32Type; +use ballista::datafusion::execution::{SessionState, SessionStateBuilder}; +use ballista::datafusion::prelude::{SessionConfig, SessionContext}; +use ballista::prelude::{SessionConfigExt, SessionContextExt}; +use ballista_core::serde::protobuf::scheduler_grpc_client::SchedulerGrpcClient; +use ballista_executor::new_standalone_executor_from_state; +use ballista_scheduler::standalone::new_standalone_scheduler_from_state; +use iceberg::spec::{ + NestedField, PrimitiveType, Schema, Transform, Type, UnboundPartitionField, + UnboundPartitionSpec, +}; +use iceberg::{Catalog, CatalogBuilder, NamespaceIdent, TableCreation, TableIdent}; +use iceberg_ballista::{ + IcebergCatalogConfig, register_iceberg_catalog, register_iceberg_codecs, register_iceberg_table, +}; +use iceberg_catalog_rest::RestCatalogBuilder; +use iceberg_storage_opendal::OpenDalStorageFactory; +use tokio::sync::Mutex; + +/// Serializes the catalog-mutating tests in this binary. The REST fixture is +/// backed by in-memory SQLite, which rejects concurrent commits with +/// `SQLITE_BUSY`. Each test still exercises parallelism *internally* (multiple +/// write tasks / executors); this only stops the two test cases from committing +/// to the catalog at the same time, so the suite is robust under a parallel test +/// harness (`cargo test`, `nextest`) without relying on `--test-threads=1`. +static CATALOG_GUARD: LazyLock> = LazyLock::new(|| Mutex::new(())); + +/// Table name unique per run, so reruns don't collide in the shared catalog. +fn unique_table_name(prefix: &str) -> String { + let millis = SystemTime::now() + .duration_since(UNIX_EPOCH) + .unwrap() + .as_millis(); + format!("{prefix}_{millis}") +} + +/// Session state with the Iceberg codecs installed. +fn iceberg_session_state(config: SessionConfig) -> SessionState { + SessionStateBuilder::new() + .with_config(register_iceberg_codecs(config)) + .with_default_features() + .build() +} + +/// Runs a SQL statement to completion and returns its batches. +async fn run_sql(ctx: &SessionContext, sql: &str) -> Vec { + ctx.sql(sql) + .await + .expect("plan sql") + .collect() + .await + .expect("run sql") +} + +/// Extracts the single `i64` value of the first column (a COUNT result). +fn single_i64(batches: &[RecordBatch]) -> i64 { + batches + .iter() + .find_map(|b| { + b.column(0) + .as_any() + .downcast_ref::() + .map(|a| a.value(0)) + }) + .expect("i64 column") +} + +/// Flattens the first column of every batch into a `Vec`. +fn i32_values(batches: &[RecordBatch]) -> Vec { + batches + .iter() + .flat_map(|b| b.column(0).as_primitive::().values().to_vec()) + .collect() +} + +fn catalog_props() -> HashMap { + let rest_uri = + std::env::var("ICEBERG_REST_URI").unwrap_or_else(|_| "http://localhost:8181".to_string()); + let s3_endpoint = std::env::var("ICEBERG_S3_ENDPOINT") + .unwrap_or_else(|_| "http://localhost:9000".to_string()); + HashMap::from([ + ("uri".to_string(), rest_uri), + ("s3.endpoint".to_string(), s3_endpoint), + ("s3.access-key-id".to_string(), "admin".to_string()), + ("s3.secret-access-key".to_string(), "password".to_string()), + ("s3.region".to_string(), "us-east-1".to_string()), + ("s3.path-style-access".to_string(), "true".to_string()), + ]) +} + +async fn build_rest_catalog(props: &HashMap) -> impl Catalog + use<> { + RestCatalogBuilder::default() + .with_storage_factory(Arc::new(OpenDalStorageFactory::S3 { + customized_credential_load: None, + })) + .load("rest", props.clone()) + .await + .expect("build rest catalog") +} + +/// Ensures the shared test namespace exists, tolerating a concurrent creator +/// (tests run in parallel and share this namespace). +async fn ensure_namespace(catalog: &impl Catalog) -> NamespaceIdent { + let namespace = NamespaceIdent::new("ballista_it".to_string()); + if !catalog.namespace_exists(&namespace).await.unwrap() + && let Err(e) = catalog.create_namespace(&namespace, HashMap::new()).await + && !catalog.namespace_exists(&namespace).await.unwrap() + { + panic!("create namespace: {e}"); + } + namespace +} + +async fn create_table(props: &HashMap, table_name: &str) -> NamespaceIdent { + let catalog = build_rest_catalog(props).await; + let namespace = ensure_namespace(&catalog).await; + + let schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::required(2, "name", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(); + let creation = TableCreation::builder() + .name(table_name.to_string()) + .schema(schema) + .properties(HashMap::new()) + .build(); + catalog + .create_table(&namespace, creation) + .await + .expect("create table"); + + namespace +} + +/// Creates a table partitioned by `region` (identity). A distributed INSERT then +/// fans the rows out to one writer per region — exercising the partition-value +/// expression (`PartitionExpr`) serialization across the cluster. +async fn create_partitioned_table( + props: &HashMap, + table_name: &str, +) -> NamespaceIdent { + let catalog = build_rest_catalog(props).await; + let namespace = ensure_namespace(&catalog).await; + + // Optional (nullable) fields so the schema matches the nullable columns a + // `VALUES` source produces; the partitioned-write path checks nullability. + let schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::optional(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + NestedField::optional(2, "region", Type::Primitive(PrimitiveType::String)).into(), + ]) + .build() + .unwrap(); + let partition_spec = UnboundPartitionSpec::builder() + .with_spec_id(0) + // The REST catalog requires an explicit partition field-id; the in-memory + // catalog used elsewhere assigns one automatically, but REST does not. + .add_partition_fields([UnboundPartitionField { + source_id: 2, + field_id: Some(1000), + name: "region".to_string(), + transform: Transform::Identity, + }]) + .unwrap() + .build(); + let creation = TableCreation::builder() + .name(table_name.to_string()) + .schema(schema) + .partition_spec(partition_spec) + .properties(HashMap::new()) + .build(); + catalog + .create_table(&namespace, creation) + .await + .expect("create partitioned table"); + + namespace +} + +#[tokio::test(flavor = "multi_thread", worker_threads = 4)] +async fn distributed_insert_and_read() { + let _ = env_logger::builder().is_test(true).try_init(); + let _catalog_guard = CATALOG_GUARD.lock().await; + + let props = catalog_props(); + let table_name = unique_table_name("events"); + let namespace = create_table(&props, &table_name).await; + + let state = iceberg_session_state( + SessionConfig::new_with_ballista() + .with_target_partitions(2) + .with_ballista_standalone_parallelism(2), + ); + let ctx = SessionContext::standalone_with_state(state) + .await + .expect("start standalone ballista"); + + let catalog_config = IcebergCatalogConfig::new("rest", "rest", props.clone()); + register_iceberg_table( + &ctx, + "events", + catalog_config, + namespace, + table_name.clone(), + ) + .await + .expect("register iceberg table"); + + run_sql( + &ctx, + "INSERT INTO events VALUES (1, 'alice'), (2, 'bob'), (3, 'carol')", + ) + .await; + + let count = single_i64(&run_sql(&ctx, "SELECT count(*) AS n FROM events").await); + assert_eq!(count, 3, "expected 3 rows after distributed insert"); + + let rows = run_sql(&ctx, "SELECT id, name FROM events ORDER BY id").await; + assert_eq!(i32_values(&rows), vec![1, 2, 3]); + + // Catalog-level registration: mount the whole Iceberg catalog and read the + // same table as `..
`. The providers built through + // the catalog carry the config too, so this distributed read exercises the + // catalog/schema config-threading path end to end. + register_iceberg_catalog( + &ctx, + "ice", + IcebergCatalogConfig::new("rest", "rest", props), + ) + .await + .expect("register iceberg catalog"); + let count = single_i64( + &run_sql( + &ctx, + &format!("SELECT count(*) AS n FROM ice.ballista_it.{table_name}"), + ) + .await, + ); + assert_eq!(count, 3, "catalog-qualified distributed read"); +} + +/// Distributed correctness on a real multi-executor cluster, writing a +/// **partitioned** table. +/// +/// Where [`distributed_insert_and_read`] uses standalone Ballista (one in-process +/// executor) and an unpartitioned table, this stands up a single scheduler with +/// **several in-process executors** and writes a table partitioned by `region`. A +/// partitioned write injects a partition-value expression (`PartitionExpr`) into +/// the physical plan, so this exercises that expression's serialization through +/// the codec on top of the plan-node serialization — and fans the rows out to one +/// writer per region across the executors. +/// +/// The assertions target the correctness properties of a distributed, multi-writer +/// write: +/// 1. the write commits exactly **one** atomic snapshot (not one per task), +/// 2. the parallel writers contributed multiple data files (one per region), and +/// 3. every input row lands exactly once (no loss, no duplication). +#[tokio::test(flavor = "multi_thread", worker_threads = 8)] +async fn parallel_multi_executor_insert_commits_all_rows() { + let _ = env_logger::builder().is_test(true).try_init(); + let _catalog_guard = CATALOG_GUARD.lock().await; + + const N_EXECUTORS: usize = 2; + const SLOTS_PER_EXECUTOR: usize = 2; + const WRITE_PARTITIONS: usize = 8; + const REGIONS: [&str; 4] = ["a", "b", "c", "d"]; + // REGIONS.len() * 3. + const TOTAL_ROWS: i32 = 12; + + let props = catalog_props(); + let table_name = unique_table_name("parallel_events"); + let namespace = create_partitioned_table(&props, &table_name).await; + + // --- Bring up one scheduler + N executors in-process (real multi-executor) --- + let state = iceberg_session_state( + SessionConfig::new_with_ballista().with_target_partitions(WRITE_PARTITIONS), + ); + + let scheduler_addr = new_standalone_scheduler_from_state(&state) + .await + .expect("start scheduler"); + let scheduler_url = format!("http://localhost:{}", scheduler_addr.port()); + + let scheduler_client = loop { + match SchedulerGrpcClient::connect(scheduler_url.clone()).await { + Ok(client) => break client, + Err(_) => tokio::time::sleep(Duration::from_millis(100)).await, + } + }; + + // Each executor is a separate service; the scheduler load-balances across + // them via pull-based scheduling. + for _ in 0..N_EXECUTORS { + new_standalone_executor_from_state(scheduler_client.clone(), SLOTS_PER_EXECUTOR, &state) + .await + .expect("start executor"); + } + + let ctx = SessionContext::remote_with_state(&scheduler_url, state) + .await + .expect("connect to scheduler"); + + let catalog_config = IcebergCatalogConfig::new("rest", "rest", props.clone()); + register_iceberg_table( + &ctx, + "target", + catalog_config, + namespace.clone(), + table_name.clone(), + ) + .await + .expect("register iceberg table"); + + // --- Distributed, partitioned INSERT across the multi-executor cluster --- + // A serializable VALUES source whose rows span every region; the partitioned + // write fans them out to one writer per region across the executors, which is + // the path that injects PartitionExpr into the physical plan. (A registered + // MemTable would not work here — it's a custom TableProvider that Ballista + // cannot serialize into the logical plan.) + let values = (0..TOTAL_ROWS) + .map(|i| { + let region = REGIONS[i as usize % REGIONS.len()]; + format!("({}, '{region}')", i + 1) + }) + .collect::>() + .join(", "); + run_sql(&ctx, &format!("INSERT INTO target VALUES {values}")).await; + + // (1) The distributed write committed exactly once (a single atomic + // snapshot), not one commit per task. Checked straight from the catalog so + // it isolates write correctness from the distributed read-back below. + let catalog = build_rest_catalog(&props).await; + let table = catalog + .load_table(&TableIdent::new(namespace.clone(), table_name.clone())) + .await + .expect("load committed table"); + assert_eq!( + table.metadata().snapshots().count(), + 1, + "the distributed write must produce exactly one atomic commit" + ); + + // (2) The parallel writers each produced a data file (one per region), all + // coalesced into that single commit. + let snapshot = table + .metadata() + .current_snapshot() + .expect("current snapshot"); + let mut data_files = 0usize; + let manifest_list = table + .manifest_list_reader(snapshot) + .load() + .await + .expect("load manifest list"); + for entry in manifest_list.entries() { + let manifest = entry + .load_manifest(table.file_io()) + .await + .expect("load manifest"); + data_files += manifest.entries().len(); + } + assert!( + data_files >= 2, + "partitioned write should produce multiple data files, got {data_files}" + ); + + // (3) Every row landed exactly once. + let count = single_i64(&run_sql(&ctx, "SELECT count(*) AS n FROM target").await); + assert_eq!(count as i32, TOTAL_ROWS, "row count after parallel insert"); + + let ids = i32_values(&run_sql(&ctx, "SELECT id FROM target ORDER BY id").await); + assert_eq!( + ids, + (1..=TOTAL_ROWS).collect::>(), + "exact id set after parallel insert (no lost or duplicated rows)" + ); + + // (4) Predicate pushdown survives serialization: a WHERE clause is pushed into + // the distributed scan (and re-applied above it), and the result is correct. + let half = TOTAL_ROWS / 2; + let filtered_ids = i32_values( + &run_sql( + &ctx, + &format!("SELECT id FROM target WHERE id <= {half} ORDER BY id"), + ) + .await, + ); + assert_eq!( + filtered_ids, + (1..=half).collect::>(), + "predicate-filtered distributed read" + ); +} diff --git a/crates/integrations/datafusion/public-api.txt b/crates/integrations/datafusion/public-api.txt index d24bd9fc9e..6aac904627 100644 --- a/crates/integrations/datafusion/public-api.txt +++ b/crates/integrations/datafusion/public-api.txt @@ -2,6 +2,12 @@ pub mod iceberg_datafusion pub mod iceberg_datafusion::metadata_table pub struct iceberg_datafusion::metadata_table::IcebergMetadataTableProvider impl iceberg_datafusion::metadata_table::IcebergMetadataTableProvider +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::catalog_config(&self) -> core::option::Option<&iceberg_datafusion::IcebergCatalogConfig> +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::metadata_type(&self) -> &iceberg::inspect::metadata_table::MetadataTableType +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::new(table: iceberg::table::Table, type: iceberg::inspect::metadata_table::MetadataTableType) -> Self +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::table(&self) -> &iceberg::table::Table +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::with_catalog_config(self, catalog_config: core::option::Option) -> Self +impl iceberg_datafusion::metadata_table::IcebergMetadataTableProvider pub async fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::scan(self) -> datafusion_common::error::Result>> impl core::clone::Clone for iceberg_datafusion::metadata_table::IcebergMetadataTableProvider pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::clone(&self) -> iceberg_datafusion::metadata_table::IcebergMetadataTableProvider @@ -13,13 +19,51 @@ pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::scan<'l pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::schema(&self) -> arrow_schema::schema::SchemaRef pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::table_type(&self) -> datafusion_expr::table_source::TableType pub mod iceberg_datafusion::physical_plan +pub struct iceberg_datafusion::physical_plan::IcebergCommitExec +impl iceberg_datafusion::physical_plan::IcebergCommitExec +pub fn iceberg_datafusion::physical_plan::IcebergCommitExec::catalog_config(&self) -> core::option::Option<&iceberg_datafusion::IcebergCatalogConfig> +pub fn iceberg_datafusion::physical_plan::IcebergCommitExec::new(table: iceberg::table::Table, catalog: alloc::sync::Arc, input: alloc::sync::Arc, schema: arrow_schema::schema::SchemaRef) -> Self +pub fn iceberg_datafusion::physical_plan::IcebergCommitExec::table(&self) -> &iceberg::table::Table +pub fn iceberg_datafusion::physical_plan::IcebergCommitExec::with_catalog_config(self, catalog_config: core::option::Option) -> Self +impl core::fmt::Debug for iceberg_datafusion::physical_plan::IcebergCommitExec +pub fn iceberg_datafusion::physical_plan::IcebergCommitExec::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl datafusion_physical_plan::display::DisplayAs for iceberg_datafusion::physical_plan::IcebergCommitExec +pub fn iceberg_datafusion::physical_plan::IcebergCommitExec::fmt_as(&self, t: datafusion_physical_plan::display::DisplayFormatType, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl datafusion_physical_plan::execution_plan::ExecutionPlan for iceberg_datafusion::physical_plan::IcebergCommitExec +pub fn iceberg_datafusion::physical_plan::IcebergCommitExec::as_any(&self) -> &dyn core::any::Any +pub fn iceberg_datafusion::physical_plan::IcebergCommitExec::benefits_from_input_partitioning(&self) -> alloc::vec::Vec +pub fn iceberg_datafusion::physical_plan::IcebergCommitExec::children(&self) -> alloc::vec::Vec<&alloc::sync::Arc> +pub fn iceberg_datafusion::physical_plan::IcebergCommitExec::execute(&self, partition: usize, context: alloc::sync::Arc) -> datafusion_common::error::Result +pub fn iceberg_datafusion::physical_plan::IcebergCommitExec::name(&self) -> &str +pub fn iceberg_datafusion::physical_plan::IcebergCommitExec::properties(&self) -> &alloc::sync::Arc +pub fn iceberg_datafusion::physical_plan::IcebergCommitExec::required_input_distribution(&self) -> alloc::vec::Vec +pub fn iceberg_datafusion::physical_plan::IcebergCommitExec::with_new_children(self: alloc::sync::Arc, children: alloc::vec::Vec>) -> datafusion_common::error::Result> +pub struct iceberg_datafusion::physical_plan::IcebergMetadataScan +impl iceberg_datafusion::physical_plan::IcebergMetadataScan +pub fn iceberg_datafusion::physical_plan::IcebergMetadataScan::new(provider: iceberg_datafusion::metadata_table::IcebergMetadataTableProvider) -> Self +pub fn iceberg_datafusion::physical_plan::IcebergMetadataScan::provider(&self) -> &iceberg_datafusion::metadata_table::IcebergMetadataTableProvider +impl core::fmt::Debug for iceberg_datafusion::physical_plan::IcebergMetadataScan +pub fn iceberg_datafusion::physical_plan::IcebergMetadataScan::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl datafusion_physical_plan::display::DisplayAs for iceberg_datafusion::physical_plan::IcebergMetadataScan +pub fn iceberg_datafusion::physical_plan::IcebergMetadataScan::fmt_as(&self, _t: datafusion_physical_plan::display::DisplayFormatType, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl datafusion_physical_plan::execution_plan::ExecutionPlan for iceberg_datafusion::physical_plan::IcebergMetadataScan +pub fn iceberg_datafusion::physical_plan::IcebergMetadataScan::as_any(&self) -> &dyn core::any::Any +pub fn iceberg_datafusion::physical_plan::IcebergMetadataScan::children(&self) -> alloc::vec::Vec<&alloc::sync::Arc> +pub fn iceberg_datafusion::physical_plan::IcebergMetadataScan::execute(&self, _partition: usize, _context: alloc::sync::Arc) -> datafusion_common::error::Result +pub fn iceberg_datafusion::physical_plan::IcebergMetadataScan::name(&self) -> &str +pub fn iceberg_datafusion::physical_plan::IcebergMetadataScan::properties(&self) -> &alloc::sync::Arc +pub fn iceberg_datafusion::physical_plan::IcebergMetadataScan::with_new_children(self: alloc::sync::Arc, _children: alloc::vec::Vec>) -> datafusion_common::error::Result> pub struct iceberg_datafusion::physical_plan::IcebergTableScan impl iceberg_datafusion::physical_plan::IcebergTableScan +pub fn iceberg_datafusion::physical_plan::IcebergTableScan::catalog_config(&self) -> core::option::Option<&iceberg_datafusion::IcebergCatalogConfig> pub fn iceberg_datafusion::physical_plan::IcebergTableScan::limit(&self) -> core::option::Option +pub fn iceberg_datafusion::physical_plan::IcebergTableScan::new(table: iceberg::table::Table, snapshot_id: core::option::Option, schema: arrow_schema::schema::SchemaRef, projection: core::option::Option<&alloc::vec::Vec>, filters: &[datafusion_expr::expr::Expr], limit: core::option::Option) -> Self pub fn iceberg_datafusion::physical_plan::IcebergTableScan::predicates(&self) -> core::option::Option<&iceberg::expr::predicate::Predicate> pub fn iceberg_datafusion::physical_plan::IcebergTableScan::projection(&self) -> core::option::Option<&[alloc::string::String]> pub fn iceberg_datafusion::physical_plan::IcebergTableScan::snapshot_id(&self) -> core::option::Option pub fn iceberg_datafusion::physical_plan::IcebergTableScan::table(&self) -> &iceberg::table::Table +pub fn iceberg_datafusion::physical_plan::IcebergTableScan::with_catalog_config(self, catalog_config: core::option::Option) -> Self +pub fn iceberg_datafusion::physical_plan::IcebergTableScan::with_predicates(self, predicates: core::option::Option) -> Self impl core::fmt::Debug for iceberg_datafusion::physical_plan::IcebergTableScan pub fn iceberg_datafusion::physical_plan::IcebergTableScan::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result impl datafusion_physical_plan::display::DisplayAs for iceberg_datafusion::physical_plan::IcebergTableScan @@ -31,12 +75,61 @@ pub fn iceberg_datafusion::physical_plan::IcebergTableScan::execute(&self, _part pub fn iceberg_datafusion::physical_plan::IcebergTableScan::name(&self) -> &str pub fn iceberg_datafusion::physical_plan::IcebergTableScan::properties(&self) -> &alloc::sync::Arc pub fn iceberg_datafusion::physical_plan::IcebergTableScan::with_new_children(self: alloc::sync::Arc, _children: alloc::vec::Vec>) -> datafusion_common::error::Result> +pub struct iceberg_datafusion::physical_plan::IcebergWriteExec +impl iceberg_datafusion::physical_plan::IcebergWriteExec +pub fn iceberg_datafusion::physical_plan::IcebergWriteExec::catalog_config(&self) -> core::option::Option<&iceberg_datafusion::IcebergCatalogConfig> +pub fn iceberg_datafusion::physical_plan::IcebergWriteExec::new(table: iceberg::table::Table, input: alloc::sync::Arc, schema: arrow_schema::schema::SchemaRef) -> Self +pub fn iceberg_datafusion::physical_plan::IcebergWriteExec::table(&self) -> &iceberg::table::Table +pub fn iceberg_datafusion::physical_plan::IcebergWriteExec::with_catalog_config(self, catalog_config: core::option::Option) -> Self +impl core::fmt::Debug for iceberg_datafusion::physical_plan::IcebergWriteExec +pub fn iceberg_datafusion::physical_plan::IcebergWriteExec::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl datafusion_physical_plan::display::DisplayAs for iceberg_datafusion::physical_plan::IcebergWriteExec +pub fn iceberg_datafusion::physical_plan::IcebergWriteExec::fmt_as(&self, t: datafusion_physical_plan::display::DisplayFormatType, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl datafusion_physical_plan::execution_plan::ExecutionPlan for iceberg_datafusion::physical_plan::IcebergWriteExec +pub fn iceberg_datafusion::physical_plan::IcebergWriteExec::as_any(&self) -> &dyn core::any::Any +pub fn iceberg_datafusion::physical_plan::IcebergWriteExec::benefits_from_input_partitioning(&self) -> alloc::vec::Vec +pub fn iceberg_datafusion::physical_plan::IcebergWriteExec::children(&self) -> alloc::vec::Vec<&alloc::sync::Arc> +pub fn iceberg_datafusion::physical_plan::IcebergWriteExec::execute(&self, partition: usize, context: alloc::sync::Arc) -> datafusion_common::error::Result +pub fn iceberg_datafusion::physical_plan::IcebergWriteExec::maintains_input_order(&self) -> alloc::vec::Vec +pub fn iceberg_datafusion::physical_plan::IcebergWriteExec::name(&self) -> &str +pub fn iceberg_datafusion::physical_plan::IcebergWriteExec::properties(&self) -> &alloc::sync::Arc +pub fn iceberg_datafusion::physical_plan::IcebergWriteExec::with_new_children(self: alloc::sync::Arc, children: alloc::vec::Vec>) -> datafusion_common::error::Result> +pub struct iceberg_datafusion::physical_plan::PartitionExpr +impl iceberg_datafusion::physical_plan::PartitionExpr +pub fn iceberg_datafusion::physical_plan::PartitionExpr::partition_spec(&self) -> &alloc::sync::Arc +pub fn iceberg_datafusion::physical_plan::PartitionExpr::table_schema(&self) -> &iceberg::spec::schema::SchemaRef +pub fn iceberg_datafusion::physical_plan::PartitionExpr::try_new(partition_spec: alloc::sync::Arc, table_schema: iceberg::spec::schema::SchemaRef) -> datafusion_common::error::Result +impl core::clone::Clone for iceberg_datafusion::physical_plan::PartitionExpr +pub fn iceberg_datafusion::physical_plan::PartitionExpr::clone(&self) -> iceberg_datafusion::physical_plan::PartitionExpr +impl core::cmp::Eq for iceberg_datafusion::physical_plan::PartitionExpr +impl core::cmp::PartialEq for iceberg_datafusion::physical_plan::PartitionExpr +pub fn iceberg_datafusion::physical_plan::PartitionExpr::eq(&self, other: &Self) -> bool +impl core::fmt::Debug for iceberg_datafusion::physical_plan::PartitionExpr +pub fn iceberg_datafusion::physical_plan::PartitionExpr::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl core::fmt::Display for iceberg_datafusion::physical_plan::PartitionExpr +pub fn iceberg_datafusion::physical_plan::PartitionExpr::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl core::hash::Hash for iceberg_datafusion::physical_plan::PartitionExpr +pub fn iceberg_datafusion::physical_plan::PartitionExpr::hash(&self, state: &mut H) +impl datafusion_physical_expr_common::physical_expr::PhysicalExpr for iceberg_datafusion::physical_plan::PartitionExpr +pub fn iceberg_datafusion::physical_plan::PartitionExpr::as_any(&self) -> &dyn core::any::Any +pub fn iceberg_datafusion::physical_plan::PartitionExpr::children(&self) -> alloc::vec::Vec<&alloc::sync::Arc> +pub fn iceberg_datafusion::physical_plan::PartitionExpr::data_type(&self, _input_schema: &arrow_schema::schema::Schema) -> datafusion_common::error::Result +pub fn iceberg_datafusion::physical_plan::PartitionExpr::evaluate(&self, batch: &arrow_array::record_batch::RecordBatch) -> datafusion_common::error::Result +pub fn iceberg_datafusion::physical_plan::PartitionExpr::fmt_sql(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +pub fn iceberg_datafusion::physical_plan::PartitionExpr::nullable(&self, _input_schema: &arrow_schema::schema::Schema) -> datafusion_common::error::Result +pub fn iceberg_datafusion::physical_plan::PartitionExpr::with_new_children(self: alloc::sync::Arc, _children: alloc::vec::Vec>) -> datafusion_common::error::Result> pub fn iceberg_datafusion::physical_plan::convert_filters_to_predicate(filters: &[datafusion_expr::expr::Expr]) -> core::option::Option pub fn iceberg_datafusion::physical_plan::project_with_partition(input: alloc::sync::Arc, table: &iceberg::table::Table) -> datafusion_common::error::Result> pub mod iceberg_datafusion::table pub mod iceberg_datafusion::table::metadata_table pub struct iceberg_datafusion::table::metadata_table::IcebergMetadataTableProvider impl iceberg_datafusion::metadata_table::IcebergMetadataTableProvider +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::catalog_config(&self) -> core::option::Option<&iceberg_datafusion::IcebergCatalogConfig> +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::metadata_type(&self) -> &iceberg::inspect::metadata_table::MetadataTableType +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::new(table: iceberg::table::Table, type: iceberg::inspect::metadata_table::MetadataTableType) -> Self +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::table(&self) -> &iceberg::table::Table +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::with_catalog_config(self, catalog_config: core::option::Option) -> Self +impl iceberg_datafusion::metadata_table::IcebergMetadataTableProvider pub async fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::scan(self) -> datafusion_common::error::Result>> impl core::clone::Clone for iceberg_datafusion::metadata_table::IcebergMetadataTableProvider pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::clone(&self) -> iceberg_datafusion::metadata_table::IcebergMetadataTableProvider @@ -58,6 +151,24 @@ impl core::fmt::Debug for iceberg_datafusion::table_provider_factory::IcebergTab pub fn iceberg_datafusion::table_provider_factory::IcebergTableProviderFactory::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result impl datafusion_catalog::table::TableProviderFactory for iceberg_datafusion::table_provider_factory::IcebergTableProviderFactory pub fn iceberg_datafusion::table_provider_factory::IcebergTableProviderFactory::create<'life0, 'life1, 'life2, 'async_trait>(&'life0 self, _state: &'life1 dyn datafusion_session::session::Session, cmd: &'life2 datafusion_expr::logical_plan::ddl::CreateExternalTable) -> core::pin::Pin>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait +pub struct iceberg_datafusion::table::IcebergMetadataTableProvider +impl iceberg_datafusion::metadata_table::IcebergMetadataTableProvider +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::catalog_config(&self) -> core::option::Option<&iceberg_datafusion::IcebergCatalogConfig> +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::metadata_type(&self) -> &iceberg::inspect::metadata_table::MetadataTableType +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::new(table: iceberg::table::Table, type: iceberg::inspect::metadata_table::MetadataTableType) -> Self +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::table(&self) -> &iceberg::table::Table +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::with_catalog_config(self, catalog_config: core::option::Option) -> Self +impl iceberg_datafusion::metadata_table::IcebergMetadataTableProvider +pub async fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::scan(self) -> datafusion_common::error::Result>> +impl core::clone::Clone for iceberg_datafusion::metadata_table::IcebergMetadataTableProvider +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::clone(&self) -> iceberg_datafusion::metadata_table::IcebergMetadataTableProvider +impl core::fmt::Debug for iceberg_datafusion::metadata_table::IcebergMetadataTableProvider +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl datafusion_catalog::table::TableProvider for iceberg_datafusion::metadata_table::IcebergMetadataTableProvider +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::as_any(&self) -> &dyn core::any::Any +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::scan<'life0, 'life1, 'life2, 'life3, 'async_trait>(&'life0 self, _state: &'life1 dyn datafusion_session::session::Session, _projection: core::option::Option<&'life2 alloc::vec::Vec>, _filters: &'life3 [datafusion_expr::expr::Expr], _limit: core::option::Option) -> core::pin::Pin>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::schema(&self) -> arrow_schema::schema::SchemaRef +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::table_type(&self) -> datafusion_expr::table_source::TableType pub struct iceberg_datafusion::table::IcebergStaticTableProvider impl iceberg_datafusion::IcebergStaticTableProvider pub async fn iceberg_datafusion::IcebergStaticTableProvider::try_new_from_table(table: iceberg::table::Table) -> iceberg::error::Result @@ -74,6 +185,12 @@ pub fn iceberg_datafusion::IcebergStaticTableProvider::schema(&self) -> arrow_sc pub fn iceberg_datafusion::IcebergStaticTableProvider::supports_filters_pushdown(&self, filters: &[&datafusion_expr::expr::Expr]) -> datafusion_common::error::Result> pub fn iceberg_datafusion::IcebergStaticTableProvider::table_type(&self) -> datafusion_expr::table_source::TableType pub struct iceberg_datafusion::table::IcebergTableProvider +impl iceberg_datafusion::IcebergTableProvider +pub fn iceberg_datafusion::IcebergTableProvider::config(&self) -> core::option::Option<&iceberg_datafusion::IcebergCatalogConfig> +pub fn iceberg_datafusion::IcebergTableProvider::snapshot_id(&self) -> core::option::Option +pub fn iceberg_datafusion::IcebergTableProvider::table_ident(&self) -> &iceberg::catalog::TableIdent +pub async fn iceberg_datafusion::IcebergTableProvider::try_new_with_config(catalog: alloc::sync::Arc, config: iceberg_datafusion::IcebergCatalogConfig, namespace: iceberg::catalog::NamespaceIdent, name: impl core::convert::Into) -> iceberg::error::Result +pub fn iceberg_datafusion::IcebergTableProvider::with_snapshot_id(self, snapshot_id: core::option::Option) -> Self impl core::clone::Clone for iceberg_datafusion::IcebergTableProvider pub fn iceberg_datafusion::IcebergTableProvider::clone(&self) -> iceberg_datafusion::IcebergTableProvider impl core::fmt::Debug for iceberg_datafusion::IcebergTableProvider @@ -96,15 +213,48 @@ impl core::fmt::Debug for iceberg_datafusion::table_provider_factory::IcebergTab pub fn iceberg_datafusion::table_provider_factory::IcebergTableProviderFactory::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result impl datafusion_catalog::table::TableProviderFactory for iceberg_datafusion::table_provider_factory::IcebergTableProviderFactory pub fn iceberg_datafusion::table_provider_factory::IcebergTableProviderFactory::create<'life0, 'life1, 'life2, 'async_trait>(&'life0 self, _state: &'life1 dyn datafusion_session::session::Session, cmd: &'life2 datafusion_expr::logical_plan::ddl::CreateExternalTable) -> core::pin::Pin>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait +pub struct iceberg_datafusion::IcebergCatalogConfig +pub iceberg_datafusion::IcebergCatalogConfig::name: alloc::string::String +pub iceberg_datafusion::IcebergCatalogConfig::props: std::collections::hash::map::HashMap +pub iceberg_datafusion::IcebergCatalogConfig::type: alloc::string::String +impl iceberg_datafusion::IcebergCatalogConfig +pub fn iceberg_datafusion::IcebergCatalogConfig::new(type: impl core::convert::Into, name: impl core::convert::Into, props: std::collections::hash::map::HashMap) -> Self +impl core::clone::Clone for iceberg_datafusion::IcebergCatalogConfig +pub fn iceberg_datafusion::IcebergCatalogConfig::clone(&self) -> iceberg_datafusion::IcebergCatalogConfig +impl core::cmp::Eq for iceberg_datafusion::IcebergCatalogConfig +impl core::cmp::PartialEq for iceberg_datafusion::IcebergCatalogConfig +pub fn iceberg_datafusion::IcebergCatalogConfig::eq(&self, other: &iceberg_datafusion::IcebergCatalogConfig) -> bool +impl core::fmt::Debug for iceberg_datafusion::IcebergCatalogConfig +pub fn iceberg_datafusion::IcebergCatalogConfig::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl core::marker::StructuralPartialEq for iceberg_datafusion::IcebergCatalogConfig pub struct iceberg_datafusion::IcebergCatalogProvider impl iceberg_datafusion::IcebergCatalogProvider pub async fn iceberg_datafusion::IcebergCatalogProvider::try_new(client: alloc::sync::Arc) -> iceberg::error::Result +pub async fn iceberg_datafusion::IcebergCatalogProvider::try_new_with_config(client: alloc::sync::Arc, config: iceberg_datafusion::IcebergCatalogConfig) -> iceberg::error::Result impl core::fmt::Debug for iceberg_datafusion::IcebergCatalogProvider pub fn iceberg_datafusion::IcebergCatalogProvider::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result impl datafusion_catalog::catalog::CatalogProvider for iceberg_datafusion::IcebergCatalogProvider pub fn iceberg_datafusion::IcebergCatalogProvider::as_any(&self) -> &dyn core::any::Any pub fn iceberg_datafusion::IcebergCatalogProvider::schema(&self, name: &str) -> core::option::Option> pub fn iceberg_datafusion::IcebergCatalogProvider::schema_names(&self) -> alloc::vec::Vec +pub struct iceberg_datafusion::IcebergMetadataTableProvider +impl iceberg_datafusion::metadata_table::IcebergMetadataTableProvider +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::catalog_config(&self) -> core::option::Option<&iceberg_datafusion::IcebergCatalogConfig> +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::metadata_type(&self) -> &iceberg::inspect::metadata_table::MetadataTableType +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::new(table: iceberg::table::Table, type: iceberg::inspect::metadata_table::MetadataTableType) -> Self +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::table(&self) -> &iceberg::table::Table +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::with_catalog_config(self, catalog_config: core::option::Option) -> Self +impl iceberg_datafusion::metadata_table::IcebergMetadataTableProvider +pub async fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::scan(self) -> datafusion_common::error::Result>> +impl core::clone::Clone for iceberg_datafusion::metadata_table::IcebergMetadataTableProvider +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::clone(&self) -> iceberg_datafusion::metadata_table::IcebergMetadataTableProvider +impl core::fmt::Debug for iceberg_datafusion::metadata_table::IcebergMetadataTableProvider +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result +impl datafusion_catalog::table::TableProvider for iceberg_datafusion::metadata_table::IcebergMetadataTableProvider +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::as_any(&self) -> &dyn core::any::Any +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::scan<'life0, 'life1, 'life2, 'life3, 'async_trait>(&'life0 self, _state: &'life1 dyn datafusion_session::session::Session, _projection: core::option::Option<&'life2 alloc::vec::Vec>, _filters: &'life3 [datafusion_expr::expr::Expr], _limit: core::option::Option) -> core::pin::Pin>> + core::marker::Send + 'async_trait)>> where Self: 'async_trait, 'life0: 'async_trait, 'life1: 'async_trait, 'life2: 'async_trait, 'life3: 'async_trait +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::schema(&self) -> arrow_schema::schema::SchemaRef +pub fn iceberg_datafusion::metadata_table::IcebergMetadataTableProvider::table_type(&self) -> datafusion_expr::table_source::TableType pub struct iceberg_datafusion::IcebergStaticTableProvider impl iceberg_datafusion::IcebergStaticTableProvider pub async fn iceberg_datafusion::IcebergStaticTableProvider::try_new_from_table(table: iceberg::table::Table) -> iceberg::error::Result @@ -121,6 +271,12 @@ pub fn iceberg_datafusion::IcebergStaticTableProvider::schema(&self) -> arrow_sc pub fn iceberg_datafusion::IcebergStaticTableProvider::supports_filters_pushdown(&self, filters: &[&datafusion_expr::expr::Expr]) -> datafusion_common::error::Result> pub fn iceberg_datafusion::IcebergStaticTableProvider::table_type(&self) -> datafusion_expr::table_source::TableType pub struct iceberg_datafusion::IcebergTableProvider +impl iceberg_datafusion::IcebergTableProvider +pub fn iceberg_datafusion::IcebergTableProvider::config(&self) -> core::option::Option<&iceberg_datafusion::IcebergCatalogConfig> +pub fn iceberg_datafusion::IcebergTableProvider::snapshot_id(&self) -> core::option::Option +pub fn iceberg_datafusion::IcebergTableProvider::table_ident(&self) -> &iceberg::catalog::TableIdent +pub async fn iceberg_datafusion::IcebergTableProvider::try_new_with_config(catalog: alloc::sync::Arc, config: iceberg_datafusion::IcebergCatalogConfig, namespace: iceberg::catalog::NamespaceIdent, name: impl core::convert::Into) -> iceberg::error::Result +pub fn iceberg_datafusion::IcebergTableProvider::with_snapshot_id(self, snapshot_id: core::option::Option) -> Self impl core::clone::Clone for iceberg_datafusion::IcebergTableProvider pub fn iceberg_datafusion::IcebergTableProvider::clone(&self) -> iceberg_datafusion::IcebergTableProvider impl core::fmt::Debug for iceberg_datafusion::IcebergTableProvider diff --git a/crates/integrations/datafusion/src/catalog.rs b/crates/integrations/datafusion/src/catalog.rs index c3cbcc88b4..31b4ae7175 100644 --- a/crates/integrations/datafusion/src/catalog.rs +++ b/crates/integrations/datafusion/src/catalog.rs @@ -23,6 +23,7 @@ use datafusion::catalog::{CatalogProvider, SchemaProvider}; use futures::future::try_join_all; use iceberg::{Catalog, NamespaceIdent, Result}; +use crate::IcebergCatalogConfig; use crate::schema::IcebergSchemaProvider; /// Provides an interface to manage and access multiple schemas @@ -47,6 +48,24 @@ impl IcebergCatalogProvider { /// attempts to create a schema provider for each namespace, and /// collects these providers into a `HashMap`. pub async fn try_new(client: Arc) -> Result { + Self::try_new_impl(client, None).await + } + + /// Like [`try_new`](Self::try_new), but threads a serializable + /// [`IcebergCatalogConfig`] into every schema and table provider it creates, + /// so the catalog's tables can be queried by a distributed engine such as + /// Ballista. The `client` must already be built from the same `config`. + pub async fn try_new_with_config( + client: Arc, + config: IcebergCatalogConfig, + ) -> Result { + Self::try_new_impl(client, Some(config)).await + } + + async fn try_new_impl( + client: Arc, + config: Option, + ) -> Result { // TODO: // Schemas and providers should be cached and evicted based on time // As of right now; schemas might become stale. @@ -63,6 +82,7 @@ impl IcebergCatalogProvider { .map(|name| { IcebergSchemaProvider::try_new( client.clone(), + config.clone(), NamespaceIdent::new(name.clone()), ) }) diff --git a/crates/integrations/datafusion/src/catalog_config.rs b/crates/integrations/datafusion/src/catalog_config.rs new file mode 100644 index 0000000000..b62e500ffe --- /dev/null +++ b/crates/integrations/datafusion/src/catalog_config.rs @@ -0,0 +1,54 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +use std::collections::HashMap; + +/// A serializable description of the catalog (and storage) that backs an +/// [`IcebergTableProvider`](crate::table::IcebergTableProvider). +/// +/// This is the minimal, self-contained handle needed to *reconstruct* a catalog +/// and its associated `FileIO` on a remote node. It deliberately holds only +/// plain data (no live connections) so that distributed query engines such as +/// Ballista can serialize it, ship it to executors, and rebuild the catalog +/// there via a catalog loader (e.g. `iceberg-catalog-loader`) and the storage +/// via `FileIOBuilder::with_props`. +/// +/// The `props` map carries both the catalog connection properties (e.g. the +/// REST catalog URI) and the storage/`FileIO` properties (e.g. S3 endpoint and +/// credentials); in practice these live together in a single map. +#[derive(Debug, Clone, PartialEq, Eq)] +pub struct IcebergCatalogConfig { + /// The catalog type, e.g. `"rest"`, `"sql"`, `"glue"`. + pub r#type: String, + pub name: String, + /// Catalog connection and storage properties. + pub props: HashMap, +} + +impl IcebergCatalogConfig { + pub fn new( + r#type: impl Into, + name: impl Into, + props: HashMap, + ) -> Self { + Self { + r#type: r#type.into(), + name: name.into(), + props, + } + } +} diff --git a/crates/integrations/datafusion/src/lib.rs b/crates/integrations/datafusion/src/lib.rs index 4b0ea8606d..19475aeda2 100644 --- a/crates/integrations/datafusion/src/lib.rs +++ b/crates/integrations/datafusion/src/lib.rs @@ -18,6 +18,9 @@ mod catalog; pub use catalog::*; +mod catalog_config; +pub use catalog_config::*; + mod error; pub use error::*; diff --git a/crates/integrations/datafusion/src/physical_plan/commit.rs b/crates/integrations/datafusion/src/physical_plan/commit.rs index 835c804908..d3a43c4006 100644 --- a/crates/integrations/datafusion/src/physical_plan/commit.rs +++ b/crates/integrations/datafusion/src/physical_plan/commit.rs @@ -41,13 +41,17 @@ use crate::to_datafusion_error; /// IcebergCommitExec is responsible for collecting the files written and use /// [`Transaction::fast_append`] to commit the data files written. #[derive(Debug)] -pub(crate) struct IcebergCommitExec { +pub struct IcebergCommitExec { table: Table, catalog: Arc, input: Arc, schema: ArrowSchemaRef, count_schema: ArrowSchemaRef, plan_properties: Arc, + /// Optional serializable catalog/storage config, populated when this node is + /// built through a config-backed provider so it can be reconstructed on a + /// remote node by a distributed engine. + catalog_config: Option, } impl IcebergCommitExec { @@ -68,9 +72,31 @@ impl IcebergCommitExec { schema, count_schema, plan_properties, + catalog_config: None, } } + /// Attaches a serializable catalog/storage config to this node so that a + /// distributed engine can reconstruct it (including the catalog) on a remote + /// node. + pub fn with_catalog_config( + mut self, + catalog_config: Option, + ) -> Self { + self.catalog_config = catalog_config; + self + } + + /// Returns the serializable catalog/storage config, if any. + pub fn catalog_config(&self) -> Option<&crate::IcebergCatalogConfig> { + self.catalog_config.as_ref() + } + + /// Returns the table this node commits to. + pub fn table(&self) -> &Table { + &self.table + } + // Compute the plan properties for this execution plan fn compute_properties(schema: ArrowSchemaRef) -> Arc { Arc::new(PlanProperties::new( @@ -160,12 +186,15 @@ impl ExecutionPlan for IcebergCommitExec { ))); } - Ok(Arc::new(IcebergCommitExec::new( - self.table.clone(), - self.catalog.clone(), - children[0].clone(), - self.schema.clone(), - ))) + Ok(Arc::new( + IcebergCommitExec::new( + self.table.clone(), + self.catalog.clone(), + children[0].clone(), + self.schema.clone(), + ) + .with_catalog_config(self.catalog_config.clone()), + )) } fn execute( @@ -592,6 +621,7 @@ mod tests { let iceberg_table_provider = IcebergTableProvider::try_new( catalog.clone(), + None, namespace.clone(), table_name.to_string(), ) diff --git a/crates/integrations/datafusion/src/physical_plan/metadata_scan.rs b/crates/integrations/datafusion/src/physical_plan/metadata_scan.rs index a1a65dec1f..d25a897cc6 100644 --- a/crates/integrations/datafusion/src/physical_plan/metadata_scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/metadata_scan.rs @@ -45,6 +45,13 @@ impl IcebergMetadataScan { properties, } } + + /// Returns the metadata-table provider this node scans, so a distributed + /// engine can serialize the catalog config + table identifier + metadata type + /// it carries and rebuild it on a remote node. + pub fn provider(&self) -> &IcebergMetadataTableProvider { + &self.provider + } } impl DisplayAs for IcebergMetadataScan { diff --git a/crates/integrations/datafusion/src/physical_plan/mod.rs b/crates/integrations/datafusion/src/physical_plan/mod.rs index aeac30de32..024266a4e9 100644 --- a/crates/integrations/datafusion/src/physical_plan/mod.rs +++ b/crates/integrations/datafusion/src/physical_plan/mod.rs @@ -26,6 +26,9 @@ pub(crate) mod write; pub(crate) const DATA_FILES_COL_NAME: &str = "data_files"; +pub use commit::IcebergCommitExec; pub use expr_to_predicate::convert_filters_to_predicate; -pub use project::project_with_partition; +pub use metadata_scan::IcebergMetadataScan; +pub use project::{PartitionExpr, project_with_partition}; pub use scan::IcebergTableScan; +pub use write::IcebergWriteExec; diff --git a/crates/integrations/datafusion/src/physical_plan/project.rs b/crates/integrations/datafusion/src/physical_plan/project.rs index 670d961f91..00c3c848d6 100644 --- a/crates/integrations/datafusion/src/physical_plan/project.rs +++ b/crates/integrations/datafusion/src/physical_plan/project.rs @@ -30,7 +30,7 @@ use iceberg::arrow::{ PROJECTED_PARTITION_VALUE_COLUMN, PartitionValueCalculator, schema_to_arrow_schema, strip_metadata_from_schema, }; -use iceberg::spec::PartitionSpec; +use iceberg::spec::{PartitionSpec, SchemaRef}; use iceberg::table::Table; use crate::to_datafusion_error; @@ -79,10 +79,6 @@ pub fn project_with_partition( ))); } - let calculator = - PartitionValueCalculator::try_new(partition_spec.as_ref(), table_schema.as_ref()) - .map_err(to_datafusion_error)?; - let mut projection_exprs: Vec<(Arc, String)> = Vec::with_capacity(input_schema.fields().len() + 1); @@ -91,26 +87,51 @@ pub fn project_with_partition( projection_exprs.push((column_expr, field.name().clone())); } - let partition_expr = Arc::new(PartitionExpr::new(calculator, partition_spec.clone())); + let partition_expr = Arc::new(PartitionExpr::try_new( + partition_spec.clone(), + table_schema.clone(), + )?); projection_exprs.push((partition_expr, PROJECTED_PARTITION_VALUE_COLUMN.to_string())); let projection = ProjectionExec::try_new(projection_exprs, input)?; Ok(Arc::new(projection)) } -/// PhysicalExpr implementation for partition value calculation +/// `PhysicalExpr` that computes Iceberg partition values for each input row. +/// +/// Alongside the live (non-serializable) [`PartitionValueCalculator`], it retains +/// the [`PartitionSpec`] and table schema it was built from. A distributed engine +/// can serialize those two — both are self-contained iceberg spec types — and +/// rebuild an equivalent expression on a remote node via [`PartitionExpr::try_new`]. #[derive(Debug, Clone)] -struct PartitionExpr { +pub struct PartitionExpr { calculator: Arc, partition_spec: Arc, + table_schema: SchemaRef, } impl PartitionExpr { - fn new(calculator: PartitionValueCalculator, partition_spec: Arc) -> Self { - Self { + /// Builds a partition expression from a partition spec and the table schema + /// it is bound to, constructing the underlying [`PartitionValueCalculator`]. + pub fn try_new(partition_spec: Arc, table_schema: SchemaRef) -> DFResult { + let calculator = + PartitionValueCalculator::try_new(partition_spec.as_ref(), table_schema.as_ref()) + .map_err(to_datafusion_error)?; + Ok(Self { calculator: Arc::new(calculator), partition_spec, - } + table_schema, + }) + } + + /// The partition spec whose values this expression computes. + pub fn partition_spec(&self) -> &Arc { + &self.partition_spec + } + + /// The table schema the partition values are derived from. + pub fn table_schema(&self) -> &SchemaRef { + &self.table_schema } } @@ -248,8 +269,6 @@ mod tests { let input = Arc::new(EmptyExec::new(arrow_schema.clone())); - let calculator = PartitionValueCalculator::try_new(&partition_spec, &table_schema).unwrap(); - let mut projection_exprs: Vec<(Arc, String)> = Vec::with_capacity(arrow_schema.fields().len() + 1); for (i, field) in arrow_schema.fields().iter().enumerate() { @@ -257,7 +276,9 @@ mod tests { projection_exprs.push((column_expr, field.name().clone())); } - let partition_expr = Arc::new(PartitionExpr::new(calculator, partition_spec)); + let partition_expr = Arc::new( + PartitionExpr::try_new(partition_spec, Arc::new(table_schema.clone())).unwrap(), + ); projection_exprs.push((partition_expr, PROJECTED_PARTITION_VALUE_COLUMN.to_string())); let projection = ProjectionExec::try_new(projection_exprs, input).unwrap(); @@ -302,7 +323,7 @@ mod tests { let partition_spec = Arc::new(partition_spec); let calculator = PartitionValueCalculator::try_new(&partition_spec, &table_schema).unwrap(); let partition_type = calculator.partition_arrow_type().clone(); - let expr = PartitionExpr::new(calculator, partition_spec); + let expr = PartitionExpr::try_new(partition_spec, Arc::new(table_schema.clone())).unwrap(); assert_eq!(expr.data_type(&arrow_schema).unwrap(), partition_type); assert!(!expr.nullable(&arrow_schema).unwrap()); diff --git a/crates/integrations/datafusion/src/physical_plan/scan.rs b/crates/integrations/datafusion/src/physical_plan/scan.rs index 36539ae503..098af883e8 100644 --- a/crates/integrations/datafusion/src/physical_plan/scan.rs +++ b/crates/integrations/datafusion/src/physical_plan/scan.rs @@ -53,11 +53,15 @@ pub struct IcebergTableScan { predicates: Option, /// Optional limit on the number of rows to return limit: Option, + /// Optional serializable catalog/storage config, populated when this scan is + /// built through a config-backed provider so it can be reconstructed on a + /// remote node by a distributed engine. + catalog_config: Option, } impl IcebergTableScan { /// Creates a new [`IcebergTableScan`] object. - pub(crate) fn new( + pub fn new( table: Table, snapshot_id: Option, schema: ArrowSchemaRef, @@ -80,9 +84,36 @@ impl IcebergTableScan { projection, predicates, limit, + catalog_config: None, } } + /// Attaches a serializable catalog/storage config to this scan so that a + /// distributed engine can reconstruct it on a remote node. + pub fn with_catalog_config( + mut self, + catalog_config: Option, + ) -> Self { + self.catalog_config = catalog_config; + self + } + + /// Returns the serializable catalog/storage config, if any. + pub fn catalog_config(&self) -> Option<&crate::IcebergCatalogConfig> { + self.catalog_config.as_ref() + } + + /// Replaces the scan's pushed-down filter predicate. + /// + /// `IcebergTableScan::new` derives the predicate from DataFusion `Expr` + /// filters; this setter lets a distributed engine restore an already-built + /// [`Predicate`] directly (e.g. after deserializing it), so file pruning is + /// preserved on remote nodes. + pub fn with_predicates(mut self, predicates: Option) -> Self { + self.predicates = predicates; + self + } + pub fn table(&self) -> &Table { &self.table } diff --git a/crates/integrations/datafusion/src/physical_plan/write.rs b/crates/integrations/datafusion/src/physical_plan/write.rs index 697eeec659..b9aa80d493 100644 --- a/crates/integrations/datafusion/src/physical_plan/write.rs +++ b/crates/integrations/datafusion/src/physical_plan/write.rs @@ -60,11 +60,15 @@ use crate::to_datafusion_error; /// The output of this execution plan is a record batch containing a single column with serialized /// data file information that can be used for committing the write operation to the table. #[derive(Debug)] -pub(crate) struct IcebergWriteExec { +pub struct IcebergWriteExec { table: Table, input: Arc, result_schema: ArrowSchemaRef, plan_properties: Arc, + /// Optional serializable catalog/storage config, populated when this node is + /// built through a config-backed provider so it can be reconstructed on a + /// remote node by a distributed engine. + catalog_config: Option, } impl IcebergWriteExec { @@ -76,9 +80,30 @@ impl IcebergWriteExec { input, result_schema: Self::make_result_schema(), plan_properties, + catalog_config: None, } } + /// Attaches a serializable catalog/storage config to this node so that a + /// distributed engine can reconstruct it on a remote node. + pub fn with_catalog_config( + mut self, + catalog_config: Option, + ) -> Self { + self.catalog_config = catalog_config; + self + } + + /// Returns the serializable catalog/storage config, if any. + pub fn catalog_config(&self) -> Option<&crate::IcebergCatalogConfig> { + self.catalog_config.as_ref() + } + + /// Returns the table this node writes to. + pub fn table(&self) -> &Table { + &self.table + } + fn compute_properties( input: &Arc, schema: ArrowSchemaRef, @@ -172,11 +197,10 @@ impl ExecutionPlan for IcebergWriteExec { ))); } - Ok(Arc::new(Self::new( - self.table.clone(), - Arc::clone(&children[0]), - self.schema(), - ))) + Ok(Arc::new( + Self::new(self.table.clone(), Arc::clone(&children[0]), self.schema()) + .with_catalog_config(self.catalog_config.clone()), + )) } /// Executes the write operation for the given partition. diff --git a/crates/integrations/datafusion/src/schema.rs b/crates/integrations/datafusion/src/schema.rs index 508aeb303b..3bf52f0542 100644 --- a/crates/integrations/datafusion/src/schema.rs +++ b/crates/integrations/datafusion/src/schema.rs @@ -32,7 +32,7 @@ use iceberg::inspect::MetadataTableType; use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableCreation, TableIdent}; use crate::table::IcebergTableProvider; -use crate::to_datafusion_error; +use crate::{IcebergCatalogConfig, to_datafusion_error}; /// Represents a [`SchemaProvider`] for the Iceberg [`Catalog`], managing /// access to table providers within a specific namespace. @@ -42,6 +42,10 @@ pub(crate) struct IcebergSchemaProvider { catalog: Arc, /// The namespace this schema represents namespace: NamespaceIdent, + /// Optional serializable catalog/storage config. When present, every table + /// provider this schema creates carries it, so catalog-registered tables can + /// be queried by a distributed engine. + config: Option, /// A concurrent map where keys are table names /// and values are dynamic references to objects implementing the /// [`TableProvider`] trait. @@ -57,8 +61,12 @@ impl IcebergSchemaProvider { /// This method retrieves a list of table names /// attempts to create a table provider for each table name, and /// collects these providers into a `HashMap`. + /// + /// When `config` is present it is threaded into every table provider this + /// schema creates, so the tables can be queried by a distributed engine. pub(crate) async fn try_new( client: Arc, + config: Option, namespace: NamespaceIdent, ) -> Result { // TODO: @@ -75,7 +83,14 @@ impl IcebergSchemaProvider { let providers = try_join_all( table_names .iter() - .map(|name| IcebergTableProvider::try_new(client.clone(), namespace.clone(), name)) + .map(|name| { + IcebergTableProvider::try_new( + client.clone(), + config.clone(), + namespace.clone(), + name.clone(), + ) + }) .collect::>(), ) .await?; @@ -88,6 +103,7 @@ impl IcebergSchemaProvider { Ok(IcebergSchemaProvider { catalog: client, namespace, + config, tables, }) } @@ -171,6 +187,7 @@ impl SchemaProvider for IcebergSchemaProvider { let catalog = self.catalog.clone(); let namespace = self.namespace.clone(); + let config = self.config.clone(); let tables = self.tables.clone(); let name_clone = name.clone(); @@ -189,9 +206,11 @@ impl SchemaProvider for IcebergSchemaProvider { .await .map_err(to_datafusion_error)?; - // Create a new table provider using the catalog reference + // Create a new table provider using the catalog reference, + // carrying the config so it stays distributable. let table_provider = IcebergTableProvider::try_new( catalog.clone(), + config.clone(), namespace.clone(), name_clone.clone(), ) @@ -315,13 +334,93 @@ mod tests { .await .unwrap(); - let provider = IcebergSchemaProvider::try_new(Arc::new(catalog), namespace) + let provider = IcebergSchemaProvider::try_new(Arc::new(catalog), None, namespace) .await .unwrap(); (provider, temp_dir) } + #[tokio::test] + async fn test_schema_provider_with_config_propagates_to_tables() { + use iceberg::TableCreation; + use iceberg::spec::{NestedField, PrimitiveType, Schema, Type}; + + let temp_dir = TempDir::new().unwrap(); + let warehouse_path = temp_dir.path().to_str().unwrap().to_string(); + let catalog = Arc::new( + MemoryCatalogBuilder::default() + .load( + "memory", + HashMap::from([(MEMORY_CATALOG_WAREHOUSE.to_string(), warehouse_path.clone())]), + ) + .await + .unwrap(), + ); + + let namespace = NamespaceIdent::new("test_ns".to_string()); + catalog + .create_namespace(&namespace, HashMap::new()) + .await + .unwrap(); + + let schema = Schema::builder() + .with_schema_id(0) + .with_fields(vec![ + NestedField::required(1, "id", Type::Primitive(PrimitiveType::Int)).into(), + ]) + .build() + .unwrap(); + catalog + .create_table( + &namespace, + TableCreation::builder() + .name("t".to_string()) + .location(format!("{warehouse_path}/t")) + .schema(schema) + .properties(HashMap::new()) + .build(), + ) + .await + .unwrap(); + + // With config: the table provider carries it (and is therefore distributable). + let config = crate::IcebergCatalogConfig::new("memory", "memory", HashMap::new()); + let with_config = + IcebergSchemaProvider::try_new(catalog.clone(), Some(config), namespace.clone()) + .await + .unwrap(); + let provider = with_config + .table("t") + .await + .unwrap() + .expect("table provider"); + let iceberg = provider + .as_any() + .downcast_ref::() + .expect("IcebergTableProvider"); + assert!( + iceberg.config().is_some(), + "try_new_with_config should propagate the config to its tables" + ); + + // Without config: providers stay config-less (legacy behavior). + let without_config = + IcebergSchemaProvider::try_new(catalog.clone(), None, namespace.clone()) + .await + .unwrap(); + let provider = without_config + .table("t") + .await + .unwrap() + .expect("table provider"); + let iceberg = provider + .as_any() + .downcast_ref::() + .expect("IcebergTableProvider"); + assert!(iceberg.config().is_none()); + } + #[tokio::test] async fn test_register_table_with_data_fails() { let (schema_provider, _temp_dir) = create_test_schema_provider().await; diff --git a/crates/integrations/datafusion/src/table/metadata_table.rs b/crates/integrations/datafusion/src/table/metadata_table.rs index 38148b4084..ab3c1befa7 100644 --- a/crates/integrations/datafusion/src/table/metadata_table.rs +++ b/crates/integrations/datafusion/src/table/metadata_table.rs @@ -41,6 +41,47 @@ use crate::to_datafusion_error; pub struct IcebergMetadataTableProvider { pub(crate) table: Table, pub(crate) r#type: MetadataTableType, + /// Optional serializable catalog/storage config, populated when this provider + /// is built through a config-backed table provider so that a distributed + /// engine can reconstruct it (reload the table from the catalog) on a remote + /// node. + catalog_config: Option, +} + +impl IcebergMetadataTableProvider { + /// Creates a metadata-table provider over an already-loaded table. + pub fn new(table: Table, r#type: MetadataTableType) -> Self { + Self { + table, + r#type, + catalog_config: None, + } + } + + /// Attaches a serializable catalog/storage config so that a distributed engine + /// can reconstruct this provider on a remote node. + pub fn with_catalog_config( + mut self, + catalog_config: Option, + ) -> Self { + self.catalog_config = catalog_config; + self + } + + /// Returns the serializable catalog/storage config, if any. + pub fn catalog_config(&self) -> Option<&crate::IcebergCatalogConfig> { + self.catalog_config.as_ref() + } + + /// Returns the table this provider inspects. + pub fn table(&self) -> &Table { + &self.table + } + + /// Returns which metadata table this provider serves. + pub fn metadata_type(&self) -> &MetadataTableType { + &self.r#type + } } #[async_trait] diff --git a/crates/integrations/datafusion/src/table/mod.rs b/crates/integrations/datafusion/src/table/mod.rs index 75b7988d8d..aa8a2a0872 100644 --- a/crates/integrations/datafusion/src/table/mod.rs +++ b/crates/integrations/datafusion/src/table/mod.rs @@ -47,7 +47,7 @@ use iceberg::inspect::MetadataTableType; use iceberg::spec::TableProperties; use iceberg::table::Table; use iceberg::{Catalog, Error, ErrorKind, NamespaceIdent, Result, TableIdent}; -use metadata_table::IcebergMetadataTableProvider; +pub use metadata_table::IcebergMetadataTableProvider; use crate::error::to_datafusion_error; use crate::physical_plan::commit::IcebergCommitExec; @@ -73,6 +73,15 @@ pub struct IcebergTableProvider { table_ident: TableIdent, /// A reference-counted arrow `Schema` (cached at construction) schema: ArrowSchemaRef, + /// Optional serializable catalog/storage config. When present, it is + /// threaded into the execution plan nodes produced by `scan`/`insert_into` + /// so that a distributed engine can reconstruct them (and their catalog and + /// storage) on remote nodes. + config: Option, + /// Optional snapshot to read. `None` reads the current snapshot (refreshed + /// from the catalog on each scan); `Some` pins reads to that snapshot for + /// time-travel. Writes always target the current table state. + snapshot_id: Option, } impl IcebergTableProvider { @@ -82,6 +91,7 @@ impl IcebergTableProvider { /// reference for future metadata refreshes on each operation. pub(crate) async fn try_new( catalog: Arc, + config: Option, namespace: NamespaceIdent, name: impl Into, ) -> Result { @@ -95,16 +105,59 @@ impl IcebergTableProvider { catalog, table_ident, schema, + config, + snapshot_id: None, }) } + /// Creates a catalog-backed table provider that carries a serializable + /// [`IcebergCatalogConfig`](crate::IcebergCatalogConfig). + /// + /// The `catalog` must already be built from the same `config`. The config is + /// threaded into the execution plan nodes this provider produces so that a + /// distributed engine (e.g. Ballista) can serialize those nodes and rebuild + /// the catalog/storage on remote executors. + pub async fn try_new_with_config( + catalog: Arc, + config: crate::IcebergCatalogConfig, + namespace: NamespaceIdent, + name: impl Into, + ) -> Result { + Self::try_new(catalog, Some(config), namespace, name).await + } + + /// Pins reads to a specific snapshot for time-travel. `None` (the default) + /// reads the current snapshot. The snapshot id is threaded into the scan + /// node, so it is serialized and honored by a distributed engine as well. + pub fn with_snapshot_id(mut self, snapshot_id: Option) -> Self { + self.snapshot_id = snapshot_id; + self + } + + /// Returns the snapshot this provider reads, if pinned for time-travel. + pub fn snapshot_id(&self) -> Option { + self.snapshot_id + } + + /// Returns the serializable catalog/storage config, if this provider was + /// created with one. + pub fn config(&self) -> Option<&crate::IcebergCatalogConfig> { + self.config.as_ref() + } + + /// Returns the identifier of the table this provider serves. + pub fn table_ident(&self) -> &TableIdent { + &self.table_ident + } + pub(crate) async fn metadata_table( &self, r#type: MetadataTableType, ) -> Result { // Load fresh table metadata for metadata table access let table = self.catalog.load_table(&self.table_ident).await?; - Ok(IcebergMetadataTableProvider { table, r#type }) + Ok(IcebergMetadataTableProvider::new(table, r#type) + .with_catalog_config(self.config.clone())) } } @@ -136,15 +189,18 @@ impl TableProvider for IcebergTableProvider { .await .map_err(to_datafusion_error)?; - // Create scan with fresh metadata (always use current snapshot) - Ok(Arc::new(IcebergTableScan::new( - table, - None, // Always use current snapshot for catalog-backed provider - self.schema.clone(), - projection, - filters, - limit, - ))) + // Create scan with fresh metadata, honoring a pinned snapshot if set. + Ok(Arc::new( + IcebergTableScan::new( + table, + self.snapshot_id, + self.schema.clone(), + projection, + filters, + limit, + ) + .with_catalog_config(self.config.clone()), + )) } fn supports_filters_pushdown( @@ -217,21 +273,23 @@ impl TableProvider for IcebergTableProvider { sort_by_partition(repartitioned_plan)? }; - let write_plan = Arc::new(IcebergWriteExec::new( - table.clone(), - write_input, - self.schema.clone(), - )); + let write_plan = Arc::new( + IcebergWriteExec::new(table.clone(), write_input, self.schema.clone()) + .with_catalog_config(self.config.clone()), + ); // Merge the outputs of write_plan into one so we can commit all files together let coalesce_partitions = Arc::new(CoalescePartitionsExec::new(write_plan)); - Ok(Arc::new(IcebergCommitExec::new( - table, - self.catalog.clone(), - coalesce_partitions, - self.schema.clone(), - ))) + Ok(Arc::new( + IcebergCommitExec::new( + table, + self.catalog.clone(), + coalesce_partitions, + self.schema.clone(), + ) + .with_catalog_config(self.config.clone()), + )) } } @@ -526,10 +584,14 @@ mod tests { let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await; // Test creating a catalog-backed provider - let provider = - IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone()) - .await - .unwrap(); + let provider = IcebergTableProvider::try_new( + catalog.clone(), + None, + namespace.clone(), + table_name.clone(), + ) + .await + .unwrap(); // Verify the schema is loaded correctly let schema = provider.schema(); @@ -542,10 +604,14 @@ mod tests { async fn test_catalog_backed_provider_scan() { let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await; - let provider = - IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone()) - .await - .unwrap(); + let provider = IcebergTableProvider::try_new( + catalog.clone(), + None, + namespace.clone(), + table_name.clone(), + ) + .await + .unwrap(); let ctx = SessionContext::new(); ctx.register_table("test_table", Arc::new(provider)) @@ -568,10 +634,14 @@ mod tests { async fn test_catalog_backed_provider_insert() { let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await; - let provider = - IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone()) - .await - .unwrap(); + let provider = IcebergTableProvider::try_new( + catalog.clone(), + None, + namespace.clone(), + table_name.clone(), + ) + .await + .unwrap(); let ctx = SessionContext::new(); ctx.register_table("test_table", Arc::new(provider)) @@ -595,10 +665,14 @@ mod tests { async fn test_physical_input_schema_consistent_with_logical_input_schema() { let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await; - let provider = - IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone()) - .await - .unwrap(); + let provider = IcebergTableProvider::try_new( + catalog.clone(), + None, + namespace.clone(), + table_name.clone(), + ) + .await + .unwrap(); let ctx = SessionContext::new(); ctx.register_table("test_table", Arc::new(provider)) @@ -720,10 +794,14 @@ mod tests { let (catalog, namespace, table_name, _temp_dir) = get_partitioned_test_catalog_and_table(Some(true)).await; - let provider = - IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone()) - .await - .unwrap(); + let provider = IcebergTableProvider::try_new( + catalog.clone(), + None, + namespace.clone(), + table_name.clone(), + ) + .await + .unwrap(); let ctx = SessionContext::new(); let input_schema = provider.schema(); @@ -752,10 +830,14 @@ mod tests { let (catalog, namespace, table_name, _temp_dir) = get_partitioned_test_catalog_and_table(Some(false)).await; - let provider = - IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone()) - .await - .unwrap(); + let provider = IcebergTableProvider::try_new( + catalog.clone(), + None, + namespace.clone(), + table_name.clone(), + ) + .await + .unwrap(); let ctx = SessionContext::new(); let input_schema = provider.schema(); @@ -812,10 +894,14 @@ mod tests { let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await; - let provider = - IcebergTableProvider::try_new(catalog.clone(), namespace.clone(), table_name.clone()) - .await - .unwrap(); + let provider = IcebergTableProvider::try_new( + catalog.clone(), + None, + namespace.clone(), + table_name.clone(), + ) + .await + .unwrap(); let ctx = SessionContext::new(); let state = ctx.state(); @@ -865,4 +951,40 @@ mod tests { "Limit should be None when not specified" ); } + + #[tokio::test] + async fn test_with_snapshot_id_pins_scan() { + use datafusion::datasource::TableProvider; + + let (catalog, namespace, table_name, _temp_dir) = get_test_catalog_and_table().await; + + // Default provider reads the current snapshot (None in the scan). + let provider = IcebergTableProvider::try_new( + catalog.clone(), + None, + namespace.clone(), + table_name.clone(), + ) + .await + .unwrap(); + assert_eq!(provider.snapshot_id(), None); + + // Pinning a snapshot threads it into the scan node, where the codec reads + // it — so time-travel is honored locally and distributed. + let pinned = provider.with_snapshot_id(Some(123)); + assert_eq!(pinned.snapshot_id(), Some(123)); + + let ctx = SessionContext::new(); + let state = ctx.state(); + let scan_plan = pinned.scan(&state, None, &[], None).await.unwrap(); + let iceberg_scan = scan_plan + .as_any() + .downcast_ref::() + .expect("Expected IcebergTableScan"); + assert_eq!( + iceberg_scan.snapshot_id(), + Some(123), + "pinned snapshot should propagate to the scan node" + ); + } }