diff --git a/spark-extension-shims-spark/pom.xml b/spark-extension-shims-spark/pom.xml index 4c75845bf..f3e0d7ce3 100644 --- a/spark-extension-shims-spark/pom.xml +++ b/spark-extension-shims-spark/pom.xml @@ -55,11 +55,27 @@ spark-core_${scalaVersion} provided + + org.apache.spark + spark-catalyst_${scalaVersion} + provided + + + org.apache.arrow + arrow-memory-netty + + + org.apache.spark spark-hive_${scalaVersion} provided + + org.apache.hadoop + hadoop-client-api + ${hadoopVersion} + org.apache.spark spark-sql_${scalaVersion} @@ -103,15 +119,42 @@ spark-core_${scalaVersion} test-jar + + org.apache.arrow + arrow-memory-core + ${arrowVersion} + org.apache.spark spark-catalyst_${scalaVersion} test-jar + + + org.apache.arrow + arrow-memory-netty + + + org.apache.spark spark-sql_${scalaVersion} test-jar + + org.apache.spark + spark-hive_${scalaVersion} + test-jar + + + + + + + + + + + diff --git a/spark-extension-shims-spark/src/main/resources/META-INF/services/org.apache.spark.sql.auron.AuronConvertProvider b/spark-extension-shims-spark/src/main/resources/META-INF/services/org.apache.spark.sql.auron.AuronConvertProvider new file mode 100644 index 000000000..8e5cc72ed --- /dev/null +++ b/spark-extension-shims-spark/src/main/resources/META-INF/services/org.apache.spark.sql.auron.AuronConvertProvider @@ -0,0 +1,18 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +org.apache.spark.sql.hive.execution.auron.plan.HiveConvertProvider diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/HiveConvertProvider.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/HiveConvertProvider.scala new file mode 100644 index 000000000..99f821b93 --- /dev/null +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/HiveConvertProvider.scala @@ -0,0 +1,73 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution.auron.plan + +import org.apache.spark.internal.Logging +import org.apache.spark.sql.auron.{AuronConverters, AuronConvertProvider} +import org.apache.spark.sql.auron.AuronConverters.getBooleanConf +import org.apache.spark.sql.execution.SparkPlan +import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.hive.execution.HiveTableScanExec + +class HiveConvertProvider extends AuronConvertProvider with Logging { + override def isEnabled: Boolean = + getBooleanConf("spark.auron.enable.hiveTable", defaultValue = true) + + private def enableHiveTableScanExec: Boolean = + getBooleanConf("spark.auron.enable.parquetHiveTableScanExec", defaultValue = false) + + override def isSupported(exec: SparkPlan): Boolean = + exec match { + case e: HiveTableScanExec + if enableHiveTableScanExec && + e.relation.tableMeta.provider.isDefined && + e.relation.tableMeta.provider.get.equals("hive") => + true + case _ => false + } + + override def convert(exec: SparkPlan): SparkPlan = { + exec match { + case hiveExec: HiveTableScanExec + if enableHiveTableScanExec + && HiveTableUtil.isParquetTable(hiveExec) => + convertParquetHiveTableScanExec(hiveExec) + case _ => exec + } + } + + private def convertParquetHiveTableScanExec(hiveExec: HiveTableScanExec): SparkPlan = { + AuronConverters.addRenameColumnsExec(NativeParquetHiveTableScanExec(hiveExec)) + } +} + +object HiveTableUtil { + private val parquetFormat = "MapredParquetInputFormat" + + def isParquetTable(basedHiveScan: HiveTableScanExec): Boolean = { + if (HiveClientImpl + .toHiveTable(basedHiveScan.relation.tableMeta) + .getInputFormatClass + .getSimpleName + .equalsIgnoreCase(parquetFormat)) { + true + } else { + false + } + } + +} diff --git a/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeParquetHiveTableScanExec.scala b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeParquetHiveTableScanExec.scala new file mode 100644 index 000000000..2779de109 --- /dev/null +++ b/spark-extension-shims-spark/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeParquetHiveTableScanExec.scala @@ -0,0 +1,304 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution.auron.plan + +import java.util.UUID + +import scala.collection.JavaConverters._ +import scala.collection.mutable.ArrayBuffer + +import org.apache.hadoop.conf.Configurable +import org.apache.hadoop.hive.ql.exec.Utilities +import org.apache.hadoop.hive.ql.metadata.{Table => HiveTable} +import org.apache.hadoop.hive.ql.plan.TableDesc +import org.apache.hadoop.hive.serde.serdeConstants +import org.apache.hadoop.hive.serde2.objectinspector.{ObjectInspectorUtils, StructObjectInspector} +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils.ObjectInspectorCopyOption +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils +import org.apache.hadoop.io.Writable +import org.apache.hadoop.mapred.{FileSplit, InputFormat, JobConf} +import org.apache.hadoop.mapreduce.{InputFormat => newInputClass} +import org.apache.hadoop.util.ReflectionUtils +import org.apache.spark.{Partition, TaskContext} +import org.apache.spark.internal.Logging +import org.apache.spark.internal.config.HADOOP_RDD_IGNORE_EMPTY_SPLITS +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.auron.{NativeRDD, Shims} +import org.apache.spark.sql.catalyst.expressions.{AttributeMap, GenericInternalRow} +import org.apache.spark.sql.execution.datasources.{FilePartition, PartitionedFile} +import org.apache.spark.sql.hive.{HadoopTableReader, HiveShim} +import org.apache.spark.sql.hive.client.HiveClientImpl +import org.apache.spark.sql.hive.execution.HiveTableScanExec + +import org.apache.auron.{protobuf => pb, sparkver} +import org.apache.auron.metric.SparkMetricNode + +case class NativeParquetHiveTableScanExec(basedHiveScan: HiveTableScanExec) + extends NativeHiveTableScanBase(basedHiveScan) + with Logging { + + @transient private lazy val nativeTable: HiveTable = + HiveClientImpl.toHiveTable(relation.tableMeta) + @transient private lazy val fileFormat = + ParquetHiveTableUtil.getFileFormat(nativeTable.getInputFormatClass) + @transient private lazy val nativeTableDesc = new TableDesc( + nativeTable.getInputFormatClass, + nativeTable.getOutputFormatClass, + nativeTable.getMetadata) + + @transient private lazy val nativeHadoopConf = { + val hiveConf = SparkSession.getActiveSession.get.sessionState.newHadoopConf() + // append columns ids and names before broadcast + val columnOrdinals = AttributeMap(relation.dataCols.zipWithIndex) + val neededColumnIDs = output.flatMap(columnOrdinals.get).map(o => o: Integer) + val neededColumnNames = output.filter(columnOrdinals.contains).map(_.name) + + HiveShim.appendReadColumns(hiveConf, neededColumnIDs, neededColumnNames) + + val deserializer = nativeTableDesc.getDeserializerClass.getConstructor().newInstance() + deserializer.initialize(hiveConf, nativeTableDesc.getProperties) + + // Specifies types and object inspectors of columns to be scanned. + val structOI = ObjectInspectorUtils + .getStandardObjectInspector(deserializer.getObjectInspector, ObjectInspectorCopyOption.JAVA) + .asInstanceOf[StructObjectInspector] + + val columnTypeNames = structOI.getAllStructFieldRefs.asScala + .map(_.getFieldObjectInspector) + .map(TypeInfoUtils.getTypeInfoFromObjectInspector(_).getTypeName) + .mkString(",") + + hiveConf.set(serdeConstants.LIST_COLUMN_TYPES, columnTypeNames) + hiveConf.set(serdeConstants.LIST_COLUMNS, relation.dataCols.map(_.name).mkString(",")) + hiveConf + } + + private val minPartitions = if (SparkSession.getActiveSession.get.sparkContext.isLocal) { + 0 // will splitted based on block by default. + } else { + math.max( + nativeHadoopConf.getInt("mapreduce.job.maps", 1), + SparkSession.getActiveSession.get.sparkContext.defaultMinPartitions) + } + + private val ignoreEmptySplits = + SparkSession.getActiveSession.get.sparkContext.conf.get(HADOOP_RDD_IGNORE_EMPTY_SPLITS) + + override val nodeName: String = + s"NativeHiveTableScan $tableName" + + override def doExecuteNative(): NativeRDD = { + val nativeMetrics = SparkMetricNode( + metrics, + Nil, + Some({ + case ("bytes_scanned", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incBytesRead(v) + case ("output_rows", v) => + val inputMetric = TaskContext.get.taskMetrics().inputMetrics + inputMetric.incRecordsRead(v) + case _ => + })) + val nativeFileSchema = this.nativeFileSchema + val nativeFileGroups = this.nativeFileGroups + val nativePartitionSchema = this.nativePartitionSchema + + val projection = schema.map(field => relation.schema.fieldIndex(field.name)) + val broadcastedHadoopConf = this.broadcastedHadoopConf + val numPartitions = partitions.length + + new NativeRDD( + sparkContext, + nativeMetrics, + partitions.asInstanceOf[Array[Partition]], + None, + Nil, + rddShuffleReadFull = true, + (partition, _) => { + val resourceId = s"NativeParquetHiveTableScan:${UUID.randomUUID().toString}" + putJniBridgeResource(resourceId, broadcastedHadoopConf) + + val nativeFileGroup = nativeFileGroups(partition.asInstanceOf[FilePartition]) + val nativeFileScanConf = pb.FileScanExecConf + .newBuilder() + .setNumPartitions(numPartitions) + .setPartitionIndex(partition.index) + .setStatistics(pb.Statistics.getDefaultInstance) + .setSchema(nativeFileSchema) + .setFileGroup(nativeFileGroup) + .addAllProjection(projection.map(Integer.valueOf).asJava) + .setPartitionSchema(nativePartitionSchema) + .build() + fileFormat match { + case "parquet" => + val nativeParquetScanExecBuilder = pb.ParquetScanExecNode + .newBuilder() + .setBaseConf(nativeFileScanConf) + .setFsResourceId(resourceId) + .addAllPruningPredicates(new java.util.ArrayList()) // not support this filter + + pb.PhysicalPlanNode + .newBuilder() + .setParquetScan(nativeParquetScanExecBuilder.build()) + .build() + case "other" => + throw new Exception("HiveTableExec only support parquet") + } + }, + friendlyName = "NativeRDD.ParquetHiveTableScan") + } + + @sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5 / 4.0 / 4.1") + def getPrunedPartitions(newJobConf: JobConf): Array[PartitionedFile] = { + val partitions = basedHiveScan.prunedPartitions + val arrayPartitionedFile = ArrayBuffer[PartitionedFile]() + partitions.foreach { partition => + val partDesc = Utilities.getPartitionDescFromTableDesc(nativeTableDesc, partition, true) + val partPath = partition.getDataLocation + HadoopTableReader.initializeLocalJobConfFunc(partPath.toString, nativeTableDesc)(newJobConf) + val partitionValues = partition.getTPartition.getValues + + val partitionInternalRow = new GenericInternalRow(partitionValues.size()) + for (partitionIndex <- 0 until partitionValues.size) { + partitionInternalRow.update(partitionIndex, partitionValues.get(partitionIndex)) + } + + val inputFormatClass = partDesc.getInputFileFormatClass + .asInstanceOf[Class[newInputClass[Writable, Writable]]] + newJobConf.set("mapred.input.dir", partPath.toString) + arrayPartitionedFile ++= getArrayPartitionedFile( + newJobConf, + inputFormatClass, + partitionInternalRow) + } + arrayPartitionedFile + .sortBy(_.length)(implicitly[Ordering[Long]].reverse) + .toArray + } + + @sparkver("3.0") + def getPrunedPartitions(newJobConf: JobConf): Array[PartitionedFile] = { + val partitions = basedHiveScan.rawPartitions + val arrayPartitionedFile = ArrayBuffer[PartitionedFile]() + partitions.foreach { partition => + val partDesc = Utilities.getPartitionDescFromTableDesc(nativeTableDesc, partition, true) + val partPath = partition.getDataLocation + HadoopTableReader.initializeLocalJobConfFunc(partPath.toString, nativeTableDesc)(newJobConf) + val partitionValues = partition.getTPartition.getValues + + val partitionInternalRow = new GenericInternalRow(partitionValues.size()) + for (partitionIndex <- 0 until partitionValues.size) { + partitionInternalRow.update(partitionIndex, partitionValues.get(partitionIndex)) + } + + val inputFormatClass = partDesc.getInputFileFormatClass + .asInstanceOf[Class[newInputClass[Writable, Writable]]] + newJobConf.set("mapred.input.dir", partPath.toString) + arrayPartitionedFile ++= getArrayPartitionedFile( + newJobConf, + inputFormatClass, + partitionInternalRow) + } + arrayPartitionedFile + .sortBy(_.length)(implicitly[Ordering[Long]].reverse) + .toArray + } + + override def getFilePartitions(): Array[FilePartition] = { + val newJobConf = new JobConf(nativeHadoopConf) + val arrayFilePartition = ArrayBuffer[FilePartition]() + val partitionedFiles = if (relation.isPartitioned) { + getPrunedPartitions(newJobConf) + } else { + newJobConf.set("mapred.input.dir", nativeTableDesc.getProperties().getProperty("location")) + val inputFormatClass = + nativeTable.getInputFormatClass.asInstanceOf[Class[newInputClass[Writable, Writable]]] + getArrayPartitionedFile(newJobConf, inputFormatClass, new GenericInternalRow(0)) + .sortBy(_.length)(implicitly[Ordering[Long]].reverse) + .toArray + } + arrayFilePartition ++= FilePartition.getFilePartitions( + SparkSession.getActiveSession.get, + partitionedFiles, + getMaxSplitBytes(SparkSession.getActiveSession.get)) + + arrayFilePartition.toArray + } + + private def getMaxSplitBytes(sparkSession: SparkSession): Long = { + val defaultMaxSplitBytes = sparkSession.sessionState.conf.filesMaxPartitionBytes + val openCostInBytes = sparkSession.sessionState.conf.filesOpenCostInBytes + Math.min(defaultMaxSplitBytes, openCostInBytes) + } + + private def getArrayPartitionedFile( + newJobConf: JobConf, + inputFormatClass: Class[newInputClass[Writable, Writable]], + partitionInternalRow: GenericInternalRow): ArrayBuffer[PartitionedFile] = { + val allInputSplits = + getInputFormat(newJobConf, inputFormatClass).getSplits(newJobConf, minPartitions) + val inputSplits = if (ignoreEmptySplits) { + allInputSplits.filter(_.getLength > 0) + } else { + allInputSplits + } + + val arrayFilePartition = ArrayBuffer[PartitionedFile]() + for (i <- 0 until inputSplits.size) { + val inputSplit = inputSplits(i) + if (inputSplit.isInstanceOf[FileSplit]) { + val orcInputSplit = inputSplit.asInstanceOf[FileSplit] + arrayFilePartition += + Shims.get.getPartitionedFile( + partitionInternalRow, + orcInputSplit.getPath.toString, + orcInputSplit.getStart, + orcInputSplit.getLength) + } + } + arrayFilePartition + } + + private def getInputFormat( + conf: JobConf, + inputFormatClass: Class[newInputClass[Writable, Writable]]) + : InputFormat[Writable, Writable] = { + val newInputFormat = ReflectionUtils + .newInstance(inputFormatClass.asInstanceOf[Class[_]], conf) + .asInstanceOf[InputFormat[Writable, Writable]] + newInputFormat match { + case c: Configurable => c.setConf(conf) + case _ => + } + newInputFormat + } + +} + +object ParquetHiveTableUtil { + private val parquetFormat = "MapredParquetInputFormat" + + def getFileFormat(inputFormatClass: Class[_ <: InputFormat[_, _]]): String = { + if (inputFormatClass.getSimpleName.equalsIgnoreCase(parquetFormat)) { + "parquet" + } else { + "other" + } + } + +} diff --git a/spark-extension-shims-spark/src/test/scala/org/apache/auron/BaseAuronSQLSuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/auron/BaseAuronSQLSuite.scala index cd3ce9759..8c91a3e22 100644 --- a/spark-extension-shims-spark/src/test/scala/org/apache/auron/BaseAuronSQLSuite.scala +++ b/spark-extension-shims-spark/src/test/scala/org/apache/auron/BaseAuronSQLSuite.scala @@ -20,6 +20,8 @@ import java.io.File import org.apache.commons.io.FileUtils import org.apache.spark.SparkConf +import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode +import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession import org.apache.auron.util.SparkVersionUtil @@ -61,6 +63,13 @@ trait BaseAuronSQLSuite extends SharedSparkSession { .set("spark.ui.enabled", "false") .set("spark.sql.warehouse.dir", warehouseDir) .set("spark.auron.udf.singleChildFallback.enabled", "false") + .set(SQLConf.CODEGEN_FALLBACK.key, "false") + .set(SQLConf.CODEGEN_FACTORY_MODE.key, CodegenObjectFactoryMode.CODEGEN_ONLY.toString) + .set( + "spark.sql.hive.metastore.barrierPrefixes", + "org.apache.spark.sql.hive.execution.PairSerDe") + .set("spark.hadoop.hive.metastore.disallow.incompatible.col.type.changes", "false") + .set("spark.sql.catalogImplementation", "hive") if (SparkVersionUtil.isSparkV40OrGreater) { // Spark 4.0+: Disable session artifact isolation, align with Spark 3.x behavior diff --git a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/BaseAuronHiveSuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/BaseAuronHiveSuite.scala new file mode 100644 index 000000000..f2a7f95a5 --- /dev/null +++ b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/BaseAuronHiveSuite.scala @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution + + +import org.apache.spark.{SparkConf, SparkContext} +import org.apache.spark.sql.SparkSession +import org.apache.spark.sql.hive.test.TestHiveContext + +trait BaseAuronHiveSuite { + protected val suiteWorkspace: String = getClass.getResource("/").getPath + "auron-tests-workdir" + protected val warehouseDir: String = suiteWorkspace + "/spark-warehouse" + protected val metastoreDir: String = suiteWorkspace + "/meta" + + protected val spark: SparkSession = TestAuronHive.sparkSession + +// protected def resetSuiteWorkspace(): Unit = { +// val workdir = new File(suiteWorkspace) +// if (workdir.exists()) { +// FileUtils.forceDelete(workdir) +// } +// FileUtils.forceMkdir(workdir) +// FileUtils.forceMkdir(new File(warehouseDir)) +// FileUtils.forceMkdir(new File(metastoreDir)) +// } +// +// override def beforeAll(): Unit = { +// // Prepare a clean workspace before SparkSession initialization +// resetSuiteWorkspace() +// super.beforeAll() +// spark.sparkContext.setLogLevel("WARN") +// } +// +// override def afterAll(): Unit = { +// super.afterAll() +// } + + object TestAuronHive + extends TestHiveContext( + new SparkContext( + System.getProperty("spark.sql.test.master", "local[1]"), + "TestSQLContext", + new SparkConf() + .set("spark.sql.test", "") + .set("spark.sql.extensions", "org.apache.spark.sql.auron.AuronSparkSessionExtension") + .set( + "spark.shuffle.manager", + "org.apache.spark.sql.execution.auron.shuffle.AuronShuffleManager") + .set("spark.memory.offHeap.enabled", "false") + .set("spark.auron.enable", "true") + .set("spark.ui.enabled", "false") + .set("spark.sql.warehouse.dir", warehouseDir) + .set("spark.auron.udf.singleChildFallback.enabled", "false") + .set("spark.auron.enable.parquetHiveTableScanExec", "true") + .set("spark.sql.hive.convertMetastoreParquet", "false"))) {} +} diff --git a/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/HiveParquetTableScanExecSuite.scala b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/HiveParquetTableScanExecSuite.scala new file mode 100644 index 000000000..efddfb9dd --- /dev/null +++ b/spark-extension-shims-spark/src/test/scala/org/apache/spark/sql/hive/execution/HiveParquetTableScanExecSuite.scala @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.hive.execution + +import org.apache.spark.sql.AuronQueryTest +import org.apache.spark.sql.hive.execution.auron.plan.NativeParquetHiveTableScanExec + +class HiveParquetTableScanExecSuite extends AuronQueryTest with BaseAuronHiveSuite { + + test("test hive parquet table without partition to native") { + withTempView("t1") { + TestAuronHive.sql("create table t1 (a string) stored as parquet") + TestAuronHive.sql("insert into t1 values(1)") + val df = TestAuronHive.sql("select * from t1") + assert(df.collect().toList.head.get(0) == "1") + val plan = df.queryExecution.executedPlan + assert(collect(plan) { case e: NativeParquetHiveTableScanExec => + e + }.size == 1) + } + } + + test("test hive parquet table partition to native") { + withTempView("t1") { + TestAuronHive.sql("create table t1 (a string) stored as parquet partitioned by(pt string)") + TestAuronHive.sql("insert into t1 partition(pt='2026-03-10') values('1')") + TestAuronHive.sql("insert into t1 partition(pt='2026-03-11') values('1')") + val df = TestAuronHive.sql("select * from t1 where pt = '2026-03-10'") + df.show() + assert(df.collect().toList.head.get(0) == "1") + assert(df.collect().toList.head.get(1) == "2026-03-10") + val plan = df.queryExecution.executedPlan + assert(collect(plan) { case e: NativeParquetHiveTableScanExec => + e + }.size == 1) + } + } + +} diff --git a/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHiveTableScanBase.scala b/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHiveTableScanBase.scala index 6dfc8be79..951c3b7fb 100644 --- a/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHiveTableScanBase.scala +++ b/spark-extension/src/main/scala/org/apache/spark/sql/hive/execution/auron/plan/NativeHiveTableScanBase.scala @@ -85,7 +85,8 @@ abstract class NativeHiveTableScanBase(basedHiveScan: HiveTableScanExec) val nativePartitionedFile = (file: PartitionedFile) => { val nativePartitionValues = partitionSchema.zipWithIndex.map { case (field, index) => NativeConverters - .convertExpr(Literal(file.partitionValues.get(index, field.dataType), field.dataType)) + .convertExpr( + Literal.create(file.partitionValues.get(index, field.dataType), field.dataType)) .getLiteral } pb.PartitionedFile