diff --git a/.github/workflows/velox_backend_x86.yml b/.github/workflows/velox_backend_x86.yml
index 1e4b6fb24af..eb4c60a56ff 100644
--- a/.github/workflows/velox_backend_x86.yml
+++ b/.github/workflows/velox_backend_x86.yml
@@ -1432,7 +1432,7 @@ jobs:
export PATH=$JAVA_HOME/bin:$PATH
java -version
$MVN_CMD clean test -Pspark-4.1 -Pscala-2.13 -Pjava-17 -Pbackends-velox \
- -Pspark-ut -DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \
+ -Pspark-ut -Pdelta -DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \
-DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.spark.tags.SlowHiveTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.EnhancedFeaturesTest,org.apache.gluten.tags.SkipTest
- name: Upload test report
if: always()
diff --git a/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala b/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala
index 6df44e779d2..137956f5c24 100644
--- a/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala
+++ b/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala
@@ -16,7 +16,7 @@
*/
package org.apache.spark.sql.delta
-import org.apache.spark.internal.{LoggingShims, MDC}
+import org.apache.spark.internal.{Logging, MDC => SparkMDC}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.delta.GlutenDeltaParquetFileFormat._
@@ -62,7 +62,7 @@ case class GlutenDeltaParquetFileFormat(
tablePath: Option[String] = None,
isCDCRead: Boolean = false)
extends GlutenParquetFileFormat
- with LoggingShims {
+ with Logging {
// Validate either we have all arguments for DV enabled read or none of them.
if (hasTablePath) {
SparkSession.getActiveSession.map { session =>
@@ -528,7 +528,7 @@ case class GlutenDeltaParquetFileFormat(
case AlwaysTrue() => Some(AlwaysTrue())
case AlwaysFalse() => Some(AlwaysFalse())
case _ =>
- logError(log"Failed to translate filter ${MDC(DeltaLogKeys.FILTER, filter)}")
+ logError(log"Failed to translate filter ${SparkMDC.of(DeltaLogKeys.FILTER, filter)}")
None
}
}
diff --git a/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala b/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
index f609a6130b8..73b9a8fdbaf 100644
--- a/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
+++ b/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala
@@ -24,7 +24,7 @@ import org.apache.gluten.execution.datasource.GlutenFormatFactory
import org.apache.gluten.extension.columnar.transition.{Convention, Transitions}
import org.apache.spark._
-import org.apache.spark.internal.{LoggingShims, MDC}
+import org.apache.spark.internal.{Logging, MDC => SparkMDC}
import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils}
import org.apache.spark.shuffle.FetchFailedException
import org.apache.spark.sql.SparkSession
@@ -63,7 +63,7 @@ import java.util.{Date, UUID}
* values to data files. Specifically L123-126, L132, and L140 where it adds option
* WRITE_PARTITION_COLUMNS
*/
-object GlutenDeltaFileFormatWriter extends LoggingShims {
+object GlutenDeltaFileFormatWriter extends Logging {
/**
* A variable used in tests to check whether the output ordering of the query matches the
@@ -343,20 +343,20 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
val ret = f
val commitMsgs = ret.map(_.commitMsg)
- logInfo(log"Start to commit write Job ${MDC(DeltaLogKeys.JOB_ID, description.uuid)}.")
+ logInfo(log"Start to commit write Job ${SparkMDC.of(DeltaLogKeys.JOB_ID, description.uuid)}.")
val (_, duration) = Utils.timeTakenMs { committer.commitJob(job, commitMsgs) }
- logInfo(log"Write Job ${MDC(DeltaLogKeys.JOB_ID, description.uuid)} committed. " +
- log"Elapsed time: ${MDC(DeltaLogKeys.DURATION, duration)} ms.")
+ logInfo(log"Write Job ${SparkMDC.of(DeltaLogKeys.JOB_ID, description.uuid)} committed. " +
+ log"Elapsed time: ${SparkMDC.of(DeltaLogKeys.DURATION, duration)} ms.")
processStats(description.statsTrackers, ret.map(_.summary.stats), duration)
logInfo(log"Finished processing stats for write job " +
- log"${MDC(DeltaLogKeys.JOB_ID, description.uuid)}.")
+ log"${SparkMDC.of(DeltaLogKeys.JOB_ID, description.uuid)}.")
// return a set of all the partition paths that were updated during this job
ret.map(_.summary.updatedPartitions).reduceOption(_ ++ _).getOrElse(Set.empty)
} catch {
case cause: Throwable =>
- logError(log"Aborting job ${MDC(DeltaLogKeys.JOB_ID, description.uuid)}", cause)
+ logError(log"Aborting job ${SparkMDC.of(DeltaLogKeys.JOB_ID, description.uuid)}", cause)
committer.abortJob(job)
throw cause
}
@@ -490,7 +490,7 @@ object GlutenDeltaFileFormatWriter extends LoggingShims {
})(catchBlock = {
// If there is an error, abort the task
dataWriter.abort()
- logError(log"Job ${MDC(DeltaLogKeys.JOB_ID, jobId)} aborted.")
+ logError(log"Job ${SparkMDC.of(DeltaLogKeys.JOB_ID, jobId)} aborted.")
}, finallyBlock = {
dataWriter.close()
})
diff --git a/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuiteShims.scala b/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuiteShims.scala
index a9ae4492135..85cde8a79b5 100644
--- a/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuiteShims.scala
+++ b/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuiteShims.scala
@@ -17,8 +17,19 @@
package org.apache.spark.sql.delta
object DeltaInsertIntoTableSuiteShims {
- val INSERT_INTO_TMP_VIEW_ERROR_MSG = "[EXPECT_TABLE_NOT_VIEW.NO_ALTERNATIVE]"
+ private val isSpark41 = org.apache.spark.SPARK_VERSION.startsWith("4.1")
- // Spark 4.0.1 reports non-constant defaults with NOT_CONSTANT.
- val INVALID_COLUMN_DEFAULT_VALUE_ERROR_MSG = "INVALID_DEFAULT_VALUE.NOT_CONSTANT"
+ val INSERT_INTO_TMP_VIEW_ERROR_MSG =
+ if (isSpark41) {
+ "[TABLE_OR_VIEW_NOT_FOUND]"
+ } else {
+ "[EXPECT_TABLE_NOT_VIEW.NO_ALTERNATIVE]"
+ }
+
+ val INVALID_COLUMN_DEFAULT_VALUE_ERROR_MSG =
+ if (isSpark41) {
+ "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION"
+ } else {
+ "INVALID_DEFAULT_VALUE.NOT_CONSTANT"
+ }
}
diff --git a/pom.xml b/pom.xml
index 2be98cafe12..bb061c7434e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -1351,8 +1351,8 @@
spark-sql-columnar-shims-spark41
4.1.1
1.10.0
- delta-spark
- 4.0.0
+ delta-spark_4.1
+ 4.1.0
40
1.1.0
2.18.2
diff --git a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/MemoryStream.scala b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/MemoryStream.scala
new file mode 100644
index 00000000000..4a675a26d98
--- /dev/null
+++ b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/MemoryStream.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.streaming
+
+import org.apache.spark.sql.{Encoder, SQLContext}
+import org.apache.spark.sql.execution.streaming.runtime.{MemoryStream => RuntimeMemoryStream}
+
+object MemoryStream {
+ def apply[A: Encoder](implicit sqlContext: SQLContext): RuntimeMemoryStream[A] = {
+ RuntimeMemoryStream[A](implicitly[Encoder[A]], sqlContext.sparkSession)
+ }
+
+ def apply[A: Encoder](
+ numPartitions: Int)(
+ implicit sqlContext: SQLContext): RuntimeMemoryStream[A] = {
+ RuntimeMemoryStream[A](numPartitions)(implicitly[Encoder[A]], sqlContext.sparkSession)
+ }
+}