From 494d06943ccd9d96cce7cea5cd922970135a8e13 Mon Sep 17 00:00:00 2001 From: Vlad Slavlotski Date: Sat, 6 Jun 2026 21:02:35 +0500 Subject: [PATCH 1/4] =?UTF-8?q?feat:=20route=20Map=20=E2=86=92=20Map=20cas?= =?UTF-8?q?ts=20to=20native=20cast=5Fmap=5Fto=5Fmap?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../scala/org/apache/comet/expressions/CometCast.scala | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala index 400229a402..f00a8a9e1a 100644 --- a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala +++ b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala @@ -21,7 +21,7 @@ package org.apache.comet.expressions import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression, Literal} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType, NullType, StructType, TimestampNTZType, TimestampType} +import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType, MapType, NullType, StructType, TimestampNTZType, TimestampType} import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.{isSpark40Plus, withInfo} @@ -198,6 +198,14 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { } } Compatible() + case (from_map: MapType, to_map: MapType) => + // Native cast_map_to_map recursively casts keys and values, so support is + // determined by whether both inner casts are individually supported. + isSupported(from_map.keyType, to_map.keyType, timeZoneId, evalMode) match { + case Compatible(_) => + isSupported(from_map.valueType, to_map.valueType, timeZoneId, evalMode) + case other => other + } case (DataTypes.DateType, toType) => canCastFromDate(toType, evalMode) case _ => unsupported(fromType, toType) } From 7b314eb58d7979fcdff3b37a01567c08f4c50b86 Mon Sep 17 00:00:00 2001 From: Vlad Slavlotski Date: Sun, 7 Jun 2026 11:31:03 +0500 Subject: [PATCH 2/4] add tests cast_map_to_map --- .../org/apache/comet/CometCastSuite.scala | 24 ++++++++++++++++++- 1 file changed, 23 insertions(+), 1 deletion(-) diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 552089c922..78370a61df 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -31,7 +31,7 @@ import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper import org.apache.spark.sql.functions.{col, monotonically_increasing_id} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, DataType, DataTypes, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType, StructField, StructType, TimestampType} +import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, DataType, DataTypes, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ShortType, StringType, StructField, StructType, TimestampType} import org.apache.comet.CometSparkSessionExtensions.isSpark41Plus import org.apache.comet.expressions.{CometCast, CometEvalMode} @@ -1592,6 +1592,28 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { testArrayCastMatrix(types, ArrayType(_), generateArrays(100, _)) } + test("cast MapType to MapType") { + // https://github.com/apache/datafusion-comet/issues/4491 + // Native cast_map_to_map already handles the Parquet `key_value` vs + // Spark `entries` field-name difference, so we only need to verify that + // the planner routes Map→Map casts into it. + import scala.collection.JavaConverters._ + val schema = + StructType(Seq(StructField("a", MapType(IntegerType, IntegerType), nullable = true))) + val rows = Range(0, 100).map { i => + if (i % 10 == 0) Row(null) + else Row(Map(i -> (i + 1), (i + 2) -> (i + 3))) + } + val input = spark.createDataFrame(rows.asJava, schema) + + Seq( + MapType(LongType, LongType), + MapType(IntegerType, StringType), + MapType(StringType, DoubleType)).foreach { toType => + castTest(input, toType) + } + } + // https://github.com/apache/datafusion-comet/issues/3906 ignore("cast nested ArrayType to nested ArrayType") { val types = Seq( From 570026afd68244e47078a5c5ddb34bd52ff4c89f Mon Sep 17 00:00:00 2001 From: Vlad Slavlotski Date: Sun, 7 Jun 2026 11:46:13 +0500 Subject: [PATCH 3/4] fix merge conflict --- spark/src/test/scala/org/apache/comet/CometCastSuite.scala | 3 +++ 1 file changed, 3 insertions(+) diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 1fcb542ae9..cb74d64e24 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -1651,6 +1651,9 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { MapType(IntegerType, StringType), MapType(StringType, DoubleType)).foreach { toType => castTest(input, toType) + } + } + test("cast ArrayType(DateType) to unsupported ArrayType falls back") { val fromType = ArrayType(DateType) val unsupportedElementTypes = From 34f508691a5605b30ddcb981239c286f05d05c7d Mon Sep 17 00:00:00 2001 From: Vlad Slavlotski Date: Sun, 7 Jun 2026 15:12:20 +0500 Subject: [PATCH 4/4] =?UTF-8?q?pin=20USE=5FV1=5FSOURCE=5FLIST=20and=20add?= =?UTF-8?q?=20propagation=20tests=20for=20Map=E2=86=92Map=20cast?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../org/apache/comet/CometCastSuite.scala | 45 ++++++++++++++++--- 1 file changed, 39 insertions(+), 6 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index cb74d64e24..aac1bc0081 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -1636,24 +1636,57 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { // https://github.com/apache/datafusion-comet/issues/4491 // Native cast_map_to_map already handles the Parquet `key_value` vs // Spark `entries` field-name difference, so we only need to verify that - // the planner routes Map→Map casts into it. + // the planner routes Map→Map casts into it. The map column must be read + // natively for the cast to be exercised by Comet, which only happens + // under the V1 Parquet scan, so we pin USE_V1_SOURCE_LIST=parquet. import scala.collection.JavaConverters._ val schema = StructType(Seq(StructField("a", MapType(IntegerType, IntegerType), nullable = true))) val rows = Range(0, 100).map { i => if (i % 10 == 0) Row(null) + else if (i % 7 == 0) Row(Map.empty[Int, Int]) else Row(Map(i -> (i + 1), (i + 2) -> (i + 3))) } val input = spark.createDataFrame(rows.asJava, schema) - Seq( - MapType(LongType, LongType), - MapType(IntegerType, StringType), - MapType(StringType, DoubleType)).foreach { toType => - castTest(input, toType) + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + Seq( + MapType(LongType, LongType), + MapType(IntegerType, StringType), + MapType(StringType, DoubleType)).foreach { toType => + castTest(input, toType) + } } } + test("cast MapType propagates Incompatible from inner value cast") { + // Float → Decimal is Incompatible due to rounding (see canCastFromFloat). + // The Map arm must propagate that Incompatible up rather than silently + // marking the whole Map → Map cast Compatible. + assert( + CometCast.isSupported( + MapType(IntegerType, FloatType), + MapType(IntegerType, DecimalType(10, 2)), + None, + CometEvalMode.LEGACY) == + Incompatible(Some("There can be rounding differences"))) + } + + test("cast MapType propagates Unsupported from nested value cast") { + // Map> → Map: the inner Map → String + // cast is Unsupported, and that must propagate through the outer Map + // arm rather than being silently swallowed. + val innerFrom = MapType(IntegerType, IntegerType) + val expectedMessage = s"Cast from $innerFrom to ${DataTypes.StringType} is not supported" + assert( + CometCast.isSupported( + MapType(IntegerType, innerFrom), + MapType(IntegerType, StringType), + None, + CometEvalMode.LEGACY) == + Unsupported(Some(expectedMessage))) + } + test("cast ArrayType(DateType) to unsupported ArrayType falls back") { val fromType = ArrayType(DateType) val unsupportedElementTypes =