From f51273998f90d1f305df77c5ca577518954c9fcb Mon Sep 17 00:00:00 2001 From: weimingdiit Date: Mon, 6 Apr 2026 18:22:32 +0800 Subject: [PATCH] [AURON #2175] Add native support for the _file metadata column Signed-off-by: weimingdiit --- .../auron/iceberg/IcebergScanSupport.scala | 41 ++++++++++++--- .../plan/NativeIcebergTableScanExec.scala | 36 +++++++++---- .../AuronIcebergIntegrationSuite.scala | 51 ++++++++++++++++--- 3 files changed, 105 insertions(+), 23 deletions(-) diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala index 7ac671638..405141733 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala @@ -27,10 +27,14 @@ import org.apache.spark.sql.connector.read.InputPartition import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.types.StructType +// fileSchema is read from the data files. partitionSchema carries supported metadata columns +// (for example _file) that are materialized as per-file constant values in the native scan. final case class IcebergScanPlan( fileTasks: Seq[FileScanTask], fileFormat: FileFormat, - readSchema: StructType) + readSchema: StructType, + fileSchema: StructType, + partitionSchema: StructType) object IcebergScanSupport extends Logging { @@ -48,12 +52,24 @@ object IcebergScanSupport extends Logging { } val readSchema = scan.readSchema - // Native scan does not support Iceberg metadata columns (e.g. _file, _pos). - if (hasMetadataColumns(readSchema)) { + val unsupportedMetadataColumns = collectUnsupportedMetadataColumns(readSchema) + // Native scan can project file-level metadata columns such as _file via partition values. + // Metadata columns that require per-row materialization (for example _pos) still fallback. + if (unsupportedMetadataColumns.nonEmpty) { return None } - if (!readSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType))) { + val fileSchema = StructType(readSchema.fields.filterNot(isSupportedMetadataColumn)) + // Supported metadata columns are materialized via per-file constant values rather than + // read from the Iceberg data file payload. + val partitionSchema = StructType(readSchema.fields.filter(isSupportedMetadataColumn)) + + if (!fileSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType))) { + return None + } + + if (!partitionSchema.fields.forall(field => + NativeConverters.isTypeSupported(field.dataType))) { return None } @@ -61,7 +77,8 @@ object IcebergScanSupport extends Logging { // Empty scan (e.g. empty table) should still build a plan to return no rows. if (partitions.isEmpty) { logWarning(s"Native Iceberg scan planned with empty partitions for $scanClassName.") - return Some(IcebergScanPlan(Seq.empty, FileFormat.PARQUET, readSchema)) + return Some( + IcebergScanPlan(Seq.empty, FileFormat.PARQUET, readSchema, fileSchema, partitionSchema)) } val icebergPartitions = partitions.flatMap(icebergPartition) @@ -93,11 +110,19 @@ object IcebergScanSupport extends Logging { return None } - Some(IcebergScanPlan(fileTasks, format, readSchema)) + Some(IcebergScanPlan(fileTasks, format, readSchema, fileSchema, partitionSchema)) } - private def hasMetadataColumns(schema: StructType): Boolean = - schema.fields.exists(field => MetadataColumns.isMetadataColumn(field.name)) + private def collectUnsupportedMetadataColumns(schema: StructType): Seq[String] = + schema.fields.collect { + case field + if MetadataColumns.isMetadataColumn(field.name) && + !isSupportedMetadataColumn(field) => + field.name + } + + private def isSupportedMetadataColumn(field: org.apache.spark.sql.types.StructField): Boolean = + field.name == MetadataColumns.FILE_PATH.name() private def inputPartitions(exec: BatchScanExec): Seq[InputPartition] = { // Prefer DataSource V2 batch API; if not available, fallback to exec methods via reflection. diff --git a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala index 63927e0b0..58d8a735c 100644 --- a/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala +++ b/thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala @@ -24,7 +24,7 @@ import java.util.UUID import scala.collection.JavaConverters._ import org.apache.hadoop.fs.FileSystem -import org.apache.iceberg.{FileFormat, FileScanTask} +import org.apache.iceberg.{FileFormat, FileScanTask, MetadataColumns} import org.apache.spark.Partition import org.apache.spark.TaskContext import org.apache.spark.broadcast.Broadcast @@ -33,13 +33,14 @@ import org.apache.spark.sql.SparkSession import org.apache.spark.sql.auron.{EmptyNativeRDD, NativeConverters, NativeHelper, NativeRDD, NativeSupports, Shims} import org.apache.spark.sql.auron.iceberg.IcebergScanPlan import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.expressions.Literal import org.apache.spark.sql.execution.LeafExecNode import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} import org.apache.spark.sql.execution.datasources.v2.BatchScanExec import org.apache.spark.sql.execution.metric.SQLMetric import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.StructType +import org.apache.spark.sql.types.{StringType, StructType} import org.apache.spark.util.SerializableConfiguration import org.apache.auron.{protobuf => pb} @@ -57,31 +58,36 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca override val output = basedScan.output override val outputPartitioning = basedScan.outputPartitioning - private lazy val readSchema: StructType = plan.readSchema + private lazy val fileSchema: StructType = plan.fileSchema + private lazy val partitionSchema: StructType = plan.partitionSchema + private lazy val projectableSchema: StructType = + StructType(fileSchema.fields ++ partitionSchema.fields) private lazy val fileTasks: Seq[FileScanTask] = plan.fileTasks private lazy val partitions: Array[FilePartition] = buildFilePartitions() private lazy val fileSizes: Map[String, Long] = buildFileSizes() - private lazy val nativeFileSchema: pb.Schema = NativeConverters.convertSchema(readSchema) + private lazy val nativeFileSchema: pb.Schema = NativeConverters.convertSchema(fileSchema) private lazy val nativePartitionSchema: pb.Schema = - NativeConverters.convertSchema(StructType(Nil)) + NativeConverters.convertSchema(partitionSchema) private lazy val caseSensitive: Boolean = SQLConf.get.caseSensitiveAnalysis private lazy val fieldIndexByName: Map[String, Int] = { if (caseSensitive) { - readSchema.fieldNames.zipWithIndex.toMap + projectableSchema.fieldNames.zipWithIndex.toMap } else { - readSchema.fieldNames.map(_.toLowerCase(Locale.ROOT)).zipWithIndex.toMap + projectableSchema.fieldNames.map(_.toLowerCase(Locale.ROOT)).zipWithIndex.toMap } } private def fieldIndexFor(name: String): Int = { if (caseSensitive) { - fieldIndexByName.getOrElse(name, readSchema.fieldIndex(name)) + fieldIndexByName.getOrElse(name, projectableSchema.fieldIndex(name)) } else { - fieldIndexByName.getOrElse(name.toLowerCase(Locale.ROOT), readSchema.fieldIndex(name)) + fieldIndexByName.getOrElse( + name.toLowerCase(Locale.ROOT), + projectableSchema.fieldIndex(name)) } } @@ -98,6 +104,7 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca .setPath(filePath) .setSize(size) .setLastModifiedNs(0) + .addAllPartitionValues(metadataPartitionValues(filePath).asJava) .setRange( pb.FileRange .newBuilder() @@ -112,6 +119,17 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca .build() } + private def metadataPartitionValues(filePath: String): Seq[pb.ScalarValue] = + partitionSchema.fields.map { field => + field.name match { + case name if name == MetadataColumns.FILE_PATH.name() => + NativeConverters.convertExpr(Literal.create(filePath, StringType)).getLiteral + case name => + throw new IllegalStateException( + s"unsupported Iceberg metadata column in native scan: $name") + } + } + override def doExecuteNative(): NativeRDD = { if (partitions.isEmpty) { return new EmptyNativeRDD(sparkContext) diff --git a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala index 6472fcd86..746986e10 100644 --- a/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala +++ b/thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala @@ -24,7 +24,7 @@ import org.apache.iceberg.{FileFormat, FileScanTask} import org.apache.iceberg.data.{GenericAppenderFactory, Record} import org.apache.iceberg.deletes.PositionDelete import org.apache.iceberg.spark.Spark3Util -import org.apache.spark.sql.Row +import org.apache.spark.sql.{DataFrame, Row} class AuronIcebergIntegrationSuite extends org.apache.spark.sql.QueryTest @@ -143,13 +143,36 @@ class AuronIcebergIntegrationSuite } } - test("iceberg scan falls back when reading metadata columns") { + test("iceberg native scan supports _file metadata column") { withTable("local.db.t4") { sql("create table local.db.t4 using iceberg as select 1 as id, 'a' as v") - val df = sql("select _file from local.db.t4") - df.collect() - val plan = df.queryExecution.executedPlan.toString() - assert(!plan.contains("NativeIcebergTableScan")) + checkSparkAnswerAndOperator("select _file from local.db.t4") + } + } + + test("iceberg native scan supports data columns with _file metadata column") { + withTable("local.db.t4_mixed") { + sql("create table local.db.t4_mixed using iceberg as select 1 as id, 'a' as v") + checkSparkAnswerAndOperator("select id, _file from local.db.t4_mixed") + } + } + + test("iceberg native scan preserves projected order for _file metadata column") { + withTable("local.db.t4_metadata_first") { + sql("create table local.db.t4_metadata_first using iceberg as select 1 as id, 'a' as v") + checkSparkAnswerAndOperator("select _file, id from local.db.t4_metadata_first") + } + } + + test("iceberg scan falls back when reading unsupported metadata columns") { + withTable("local.db.t4_pos") { + sql("create table local.db.t4_pos using iceberg as select 1 as id, 'a' as v") + withSQLConf("spark.auron.enable" -> "true", "spark.auron.enable.iceberg.scan" -> "true") { + val df = sql("select _pos from local.db.t4_pos") + df.collect() + val plan = df.queryExecution.executedPlan.toString() + assert(!plan.contains("NativeIcebergTableScan")) + } } } @@ -238,4 +261,20 @@ class AuronIcebergIntegrationSuite taskIterable.close() } } + + private def checkSparkAnswerAndOperator(sqlText: String): DataFrame = { + var expected: Seq[Row] = Nil + withSQLConf("spark.auron.enable" -> "false") { + expected = sql(sqlText).collect().toSeq + } + + var df: DataFrame = null + withSQLConf("spark.auron.enable" -> "true", "spark.auron.enable.iceberg.scan" -> "true") { + df = sql(sqlText) + checkAnswer(df, expected) + val plan = df.queryExecution.executedPlan.toString() + assert(plan.contains("NativeIcebergTableScan")) + } + df + } }