Skip to content

Commit 9feb4d6

Browse files
committed
[AURON #2175][iceberg] Add native support for the _file metadata column
Signed-off-by: weimingdiit <weimingdiit@gmail.com>
1 parent 7484470 commit 9feb4d6

3 files changed

Lines changed: 86 additions & 20 deletions

File tree

thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/auron/iceberg/IcebergScanSupport.scala

Lines changed: 29 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,9 @@ import org.apache.spark.sql.types.StructType
3030
final case class IcebergScanPlan(
3131
fileTasks: Seq[FileScanTask],
3232
fileFormat: FileFormat,
33-
readSchema: StructType)
33+
readSchema: StructType,
34+
fileSchema: StructType,
35+
partitionSchema: StructType)
3436

3537
object IcebergScanSupport extends Logging {
3638

@@ -48,20 +50,31 @@ object IcebergScanSupport extends Logging {
4850
}
4951

5052
val readSchema = scan.readSchema
51-
// Native scan does not support Iceberg metadata columns (e.g. _file, _pos).
52-
if (hasMetadataColumns(readSchema)) {
53+
val unsupportedMetadataColumns = collectUnsupportedMetadataColumns(readSchema)
54+
// Native scan can project file-level metadata columns such as _file via partition values.
55+
// Metadata columns that require per-row materialization (for example _pos) still fallback.
56+
if (unsupportedMetadataColumns.nonEmpty) {
5357
return None
5458
}
5559

56-
if (!readSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType))) {
60+
val fileSchema = StructType(readSchema.fields.filterNot(isSupportedMetadataColumn))
61+
val partitionSchema = StructType(readSchema.fields.filter(isSupportedMetadataColumn))
62+
63+
if (!fileSchema.fields.forall(field => NativeConverters.isTypeSupported(field.dataType))) {
64+
return None
65+
}
66+
67+
if (!partitionSchema.fields.forall(field =>
68+
NativeConverters.isTypeSupported(field.dataType))) {
5769
return None
5870
}
5971

6072
val partitions = inputPartitions(exec)
6173
// Empty scan (e.g. empty table) should still build a plan to return no rows.
6274
if (partitions.isEmpty) {
6375
logWarning(s"Native Iceberg scan planned with empty partitions for $scanClassName.")
64-
return Some(IcebergScanPlan(Seq.empty, FileFormat.PARQUET, readSchema))
76+
return Some(
77+
IcebergScanPlan(Seq.empty, FileFormat.PARQUET, readSchema, fileSchema, partitionSchema))
6578
}
6679

6780
val icebergPartitions = partitions.flatMap(icebergPartition)
@@ -93,11 +106,19 @@ object IcebergScanSupport extends Logging {
93106
return None
94107
}
95108

96-
Some(IcebergScanPlan(fileTasks, format, readSchema))
109+
Some(IcebergScanPlan(fileTasks, format, readSchema, fileSchema, partitionSchema))
97110
}
98111

99-
private def hasMetadataColumns(schema: StructType): Boolean =
100-
schema.fields.exists(field => MetadataColumns.isMetadataColumn(field.name))
112+
private def collectUnsupportedMetadataColumns(schema: StructType): Seq[String] =
113+
schema.fields.collect {
114+
case field
115+
if MetadataColumns.isMetadataColumn(field.name) &&
116+
!isSupportedMetadataColumn(field) =>
117+
field.name
118+
}
119+
120+
private def isSupportedMetadataColumn(field: org.apache.spark.sql.types.StructField): Boolean =
121+
field.name == MetadataColumns.FILE_PATH.name()
101122

102123
private def inputPartitions(exec: BatchScanExec): Seq[InputPartition] = {
103124
// Prefer DataSource V2 batch API; if not available, fallback to exec methods via reflection.

thirdparty/auron-iceberg/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeIcebergTableScanExec.scala

Lines changed: 27 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import java.util.UUID
2424
import scala.collection.JavaConverters._
2525

2626
import org.apache.hadoop.fs.FileSystem
27-
import org.apache.iceberg.{FileFormat, FileScanTask}
27+
import org.apache.iceberg.{FileFormat, FileScanTask, MetadataColumns}
2828
import org.apache.spark.Partition
2929
import org.apache.spark.TaskContext
3030
import org.apache.spark.broadcast.Broadcast
@@ -33,13 +33,14 @@ import org.apache.spark.sql.SparkSession
3333
import org.apache.spark.sql.auron.{EmptyNativeRDD, NativeConverters, NativeHelper, NativeRDD, NativeSupports, Shims}
3434
import org.apache.spark.sql.auron.iceberg.IcebergScanPlan
3535
import org.apache.spark.sql.catalyst.InternalRow
36+
import org.apache.spark.sql.catalyst.expressions.Literal
3637
import org.apache.spark.sql.execution.LeafExecNode
3738
import org.apache.spark.sql.execution.SparkPlan
3839
import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile}
3940
import org.apache.spark.sql.execution.datasources.v2.BatchScanExec
4041
import org.apache.spark.sql.execution.metric.SQLMetric
4142
import org.apache.spark.sql.internal.SQLConf
42-
import org.apache.spark.sql.types.StructType
43+
import org.apache.spark.sql.types.{StringType, StructType}
4344
import org.apache.spark.util.SerializableConfiguration
4445

4546
import org.apache.auron.{protobuf => pb}
@@ -57,31 +58,36 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca
5758
override val output = basedScan.output
5859
override val outputPartitioning = basedScan.outputPartitioning
5960

60-
private lazy val readSchema: StructType = plan.readSchema
61+
private lazy val fileSchema: StructType = plan.fileSchema
62+
private lazy val partitionSchema: StructType = plan.partitionSchema
63+
private lazy val projectableSchema: StructType =
64+
StructType(fileSchema.fields ++ partitionSchema.fields)
6165
private lazy val fileTasks: Seq[FileScanTask] = plan.fileTasks
6266

6367
private lazy val partitions: Array[FilePartition] = buildFilePartitions()
6468
private lazy val fileSizes: Map[String, Long] = buildFileSizes()
6569

66-
private lazy val nativeFileSchema: pb.Schema = NativeConverters.convertSchema(readSchema)
70+
private lazy val nativeFileSchema: pb.Schema = NativeConverters.convertSchema(fileSchema)
6771
private lazy val nativePartitionSchema: pb.Schema =
68-
NativeConverters.convertSchema(StructType(Nil))
72+
NativeConverters.convertSchema(partitionSchema)
6973

7074
private lazy val caseSensitive: Boolean = SQLConf.get.caseSensitiveAnalysis
7175

7276
private lazy val fieldIndexByName: Map[String, Int] = {
7377
if (caseSensitive) {
74-
readSchema.fieldNames.zipWithIndex.toMap
78+
projectableSchema.fieldNames.zipWithIndex.toMap
7579
} else {
76-
readSchema.fieldNames.map(_.toLowerCase(Locale.ROOT)).zipWithIndex.toMap
80+
projectableSchema.fieldNames.map(_.toLowerCase(Locale.ROOT)).zipWithIndex.toMap
7781
}
7882
}
7983

8084
private def fieldIndexFor(name: String): Int = {
8185
if (caseSensitive) {
82-
fieldIndexByName.getOrElse(name, readSchema.fieldIndex(name))
86+
fieldIndexByName.getOrElse(name, projectableSchema.fieldIndex(name))
8387
} else {
84-
fieldIndexByName.getOrElse(name.toLowerCase(Locale.ROOT), readSchema.fieldIndex(name))
88+
fieldIndexByName.getOrElse(
89+
name.toLowerCase(Locale.ROOT),
90+
projectableSchema.fieldIndex(name))
8591
}
8692
}
8793

@@ -98,6 +104,7 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca
98104
.setPath(filePath)
99105
.setSize(size)
100106
.setLastModifiedNs(0)
107+
.addAllPartitionValues(metadataPartitionValues(filePath).asJava)
101108
.setRange(
102109
pb.FileRange
103110
.newBuilder()
@@ -112,6 +119,17 @@ case class NativeIcebergTableScanExec(basedScan: BatchScanExec, plan: IcebergSca
112119
.build()
113120
}
114121

122+
private def metadataPartitionValues(filePath: String): Seq[pb.ScalarValue] =
123+
partitionSchema.fields.map { field =>
124+
field.name match {
125+
case name if name == MetadataColumns.FILE_PATH.name() =>
126+
NativeConverters.convertExpr(Literal.create(filePath, StringType)).getLiteral
127+
case name =>
128+
throw new IllegalStateException(
129+
s"unsupported Iceberg metadata column in native scan: $name")
130+
}
131+
}
132+
115133
override def doExecuteNative(): NativeRDD = {
116134
if (partitions.isEmpty) {
117135
return new EmptyNativeRDD(sparkContext)

thirdparty/auron-iceberg/src/test/scala/org/apache/auron/iceberg/AuronIcebergIntegrationSuite.scala

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ import org.apache.iceberg.{FileFormat, FileScanTask}
2424
import org.apache.iceberg.data.{GenericAppenderFactory, Record}
2525
import org.apache.iceberg.deletes.PositionDelete
2626
import org.apache.iceberg.spark.Spark3Util
27-
import org.apache.spark.sql.Row
27+
import org.apache.spark.sql.{DataFrame, Row}
2828

2929
class AuronIcebergIntegrationSuite
3030
extends org.apache.spark.sql.QueryTest
@@ -143,10 +143,24 @@ class AuronIcebergIntegrationSuite
143143
}
144144
}
145145

146-
test("iceberg scan falls back when reading metadata columns") {
146+
test("iceberg native scan supports _file metadata column") {
147147
withTable("local.db.t4") {
148148
sql("create table local.db.t4 using iceberg as select 1 as id, 'a' as v")
149-
val df = sql("select _file from local.db.t4")
149+
checkSparkAnswerAndOperator("select _file from local.db.t4")
150+
}
151+
}
152+
153+
test("iceberg native scan supports data columns with _file metadata column") {
154+
withTable("local.db.t4_mixed") {
155+
sql("create table local.db.t4_mixed using iceberg as select 1 as id, 'a' as v")
156+
checkSparkAnswerAndOperator("select id, _file from local.db.t4_mixed")
157+
}
158+
}
159+
160+
test("iceberg scan falls back when reading unsupported metadata columns") {
161+
withTable("local.db.t4_pos") {
162+
sql("create table local.db.t4_pos using iceberg as select 1 as id, 'a' as v")
163+
val df = sql("select _pos from local.db.t4_pos")
150164
df.collect()
151165
val plan = df.queryExecution.executedPlan.toString()
152166
assert(!plan.contains("NativeIcebergTableScan"))
@@ -238,4 +252,17 @@ class AuronIcebergIntegrationSuite
238252
taskIterable.close()
239253
}
240254
}
255+
256+
private def checkSparkAnswerAndOperator(sqlText: String): DataFrame = {
257+
var expected: Seq[Row] = Nil
258+
withSQLConf("spark.auron.enable" -> "false") {
259+
expected = sql(sqlText).collect().toSeq
260+
}
261+
262+
val df = sql(sqlText)
263+
checkAnswer(df, expected)
264+
val plan = df.queryExecution.executedPlan.toString()
265+
assert(plan.contains("NativeIcebergTableScan"))
266+
df
267+
}
241268
}

0 commit comments

Comments
 (0)