From 686986d98696abdb16281aa069222204c50e4c7c Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 9 Dec 2025 15:28:52 -0500 Subject: [PATCH 1/3] add documentation for fully-native iceberg --- docs/source/user-guide/latest/iceberg.md | 58 +++++++++++++++++++++--- 1 file changed, 52 insertions(+), 6 deletions(-) diff --git a/docs/source/user-guide/latest/iceberg.md b/docs/source/user-guide/latest/iceberg.md index 0813eeeb2c..704d99b30a 100644 --- a/docs/source/user-guide/latest/iceberg.md +++ b/docs/source/user-guide/latest/iceberg.md @@ -19,10 +19,15 @@ # Accelerating Apache Iceberg Parquet Scans using Comet (Experimental) -**Note: Iceberg integration is a work-in-progress. It is currently necessary to build Iceberg from -source rather than using available artifacts in Maven** +**Note: Iceberg integration is a work-in-progress. Comet currently has two distinct Iceberg +code paths: 1) hybrid execution (native Parquet decoding, JVM otherwise) that requires +building Iceberg from source rather than using available artifacts in Maven, and 2) fully-native +execution (based on [iceberg-rust](https://github.com/apache/iceberg-rust)). Directions for both +designs are provided below.** -## Build Comet +## Hybrid Execution + +### Build Comet Run a Maven install so that we can compile Iceberg against latest Comet: @@ -42,7 +47,7 @@ Set `COMET_JAR` env var: export COMET_JAR=`pwd`/spark/target/comet-spark-spark3.5_2.12-$COMET_VERSION.jar ``` -## Build Iceberg +### Build Iceberg Clone the Iceberg repository and apply code changes needed by Comet @@ -59,7 +64,7 @@ Perform a clean build ./gradlew clean build -x test -x integrationTest ``` -## Test +### Test Set `ICEBERG_JAR` environment variable. @@ -140,7 +145,48 @@ scala> spark.sql(s"SELECT * from t1").explain() +- CometBatchScan spark_catalog.default.t1[c0#26, c1#27] spark_catalog.default.t1 (branch=null) [filters=, groupedBy=] RuntimeFilters: [] ``` -## Known issues +### Known issues - Spark Runtime Filtering isn't [working](https://github.com/apache/datafusion-comet/issues/2116) - You can bypass the issue by either setting `spark.sql.adaptive.enabled=false` or `spark.comet.exec.broadcastExchange.enabled=false` + +## Fully-Native Execution + +Comet's fully-native Iceberg integration does not require downloading Comet or Iceberg source +code, and instead relies +on reflection to extract `FileScanTask`s from Iceberg, which are then serialized them to Comet +for native execution(see [PR #2528](https://github.com/apache/datafusion-comet/pull/2528)). The +example below uses Spark's package downloader to retrieve Comet 0.12.0 and Iceberg +1.8.1, but Comet has been tested with Iceberg 1.5, 1.7, 1.8, and 1.10. The key configuration +to enable fully-native Iceberg is `spark.comet.scan.icebergNative.enabled=true`. This +configuration should **not** be used with the hybrid Iceberg configuration +`spark.sql.iceberg.parquet.reader-type=COMET` from above. + +```shell +$SPARK_HOME/bin/spark-shell \ + --packages org.apache.datafusion:comet-spark-spark3.5_2.12:0.12.0,org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.8.1,org.apache.iceberg:iceberg-core:1.8.1 \ + --repositories https://repo1.maven.org/maven2/ \ + --conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \ + --conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkCatalog \ + --conf spark.sql.catalog.spark_catalog.type=hadoop \ + --conf spark.sql.catalog.spark_catalog.warehouse=/tmp/warehouse \ + --conf spark.plugins=org.apache.spark.CometPlugin \ + --conf spark.shuffle.manager=org.apache.spark.sql.comet.execution.shuffle.CometShuffleManager \ + --conf spark.sql.extensions=org.apache.comet.CometSparkSessionExtensions \ + --conf spark.comet.scan.icebergNative.enabled=true \ + --conf spark.comet.explainFallback.enabled=true \ + --conf spark.memory.offHeap.enabled=true \ + --conf spark.memory.offHeap.size=2g +``` + +The same sample queries from above can be used to test Comet's fully-native Iceberg integration, +however the scan node to look for is `CometIcebergNativeScan`. + +### Current limitations + +- Iceberg table spec v3 scans will fall back. +- Iceberg writes will fall back. +- Iceberg table scans backed by Avro or ORC data files will fall back. +- Iceberg table scans partitioned on `BINARY` or `DECIMAL` (with precision >28) columns will fall back. +- Iceberg scans with residual filters (_i.e._, not partition expressions and evaluated on the + column values at scan time) of `truncate`, `bucket`, `year`, `month`, `day`, `hour` will fall back. From 9e910b14e43b7d0a79125f97d5d91ebfcc68d8b1 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 9 Dec 2025 15:34:50 -0500 Subject: [PATCH 2/3] add documentation for fully-native iceberg --- docs/source/user-guide/latest/iceberg.md | 16 +++++++++------- 1 file changed, 9 insertions(+), 7 deletions(-) diff --git a/docs/source/user-guide/latest/iceberg.md b/docs/source/user-guide/latest/iceberg.md index 704d99b30a..a73e07c0cd 100644 --- a/docs/source/user-guide/latest/iceberg.md +++ b/docs/source/user-guide/latest/iceberg.md @@ -152,11 +152,12 @@ scala> spark.sql(s"SELECT * from t1").explain() ## Fully-Native Execution -Comet's fully-native Iceberg integration does not require downloading Comet or Iceberg source -code, and instead relies -on reflection to extract `FileScanTask`s from Iceberg, which are then serialized them to Comet -for native execution(see [PR #2528](https://github.com/apache/datafusion-comet/pull/2528)). The -example below uses Spark's package downloader to retrieve Comet 0.12.0 and Iceberg +Comet's fully-native Iceberg integration does not require modifying Iceberg source +code. Instead, Comet relies on reflection to extract `FileScanTask`s from Iceberg, which are +then serialized to Comet's native execution engine (see +[PR #2528](https://github.com/apache/datafusion-comet/pull/2528)). + +The example below uses Spark's package downloader to retrieve Comet 0.12.0 and Iceberg 1.8.1, but Comet has been tested with Iceberg 1.5, 1.7, 1.8, and 1.10. The key configuration to enable fully-native Iceberg is `spark.comet.scan.icebergNative.enabled=true`. This configuration should **not** be used with the hybrid Iceberg configuration @@ -188,5 +189,6 @@ however the scan node to look for is `CometIcebergNativeScan`. - Iceberg writes will fall back. - Iceberg table scans backed by Avro or ORC data files will fall back. - Iceberg table scans partitioned on `BINARY` or `DECIMAL` (with precision >28) columns will fall back. -- Iceberg scans with residual filters (_i.e._, not partition expressions and evaluated on the - column values at scan time) of `truncate`, `bucket`, `year`, `month`, `day`, `hour` will fall back. +- Iceberg scans with residual filters (_i.e._, filter expressions that are not partition values, + and are evaluated on the column values at scan time) of `truncate`, `bucket`, `year`, `month`, + `day`, `hour` will fall back. From f7b2cb390a733eeae54fff6b96e6a1dbb3f85c37 Mon Sep 17 00:00:00 2001 From: Matt Butrovich Date: Tue, 9 Dec 2025 15:49:01 -0500 Subject: [PATCH 3/3] address PR feedback. --- docs/source/user-guide/latest/iceberg.md | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/docs/source/user-guide/latest/iceberg.md b/docs/source/user-guide/latest/iceberg.md index a73e07c0cd..9bf681cb08 100644 --- a/docs/source/user-guide/latest/iceberg.md +++ b/docs/source/user-guide/latest/iceberg.md @@ -20,12 +20,12 @@ # Accelerating Apache Iceberg Parquet Scans using Comet (Experimental) **Note: Iceberg integration is a work-in-progress. Comet currently has two distinct Iceberg -code paths: 1) hybrid execution (native Parquet decoding, JVM otherwise) that requires +code paths: 1) a hybrid reader (native Parquet decoding, JVM otherwise) that requires building Iceberg from source rather than using available artifacts in Maven, and 2) fully-native -execution (based on [iceberg-rust](https://github.com/apache/iceberg-rust)). Directions for both +reader (based on [iceberg-rust](https://github.com/apache/iceberg-rust)). Directions for both designs are provided below.** -## Hybrid Execution +## Hybrid Reader ### Build Comet @@ -150,7 +150,7 @@ scala> spark.sql(s"SELECT * from t1").explain() - Spark Runtime Filtering isn't [working](https://github.com/apache/datafusion-comet/issues/2116) - You can bypass the issue by either setting `spark.sql.adaptive.enabled=false` or `spark.comet.exec.broadcastExchange.enabled=false` -## Fully-Native Execution +## Native Reader Comet's fully-native Iceberg integration does not require modifying Iceberg source code. Instead, Comet relies on reflection to extract `FileScanTask`s from Iceberg, which are @@ -185,6 +185,8 @@ however the scan node to look for is `CometIcebergNativeScan`. ### Current limitations +The following scenarios are not yet supported, but are work in progress: + - Iceberg table spec v3 scans will fall back. - Iceberg writes will fall back. - Iceberg table scans backed by Avro or ORC data files will fall back.