From 2cafb1db45da0c761015a248082424891395e8db Mon Sep 17 00:00:00 2001 From: guihuawen Date: Mon, 9 Feb 2026 22:28:54 +0800 Subject: [PATCH 01/14] [AURON #2080]Support Hive Parquet table to native --- spark-extension-shims-spark/pom.xml | 44 +++ ...pache.spark.sql.auron.AuronConvertProvider | 18 ++ .../auron/plan/HiveConvertProvider.scala | 68 +++++ .../plan/NativeParquetHiveTableScanExec.scala | 272 ++++++++++++++++++ .../org/apache/auron/BaseAuronSQLSuite.scala | 8 + .../hive/execution/BaseAuronHiveSuite.scala | 74 +++++ .../HiveParquetTableScanExecSuite.scala | 53 ++++ .../auron/plan/NativeHiveTableScanBase.scala | 2 +- 8 files changed, 538 insertions(+), 1 deletion(-) create mode 100644 spark-extension-shims-spark/src/main/resources/META-INF/services/org.apache.spark.sql.auron.AuronConvertProvider create mode 100644 spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/HiveConvertProvider.scala create mode 100644 spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeParquetHiveTableScanExec.scala create mode 100644 spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/BaseAuronHiveSuite.scala create mode 100644 spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/HiveParquetTableScanExecSuite.scala diff --git a/spark-extension-shims-spark/pom.xml b/spark-extension-shims-spark/pom.xml index 4c75845bf..3d88ebaf6 100644 --- a/spark-extension-shims-spark/pom.xml +++ b/spark-extension-shims-spark/pom.xml @@ -55,11 +55,27 @@ spark-core_${scalaVersion} provided + + org.apache.spark + spark-catalyst_${scalaVersion} + provided + + + org.apache.arrow + arrow-memory-netty + + + org.apache.spark spark-hive_${scalaVersion} provided + + org.apache.hadoop + hadoop-client-api + ${hadoopVersion} + org.apache.spark spark-sql_${scalaVersion} @@ -103,15 +119,43 @@ spark-core_${scalaVersion} test-jar + + org.apache.arrow + arrow-memory-core + ${arrowVersion} + org.apache.spark spark-catalyst_${scalaVersion} test-jar + + + org.apache.arrow + arrow-memory-netty + + + + org.apache.spark spark-sql_${scalaVersion} test-jar + + org.apache.spark + spark-hive_${scalaVersion} + test-jar + + + + + + + + + + + diff --git a/spark-extension-shims-spark/src/main/resources/META-INF/services/org.apache.spark.sql.auron.AuronConvertProvider b/spark-extension-shims-spark/src/main/resources/META-INF/services/org.apache.spark.sql.auron.AuronConvertProvider new file mode 100644 index 000000000..8e5cc72ed --- /dev/null +++ b/spark-extension-shims-spark/src/main/resources/META-INF/services/org.apache.spark.sql.auron.AuronConvertProvider @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.spark.sql.hive.execution.auron.plan.HiveConvertProvider diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/HiveConvertProvider.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/HiveConvertProvider.scala new file mode 100644 index 000000000..4f457b332 --- /dev/null +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/HiveConvertProvider.scala @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution.auron.plan + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.auron.{AuronConvertProvider, AuronConverters} +import org.apache.spark.sql.auron.AuronConverters.getBooleanConf +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.hive.execution.HiveTableScanExec + +class HiveConvertProvider extends AuronConvertProvider with Logging { + override def isEnabled: Boolean = + getBooleanConf("spark.auron.enable.hiveTable", defaultValue = true) + + private def enableHiveTableScanExec: Boolean = + getBooleanConf("spark.auron.enable.parquetHiveTableScanExec", defaultValue = false) + + override def isSupported(exec: SparkPlan): Boolean = + exec match { + case e: HiveTableScanExec + if enableHiveTableScanExec && + e.relation.tableMeta.provider.isDefined && + e.relation.tableMeta.provider.get.equals("hive") => + true + case _ => false + } + + override def convert(exec: SparkPlan): SparkPlan = { + exec match { + case hiveExec: HiveTableScanExec if enableHiveTableScanExec + && HiveTableUtil.isParquetTable(hiveExec) => + convertParquetHiveTableScanExec(hiveExec) + case _ => exec + } + } + + private def convertParquetHiveTableScanExec(hiveExec: HiveTableScanExec): SparkPlan = { + AuronConverters.addRenameColumnsExec(NativeParquetHiveTableScanExec(hiveExec)) + } +} + +object HiveTableUtil { + private val parquetFormat = "MapredParquetInputFormat" + + def isParquetTable(basedHiveScan: HiveTableScanExec): Boolean = { + if (HiveClientImpl.toHiveTable(basedHiveScan.relation.tableMeta).getInputFormatClass.getSimpleName.equalsIgnoreCase(parquetFormat)) { + true + } else { + false + } + } + +} diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeParquetHiveTableScanExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeParquetHiveTableScanExec.scala new file mode 100644 index 000000000..cf6986116 --- /dev/null +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeParquetHiveTableScanExec.scala @@ -0,0 +1,272 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution.auron.plan + +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configurable +import org.apache.hadoop.hive.ql.exec.Utilities +import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable} +import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.serde.serdeConstants +import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector} +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapred.{FileSplit, InputFormat, JobConf} +import org.apache.hadoop.mapreduce.{InputFormat => newInputClass} +import org.apache.hadoop.util.ReflectionUtils +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.auron.{NativeRDD, Shims} +import org.apache.spark.sql.catalyst.expressions.{AttributeMap, GenericInternalRow} +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} +import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim} +import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.hive.execution.HiveTableScanExec + +import org.apache.auron.{protobuf => pb} +import org.apache.auron.metric.SparkMetricNode + +case class NativeParquetHiveTableScanExec(basedHiveScan: HiveTableScanExec) + extends NativeHiveTableScanBase(basedHiveScan) + with Logging { + + @transient private lazy val nativeTable: HiveTable = + HiveClientImpl.toHiveTable(relation.tableMeta) + @transient private lazy val fileFormat = + ParquetHiveTableUtil.getFileFormat(nativeTable.getInputFormatClass) + @transient private lazy val nativeTableDesc = new TableDesc( + nativeTable.getInputFormatClass, + nativeTable.getOutputFormatClass, + nativeTable.getMetadata) + + @transient private lazy val nativeHadoopConf = { + val hiveConf = SparkSession.getActiveSession.get.sessionState.newHadoopConf() + // append columns ids and names before broadcast + val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex) + val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: Integer) + val neededColumnNames = output.filter(columnOrdinals.contains).map(_.name) + + HiveShim.appendReadColumns(hiveConf, neededColumnIDs, neededColumnNames) + + val deserializer = nativeTableDesc.getDeserializerClass.getConstructor().newInstance() + deserializer.initialize(hiveConf, nativeTableDesc.getProperties) + + // Specifies types and object inspectors of columns to be scanned. + val structOI = ObjectInspectorUtils + .getStandardObjectInspector(deserializer.getObjectInspector, ObjectInspectorCopyOption.JAVA) + .asInstanceOf[StructObjectInspector] + + val columnTypeNames = structOI.getAllStructFieldRefs.asScala + .map(_.getFieldObjectInspector) + .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName) + .mkString(",") + + hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames) + hiveConf.set(serdeConstants.LIST_COLUMNS, relation.dataCols.map(_.name).mkString(",")) + hiveConf + } + + private val minPartitions = if (SparkSession.getActiveSession.get.sparkContext.isLocal) { + 0 // will splitted based on block by default. + } else { + math.max( + nativeHadoopConf.getInt("mapreduce.job.maps", 1), + SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions) + } + + private val ignoreEmptySplits = + SparkSession.getActiveSession.get.sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS) + + override val nodeName: String = + s"NativeHiveTableScan $tableName" + + override def doExecuteNative(): NativeRDD = { + val nativeMetrics = SparkMetricNode( + metrics, + Nil, + Some({ + case ("bytes_scanned", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incBytesRead(v) + case ("output_rows", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incRecordsRead(v) + case _ => + })) + val nativeFileSchema = this.nativeFileSchema + val nativeFileGroups = this.nativeFileGroups + val nativePartitionSchema = this.nativePartitionSchema + + val projection = schema.map(field => relation.schema.fieldIndex(field.name)) + val broadcastedHadoopConf = this.broadcastedHadoopConf + val numPartitions = partitions.length + + new NativeRDD( + sparkContext, + nativeMetrics, + partitions.asInstanceOf[Array[Partition]], + None, + Nil, + rddShuffleReadFull = true, + (partition, _) => { + val resourceId = s"NativeParquetHiveTableScan:${UUID.randomUUID().toString}" + putJniBridgeResource(resourceId, broadcastedHadoopConf) + + val nativeFileGroup = nativeFileGroups(partition.asInstanceOf[FilePartition]) + val nativeFileScanConf = pb.FileScanExecConf + .newBuilder() + .setNumPartitions(numPartitions) + .setPartitionIndex(partition.index) + .setStatistics(pb.Statistics.getDefaultInstance) + .setSchema(nativeFileSchema) + .setFileGroup(nativeFileGroup) + .addAllProjection(projection.map(Integer.valueOf).asJava) + .setPartitionSchema(nativePartitionSchema) + .build() + fileFormat match { + case "parquet" => + val nativeParquetScanExecBuilder = pb.ParquetScanExecNode + .newBuilder() + .setBaseConf(nativeFileScanConf) + .setFsResourceId(resourceId) + .addAllPruningPredicates(new java.util.ArrayList()) // not support this filter + + pb.PhysicalPlanNode + .newBuilder() + .setParquetScan(nativeParquetScanExecBuilder.build()) + .build() + case "other" => + throw new Exception("HiveTableExec only support parquet") + } + }, + friendlyName = "NativeRDD.ParquetHiveTableScan") + } + + override def getFilePartitions(): Array[FilePartition] = { + val newJobConf = new JobConf(nativeHadoopConf) + val arrayFilePartition = ArrayBuffer[FilePartition]() + val partitionedFiles = if (relation.isPartitioned) { + val partitions = basedHiveScan.prunedPartitions + val arrayPartitionedFile = ArrayBuffer[PartitionedFile]() + partitions.foreach { partition => + val partDesc = Utilities.getPartitionDescFromTableDesc(nativeTableDesc, partition, true) + val partPath = partition.getDataLocation + HadoopTableReader.initializeLocalJobConfFunc(partPath.toString, nativeTableDesc)( + newJobConf) + val partitionValues = partition.getTPartition.getValues + + val partitionInternalRow = new GenericInternalRow(partitionValues.size()) + for (partitionIndex <- 0 until partitionValues.size) { + partitionInternalRow.update(partitionIndex, partitionValues.get(partitionIndex)) + } + + val inputFormatClass = partDesc.getInputFileFormatClass + .asInstanceOf[Class[newInputClass[Writable, Writable]]] + newJobConf.set("mapred.input.dir", partPath.toString) + arrayPartitionedFile ++= getArrayPartitionedFile( + newJobConf, + inputFormatClass, + partitionInternalRow) + } + arrayPartitionedFile + .sortBy(_.length)(implicitly[Ordering[Long]].reverse) + .toArray + } else { + newJobConf.set("mapred.input.dir", nativeTableDesc.getProperties().getProperty("location")) + val inputFormatClass = + nativeTable.getInputFormatClass.asInstanceOf[Class[newInputClass[Writable, Writable]]] + getArrayPartitionedFile(newJobConf, inputFormatClass, new GenericInternalRow(0)) + .sortBy(_.length)(implicitly[Ordering[Long]].reverse) + .toArray + } + arrayFilePartition ++= FilePartition.getFilePartitions( + SparkSession.getActiveSession.get, + partitionedFiles, + getMaxSplitBytes(SparkSession.getActiveSession.get)) + + arrayFilePartition.toArray + } + + private def getMaxSplitBytes(sparkSession: SparkSession): Long = { + val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes + val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes + Math.min(defaultMaxSplitBytes, openCostInBytes) + } + + private def getArrayPartitionedFile( + newJobConf: JobConf, + inputFormatClass: Class[newInputClass[Writable, Writable]], + partitionInternalRow: GenericInternalRow): ArrayBuffer[PartitionedFile] = { + val allInputSplits = + getInputFormat(newJobConf, inputFormatClass).getSplits(newJobConf, minPartitions) + val inputSplits = if (ignoreEmptySplits) { + allInputSplits.filter(_.getLength > 0) + } else { + allInputSplits + } + + val arrayFilePartition = ArrayBuffer[PartitionedFile]() + for (i <- 0 until inputSplits.size) { + val inputSplit = inputSplits(i) + if (inputSplit.isInstanceOf[FileSplit]) { + val orcInputSplit = inputSplit.asInstanceOf[FileSplit] + arrayFilePartition += + Shims.get.getPartitionedFile( + partitionInternalRow, + orcInputSplit.getPath.toString, + orcInputSplit.getStart, + orcInputSplit.getLength) + } + } + arrayFilePartition + } + + private def getInputFormat( + conf: JobConf, + inputFormatClass: Class[newInputClass[Writable, Writable]]) + : InputFormat[Writable, Writable] = { + val newInputFormat = ReflectionUtils + .newInstance(inputFormatClass.asInstanceOf[Class[_]], conf) + .asInstanceOf[InputFormat[Writable, Writable]] + newInputFormat match { + case c: Configurable => c.setConf(conf) + case _ => + } + newInputFormat + } + +} + +object ParquetHiveTableUtil { + private val parquetFormat = "MapredParquetInputFormat" + + def getFileFormat(inputFormatClass: Class[_ <: InputFormat[_, _]]): String = { + if (inputFormatClass.getSimpleName.equalsIgnoreCase(parquetFormat)) { + "parquet" + } else { + "other" + } + } + +} diff --git a/spark-extension-shims-spark/src/test/scala/org/apache/auron/BaseAuronSQLSuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/auron/BaseAuronSQLSuite.scala index cd3ce9759..0129739db 100644 --- a/spark-extension-shims-spark/src/test/scala/org/apache/auron/BaseAuronSQLSuite.scala +++ b/spark-extension-shims-spark/src/test/scala/org/apache/auron/BaseAuronSQLSuite.scala @@ -20,6 +20,8 @@ import java.io.File import org.apache.commons.io.FileUtils import org.apache.spark.SparkConf +import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.auron.util.SparkVersionUtil @@ -61,6 +63,12 @@ trait BaseAuronSQLSuite extends SharedSparkSession { .set("spark.ui.enabled", "false") .set("spark.sql.warehouse.dir", warehouseDir) .set("spark.auron.udf.singleChildFallback.enabled", "false") + .set(SQLConf.CODEGEN_FALLBACK.key, "false") + .set(SQLConf.CODEGEN_FACTORY_MODE.key, CodegenObjectFactoryMode.CODEGEN_ONLY.toString) + .set( + "spark.sql.hive.metastore.barrierPrefixes", + "org.apache.spark.sql.hive.execution.PairSerDe") + .set("spark.hadoop.hive.metastore.disallow.incompatible.col.type.changes", "false") if (SparkVersionUtil.isSparkV40OrGreater) { // Spark 4.0+: Disable session artifact isolation, align with Spark 3.x behavior diff --git a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/BaseAuronHiveSuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/BaseAuronHiveSuite.scala new file mode 100644 index 000000000..a8324ad4c --- /dev/null +++ b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/BaseAuronHiveSuite.scala @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution + +import org.apache.commons.io.FileUtils +import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.hive.test.TestHiveContext +import org.scalatest.BeforeAndAfterAll + +import java.io.File + +trait BaseAuronHiveSuite extends SparkFunSuite with BeforeAndAfterAll { + + protected val spark: SparkSession = TestAuronHive.sparkSession + + protected val suiteWorkspace: String = getClass.getResource("/").getPath + "auron-tests-workdir" + protected val warehouseDir: String = suiteWorkspace + "/spark-warehouse" + protected val metastoreDir: String = suiteWorkspace + "/meta" + + protected def resetSuiteWorkspace(): Unit = { + val workdir = new File(suiteWorkspace) + if (workdir.exists()) { + FileUtils.forceDelete(workdir) + } + FileUtils.forceMkdir(workdir) + FileUtils.forceMkdir(new File(warehouseDir)) + FileUtils.forceMkdir(new File(metastoreDir)) + } + + override def beforeAll(): Unit = { + // Prepare a clean workspace before SparkSession initialization + resetSuiteWorkspace() + super.beforeAll() + spark.sparkContext.setLogLevel("WARN") + } + +} + +object TestAuronHive + extends TestHiveContext( + new SparkContext( + System.getProperty("spark.sql.test.master", "local[1]"), + "TestSQLContext", + new SparkConf() + .set("spark.sql.test", "") + .set("spark.sql.extensions", "org.apache.spark.sql.auron.AuronSparkSessionExtension") + .set( + "spark.shuffle.manager", + "org.apache.spark.sql.execution.auron.shuffle.AuronShuffleManager") + .set("spark.memory.offHeap.enabled", "false") + .set("spark.auron.enable", "true") + .set("spark.ui.enabled", "false") + .set( + "spark.sql.warehouse.dir", + getClass.getResource("/").getPath + "auron-tests-workdir/spark-warehouse") + .set("spark.auron.udf.singleChildFallback.enabled", "false") + .set("spark.auron.enable.parquetHiveTableScanExec", "true") + .set("spark.sql.hive.convertMetastoreParquet", "false") + )) {} diff --git a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/HiveParquetTableScanExecSuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/HiveParquetTableScanExecSuite.scala new file mode 100644 index 000000000..7477b257b --- /dev/null +++ b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/HiveParquetTableScanExecSuite.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.AuronQueryTest +import org.apache.spark.sql.hive.execution.auron.plan.NativeParquetHiveTableScanExec + +class HiveParquetTableScanExecSuite extends AuronQueryTest with BaseAuronHiveSuite { + + test("test hive parquet table without partition to native") { + withTempView("t1") { + spark.sql("create table t1 (a string) stored as parquet") + spark.sql("insert into t1 values(1)") + val df = spark.sql("select * from t1") + assert(df.collect().toList.head.get(0) == "1") + val plan = df.queryExecution.executedPlan + assert(collect(plan) { case e: NativeParquetHiveTableScanExec => + e + }.size == 1) + } + } + + test("test hive parquet table partition to native") { + withTempView("t1") { + spark.sql("create table t1 (a string) stored as parquet partitioned by(pt string)") + spark.sql("insert into t1 partition(pt='2026-03-10') values('1')") + spark.sql("insert into t1 partition(pt='2026-03-11') values('1')") + val df = spark.sql("select * from t1 where pt = '2026-03-10'") + df.show() + assert(df.collect().toList.head.get(0) == "1") + assert(df.collect().toList.head.get(1) == "2026-03-10") + val plan = df.queryExecution.executedPlan + assert(collect(plan) { case e: NativeParquetHiveTableScanExec => + e + }.size == 1) + } + } + +} diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHiveTableScanBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHiveTableScanBase.scala index 6dfc8be79..3f7e93ad8 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHiveTableScanBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHiveTableScanBase.scala @@ -85,7 +85,7 @@ abstract class NativeHiveTableScanBase(basedHiveScan: HiveTableScanExec) val nativePartitionedFile = (file: PartitionedFile) => { val nativePartitionValues = partitionSchema.zipWithIndex.map { case (field, index) => NativeConverters - .convertExpr(Literal(file.partitionValues.get(index, field.dataType), field.dataType)) + .convertExpr(Literal.create(file.partitionValues.get(index, field.dataType), field.dataType)) .getLiteral } pb.PartitionedFile From 6ff2420b7f40ebc49376a090cd1d9e5bf40a93c2 Mon Sep 17 00:00:00 2001 From: guihuawen Date: Wed, 11 Mar 2026 19:55:11 +0800 Subject: [PATCH 02/14] [AURON #2080] Support Hive Parquet table to NativeParquetHiveTableScanExec --- spark-extension-shims-spark/pom.xml | 1 - .../execution/auron/plan/HiveConvertProvider.scala | 13 +++++++++---- .../auron/plan/NativeParquetHiveTableScanExec.scala | 2 +- .../sql/hive/execution/BaseAuronHiveSuite.scala | 7 +++---- .../apache/spark/sql/auron/AuronConverters.scala | 6 +++--- .../sql/auron/AuronSparkSessionExtension.scala | 2 +- .../org/apache/spark/sql/auron/NativeHelper.scala | 2 +- .../spark/sql/auron/util/TaskContextHelper.scala | 2 +- .../plan/NativeParquetInsertIntoHiveTableBase.scala | 7 ++++--- .../auron/plan/NativeHiveTableScanBase.scala | 3 ++- 10 files changed, 25 insertions(+), 20 deletions(-) diff --git a/spark-extension-shims-spark/pom.xml b/spark-extension-shims-spark/pom.xml index 3d88ebaf6..f3e0d7ce3 100644 --- a/spark-extension-shims-spark/pom.xml +++ b/spark-extension-shims-spark/pom.xml @@ -135,7 +135,6 @@ - org.apache.spark diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/HiveConvertProvider.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/HiveConvertProvider.scala index 4f457b332..99f821b93 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/HiveConvertProvider.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/HiveConvertProvider.scala @@ -17,7 +17,7 @@ package org.apache.spark.sql.hive.execution.auron.plan import org.apache.spark.internal.Logging -import org.apache.spark.sql.auron.{AuronConvertProvider, AuronConverters} +import org.apache.spark.sql.auron.{AuronConverters, AuronConvertProvider} import org.apache.spark.sql.auron.AuronConverters.getBooleanConf import org.apache.spark.sql.execution.SparkPlan import org.apache.spark.sql.hive.client.HiveClientImpl @@ -42,8 +42,9 @@ class HiveConvertProvider extends AuronConvertProvider with Logging { override def convert(exec: SparkPlan): SparkPlan = { exec match { - case hiveExec: HiveTableScanExec if enableHiveTableScanExec - && HiveTableUtil.isParquetTable(hiveExec) => + case hiveExec: HiveTableScanExec + if enableHiveTableScanExec + && HiveTableUtil.isParquetTable(hiveExec) => convertParquetHiveTableScanExec(hiveExec) case _ => exec } @@ -58,7 +59,11 @@ object HiveTableUtil { private val parquetFormat = "MapredParquetInputFormat" def isParquetTable(basedHiveScan: HiveTableScanExec): Boolean = { - if (HiveClientImpl.toHiveTable(basedHiveScan.relation.tableMeta).getInputFormatClass.getSimpleName.equalsIgnoreCase(parquetFormat)) { + if (HiveClientImpl + .toHiveTable(basedHiveScan.relation.tableMeta) + .getInputFormatClass + .getSimpleName + .equalsIgnoreCase(parquetFormat)) { true } else { false diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeParquetHiveTableScanExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeParquetHiveTableScanExec.scala index cf6986116..669c14749 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeParquetHiveTableScanExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeParquetHiveTableScanExec.scala @@ -262,7 +262,7 @@ object ParquetHiveTableUtil { private val parquetFormat = "MapredParquetInputFormat" def getFileFormat(inputFormatClass: Class[_ <: InputFormat[_, _]]): String = { - if (inputFormatClass.getSimpleName.equalsIgnoreCase(parquetFormat)) { + if (inputFormatClass.getSimpleName.equalsIgnoreCase(parquetFormat)) { "parquet" } else { "other" diff --git a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/BaseAuronHiveSuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/BaseAuronHiveSuite.scala index a8324ad4c..99bce5eac 100644 --- a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/BaseAuronHiveSuite.scala +++ b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/BaseAuronHiveSuite.scala @@ -16,14 +16,14 @@ */ package org.apache.spark.sql.hive.execution +import java.io.File + import org.apache.commons.io.FileUtils import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.hive.test.TestHiveContext import org.scalatest.BeforeAndAfterAll -import java.io.File - trait BaseAuronHiveSuite extends SparkFunSuite with BeforeAndAfterAll { protected val spark: SparkSession = TestAuronHive.sparkSession @@ -70,5 +70,4 @@ object TestAuronHive getClass.getResource("/").getPath + "auron-tests-workdir/spark-warehouse") .set("spark.auron.udf.singleChildFallback.enabled", "false") .set("spark.auron.enable.parquetHiveTableScanExec", "true") - .set("spark.sql.hive.convertMetastoreParquet", "false") - )) {} + .set("spark.sql.hive.convertMetastoreParquet", "false"))) {} diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala index d01c82710..824dd6759 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala @@ -424,7 +424,7 @@ object AuronConverters extends Logging { assert( !exec.requiredSchema.exists(e => existTimestampType(e.dataType)), s"Parquet scan with timestamp type is not supported for table: ${tableIdentifier - .getOrElse("unknown")}. " + + .getOrElse("unknown")}. " + "Set spark.auron.enable.scan.parquet.timestamp=true to enable timestamp support " + "or remove timestamp columns from the query.") } @@ -435,7 +435,7 @@ object AuronConverters extends Logging { assert( !exec.requiredSchema.exists(e => existTimestampType(e.dataType)), s"ORC scan with timestamp type is not supported for tableIdentifier: ${tableIdentifier - .getOrElse("unknown")}. " + + .getOrElse("unknown")}. " + "Set spark.auron.enable.scan.orc.timestamp=true to enable timestamp support " + "or remove timestamp columns from the query.") } @@ -443,7 +443,7 @@ object AuronConverters extends Logging { case p => throw new NotImplementedError( s"Cannot convert FileSourceScanExec tableIdentifier: ${tableIdentifier.getOrElse( - "unknown")}, class: ${p.getClass.getName}") + "unknown")}, class: ${p.getClass.getName}") } } diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala index b68b04954..47492aa3d 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala @@ -91,7 +91,7 @@ case class AuronColumnarOverrides(sparkSession: SparkSession) extends ColumnarRu dumpSimpleSparkPlanTreeNode(sparkPlanTransformed) logInfo(s"Transformed spark plan after preColumnarTransitions:\n${sparkPlanTransformed - .treeString(verbose = true, addSuffix = true)}") + .treeString(verbose = true, addSuffix = true)}") // post-transform Shims.get.postTransform(sparkPlanTransformed, sparkSession.sparkContext) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala index e16656471..7a1e34724 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala @@ -74,7 +74,7 @@ object NativeHelper extends Logging { val heapMemory = Runtime.getRuntime.maxMemory() val offheapMemory = totalMemory - heapMemory logWarning(s"memory total: ${Utils.bytesToString(totalMemory)}, onheap: ${Utils.bytesToString( - heapMemory)}, offheap: ${Utils.bytesToString(offheapMemory)}") + heapMemory)}, offheap: ${Utils.bytesToString(offheapMemory)}") offheapMemory } diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/util/TaskContextHelper.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/util/TaskContextHelper.scala index 8e5d7353f..eecec5ef4 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/util/TaskContextHelper.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/util/TaskContextHelper.scala @@ -47,7 +47,7 @@ object TaskContextHelper extends Logging { val thread = Thread.currentThread() val threadName = if (context != null) { s"auron native task ${context.partitionId()}.${context.attemptNumber()} in stage ${context - .stageId()}.${context.stageAttemptNumber()} (TID ${context.taskAttemptId()})" + .stageId()}.${context.stageAttemptNumber()} (TID ${context.taskAttemptId()})" } else { "auron native task " + thread.getName } diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala index d43f7d17d..b9fb7f99b 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala @@ -69,9 +69,10 @@ abstract class NativeParquetInsertIntoHiveTableBase( .filterKeys(Set("stage_id", "output_rows", "elapsed_compute")) .toSeq :+ ("io_time", SQLMetrics.createNanoTimingMetric(sparkContext, "Native.io_time")) - :+ ("bytes_written", - SQLMetrics - .createSizeMetric(sparkContext, "Native.bytes_written")): _*) + :+ ( + "bytes_written", + SQLMetrics + .createSizeMetric(sparkContext, "Native.bytes_written")): _*) def check(): Unit = { val hadoopConf = sparkContext.hadoopConfiguration diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHiveTableScanBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHiveTableScanBase.scala index 3f7e93ad8..951c3b7fb 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHiveTableScanBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHiveTableScanBase.scala @@ -85,7 +85,8 @@ abstract class NativeHiveTableScanBase(basedHiveScan: HiveTableScanExec) val nativePartitionedFile = (file: PartitionedFile) => { val nativePartitionValues = partitionSchema.zipWithIndex.map { case (field, index) => NativeConverters - .convertExpr(Literal.create(file.partitionValues.get(index, field.dataType), field.dataType)) + .convertExpr( + Literal.create(file.partitionValues.get(index, field.dataType), field.dataType)) .getLiteral } pb.PartitionedFile From 9dd77be1dfcc4568a47ad3e6f397ddbdc5288a56 Mon Sep 17 00:00:00 2001 From: guihuawen Date: Wed, 11 Mar 2026 23:50:03 +0800 Subject: [PATCH 03/14] [AURON #2080] Support Hive Parquet table to NativeParquetHiveTableScanExec --- .../auron/shuffle/uniffle/AuronUniffleShuffleReader.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/thirdparty/auron-uniffle/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/uniffle/AuronUniffleShuffleReader.scala b/thirdparty/auron-uniffle/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/uniffle/AuronUniffleShuffleReader.scala index 94e139ac4..48938a706 100644 --- a/thirdparty/auron-uniffle/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/uniffle/AuronUniffleShuffleReader.scala +++ b/thirdparty/auron-uniffle/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/uniffle/AuronUniffleShuffleReader.scala @@ -172,7 +172,7 @@ class AuronUniffleShuffleReader[K, C]( } if (!emptyPartitionIds.isEmpty) { logDebug(s"Found ${emptyPartitionIds - .size()} empty shuffle partitions: ${emptyPartitionIds.asScala.mkString(",")}") + .size()} empty shuffle partitions: ${emptyPartitionIds.asScala.mkString(",")}") } iterators = shuffleDataIterList.iterator() if (iterators.hasNext) { From 1defeaed692ed1f11f5d877e703ce00048763979 Mon Sep 17 00:00:00 2001 From: guihuawen Date: Thu, 12 Mar 2026 00:14:54 +0800 Subject: [PATCH 04/14] [AURON #2080] Support Hive Parquet table to NativeParquetHiveTableScanExec --- .../scala/org/apache/spark/sql/auron/AuronConverters.scala | 6 +++--- .../apache/spark/sql/auron/AuronSparkSessionExtension.scala | 2 +- .../scala/org/apache/spark/sql/auron/NativeHelper.scala | 2 +- .../org/apache/spark/sql/auron/util/TaskContextHelper.scala | 2 +- 4 files changed, 6 insertions(+), 6 deletions(-) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala index 824dd6759..d01c82710 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronConverters.scala @@ -424,7 +424,7 @@ object AuronConverters extends Logging { assert( !exec.requiredSchema.exists(e => existTimestampType(e.dataType)), s"Parquet scan with timestamp type is not supported for table: ${tableIdentifier - .getOrElse("unknown")}. " + + .getOrElse("unknown")}. " + "Set spark.auron.enable.scan.parquet.timestamp=true to enable timestamp support " + "or remove timestamp columns from the query.") } @@ -435,7 +435,7 @@ object AuronConverters extends Logging { assert( !exec.requiredSchema.exists(e => existTimestampType(e.dataType)), s"ORC scan with timestamp type is not supported for tableIdentifier: ${tableIdentifier - .getOrElse("unknown")}. " + + .getOrElse("unknown")}. " + "Set spark.auron.enable.scan.orc.timestamp=true to enable timestamp support " + "or remove timestamp columns from the query.") } @@ -443,7 +443,7 @@ object AuronConverters extends Logging { case p => throw new NotImplementedError( s"Cannot convert FileSourceScanExec tableIdentifier: ${tableIdentifier.getOrElse( - "unknown")}, class: ${p.getClass.getName}") + "unknown")}, class: ${p.getClass.getName}") } } diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala index 47492aa3d..b68b04954 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/AuronSparkSessionExtension.scala @@ -91,7 +91,7 @@ case class AuronColumnarOverrides(sparkSession: SparkSession) extends ColumnarRu dumpSimpleSparkPlanTreeNode(sparkPlanTransformed) logInfo(s"Transformed spark plan after preColumnarTransitions:\n${sparkPlanTransformed - .treeString(verbose = true, addSuffix = true)}") + .treeString(verbose = true, addSuffix = true)}") // post-transform Shims.get.postTransform(sparkPlanTransformed, sparkSession.sparkContext) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala index 7a1e34724..e16656471 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/NativeHelper.scala @@ -74,7 +74,7 @@ object NativeHelper extends Logging { val heapMemory = Runtime.getRuntime.maxMemory() val offheapMemory = totalMemory - heapMemory logWarning(s"memory total: ${Utils.bytesToString(totalMemory)}, onheap: ${Utils.bytesToString( - heapMemory)}, offheap: ${Utils.bytesToString(offheapMemory)}") + heapMemory)}, offheap: ${Utils.bytesToString(offheapMemory)}") offheapMemory } diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/auron/util/TaskContextHelper.scala b/spark-extension/src/main/scala/org/apache/spark/sql/auron/util/TaskContextHelper.scala index eecec5ef4..8e5d7353f 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/auron/util/TaskContextHelper.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/auron/util/TaskContextHelper.scala @@ -47,7 +47,7 @@ object TaskContextHelper extends Logging { val thread = Thread.currentThread() val threadName = if (context != null) { s"auron native task ${context.partitionId()}.${context.attemptNumber()} in stage ${context - .stageId()}.${context.stageAttemptNumber()} (TID ${context.taskAttemptId()})" + .stageId()}.${context.stageAttemptNumber()} (TID ${context.taskAttemptId()})" } else { "auron native task " + thread.getName } From aee86b15ad81b3e6c0a0f330d0c5b3852dedd173 Mon Sep 17 00:00:00 2001 From: guihuawen Date: Sat, 14 Mar 2026 21:58:38 +0800 Subject: [PATCH 05/14] [AURON #2080] Support Hive Parquet table to NativeParquetHiveTableScanExec --- .../hive/execution/BaseAuronHiveSuite.scala | 30 ++----------------- 1 file changed, 2 insertions(+), 28 deletions(-) diff --git a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/BaseAuronHiveSuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/BaseAuronHiveSuite.scala index 99bce5eac..b2b8da28f 100644 --- a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/BaseAuronHiveSuite.scala +++ b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/BaseAuronHiveSuite.scala @@ -16,39 +16,13 @@ */ package org.apache.spark.sql.hive.execution -import java.io.File - -import org.apache.commons.io.FileUtils -import org.apache.spark.{SparkConf, SparkContext, SparkFunSuite} +import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.hive.test.TestHiveContext -import org.scalatest.BeforeAndAfterAll -trait BaseAuronHiveSuite extends SparkFunSuite with BeforeAndAfterAll { +trait BaseAuronHiveSuite { protected val spark: SparkSession = TestAuronHive.sparkSession - - protected val suiteWorkspace: String = getClass.getResource("/").getPath + "auron-tests-workdir" - protected val warehouseDir: String = suiteWorkspace + "/spark-warehouse" - protected val metastoreDir: String = suiteWorkspace + "/meta" - - protected def resetSuiteWorkspace(): Unit = { - val workdir = new File(suiteWorkspace) - if (workdir.exists()) { - FileUtils.forceDelete(workdir) - } - FileUtils.forceMkdir(workdir) - FileUtils.forceMkdir(new File(warehouseDir)) - FileUtils.forceMkdir(new File(metastoreDir)) - } - - override def beforeAll(): Unit = { - // Prepare a clean workspace before SparkSession initialization - resetSuiteWorkspace() - super.beforeAll() - spark.sparkContext.setLogLevel("WARN") - } - } object TestAuronHive From b0263ad562aa5277c533a01981e2c3c9de3c2f19 Mon Sep 17 00:00:00 2001 From: guihuawen Date: Sat, 14 Mar 2026 22:27:49 +0800 Subject: [PATCH 06/14] [AURON #2080] Support Hive Parquet table to NativeParquetHiveTableScanExec --- .../auron/plan/NativeParquetInsertIntoHiveTableBase.scala | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala index b9fb7f99b..ae379e93d 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala @@ -69,9 +69,8 @@ abstract class NativeParquetInsertIntoHiveTableBase( .filterKeys(Set("stage_id", "output_rows", "elapsed_compute")) .toSeq :+ ("io_time", SQLMetrics.createNanoTimingMetric(sparkContext, "Native.io_time")) - :+ ( - "bytes_written", - SQLMetrics + :+ ("bytes_written", + SQLMetrics .createSizeMetric(sparkContext, "Native.bytes_written")): _*) def check(): Unit = { From 3ab061fe1d43f80ba1880b7f79f7beca73e2d4ea Mon Sep 17 00:00:00 2001 From: guihuawen Date: Sat, 14 Mar 2026 22:40:23 +0800 Subject: [PATCH 07/14] [AURON #2080] Support Hive Parquet table to NativeParquetHiveTableScanExec --- .../auron/plan/NativeParquetInsertIntoHiveTableBase.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala index ae379e93d..d43f7d17d 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/execution/auron/plan/NativeParquetInsertIntoHiveTableBase.scala @@ -71,7 +71,7 @@ abstract class NativeParquetInsertIntoHiveTableBase( :+ ("io_time", SQLMetrics.createNanoTimingMetric(sparkContext, "Native.io_time")) :+ ("bytes_written", SQLMetrics - .createSizeMetric(sparkContext, "Native.bytes_written")): _*) + .createSizeMetric(sparkContext, "Native.bytes_written")): _*) def check(): Unit = { val hadoopConf = sparkContext.hadoopConfiguration From bad36767beb4feebbf6e8cb2706778be1ed2655c Mon Sep 17 00:00:00 2001 From: guihuawen Date: Sat, 14 Mar 2026 22:51:56 +0800 Subject: [PATCH 08/14] Revert "[AURON #2080] Support Hive Parquet table to NativeParquetHiveTableScanExec" This reverts commit 9dd77be1dfcc4568a47ad3e6f397ddbdc5288a56. --- .../auron/shuffle/uniffle/AuronUniffleShuffleReader.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/thirdparty/auron-uniffle/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/uniffle/AuronUniffleShuffleReader.scala b/thirdparty/auron-uniffle/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/uniffle/AuronUniffleShuffleReader.scala index 48938a706..94e139ac4 100644 --- a/thirdparty/auron-uniffle/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/uniffle/AuronUniffleShuffleReader.scala +++ b/thirdparty/auron-uniffle/src/main/scala/org/apache/spark/sql/execution/auron/shuffle/uniffle/AuronUniffleShuffleReader.scala @@ -172,7 +172,7 @@ class AuronUniffleShuffleReader[K, C]( } if (!emptyPartitionIds.isEmpty) { logDebug(s"Found ${emptyPartitionIds - .size()} empty shuffle partitions: ${emptyPartitionIds.asScala.mkString(",")}") + .size()} empty shuffle partitions: ${emptyPartitionIds.asScala.mkString(",")}") } iterators = shuffleDataIterList.iterator() if (iterators.hasNext) { From 88af8c8c33b89f48e74ce7f8249c309817351ffb Mon Sep 17 00:00:00 2001 From: guihuawen Date: Sun, 15 Mar 2026 10:22:38 +0800 Subject: [PATCH 09/14] [AURON #2080] Support Hive Parquet table to NativeParquetHiveTableScanExec --- .../plan/NativeParquetHiveTableScanExec.scala | 89 +++++++++++++------ 1 file changed, 60 insertions(+), 29 deletions(-) diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeParquetHiveTableScanExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeParquetHiveTableScanExec.scala index 669c14749..20c3ea8af 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeParquetHiveTableScanExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeParquetHiveTableScanExec.scala @@ -17,10 +17,8 @@ package org.apache.spark.sql.hive.execution.auron.plan import java.util.UUID - import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer - import org.apache.hadoop.conf.Configurable import org.apache.hadoop.hive.ql.exec.Utilities import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable} @@ -43,8 +41,7 @@ import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFil import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim} import org.apache.spark.sql.hive.client.HiveClientImpl import org.apache.spark.sql.hive.execution.HiveTableScanExec - -import org.apache.auron.{protobuf => pb} +import org.apache.auron.{sparkver, protobuf => pb} import org.apache.auron.metric.SparkMetricNode case class NativeParquetHiveTableScanExec(basedHiveScan: HiveTableScanExec) @@ -163,35 +160,69 @@ case class NativeParquetHiveTableScanExec(basedHiveScan: HiveTableScanExec) friendlyName = "NativeRDD.ParquetHiveTableScan") } + @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5 / 4.0 / 4.1") + def getPrunedPartitions(newJobConf: JobConf): Array[PartitionedFile] = { + val partitions = basedHiveScan.prunedPartitions + val arrayPartitionedFile = ArrayBuffer[PartitionedFile]() + partitions.foreach { partition => + val partDesc = Utilities.getPartitionDescFromTableDesc(nativeTableDesc, partition, true) + val partPath = partition.getDataLocation + HadoopTableReader.initializeLocalJobConfFunc(partPath.toString, nativeTableDesc)( + newJobConf) + val partitionValues = partition.getTPartition.getValues + + val partitionInternalRow = new GenericInternalRow(partitionValues.size()) + for (partitionIndex <- 0 until partitionValues.size) { + partitionInternalRow.update(partitionIndex, partitionValues.get(partitionIndex)) + } + + val inputFormatClass = partDesc.getInputFileFormatClass + .asInstanceOf[Class[newInputClass[Writable, Writable]]] + newJobConf.set("mapred.input.dir", partPath.toString) + arrayPartitionedFile ++= getArrayPartitionedFile( + newJobConf, + inputFormatClass, + partitionInternalRow) + } + arrayPartitionedFile + .sortBy(_.length)(implicitly[Ordering[Long]].reverse) + .toArray + } + + @sparkver("3.0") + def getPrunedPartitions(newJobConf: JobConf): Array[PartitionedFile] = { + val partitions = basedHiveScan.rawPartitions + val arrayPartitionedFile = ArrayBuffer[PartitionedFile]() + partitions.foreach { partition => + val partDesc = Utilities.getPartitionDescFromTableDesc(nativeTableDesc, partition, true) + val partPath = partition.getDataLocation + HadoopTableReader.initializeLocalJobConfFunc(partPath.toString, nativeTableDesc)( + newJobConf) + val partitionValues = partition.getTPartition.getValues + + val partitionInternalRow = new GenericInternalRow(partitionValues.size()) + for (partitionIndex <- 0 until partitionValues.size) { + partitionInternalRow.update(partitionIndex, partitionValues.get(partitionIndex)) + } + + val inputFormatClass = partDesc.getInputFileFormatClass + .asInstanceOf[Class[newInputClass[Writable, Writable]]] + newJobConf.set("mapred.input.dir", partPath.toString) + arrayPartitionedFile ++= getArrayPartitionedFile( + newJobConf, + inputFormatClass, + partitionInternalRow) + } + arrayPartitionedFile + .sortBy(_.length)(implicitly[Ordering[Long]].reverse) + .toArray + } + override def getFilePartitions(): Array[FilePartition] = { val newJobConf = new JobConf(nativeHadoopConf) val arrayFilePartition = ArrayBuffer[FilePartition]() val partitionedFiles = if (relation.isPartitioned) { - val partitions = basedHiveScan.prunedPartitions - val arrayPartitionedFile = ArrayBuffer[PartitionedFile]() - partitions.foreach { partition => - val partDesc = Utilities.getPartitionDescFromTableDesc(nativeTableDesc, partition, true) - val partPath = partition.getDataLocation - HadoopTableReader.initializeLocalJobConfFunc(partPath.toString, nativeTableDesc)( - newJobConf) - val partitionValues = partition.getTPartition.getValues - - val partitionInternalRow = new GenericInternalRow(partitionValues.size()) - for (partitionIndex <- 0 until partitionValues.size) { - partitionInternalRow.update(partitionIndex, partitionValues.get(partitionIndex)) - } - - val inputFormatClass = partDesc.getInputFileFormatClass - .asInstanceOf[Class[newInputClass[Writable, Writable]]] - newJobConf.set("mapred.input.dir", partPath.toString) - arrayPartitionedFile ++= getArrayPartitionedFile( - newJobConf, - inputFormatClass, - partitionInternalRow) - } - arrayPartitionedFile - .sortBy(_.length)(implicitly[Ordering[Long]].reverse) - .toArray + getPrunedPartitions(newJobConf) } else { newJobConf.set("mapred.input.dir", nativeTableDesc.getProperties().getProperty("location")) val inputFormatClass = From e54828280f0c11f12f5efd7cbae55ed942135ed9 Mon Sep 17 00:00:00 2001 From: guihuawen Date: Sun, 15 Mar 2026 10:34:50 +0800 Subject: [PATCH 10/14] [AURON #2080] Support Hive Parquet table to NativeParquetHiveTableScanExec --- .../auron/plan/NativeParquetHiveTableScanExec.scala | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeParquetHiveTableScanExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeParquetHiveTableScanExec.scala index 20c3ea8af..f497e017c 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeParquetHiveTableScanExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeParquetHiveTableScanExec.scala @@ -17,8 +17,10 @@ package org.apache.spark.sql.hive.execution.auron.plan import java.util.UUID + import scala.collection.JavaConverters._ import scala.collection.mutable.ArrayBuffer + import org.apache.hadoop.conf.Configurable import org.apache.hadoop.hive.ql.exec.Utilities import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable} @@ -41,6 +43,7 @@ import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFil import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim} import org.apache.spark.sql.hive.client.HiveClientImpl import org.apache.spark.sql.hive.execution.HiveTableScanExec + import org.apache.auron.{sparkver, protobuf => pb} import org.apache.auron.metric.SparkMetricNode @@ -167,8 +170,7 @@ case class NativeParquetHiveTableScanExec(basedHiveScan: HiveTableScanExec) partitions.foreach { partition => val partDesc = Utilities.getPartitionDescFromTableDesc(nativeTableDesc, partition, true) val partPath = partition.getDataLocation - HadoopTableReader.initializeLocalJobConfFunc(partPath.toString, nativeTableDesc)( - newJobConf) + HadoopTableReader.initializeLocalJobConfFunc(partPath.toString, nativeTableDesc)(newJobConf) val partitionValues = partition.getTPartition.getValues val partitionInternalRow = new GenericInternalRow(partitionValues.size()) @@ -196,8 +198,7 @@ case class NativeParquetHiveTableScanExec(basedHiveScan: HiveTableScanExec) partitions.foreach { partition => val partDesc = Utilities.getPartitionDescFromTableDesc(nativeTableDesc, partition, true) val partPath = partition.getDataLocation - HadoopTableReader.initializeLocalJobConfFunc(partPath.toString, nativeTableDesc)( - newJobConf) + HadoopTableReader.initializeLocalJobConfFunc(partPath.toString, nativeTableDesc)(newJobConf) val partitionValues = partition.getTPartition.getValues val partitionInternalRow = new GenericInternalRow(partitionValues.size()) From a496556439cd2d05d94ecc3f5c6eb12c1279812e Mon Sep 17 00:00:00 2001 From: guihuawen Date: Sun, 15 Mar 2026 10:46:28 +0800 Subject: [PATCH 11/14] [AURON #2080] Support Hive Parquet table to NativeParquetHiveTableScanExec --- .../execution/auron/plan/NativeParquetHiveTableScanExec.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeParquetHiveTableScanExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeParquetHiveTableScanExec.scala index f497e017c..2779de109 100644 --- a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeParquetHiveTableScanExec.scala +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeParquetHiveTableScanExec.scala @@ -44,7 +44,7 @@ import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim} import org.apache.spark.sql.hive.client.HiveClientImpl import org.apache.spark.sql.hive.execution.HiveTableScanExec -import org.apache.auron.{sparkver, protobuf => pb} +import org.apache.auron.{protobuf => pb, sparkver} import org.apache.auron.metric.SparkMetricNode case class NativeParquetHiveTableScanExec(basedHiveScan: HiveTableScanExec) From d2becbfe48c1bffa15e185c8c020c964e74fcd24 Mon Sep 17 00:00:00 2001 From: guihuawen Date: Tue, 17 Mar 2026 01:57:39 +0800 Subject: [PATCH 12/14] [AURON #2080] Support Hive Parquet table to NativeParquetHiveTableScanExec --- .../hive/execution/BaseAuronHiveSuite.scala | 46 ++++++++++++++++--- .../HiveParquetTableScanExecSuite.scala | 3 +- 2 files changed, 41 insertions(+), 8 deletions(-) diff --git a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/BaseAuronHiveSuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/BaseAuronHiveSuite.scala index b2b8da28f..0382e1df9 100644 --- a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/BaseAuronHiveSuite.scala +++ b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/BaseAuronHiveSuite.scala @@ -16,16 +16,48 @@ */ package org.apache.spark.sql.hive.execution +import org.apache.commons.io.FileUtils +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.{QueryTest, SparkSession} import org.apache.spark.sql.hive.test.TestHiveContext +import org.apache.spark.sql.test.SQLTestUtils +import org.scalatest.BeforeAndAfterEach -trait BaseAuronHiveSuite { +import java.io.File + +trait BaseAuronHiveSuite extends QueryTest + with SQLTestUtils + with BeforeAndAfterEach + with AdaptiveSparkPlanHelper { + protected val suiteWorkspace: String = getClass.getResource("/").getPath + "auron-tests-workdir" + protected val warehouseDir: String = suiteWorkspace + "/spark-warehouse" + protected val metastoreDir: String = suiteWorkspace + "/meta" protected val spark: SparkSession = TestAuronHive.sparkSession -} -object TestAuronHive + protected def resetSuiteWorkspace(): Unit = { + val workdir = new File(suiteWorkspace) + if (workdir.exists()) { + FileUtils.forceDelete(workdir) + } + FileUtils.forceMkdir(workdir) + FileUtils.forceMkdir(new File(warehouseDir)) + FileUtils.forceMkdir(new File(metastoreDir)) + } + + override def beforeAll(): Unit = { + // Prepare a clean workspace before SparkSession initialization + resetSuiteWorkspace() + super.beforeAll() + spark.sparkContext.setLogLevel("WARN") + } + + override def afterAll(): Unit = { + super.afterAll() + } + + object TestAuronHive extends TestHiveContext( new SparkContext( System.getProperty("spark.sql.test.master", "local[1]"), @@ -40,8 +72,10 @@ object TestAuronHive .set("spark.auron.enable", "true") .set("spark.ui.enabled", "false") .set( - "spark.sql.warehouse.dir", - getClass.getResource("/").getPath + "auron-tests-workdir/spark-warehouse") + "spark.sql.warehouse.dir", warehouseDir) .set("spark.auron.udf.singleChildFallback.enabled", "false") .set("spark.auron.enable.parquetHiveTableScanExec", "true") .set("spark.sql.hive.convertMetastoreParquet", "false"))) {} +} + + diff --git a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/HiveParquetTableScanExecSuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/HiveParquetTableScanExecSuite.scala index 7477b257b..29f0ad467 100644 --- a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/HiveParquetTableScanExecSuite.scala +++ b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/HiveParquetTableScanExecSuite.scala @@ -16,10 +16,9 @@ */ package org.apache.spark.sql.hive.execution -import org.apache.spark.sql.AuronQueryTest import org.apache.spark.sql.hive.execution.auron.plan.NativeParquetHiveTableScanExec -class HiveParquetTableScanExecSuite extends AuronQueryTest with BaseAuronHiveSuite { +class HiveParquetTableScanExecSuite extends BaseAuronHiveSuite { test("test hive parquet table without partition to native") { withTempView("t1") { From 3c45e3370fc50e7475590449c814e5ae6f17d9c8 Mon Sep 17 00:00:00 2001 From: guihuawen Date: Tue, 17 Mar 2026 02:00:18 +0800 Subject: [PATCH 13/14] [AURON #2080] Support Hive Parquet table to NativeParquetHiveTableScanExec --- .../hive/execution/BaseAuronHiveSuite.scala | 52 +++++++++---------- 1 file changed, 25 insertions(+), 27 deletions(-) diff --git a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/BaseAuronHiveSuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/BaseAuronHiveSuite.scala index 0382e1df9..73e635db9 100644 --- a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/BaseAuronHiveSuite.scala +++ b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/BaseAuronHiveSuite.scala @@ -16,20 +16,21 @@ */ package org.apache.spark.sql.hive.execution +import java.io.File + import org.apache.commons.io.FileUtils -import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.sql.{QueryTest, SparkSession} +import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.hive.test.TestHiveContext import org.apache.spark.sql.test.SQLTestUtils import org.scalatest.BeforeAndAfterEach -import java.io.File - -trait BaseAuronHiveSuite extends QueryTest - with SQLTestUtils - with BeforeAndAfterEach - with AdaptiveSparkPlanHelper { +trait BaseAuronHiveSuite + extends QueryTest + with SQLTestUtils + with BeforeAndAfterEach + with AdaptiveSparkPlanHelper { protected val suiteWorkspace: String = getClass.getResource("/").getPath + "auron-tests-workdir" protected val warehouseDir: String = suiteWorkspace + "/spark-warehouse" protected val metastoreDir: String = suiteWorkspace + "/meta" @@ -58,24 +59,21 @@ trait BaseAuronHiveSuite extends QueryTest } object TestAuronHive - extends TestHiveContext( - new SparkContext( - System.getProperty("spark.sql.test.master", "local[1]"), - "TestSQLContext", - new SparkConf() - .set("spark.sql.test", "") - .set("spark.sql.extensions", "org.apache.spark.sql.auron.AuronSparkSessionExtension") - .set( - "spark.shuffle.manager", - "org.apache.spark.sql.execution.auron.shuffle.AuronShuffleManager") - .set("spark.memory.offHeap.enabled", "false") - .set("spark.auron.enable", "true") - .set("spark.ui.enabled", "false") - .set( - "spark.sql.warehouse.dir", warehouseDir) - .set("spark.auron.udf.singleChildFallback.enabled", "false") - .set("spark.auron.enable.parquetHiveTableScanExec", "true") - .set("spark.sql.hive.convertMetastoreParquet", "false"))) {} + extends TestHiveContext( + new SparkContext( + System.getProperty("spark.sql.test.master", "local[1]"), + "TestSQLContext", + new SparkConf() + .set("spark.sql.test", "") + .set("spark.sql.extensions", "org.apache.spark.sql.auron.AuronSparkSessionExtension") + .set( + "spark.shuffle.manager", + "org.apache.spark.sql.execution.auron.shuffle.AuronShuffleManager") + .set("spark.memory.offHeap.enabled", "false") + .set("spark.auron.enable", "true") + .set("spark.ui.enabled", "false") + .set("spark.sql.warehouse.dir", warehouseDir) + .set("spark.auron.udf.singleChildFallback.enabled", "false") + .set("spark.auron.enable.parquetHiveTableScanExec", "true") + .set("spark.sql.hive.convertMetastoreParquet", "false"))) {} } - - From 6b0631fa7f87888aa2d80f6865666eba9fe0a54a Mon Sep 17 00:00:00 2001 From: guihuawen Date: Mon, 23 Mar 2026 07:46:11 +0800 Subject: [PATCH 14/14] [AURON #2080] Support Hive Parquet table to NativeParquetHiveTableScanExec --- .../org/apache/auron/BaseAuronSQLSuite.scala | 1 + .../hive/execution/BaseAuronHiveSuite.scala | 53 ++++++++----------- .../HiveParquetTableScanExecSuite.scala | 17 +++--- 3 files changed, 32 insertions(+), 39 deletions(-) diff --git a/spark-extension-shims-spark/src/test/scala/org/apache/auron/BaseAuronSQLSuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/auron/BaseAuronSQLSuite.scala index 0129739db..8c91a3e22 100644 --- a/spark-extension-shims-spark/src/test/scala/org/apache/auron/BaseAuronSQLSuite.scala +++ b/spark-extension-shims-spark/src/test/scala/org/apache/auron/BaseAuronSQLSuite.scala @@ -69,6 +69,7 @@ trait BaseAuronSQLSuite extends SharedSparkSession { "spark.sql.hive.metastore.barrierPrefixes", "org.apache.spark.sql.hive.execution.PairSerDe") .set("spark.hadoop.hive.metastore.disallow.incompatible.col.type.changes", "false") + .set("spark.sql.catalogImplementation", "hive") if (SparkVersionUtil.isSparkV40OrGreater) { // Spark 4.0+: Disable session artifact isolation, align with Spark 3.x behavior diff --git a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/BaseAuronHiveSuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/BaseAuronHiveSuite.scala index 73e635db9..f2a7f95a5 100644 --- a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/BaseAuronHiveSuite.scala +++ b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/BaseAuronHiveSuite.scala @@ -16,47 +16,38 @@ */ package org.apache.spark.sql.hive.execution -import java.io.File -import org.apache.commons.io.FileUtils import org.apache.spark.{SparkConf, SparkContext} -import org.apache.spark.sql.{QueryTest, SparkSession} -import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper +import org.apache.spark.sql.SparkSession import org.apache.spark.sql.hive.test.TestHiveContext -import org.apache.spark.sql.test.SQLTestUtils -import org.scalatest.BeforeAndAfterEach -trait BaseAuronHiveSuite - extends QueryTest - with SQLTestUtils - with BeforeAndAfterEach - with AdaptiveSparkPlanHelper { +trait BaseAuronHiveSuite { protected val suiteWorkspace: String = getClass.getResource("/").getPath + "auron-tests-workdir" protected val warehouseDir: String = suiteWorkspace + "/spark-warehouse" protected val metastoreDir: String = suiteWorkspace + "/meta" protected val spark: SparkSession = TestAuronHive.sparkSession - protected def resetSuiteWorkspace(): Unit = { - val workdir = new File(suiteWorkspace) - if (workdir.exists()) { - FileUtils.forceDelete(workdir) - } - FileUtils.forceMkdir(workdir) - FileUtils.forceMkdir(new File(warehouseDir)) - FileUtils.forceMkdir(new File(metastoreDir)) - } - - override def beforeAll(): Unit = { - // Prepare a clean workspace before SparkSession initialization - resetSuiteWorkspace() - super.beforeAll() - spark.sparkContext.setLogLevel("WARN") - } - - override def afterAll(): Unit = { - super.afterAll() - } +// protected def resetSuiteWorkspace(): Unit = { +// val workdir = new File(suiteWorkspace) +// if (workdir.exists()) { +// FileUtils.forceDelete(workdir) +// } +// FileUtils.forceMkdir(workdir) +// FileUtils.forceMkdir(new File(warehouseDir)) +// FileUtils.forceMkdir(new File(metastoreDir)) +// } +// +// override def beforeAll(): Unit = { +// // Prepare a clean workspace before SparkSession initialization +// resetSuiteWorkspace() +// super.beforeAll() +// spark.sparkContext.setLogLevel("WARN") +// } +// +// override def afterAll(): Unit = { +// super.afterAll() +// } object TestAuronHive extends TestHiveContext( diff --git a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/HiveParquetTableScanExecSuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/HiveParquetTableScanExecSuite.scala index 29f0ad467..efddfb9dd 100644 --- a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/HiveParquetTableScanExecSuite.scala +++ b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/HiveParquetTableScanExecSuite.scala @@ -16,15 +16,16 @@ */ package org.apache.spark.sql.hive.execution +import org.apache.spark.sql.AuronQueryTest import org.apache.spark.sql.hive.execution.auron.plan.NativeParquetHiveTableScanExec -class HiveParquetTableScanExecSuite extends BaseAuronHiveSuite { +class HiveParquetTableScanExecSuite extends AuronQueryTest with BaseAuronHiveSuite { test("test hive parquet table without partition to native") { withTempView("t1") { - spark.sql("create table t1 (a string) stored as parquet") - spark.sql("insert into t1 values(1)") - val df = spark.sql("select * from t1") + TestAuronHive.sql("create table t1 (a string) stored as parquet") + TestAuronHive.sql("insert into t1 values(1)") + val df = TestAuronHive.sql("select * from t1") assert(df.collect().toList.head.get(0) == "1") val plan = df.queryExecution.executedPlan assert(collect(plan) { case e: NativeParquetHiveTableScanExec => @@ -35,10 +36,10 @@ class HiveParquetTableScanExecSuite extends BaseAuronHiveSuite { test("test hive parquet table partition to native") { withTempView("t1") { - spark.sql("create table t1 (a string) stored as parquet partitioned by(pt string)") - spark.sql("insert into t1 partition(pt='2026-03-10') values('1')") - spark.sql("insert into t1 partition(pt='2026-03-11') values('1')") - val df = spark.sql("select * from t1 where pt = '2026-03-10'") + TestAuronHive.sql("create table t1 (a string) stored as parquet partitioned by(pt string)") + TestAuronHive.sql("insert into t1 partition(pt='2026-03-10') values('1')") + TestAuronHive.sql("insert into t1 partition(pt='2026-03-11') values('1')") + val df = TestAuronHive.sql("select * from t1 where pt = '2026-03-10'") df.show() assert(df.collect().toList.head.get(0) == "1") assert(df.collect().toList.head.get(1) == "2026-03-10")