diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala index f4d5f867e622..7dc228feaff8 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinSuite.scala @@ -42,38 +42,63 @@ import org.apache.spark.sql.execution.streaming.runtime.MemoryStream import org.apache.spark.sql.execution.streaming.state._ import org.apache.spark.sql.functions._ import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.test.SQLTestUtils import org.apache.spark.sql.types.{BooleanType, IntegerType, LongType, StructType} import org.apache.spark.tags.SlowSQLTest import org.apache.spark.util.Utils -trait AlsoTestWithVirtualColumnFamilyJoins extends SQLTestUtils { - /** Tests both with and without join ops using virtual column families */ +abstract class StreamingJoinSuite + extends StreamTest with StateStoreMetricsTest with BeforeAndAfter { + + sealed trait Mode + object Mode { + case object WithVCF extends Mode + case object WithoutVCF extends Mode + } + + protected def testMode: Mode + + /** Dispatches each test to VCF or non-VCF mode based on [[testMode]]. */ override protected def test(testName: String, testTags: Tag*)(testBody: => Any)( implicit pos: Position): Unit = { - // Test with virtual column family joins with changelog checkpointing enabled and disabled - // Since virtual column family joins require RocksDB, we only test with RocksDB here. - Seq("false", "true").foreach { enabled => - testWithVirtualColumnFamilyJoins( - testName + s" with (with changelog checkpointing = $enabled)", testTags: _*) { - withSQLConf( - "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled" -> enabled - ) { - testBody + testMode match { + case Mode.WithVCF => + // Test with VCF with changelog checkpointing enabled and disabled. + // Since VCF requires RocksDB, we only test with RocksDB here. + Seq("false", "true").foreach { enabled => + testWithVirtualColumnFamilyJoins( + testName + s" (with changelog checkpointing = $enabled)", testTags: _*) { + withSQLConf( + "spark.sql.streaming.stateStore.rocksdb.changelogCheckpointing.enabled" -> enabled + ) { + testBody + } + } + } + case Mode.WithoutVCF => + // Test with both RocksDB and HDFS state store providers without VCF. + val providers = Seq( + classOf[RocksDBStateStoreProvider].getName, + classOf[HDFSBackedStateStoreProvider].getName + ) + providers.foreach { provider => + testWithoutVirtualColumnFamilyJoins(testName + s" (with $provider)", testTags: _*) { + withSQLConf( + SQLConf.STATE_STORE_PROVIDER_CLASS.key -> provider + ) { + testBody + } + } } - } } + } - // Test with both RocksDB and HDFS state store providers without virtual column family joins - val providers = Seq( - classOf[RocksDBStateStoreProvider].getName, - classOf[HDFSBackedStateStoreProvider].getName - ) - - providers.foreach { provider => - testWithoutVirtualColumnFamilyJoins(testName + s" (with $provider)", testTags: _*) { + def testWithVirtualColumnFamilyJoins(testName: String, testTags: Tag*)( + testBody: => Any): Unit = { + if (testMode == Mode.WithVCF) { + super.test(testName + " (with virtual column family joins)", testTags: _*) { withSQLConf( - SQLConf.STATE_STORE_PROVIDER_CLASS.key -> provider + SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION.key -> "3", + SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName ) { testBody } @@ -81,39 +106,16 @@ trait AlsoTestWithVirtualColumnFamilyJoins extends SQLTestUtils { } } - def testWithVirtualColumnFamilyJoins(testName: String, testTags: Tag*)(testBody: => Any): Unit = { - super.test(testName + " (with virtual column family joins)", testTags: _*) { - // in case tests have any code that needs to execute before every test - super.beforeEach() - // We must be using RocksDB for virtual column family joins - withSQLConf( - SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION.key -> "3", - SQLConf.STATE_STORE_PROVIDER_CLASS.key -> classOf[RocksDBStateStoreProvider].getName - ) { - testBody - } - // in case tests have any code that needs to execute after every test - super.afterEach() - } - } - def testWithoutVirtualColumnFamilyJoins(testName: String, testTags: Tag*)( testBody: => Any): Unit = { - super.test(testName + " (without virtual column family joins)", testTags: _*) { - // in case tests have any code that needs to execute before every test - super.beforeEach() - withSQLConf(SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION.key -> "2") { - testBody + if (testMode == Mode.WithoutVCF) { + super.test(testName + " (without virtual column family joins)", testTags: _*) { + withSQLConf(SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION.key -> "2") { + testBody + } } - // in case tests have any code that needs to execute after every test - super.afterEach() } } -} - -abstract class StreamingJoinSuite - extends StreamTest with StateStoreMetricsTest with BeforeAndAfter - with AlsoTestWithVirtualColumnFamilyJoins { import testImplicits._ @@ -318,7 +320,7 @@ abstract class StreamingJoinSuite } @SlowSQLTest -class StreamingInnerJoinSuite extends StreamingJoinSuite { +abstract class StreamingInnerJoinBase extends StreamingJoinSuite { import testImplicits._ test("stream stream inner join on non-time column") { @@ -391,55 +393,6 @@ class StreamingInnerJoinSuite extends StreamingJoinSuite { ) } - test("stream stream inner join on windows - with watermark") { - val input1 = MemoryStream[Int] - val input2 = MemoryStream[Int] - - val df1 = input1.toDF() - .select($"value" as "key", timestamp_seconds($"value") as "timestamp", - ($"value" * 2) as "leftValue") - .withWatermark("timestamp", "10 seconds") - .select($"key", window($"timestamp", "10 second"), $"leftValue") - - val df2 = input2.toDF() - .select($"value" as "key", timestamp_seconds($"value") as "timestamp", - ($"value" * 3) as "rightValue") - .select($"key", window($"timestamp", "10 second"), $"rightValue") - - val joined = df1.join(df2, Seq("key", "window")) - .select($"key", $"window.end".cast("long"), $"leftValue", $"rightValue") - - testStream(joined)( - AddData(input1, 1), - CheckAnswer(), - assertNumStateRows(total = 1, updated = 1), - - AddData(input2, 1), - CheckAnswer((1, 10, 2, 3)), - assertNumStateRows(total = 2, updated = 1), - StopStream, - StartStream(), - - AddData(input1, 25), - CheckNewAnswer(), // watermark = 15, no-data-batch should remove 2 rows having window=[0,10] - assertNumStateRows(total = 1, updated = 1), - - AddData(input2, 25), - CheckNewAnswer((25, 30, 50, 75)), - assertNumStateRows(total = 2, updated = 1), - StopStream, - StartStream(), - - AddData(input2, 1), - CheckNewAnswer(), // Should not join as < 15 removed - assertNumStateRows(total = 2, updated = 0), // row not add as 1 < state key watermark = 15 - - AddData(input1, 5), - CheckNewAnswer(), // Same reason as above - assertNumStateRows(total = 2, updated = 0, droppedByWatermark = 1) - ) - } - test("stream stream inner join with time range - with watermark - one side condition") { import org.apache.spark.sql.functions._ @@ -922,59 +875,6 @@ class StreamingInnerJoinSuite extends StreamingJoinSuite { ) } - test("SPARK-35896: metrics in StateOperatorProgress are output correctly") { - val input1 = MemoryStream[Int] - val input2 = MemoryStream[Int] - - val df1 = input1.toDF() - .select($"value" as "key", timestamp_seconds($"value") as "timestamp", - ($"value" * 2) as "leftValue") - .withWatermark("timestamp", "10 seconds") - .select($"key", window($"timestamp", "10 second"), $"leftValue") - - val df2 = input2.toDF() - .select($"value" as "key", timestamp_seconds($"value") as "timestamp", - ($"value" * 3) as "rightValue") - .select($"key", window($"timestamp", "10 second"), $"rightValue") - - val joined = df1.join(df2, Seq("key", "window")) - .select($"key", $"window.end".cast("long"), $"leftValue", $"rightValue") - - val useVirtualColumnFamilies = - spark.sessionState.conf.getConf(SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION) >= 3 - // Number of shuffle partitions being used is 3 - val numStateStoreInstances = if (useVirtualColumnFamilies) { - // Only one state store is created per partition if we're using virtual column families - 3 - } else { - // Four state stores are created per partition otherwise - 3 * 4 - } - - testStream(joined)( - StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "3")), - AddData(input1, 1), - CheckAnswer(), - assertStateOperatorProgressMetric(operatorName = "symmetricHashJoin", - numShufflePartitions = 3, numStateStoreInstances = numStateStoreInstances), - - AddData(input2, 1), - CheckAnswer((1, 10, 2, 3)), - assertNumStateRows( - total = Seq(2), updated = Seq(1), droppedByWatermark = Seq(0), removed = Some(Seq(0))), - - AddData(input1, 25), - CheckNewAnswer(), // watermark = 15, no-data-batch should remove 2 rows having window=[0,10] - assertNumStateRows( - total = Seq(1), updated = Seq(1), droppedByWatermark = Seq(0), removed = Some(Seq(2))), - - AddData(input2, 25), - CheckNewAnswer((25, 30, 50, 75)), - assertNumStateRows( - total = Seq(2), updated = Seq(1), droppedByWatermark = Seq(0), removed = Some(Seq(0))) - ) - } - test("joining non-nullable left join key with nullable right join key") { val input1 = MemoryStream[Int] val input2 = MemoryStream[JInteger] @@ -1014,91 +914,6 @@ class StreamingInnerJoinSuite extends StreamingJoinSuite { ) } - // Only ran with virtual column family joins as the previous join version uses different schemas - testWithVirtualColumnFamilyJoins( - "SPARK-51779 Verify StateSchemaV3 writes correct key and value schemas for join operator") { - withTempDir { checkpointDir => - val input1 = MemoryStream[Int] - val input2 = MemoryStream[Int] - - val df1 = input1.toDF().select($"value" as "key", ($"value" * 2) as "leftValue") - val df2 = input2.toDF().select($"value" as "key", ($"value" * 3) as "rightValue") - val joined = df1.join(df2, "key") - - val metadataPathPostfix = "state/0/_stateSchema/default" - val stateSchemaPath = new Path(checkpointDir.toString, s"$metadataPathPostfix") - val hadoopConf = spark.sessionState.newHadoopConf() - val fm = CheckpointFileManager.create(stateSchemaPath, hadoopConf) - - val keySchemaForNums = new StructType().add("field0", IntegerType, nullable = false) - val keySchemaForIndex = keySchemaForNums.add("index", LongType) - val numSchema: StructType = new StructType().add("value", LongType) - val leftIndexSchema: StructType = new StructType() - .add("key", IntegerType, nullable = false) - .add("leftValue", IntegerType, nullable = false) - .add("matched", BooleanType) - val rightIndexSchema: StructType = new StructType() - .add("key", IntegerType, nullable = false) - .add("rightValue", IntegerType, nullable = false) - .add("matched", BooleanType) - - val schemaLeftIndex = StateStoreColFamilySchema( - "left-keyWithIndexToValue", 0, - keySchemaForIndex, 0, - leftIndexSchema, - Some(NoPrefixKeyStateEncoderSpec(keySchemaForIndex)), - None - ) - val schemaLeftNum = StateStoreColFamilySchema( - "left-keyToNumValues", 0, - keySchemaForNums, 0, - numSchema, - Some(NoPrefixKeyStateEncoderSpec(keySchemaForNums)), - None - ) - val schemaRightIndex = StateStoreColFamilySchema( - "right-keyWithIndexToValue", 0, - keySchemaForIndex, 0, - rightIndexSchema, - Some(NoPrefixKeyStateEncoderSpec(keySchemaForIndex)), - None - ) - val schemaRightNum = StateStoreColFamilySchema( - "right-keyToNumValues", 0, - keySchemaForNums, 0, - numSchema, - Some(NoPrefixKeyStateEncoderSpec(keySchemaForNums)), - None - ) - - testStream(joined)( - StartStream(checkpointLocation = checkpointDir.getCanonicalPath), - AddData(input1, 1), - CheckAnswer(), - AddData(input2, 1, 10), - CheckNewAnswer((1, 2, 3)), - Execute { q => - val schemaFilePath = fm.list(stateSchemaPath).toSeq.head.getPath - val providerId = StateStoreProviderId( - StateStoreId(checkpointDir.getCanonicalPath, 0, 0), q.lastProgress.runId - ) - val checker = new StateSchemaCompatibilityChecker( - providerId, - hadoopConf, - List(schemaFilePath) - ) - val colFamilySeq = checker.readSchemaFile() - // Verify schema count and contents - assert(colFamilySeq.length == 4) - assert(colFamilySeq.map(_.toString).toSet == Set( - schemaLeftIndex, schemaLeftNum, schemaRightIndex, schemaRightNum - ).map(_.toString)) - }, - StopStream - ) - } - } - testWithVirtualColumnFamilyJoins( "SPARK-51779 Restart streaming join query with virtual column families") { withTempDir { checkpointDir => @@ -1229,9 +1044,200 @@ class StreamingInnerJoinSuite extends StreamingJoinSuite { } } +/** Adds tests that are specific to state format V1-V3 (not compatible with V4). */ +abstract class StreamingInnerJoinSuite extends StreamingInnerJoinBase { + import testImplicits._ + + test("stream stream inner join on windows - with watermark") { + val input1 = MemoryStream[Int] + val input2 = MemoryStream[Int] + + val df1 = input1.toDF() + .select($"value" as "key", timestamp_seconds($"value") as "timestamp", + ($"value" * 2) as "leftValue") + .withWatermark("timestamp", "10 seconds") + .select($"key", window($"timestamp", "10 second"), $"leftValue") + + val df2 = input2.toDF() + .select($"value" as "key", timestamp_seconds($"value") as "timestamp", + ($"value" * 3) as "rightValue") + .select($"key", window($"timestamp", "10 second"), $"rightValue") + + val joined = df1.join(df2, Seq("key", "window")) + .select($"key", $"window.end".cast("long"), $"leftValue", $"rightValue") + + testStream(joined)( + AddData(input1, 1), + CheckAnswer(), + assertNumStateRows(total = 1, updated = 1), + + AddData(input2, 1), + CheckAnswer((1, 10, 2, 3)), + assertNumStateRows(total = 2, updated = 1), + StopStream, + StartStream(), + + AddData(input1, 25), + CheckNewAnswer(), // watermark = 15, no-data-batch should remove 2 rows having window=[0,10] + assertNumStateRows(total = 1, updated = 1), + + AddData(input2, 25), + CheckNewAnswer((25, 30, 50, 75)), + assertNumStateRows(total = 2, updated = 1), + StopStream, + StartStream(), + + AddData(input2, 1), + CheckNewAnswer(), // Should not join as < 15 removed + assertNumStateRows(total = 2, updated = 0), // row not add as 1 < state key watermark = 15 + + AddData(input1, 5), + CheckNewAnswer(), // Same reason as above + assertNumStateRows(total = 2, updated = 0, droppedByWatermark = 1) + ) + } + + test("SPARK-35896: metrics in StateOperatorProgress are output correctly") { + val input1 = MemoryStream[Int] + val input2 = MemoryStream[Int] + + val df1 = input1.toDF() + .select($"value" as "key", timestamp_seconds($"value") as "timestamp", + ($"value" * 2) as "leftValue") + .withWatermark("timestamp", "10 seconds") + .select($"key", window($"timestamp", "10 second"), $"leftValue") + + val df2 = input2.toDF() + .select($"value" as "key", timestamp_seconds($"value") as "timestamp", + ($"value" * 3) as "rightValue") + .select($"key", window($"timestamp", "10 second"), $"rightValue") + + val joined = df1.join(df2, Seq("key", "window")) + .select($"key", $"window.end".cast("long"), $"leftValue", $"rightValue") + + val useVirtualColumnFamilies = + spark.sessionState.conf.getConf(SQLConf.STREAMING_JOIN_STATE_FORMAT_VERSION) >= 3 + // Number of shuffle partitions being used is 3 + val numStateStoreInstances = if (useVirtualColumnFamilies) { + // Only one state store is created per partition if we're using virtual column families + 3 + } else { + // Four state stores are created per partition otherwise + 3 * 4 + } + + testStream(joined)( + StartStream(additionalConfs = Map(SQLConf.SHUFFLE_PARTITIONS.key -> "3")), + AddData(input1, 1), + CheckAnswer(), + assertStateOperatorProgressMetric(operatorName = "symmetricHashJoin", + numShufflePartitions = 3, numStateStoreInstances = numStateStoreInstances), + + AddData(input2, 1), + CheckAnswer((1, 10, 2, 3)), + assertNumStateRows( + total = Seq(2), updated = Seq(1), droppedByWatermark = Seq(0), removed = Some(Seq(0))), + + AddData(input1, 25), + CheckNewAnswer(), // watermark = 15, no-data-batch should remove 2 rows having window=[0,10] + assertNumStateRows( + total = Seq(1), updated = Seq(1), droppedByWatermark = Seq(0), removed = Some(Seq(2))), + + AddData(input2, 25), + CheckNewAnswer((25, 30, 50, 75)), + assertNumStateRows( + total = Seq(2), updated = Seq(1), droppedByWatermark = Seq(0), removed = Some(Seq(0))) + ) + } + + // Only ran with virtual column family joins as the previous join version uses different schemas + testWithVirtualColumnFamilyJoins( + "SPARK-51779 Verify StateSchemaV3 writes correct key and value schemas for join operator") { + withTempDir { checkpointDir => + val input1 = MemoryStream[Int] + val input2 = MemoryStream[Int] + + val df1 = input1.toDF().select($"value" as "key", ($"value" * 2) as "leftValue") + val df2 = input2.toDF().select($"value" as "key", ($"value" * 3) as "rightValue") + val joined = df1.join(df2, "key") + + val metadataPathPostfix = "state/0/_stateSchema/default" + val stateSchemaPath = new Path(checkpointDir.toString, s"$metadataPathPostfix") + val hadoopConf = spark.sessionState.newHadoopConf() + val fm = CheckpointFileManager.create(stateSchemaPath, hadoopConf) + + val keySchemaForNums = new StructType().add("field0", IntegerType, nullable = false) + val keySchemaForIndex = keySchemaForNums.add("index", LongType) + val numSchema: StructType = new StructType().add("value", LongType) + val leftIndexSchema: StructType = new StructType() + .add("key", IntegerType, nullable = false) + .add("leftValue", IntegerType, nullable = false) + .add("matched", BooleanType) + val rightIndexSchema: StructType = new StructType() + .add("key", IntegerType, nullable = false) + .add("rightValue", IntegerType, nullable = false) + .add("matched", BooleanType) + + val schemaLeftIndex = StateStoreColFamilySchema( + "left-keyWithIndexToValue", 0, + keySchemaForIndex, 0, + leftIndexSchema, + Some(NoPrefixKeyStateEncoderSpec(keySchemaForIndex)), + None + ) + val schemaLeftNum = StateStoreColFamilySchema( + "left-keyToNumValues", 0, + keySchemaForNums, 0, + numSchema, + Some(NoPrefixKeyStateEncoderSpec(keySchemaForNums)), + None + ) + val schemaRightIndex = StateStoreColFamilySchema( + "right-keyWithIndexToValue", 0, + keySchemaForIndex, 0, + rightIndexSchema, + Some(NoPrefixKeyStateEncoderSpec(keySchemaForIndex)), + None + ) + val schemaRightNum = StateStoreColFamilySchema( + "right-keyToNumValues", 0, + keySchemaForNums, 0, + numSchema, + Some(NoPrefixKeyStateEncoderSpec(keySchemaForNums)), + None + ) + + testStream(joined)( + StartStream(checkpointLocation = checkpointDir.getCanonicalPath), + AddData(input1, 1), + CheckAnswer(), + AddData(input2, 1, 10), + CheckNewAnswer((1, 2, 3)), + Execute { q => + val schemaFilePath = fm.list(stateSchemaPath).toSeq.head.getPath + val providerId = StateStoreProviderId( + StateStoreId(checkpointDir.getCanonicalPath, 0, 0), q.lastProgress.runId + ) + val checker = new StateSchemaCompatibilityChecker( + providerId, + hadoopConf, + List(schemaFilePath) + ) + val colFamilySeq = checker.readSchemaFile() + // Verify schema count and contents + assert(colFamilySeq.length == 4) + assert(colFamilySeq.map(_.toString).toSet == Set( + schemaLeftIndex, schemaLeftNum, schemaRightIndex, schemaRightNum + ).map(_.toString)) + }, + StopStream + ) + } + } +} @SlowSQLTest -class StreamingOuterJoinSuite extends StreamingJoinSuite { +abstract class StreamingOuterJoinBase extends StreamingJoinSuite { import testImplicits._ import org.apache.spark.sql.functions._ @@ -1877,6 +1883,12 @@ class StreamingOuterJoinSuite extends StreamingJoinSuite { } } +} + +/** Adds tests that are specific to state format V1-V3 (not compatible with V4). */ +abstract class StreamingOuterJoinSuite extends StreamingOuterJoinBase { + import testImplicits._ + test( "SPARK-49829 left-outer join, input being unmatched is between WM for late event and " + "WM for eviction") { @@ -1940,7 +1952,7 @@ class StreamingOuterJoinSuite extends StreamingJoinSuite { } @SlowSQLTest -class StreamingFullOuterJoinSuite extends StreamingJoinSuite { +abstract class StreamingFullOuterJoinBase extends StreamingJoinSuite { test("windowed full outer join") { withTempDir { checkpointDir => @@ -2156,8 +2168,11 @@ class StreamingFullOuterJoinSuite extends StreamingJoinSuite { } } +/** No V3-specific tests for full outer joins. */ +abstract class StreamingFullOuterJoinSuite extends StreamingFullOuterJoinBase + @SlowSQLTest -class StreamingLeftSemiJoinSuite extends StreamingJoinSuite { +abstract class StreamingLeftSemiJoinBase extends StreamingJoinSuite { import testImplicits._ @@ -2400,6 +2415,12 @@ class StreamingLeftSemiJoinSuite extends StreamingJoinSuite { ) } +} + +/** Adds tests that are specific to state format V1-V3 (not compatible with V4). */ +abstract class StreamingLeftSemiJoinSuite extends StreamingLeftSemiJoinBase { + import testImplicits._ + test( "SPARK-49829 two chained stream-stream left outer joins among three input streams") { withSQLConf(SQLConf.STREAMING_NO_DATA_MICRO_BATCHES_ENABLED.key -> "false") { @@ -2525,3 +2546,45 @@ class StreamingLeftSemiJoinSuite extends StreamingJoinSuite { } } } + +// Concrete single-mode suites for parallel CI execution and failure isolation. + +@SlowSQLTest +class StreamingInnerJoinWithVCFSuite extends StreamingInnerJoinSuite { + override protected def testMode = Mode.WithVCF +} + +@SlowSQLTest +class StreamingInnerJoinWithoutVCFSuite extends StreamingInnerJoinSuite { + override protected def testMode = Mode.WithoutVCF +} + +@SlowSQLTest +class StreamingOuterJoinWithVCFSuite extends StreamingOuterJoinSuite { + override protected def testMode = Mode.WithVCF +} + +@SlowSQLTest +class StreamingOuterJoinWithoutVCFSuite extends StreamingOuterJoinSuite { + override protected def testMode = Mode.WithoutVCF +} + +@SlowSQLTest +class StreamingFullOuterJoinWithVCFSuite extends StreamingFullOuterJoinSuite { + override protected def testMode = Mode.WithVCF +} + +@SlowSQLTest +class StreamingFullOuterJoinWithoutVCFSuite extends StreamingFullOuterJoinSuite { + override protected def testMode = Mode.WithoutVCF +} + +@SlowSQLTest +class StreamingLeftSemiJoinWithVCFSuite extends StreamingLeftSemiJoinSuite { + override protected def testMode = Mode.WithVCF +} + +@SlowSQLTest +class StreamingLeftSemiJoinWithoutVCFSuite extends StreamingLeftSemiJoinSuite { + override protected def testMode = Mode.WithoutVCF +} diff --git a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala index 2c9f2a5d11d6..ef4615c1254f 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/streaming/StreamingJoinV4Suite.scala @@ -18,7 +18,7 @@ package org.apache.spark.sql.streaming import org.apache.hadoop.fs.Path -import org.scalatest.{Args, Status, Tag} +import org.scalatest.Tag import org.apache.spark.sql.execution.streaming.checkpointing.CheckpointFileManager import org.apache.spark.sql.execution.streaming.operators.stateful.join.StreamingSymmetricHashJoinExec @@ -35,7 +35,8 @@ import org.apache.spark.tags.SlowSQLTest * RocksDB with virtual column families. The innermost withSQLConf wins, * so wrapping the test body overrides the V3 setting from the parent trait. */ -trait TestWithV4StateFormat extends AlsoTestWithVirtualColumnFamilyJoins { +trait TestWithV4StateFormat extends StreamingJoinSuite { + override protected def testMode: Mode = Mode.WithVCF override def testWithVirtualColumnFamilyJoins( testName: String, testTags: Tag*)(testBody: => Any): Unit = { @@ -47,40 +48,11 @@ trait TestWithV4StateFormat extends AlsoTestWithVirtualColumnFamilyJoins { } } } - - // V4 always uses virtual column families, so skip non-VCF tests. - override def testWithoutVirtualColumnFamilyJoins( - testName: String, testTags: Tag*)(testBody: => Any): Unit = {} - - // Use lazy val because the parent constructor registers tests before - // subclass vals are initialized. - private lazy val testsToSkip = Seq( - // V4's timestamp-based indexing does not support window structs - // in join keys. - "stream stream inner join on windows - with watermark", - // V4 uses 1 store with VCFs instead of V3's 4*partitions layout, - // so metric assertions about number of state store instances differ. - "SPARK-35896: metrics in StateOperatorProgress are output correctly", - // V4 uses different column families and encoder specs than V3; - // overridden in StreamingInnerJoinV4Suite with V4-specific assertions. - "SPARK-51779 Verify StateSchemaV3 writes correct key and value " + - "schemas for join operator", - // V4's key encoding is not yet supported by StateDataSource reader. - "SPARK-49829" - ) - - override def runTest(testName: String, args: Args): Status = { - if (testsToSkip.exists(testName.contains)) { - org.scalatest.SucceededStatus - } else { - super.runTest(testName, args) - } - } } @SlowSQLTest class StreamingInnerJoinV4Suite - extends StreamingInnerJoinSuite + extends StreamingInnerJoinBase with TestWithV4StateFormat { import testImplicits._ @@ -216,15 +188,15 @@ class StreamingInnerJoinV4Suite @SlowSQLTest class StreamingOuterJoinV4Suite - extends StreamingOuterJoinSuite + extends StreamingOuterJoinBase with TestWithV4StateFormat @SlowSQLTest class StreamingFullOuterJoinV4Suite - extends StreamingFullOuterJoinSuite + extends StreamingFullOuterJoinBase with TestWithV4StateFormat @SlowSQLTest class StreamingLeftSemiJoinV4Suite - extends StreamingLeftSemiJoinSuite + extends StreamingLeftSemiJoinBase with TestWithV4StateFormat