Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,14 @@ import org.apache.spark.sql.connector.read.InputPartition
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.types.StructType

// fileSchema is read from the data files. partitionSchema carries supported metadata columns
// (for example _file) that are materialized as per-file constant values in the native scan.
final case class IcebergScanPlan(
fileTasks: Seq[FileScanTask],
fileFormat: FileFormat,
readSchema: StructType)
readSchema: StructType,
fileSchema: StructType,
partitionSchema: StructType)

object IcebergScanSupport extends Logging {

Expand All @@ -48,20 +52,33 @@ object IcebergScanSupport extends Logging {
}

val readSchema = scan.readSchema
// Native scan does not support Iceberg metadata columns (e.g. _file, _pos).
if (hasMetadataColumns(readSchema)) {
val unsupportedMetadataColumns = collectUnsupportedMetadataColumns(readSchema)
// Native scan can project file-level metadata columns such as _file via partition values.
// Metadata columns that require per-row materialization (for example _pos) still fallback.
if (unsupportedMetadataColumns.nonEmpty) {
return None
}

if (!readSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType))) {
val fileSchema = StructType(readSchema.fields.filterNot(isSupportedMetadataColumn))
// Supported metadata columns are materialized via per-file constant values rather than
// read from the Iceberg data file payload.
val partitionSchema = StructType(readSchema.fields.filter(isSupportedMetadataColumn))

if (!fileSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType))) {
return None
}

if (!partitionSchema.fields.forall(field =>
NativeConverters.isTypeSupported(field.dataType))) {
return None
}

val partitions = inputPartitions(exec)
// Empty scan (e.g. empty table) should still build a plan to return no rows.
if (partitions.isEmpty) {
logWarning(s"Native Iceberg scan planned with empty partitions for $scanClassName.")
return Some(IcebergScanPlan(Seq.empty, FileFormat.PARQUET, readSchema))
return Some(
IcebergScanPlan(Seq.empty, FileFormat.PARQUET, readSchema, fileSchema, partitionSchema))
}

val icebergPartitions = partitions.flatMap(icebergPartition)
Expand Down Expand Up @@ -93,11 +110,19 @@ object IcebergScanSupport extends Logging {
return None
}

Some(IcebergScanPlan(fileTasks, format, readSchema))
Some(IcebergScanPlan(fileTasks, format, readSchema, fileSchema, partitionSchema))
}

private def hasMetadataColumns(schema: StructType): Boolean =
schema.fields.exists(field => MetadataColumns.isMetadataColumn(field.name))
private def collectUnsupportedMetadataColumns(schema: StructType): Seq[String] =
schema.fields.collect {
case field
if MetadataColumns.isMetadataColumn(field.name) &&
!isSupportedMetadataColumn(field) =>
field.name
}

private def isSupportedMetadataColumn(field: org.apache.spark.sql.types.StructField): Boolean =
field.name == MetadataColumns.FILE_PATH.name()

private def inputPartitions(exec: BatchScanExec): Seq[InputPartition] = {
// Prefer DataSource V2 batch API; if not available, fallback to exec methods via reflection.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import java.util.UUID
import scala.collection.JavaConverters._

import org.apache.hadoop.fs.FileSystem
import org.apache.iceberg.{FileFormat, FileScanTask}
import org.apache.iceberg.{FileFormat, FileScanTask, MetadataColumns}
import org.apache.spark.Partition
import org.apache.spark.TaskContext
import org.apache.spark.broadcast.Broadcast
Expand All @@ -33,13 +33,14 @@ import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.auron.{EmptyNativeRDD, NativeConverters, NativeHelper, NativeRDD, NativeSupports, Shims}
import org.apache.spark.sql.auron.iceberg.IcebergScanPlan
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Literal
import org.apache.spark.sql.execution.LeafExecNode
import org.apache.spark.sql.execution.SparkPlan
import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
import org.apache.spark.sql.execution.metric.SQLMetric
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.types.{StringType, StructType}
import org.apache.spark.util.SerializableConfiguration

import org.apache.auron.{protobuf => pb}
Expand All @@ -57,31 +58,36 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca
override val output = basedScan.output
override val outputPartitioning = basedScan.outputPartitioning

private lazy val readSchema: StructType = plan.readSchema
private lazy val fileSchema: StructType = plan.fileSchema
private lazy val partitionSchema: StructType = plan.partitionSchema
private lazy val projectableSchema: StructType =
StructType(fileSchema.fields ++ partitionSchema.fields)
private lazy val fileTasks: Seq[FileScanTask] = plan.fileTasks

private lazy val partitions: Array[FilePartition] = buildFilePartitions()
private lazy val fileSizes: Map[String, Long] = buildFileSizes()

private lazy val nativeFileSchema: pb.Schema = NativeConverters.convertSchema(readSchema)
private lazy val nativeFileSchema: pb.Schema = NativeConverters.convertSchema(fileSchema)
private lazy val nativePartitionSchema: pb.Schema =
NativeConverters.convertSchema(StructType(Nil))
NativeConverters.convertSchema(partitionSchema)

private lazy val caseSensitive: Boolean = SQLConf.get.caseSensitiveAnalysis

private lazy val fieldIndexByName: Map[String, Int] = {
if (caseSensitive) {
readSchema.fieldNames.zipWithIndex.toMap
projectableSchema.fieldNames.zipWithIndex.toMap
} else {
readSchema.fieldNames.map(_.toLowerCase(Locale.ROOT)).zipWithIndex.toMap
projectableSchema.fieldNames.map(_.toLowerCase(Locale.ROOT)).zipWithIndex.toMap
}
}

private def fieldIndexFor(name: String): Int = {
if (caseSensitive) {
fieldIndexByName.getOrElse(name, readSchema.fieldIndex(name))
fieldIndexByName.getOrElse(name, projectableSchema.fieldIndex(name))
} else {
fieldIndexByName.getOrElse(name.toLowerCase(Locale.ROOT), readSchema.fieldIndex(name))
fieldIndexByName.getOrElse(
name.toLowerCase(Locale.ROOT),
projectableSchema.fieldIndex(name))
}
}

Expand All @@ -98,6 +104,7 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca
.setPath(filePath)
.setSize(size)
.setLastModifiedNs(0)
.addAllPartitionValues(metadataPartitionValues(filePath).asJava)
.setRange(
pb.FileRange
.newBuilder()
Expand All @@ -112,6 +119,17 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca
.build()
}

private def metadataPartitionValues(filePath: String): Seq[pb.ScalarValue] =
partitionSchema.fields.map { field =>
field.name match {
case name if name == MetadataColumns.FILE_PATH.name() =>
NativeConverters.convertExpr(Literal.create(filePath, StringType)).getLiteral
case name =>
throw new IllegalStateException(
s"unsupported Iceberg metadata column in native scan: $name")
}
}

override def doExecuteNative(): NativeRDD = {
if (partitions.isEmpty) {
return new EmptyNativeRDD(sparkContext)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.iceberg.{FileFormat, FileScanTask}
import org.apache.iceberg.data.{GenericAppenderFactory, Record}
import org.apache.iceberg.deletes.PositionDelete
import org.apache.iceberg.spark.Spark3Util
import org.apache.spark.sql.Row
import org.apache.spark.sql.{DataFrame, Row}

class AuronIcebergIntegrationSuite
extends org.apache.spark.sql.QueryTest
Expand Down Expand Up @@ -143,13 +143,36 @@ class AuronIcebergIntegrationSuite
}
}

test("iceberg scan falls back when reading metadata columns") {
test("iceberg native scan supports _file metadata column") {
withTable("local.db.t4") {
sql("create table local.db.t4 using iceberg as select 1 as id, 'a' as v")
val df = sql("select _file from local.db.t4")
df.collect()
val plan = df.queryExecution.executedPlan.toString()
assert(!plan.contains("NativeIcebergTableScan"))
checkSparkAnswerAndOperator("select _file from local.db.t4")
}
}

test("iceberg native scan supports data columns with _file metadata column") {
withTable("local.db.t4_mixed") {
sql("create table local.db.t4_mixed using iceberg as select 1 as id, 'a' as v")
checkSparkAnswerAndOperator("select id, _file from local.db.t4_mixed")
}
}

test("iceberg native scan preserves projected order for _file metadata column") {
withTable("local.db.t4_metadata_first") {
sql("create table local.db.t4_metadata_first using iceberg as select 1 as id, 'a' as v")
checkSparkAnswerAndOperator("select _file, id from local.db.t4_metadata_first")
}
}

test("iceberg scan falls back when reading unsupported metadata columns") {
withTable("local.db.t4_pos") {
sql("create table local.db.t4_pos using iceberg as select 1 as id, 'a' as v")
withSQLConf("spark.auron.enable" -> "true", "spark.auron.enable.iceberg.scan" -> "true") {
val df = sql("select _pos from local.db.t4_pos")
df.collect()
val plan = df.queryExecution.executedPlan.toString()
assert(!plan.contains("NativeIcebergTableScan"))
}
}
}

Expand Down Expand Up @@ -238,4 +261,20 @@ class AuronIcebergIntegrationSuite
taskIterable.close()
}
}

private def checkSparkAnswerAndOperator(sqlText: String): DataFrame = {
var expected: Seq[Row] = Nil
withSQLConf("spark.auron.enable" -> "false") {
expected = sql(sqlText).collect().toSeq
}

var df: DataFrame = null
withSQLConf("spark.auron.enable" -> "true", "spark.auron.enable.iceberg.scan" -> "true") {
df = sql(sqlText)
checkAnswer(df, expected)
val plan = df.queryExecution.executedPlan.toString()
assert(plan.contains("NativeIcebergTableScan"))
}
df
}
}
Loading