From 8cb01ca46b75f5eecaaabc5f6a967f92d7dfbad7 Mon Sep 17 00:00:00 2001 From: Mohammad Linjawi Date: Tue, 12 May 2026 16:27:44 +0300 Subject: [PATCH 1/8] Fix Delta 4.0 Spark 4.1 package build --- .github/workflows/velox_backend_x86.yml | 2 +- .../delta/GlutenDeltaParquetFileFormat.scala | 6 +- .../files/GlutenDeltaFileFormatWriter.scala | 16 +-- .../streaming/CheckpointFileManager.scala | 103 ++++++++++++++++++ .../execution/streaming/MemoryStream.scala | 32 ++++++ 5 files changed, 147 insertions(+), 12 deletions(-) create mode 100644 shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala create mode 100644 shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/MemoryStream.scala diff --git a/.github/workflows/velox_backend_x86.yml b/.github/workflows/velox_backend_x86.yml index e06d0d05549..5c068a83165 100644 --- a/.github/workflows/velox_backend_x86.yml +++ b/.github/workflows/velox_backend_x86.yml @@ -1376,7 +1376,7 @@ jobs: export PATH=$JAVA_HOME/bin:$PATH java -version $MVN_CMD clean test -Pspark-4.1 -Pscala-2.13 -Pjava-17 -Pbackends-velox \ - -Pspark-ut -DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \ + -Pspark-ut -Pdelta -DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \ -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.EnhancedFeaturesTest,org.apache.gluten.tags.SkipTest - name: Upload test report if: always() diff --git a/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala b/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala index 6df44e779d2..137956f5c24 100644 --- a/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala +++ b/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/GlutenDeltaParquetFileFormat.scala @@ -16,7 +16,7 @@ */ package org.apache.spark.sql.delta -import org.apache.spark.internal.{LoggingShims, MDC} +import org.apache.spark.internal.{Logging, MDC => SparkMDC} import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.delta.GlutenDeltaParquetFileFormat._ @@ -62,7 +62,7 @@ case class GlutenDeltaParquetFileFormat( tablePath: Option[String] = None, isCDCRead: Boolean = false) extends GlutenParquetFileFormat - with LoggingShims { + with Logging { // Validate either we have all arguments for DV enabled read or none of them. if (hasTablePath) { SparkSession.getActiveSession.map { session => @@ -528,7 +528,7 @@ case class GlutenDeltaParquetFileFormat( case AlwaysTrue() => Some(AlwaysTrue()) case AlwaysFalse() => Some(AlwaysFalse()) case _ => - logError(log"Failed to translate filter ${MDC(DeltaLogKeys.FILTER, filter)}") + logError(log"Failed to translate filter ${SparkMDC.of(DeltaLogKeys.FILTER, filter)}") None } } diff --git a/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala b/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala index f609a6130b8..73b9a8fdbaf 100644 --- a/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala +++ b/backends-velox/src-delta40/main/scala/org/apache/spark/sql/delta/files/GlutenDeltaFileFormatWriter.scala @@ -24,7 +24,7 @@ import org.apache.gluten.execution.datasource.GlutenFormatFactory import org.apache.gluten.extension.columnar.transition.{Convention, Transitions} import org.apache.spark._ -import org.apache.spark.internal.{LoggingShims, MDC} +import org.apache.spark.internal.{Logging, MDC => SparkMDC} import org.apache.spark.internal.io.{FileCommitProtocol, SparkHadoopWriterUtils} import org.apache.spark.shuffle.FetchFailedException import org.apache.spark.sql.SparkSession @@ -63,7 +63,7 @@ import java.util.{Date, UUID} * values to data files. Specifically L123-126, L132, and L140 where it adds option * WRITE_PARTITION_COLUMNS */ -object GlutenDeltaFileFormatWriter extends LoggingShims { +object GlutenDeltaFileFormatWriter extends Logging { /** * A variable used in tests to check whether the output ordering of the query matches the @@ -343,20 +343,20 @@ object GlutenDeltaFileFormatWriter extends LoggingShims { val ret = f val commitMsgs = ret.map(_.commitMsg) - logInfo(log"Start to commit write Job ${MDC(DeltaLogKeys.JOB_ID, description.uuid)}.") + logInfo(log"Start to commit write Job ${SparkMDC.of(DeltaLogKeys.JOB_ID, description.uuid)}.") val (_, duration) = Utils.timeTakenMs { committer.commitJob(job, commitMsgs) } - logInfo(log"Write Job ${MDC(DeltaLogKeys.JOB_ID, description.uuid)} committed. " + - log"Elapsed time: ${MDC(DeltaLogKeys.DURATION, duration)} ms.") + logInfo(log"Write Job ${SparkMDC.of(DeltaLogKeys.JOB_ID, description.uuid)} committed. " + + log"Elapsed time: ${SparkMDC.of(DeltaLogKeys.DURATION, duration)} ms.") processStats(description.statsTrackers, ret.map(_.summary.stats), duration) logInfo(log"Finished processing stats for write job " + - log"${MDC(DeltaLogKeys.JOB_ID, description.uuid)}.") + log"${SparkMDC.of(DeltaLogKeys.JOB_ID, description.uuid)}.") // return a set of all the partition paths that were updated during this job ret.map(_.summary.updatedPartitions).reduceOption(_ ++ _).getOrElse(Set.empty) } catch { case cause: Throwable => - logError(log"Aborting job ${MDC(DeltaLogKeys.JOB_ID, description.uuid)}", cause) + logError(log"Aborting job ${SparkMDC.of(DeltaLogKeys.JOB_ID, description.uuid)}", cause) committer.abortJob(job) throw cause } @@ -490,7 +490,7 @@ object GlutenDeltaFileFormatWriter extends LoggingShims { })(catchBlock = { // If there is an error, abort the task dataWriter.abort() - logError(log"Job ${MDC(DeltaLogKeys.JOB_ID, jobId)} aborted.") + logError(log"Job ${SparkMDC.of(DeltaLogKeys.JOB_ID, jobId)} aborted.") }, finallyBlock = { dataWriter.close() }) diff --git a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala new file mode 100644 index 00000000000..9595883c75f --- /dev/null +++ b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager => Spark41CheckpointFileManager} + +import org.apache.hadoop.conf.Configuration +import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataInputStream, Path, PathFilter} + +import java.io.OutputStream + +/** + * Binary compatibility shim for Delta 4.0, which was compiled against Spark 4.0's + * CheckpointFileManager package before Spark 4.1 moved it under streaming.checkpointing. + */ +trait CheckpointFileManager { + def createAtomic( + path: Path, + overwriteIfPossible: Boolean): CheckpointFileManager.CancellableFSDataOutputStream + + def open(path: Path): FSDataInputStream + + def list(path: Path, filter: PathFilter): Array[FileStatus] + + def list(path: Path): Array[FileStatus] = { + list( + path, + new PathFilter { + override def accept(path: Path): Boolean = true + }) + } + + def mkdirs(path: Path): Unit + + def exists(path: Path): Boolean + + def delete(path: Path): Unit + + def isLocal: Boolean + + def createCheckpointDirectory(): Path +} + +object CheckpointFileManager { + def create(path: Path, hadoopConf: Configuration): CheckpointFileManager = { + new Spark41CheckpointFileManagerAdapter( + Spark41CheckpointFileManager.create(path, hadoopConf)) + } + + abstract class CancellableFSDataOutputStream(outputStream: OutputStream) + extends org.apache.hadoop.fs.FSDataOutputStream( + outputStream, + null.asInstanceOf[FileSystem.Statistics]) { + def cancel(): Unit + } + + private class Spark41CheckpointFileManagerAdapter( + delegate: Spark41CheckpointFileManager) + extends CheckpointFileManager { + override def createAtomic( + path: Path, + overwriteIfPossible: Boolean): CancellableFSDataOutputStream = { + new CancellableFSDataOutputStreamAdapter(delegate.createAtomic(path, overwriteIfPossible)) + } + + override def open(path: Path): FSDataInputStream = delegate.open(path) + + override def list(path: Path, filter: PathFilter): Array[FileStatus] = + delegate.list(path, filter) + + override def mkdirs(path: Path): Unit = delegate.mkdirs(path) + + override def exists(path: Path): Boolean = delegate.exists(path) + + override def delete(path: Path): Unit = delegate.delete(path) + + override def isLocal: Boolean = delegate.isLocal + + override def createCheckpointDirectory(): Path = delegate.createCheckpointDirectory() + } + + private class CancellableFSDataOutputStreamAdapter( + delegate: Spark41CheckpointFileManager.CancellableFSDataOutputStream) + extends CancellableFSDataOutputStream(delegate) { + override def close(): Unit = delegate.close() + + override def cancel(): Unit = delegate.cancel() + } +} diff --git a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/MemoryStream.scala b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/MemoryStream.scala new file mode 100644 index 00000000000..0101198e360 --- /dev/null +++ b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/MemoryStream.scala @@ -0,0 +1,32 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.spark.sql.execution.streaming + +import org.apache.spark.sql.{Encoder, SQLContext} +import org.apache.spark.sql.execution.streaming.runtime.{MemoryStream => RuntimeMemoryStream} + +object MemoryStream { + def apply[A: Encoder](implicit sqlContext: SQLContext): RuntimeMemoryStream[A] = { + RuntimeMemoryStream[A]()(implicitly[Encoder[A]], sqlContext) + } + + def apply[A: Encoder]( + numPartitions: Int)( + implicit sqlContext: SQLContext): RuntimeMemoryStream[A] = { + RuntimeMemoryStream[A](numPartitions)(implicitly[Encoder[A]], sqlContext.sparkSession) + } +} From 1e7ade93c0e8f32c9018dcae9193d97f51dae6c3 Mon Sep 17 00:00:00 2001 From: Mohammad Linjawi Date: Wed, 13 May 2026 13:33:35 +0300 Subject: [PATCH 2/8] Use Delta 4.1 for Spark 4.1 profile --- pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pom.xml b/pom.xml index 676c7252ac1..e3becb24d17 100644 --- a/pom.xml +++ b/pom.xml @@ -1332,8 +1332,8 @@ spark-sql-columnar-shims-spark41 4.1.1 1.10.0 - delta-spark - 4.0.0 + delta-spark_4.1 + 4.1.0 40 1.1.0 2.18.2 From f6bc30fdfecce55f3ef33583a3ad247faf3d6bb7 Mon Sep 17 00:00:00 2001 From: Mohammad Linjawi Date: Wed, 13 May 2026 14:19:13 +0300 Subject: [PATCH 3/8] Align Delta tests with Spark 4.1 errors --- .../delta/DeltaInsertIntoTableSuiteShims.scala | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuiteShims.scala b/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuiteShims.scala index a9ae4492135..85cde8a79b5 100644 --- a/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuiteShims.scala +++ b/backends-velox/src-delta40/test/scala/org/apache/spark/sql/delta/DeltaInsertIntoTableSuiteShims.scala @@ -17,8 +17,19 @@ package org.apache.spark.sql.delta object DeltaInsertIntoTableSuiteShims { - val INSERT_INTO_TMP_VIEW_ERROR_MSG = "[EXPECT_TABLE_NOT_VIEW.NO_ALTERNATIVE]" + private val isSpark41 = org.apache.spark.SPARK_VERSION.startsWith("4.1") - // Spark 4.0.1 reports non-constant defaults with NOT_CONSTANT. - val INVALID_COLUMN_DEFAULT_VALUE_ERROR_MSG = "INVALID_DEFAULT_VALUE.NOT_CONSTANT" + val INSERT_INTO_TMP_VIEW_ERROR_MSG = + if (isSpark41) { + "[TABLE_OR_VIEW_NOT_FOUND]" + } else { + "[EXPECT_TABLE_NOT_VIEW.NO_ALTERNATIVE]" + } + + val INVALID_COLUMN_DEFAULT_VALUE_ERROR_MSG = + if (isSpark41) { + "INVALID_DEFAULT_VALUE.UNRESOLVED_EXPRESSION" + } else { + "INVALID_DEFAULT_VALUE.NOT_CONSTANT" + } } From 513e63d1075f4e9c9c0406987c8cdea277fa4fb5 Mon Sep 17 00:00:00 2001 From: Mohammad Linjawi Date: Wed, 13 May 2026 15:29:14 +0300 Subject: [PATCH 4/8] Add Spark 4.1 Delta compile coverage --- .github/workflows/velox_backend_x86.yml | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/.github/workflows/velox_backend_x86.yml b/.github/workflows/velox_backend_x86.yml index 5c068a83165..b60158ee1e1 100644 --- a/.github/workflows/velox_backend_x86.yml +++ b/.github/workflows/velox_backend_x86.yml @@ -1375,8 +1375,10 @@ jobs: export JAVA_HOME=/usr/lib/jvm/java-17-openjdk export PATH=$JAVA_HOME/bin:$PATH java -version + $MVN_CMD clean test-compile -Pspark-4.1 -Pscala-2.13 -Pjava-17 -Pbackends-velox \ + -Pspark-ut -Pdelta -DskipTests -Dmaven.source.skip $MVN_CMD clean test -Pspark-4.1 -Pscala-2.13 -Pjava-17 -Pbackends-velox \ - -Pspark-ut -Pdelta -DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \ + -Pspark-ut -DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \ -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.EnhancedFeaturesTest,org.apache.gluten.tags.SkipTest - name: Upload test report if: always() From a24230c2ff52f5a23a0a6a3c2c09099b18ee8f0f Mon Sep 17 00:00:00 2001 From: Mohammad Linjawi Date: Wed, 13 May 2026 20:37:16 +0300 Subject: [PATCH 5/8] Trigger CI From 0be42c91e902e9b12571084b11533dd09fbf7942 Mon Sep 17 00:00:00 2001 From: Mohammad Linjawi Date: Mon, 18 May 2026 13:08:05 +0300 Subject: [PATCH 6/8] Address Spark 4.1 Delta review comments --- .github/workflows/velox_backend_x86.yml | 4 +- .../streaming/CheckpointFileManager.scala | 103 ------------------ .../execution/streaming/MemoryStream.scala | 4 + 3 files changed, 5 insertions(+), 106 deletions(-) delete mode 100644 shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala diff --git a/.github/workflows/velox_backend_x86.yml b/.github/workflows/velox_backend_x86.yml index b60158ee1e1..5c068a83165 100644 --- a/.github/workflows/velox_backend_x86.yml +++ b/.github/workflows/velox_backend_x86.yml @@ -1375,10 +1375,8 @@ jobs: export JAVA_HOME=/usr/lib/jvm/java-17-openjdk export PATH=$JAVA_HOME/bin:$PATH java -version - $MVN_CMD clean test-compile -Pspark-4.1 -Pscala-2.13 -Pjava-17 -Pbackends-velox \ - -Pspark-ut -Pdelta -DskipTests -Dmaven.source.skip $MVN_CMD clean test -Pspark-4.1 -Pscala-2.13 -Pjava-17 -Pbackends-velox \ - -Pspark-ut -DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \ + -Pspark-ut -Pdelta -DargLine="-Dspark.test.home=/opt/shims/spark41/spark_home/" \ -DtagsToExclude=org.apache.spark.tags.ExtendedSQLTest,org.apache.gluten.tags.UDFTest,org.apache.gluten.tags.EnhancedFeaturesTest,org.apache.gluten.tags.SkipTest - name: Upload test report if: always() diff --git a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala deleted file mode 100644 index 9595883c75f..00000000000 --- a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/CheckpointFileManager.scala +++ /dev/null @@ -1,103 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.spark.sql.execution.streaming - -import org.apache.spark.sql.execution.streaming.checkpointing.{CheckpointFileManager => Spark41CheckpointFileManager} - -import org.apache.hadoop.conf.Configuration -import org.apache.hadoop.fs.{FileStatus, FileSystem, FSDataInputStream, Path, PathFilter} - -import java.io.OutputStream - -/** - * Binary compatibility shim for Delta 4.0, which was compiled against Spark 4.0's - * CheckpointFileManager package before Spark 4.1 moved it under streaming.checkpointing. - */ -trait CheckpointFileManager { - def createAtomic( - path: Path, - overwriteIfPossible: Boolean): CheckpointFileManager.CancellableFSDataOutputStream - - def open(path: Path): FSDataInputStream - - def list(path: Path, filter: PathFilter): Array[FileStatus] - - def list(path: Path): Array[FileStatus] = { - list( - path, - new PathFilter { - override def accept(path: Path): Boolean = true - }) - } - - def mkdirs(path: Path): Unit - - def exists(path: Path): Boolean - - def delete(path: Path): Unit - - def isLocal: Boolean - - def createCheckpointDirectory(): Path -} - -object CheckpointFileManager { - def create(path: Path, hadoopConf: Configuration): CheckpointFileManager = { - new Spark41CheckpointFileManagerAdapter( - Spark41CheckpointFileManager.create(path, hadoopConf)) - } - - abstract class CancellableFSDataOutputStream(outputStream: OutputStream) - extends org.apache.hadoop.fs.FSDataOutputStream( - outputStream, - null.asInstanceOf[FileSystem.Statistics]) { - def cancel(): Unit - } - - private class Spark41CheckpointFileManagerAdapter( - delegate: Spark41CheckpointFileManager) - extends CheckpointFileManager { - override def createAtomic( - path: Path, - overwriteIfPossible: Boolean): CancellableFSDataOutputStream = { - new CancellableFSDataOutputStreamAdapter(delegate.createAtomic(path, overwriteIfPossible)) - } - - override def open(path: Path): FSDataInputStream = delegate.open(path) - - override def list(path: Path, filter: PathFilter): Array[FileStatus] = - delegate.list(path, filter) - - override def mkdirs(path: Path): Unit = delegate.mkdirs(path) - - override def exists(path: Path): Boolean = delegate.exists(path) - - override def delete(path: Path): Unit = delegate.delete(path) - - override def isLocal: Boolean = delegate.isLocal - - override def createCheckpointDirectory(): Path = delegate.createCheckpointDirectory() - } - - private class CancellableFSDataOutputStreamAdapter( - delegate: Spark41CheckpointFileManager.CancellableFSDataOutputStream) - extends CancellableFSDataOutputStream(delegate) { - override def close(): Unit = delegate.close() - - override def cancel(): Unit = delegate.cancel() - } -} diff --git a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/MemoryStream.scala b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/MemoryStream.scala index 0101198e360..29a5048fef0 100644 --- a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/MemoryStream.scala +++ b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/MemoryStream.scala @@ -19,6 +19,10 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.sql.{Encoder, SQLContext} import org.apache.spark.sql.execution.streaming.runtime.{MemoryStream => RuntimeMemoryStream} +/** + * Compatibility wrapper for Delta test sources that still import Spark 4.0's MemoryStream package. + * Spark 4.1 moved MemoryStream under execution.streaming.runtime. + */ object MemoryStream { def apply[A: Encoder](implicit sqlContext: SQLContext): RuntimeMemoryStream[A] = { RuntimeMemoryStream[A]()(implicitly[Encoder[A]], sqlContext) From f2200aacaa1cbd2e598315e90fb135057520d87c Mon Sep 17 00:00:00 2001 From: Mohammad Linjawi Date: Mon, 18 May 2026 13:28:43 +0300 Subject: [PATCH 7/8] Move Spark 4.1 MemoryStream shim to Delta tests --- backends-velox/pom.xml | 26 +++++++++++++++++++ .../execution/streaming/MemoryStream.scala | 6 +---- 2 files changed, 27 insertions(+), 5 deletions(-) rename {shims/spark41/src/main => backends-velox/src-delta40-spark41/test}/scala/org/apache/spark/sql/execution/streaming/MemoryStream.scala (84%) diff --git a/backends-velox/pom.xml b/backends-velox/pom.xml index 4432547a0fe..88d59ac48f5 100644 --- a/backends-velox/pom.xml +++ b/backends-velox/pom.xml @@ -543,6 +543,32 @@ + + spark-4.1 + + + + org.codehaus.mojo + build-helper-maven-plugin + + + add-spark41-delta-test-sources + + add-test-source + + generate-test-sources + + + ${project.basedir}/src-delta40-spark41/test/scala + ${project.basedir}/src-delta40-spark41/test/java + + + + + + + + hudi diff --git a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/MemoryStream.scala b/backends-velox/src-delta40-spark41/test/scala/org/apache/spark/sql/execution/streaming/MemoryStream.scala similarity index 84% rename from shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/MemoryStream.scala rename to backends-velox/src-delta40-spark41/test/scala/org/apache/spark/sql/execution/streaming/MemoryStream.scala index 29a5048fef0..4a675a26d98 100644 --- a/shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/MemoryStream.scala +++ b/backends-velox/src-delta40-spark41/test/scala/org/apache/spark/sql/execution/streaming/MemoryStream.scala @@ -19,13 +19,9 @@ package org.apache.spark.sql.execution.streaming import org.apache.spark.sql.{Encoder, SQLContext} import org.apache.spark.sql.execution.streaming.runtime.{MemoryStream => RuntimeMemoryStream} -/** - * Compatibility wrapper for Delta test sources that still import Spark 4.0's MemoryStream package. - * Spark 4.1 moved MemoryStream under execution.streaming.runtime. - */ object MemoryStream { def apply[A: Encoder](implicit sqlContext: SQLContext): RuntimeMemoryStream[A] = { - RuntimeMemoryStream[A]()(implicitly[Encoder[A]], sqlContext) + RuntimeMemoryStream[A](implicitly[Encoder[A]], sqlContext.sparkSession) } def apply[A: Encoder]( From b238da09cb7336634452c0e18464e9cf6391a490 Mon Sep 17 00:00:00 2001 From: Mohammad Linjawi Date: Mon, 18 May 2026 20:55:25 +0300 Subject: [PATCH 8/8] Move MemoryStream compatibility to Spark 4.1 shim --- backends-velox/pom.xml | 26 ------------------- .../execution/streaming/MemoryStream.scala | 0 2 files changed, 26 deletions(-) rename {backends-velox/src-delta40-spark41/test => shims/spark41/src/main}/scala/org/apache/spark/sql/execution/streaming/MemoryStream.scala (100%) diff --git a/backends-velox/pom.xml b/backends-velox/pom.xml index 88d59ac48f5..4432547a0fe 100644 --- a/backends-velox/pom.xml +++ b/backends-velox/pom.xml @@ -543,32 +543,6 @@ - - spark-4.1 - - - - org.codehaus.mojo - build-helper-maven-plugin - - - add-spark41-delta-test-sources - - add-test-source - - generate-test-sources - - - ${project.basedir}/src-delta40-spark41/test/scala - ${project.basedir}/src-delta40-spark41/test/java - - - - - - - - hudi diff --git a/backends-velox/src-delta40-spark41/test/scala/org/apache/spark/sql/execution/streaming/MemoryStream.scala b/shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/MemoryStream.scala similarity index 100% rename from backends-velox/src-delta40-spark41/test/scala/org/apache/spark/sql/execution/streaming/MemoryStream.scala rename to shims/spark41/src/main/scala/org/apache/spark/sql/execution/streaming/MemoryStream.scala