From 8cc885c58c6cdde2836f8e5a45a3d023100cc087 Mon Sep 17 00:00:00 2001 From: gittihub-jpg Date: Mon, 1 Jun 2026 22:30:22 +0200 Subject: [PATCH 1/3] feat: use native cast_map_to_map for CAST(MapType AS MapType) (#4491) Fixes #4491 Signed-off-by: gittihub-jpg --- .../org/apache/comet/expressions/CometCast.scala | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala index 8ecfdfe49c..0917511cb4 100644 --- a/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala +++ b/spark/src/main/scala/org/apache/comet/expressions/CometCast.scala @@ -21,7 +21,7 @@ package org.apache.comet.expressions import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Expression, Literal} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType, NullType, StructType, TimestampNTZType, TimestampType} +import org.apache.spark.sql.types.{ArrayType, DataType, DataTypes, DecimalType, MapType, NullType, StructType, TimestampNTZType, TimestampType} import org.apache.comet.CometConf import org.apache.comet.CometSparkSessionExtensions.{isSpark40Plus, withFallbackReason} @@ -199,6 +199,20 @@ object CometCast extends CometExpressionSerde[Cast] with CometExprShim { } Compatible() case (DataTypes.DateType, toType) => canCastFromDate(toType, evalMode) + case (from_map: MapType, to_map: MapType) => + isSupported(from_map.keyType, to_map.keyType, timeZoneId, evalMode) match { + case Compatible(_) => + // all good + case other => + return other + } + isSupported(from_map.valueType, to_map.valueType, timeZoneId, evalMode) match { + case Compatible(_) => + // all good + case other => + return other + } + Compatible() case _ => unsupported(fromType, toType) } } From dcbe41b6aa9fa5d1ba6f40e59a7fbf94cd79cbc1 Mon Sep 17 00:00:00 2001 From: gittihub-jpg Date: Wed, 3 Jun 2026 22:51:13 +0200 Subject: [PATCH 2/3] test: add CometCastSuite coverage for CAST(MapType AS MapType) Adds a test that writes a map column to Parquet, reads it back via the V1 scan so Comet reads the map natively, and asserts the map-to-map cast runs through Comet (checkSparkAnswerAndOperator) rather than falling back to Spark. --- .../org/apache/comet/CometCastSuite.scala | 35 +++++++++++++++++-- 1 file changed, 33 insertions(+), 2 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 4968809c29..8ca19329df 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -30,9 +30,9 @@ import org.apache.spark.sql.{CometTestBase, DataFrame, Row, SaveMode} import org.apache.spark.sql.catalyst.expressions.Cast import org.apache.spark.sql.catalyst.parser.ParseException import org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanHelper -import org.apache.spark.sql.functions.{col, monotonically_increasing_id} +import org.apache.spark.sql.functions.{col, map, monotonically_increasing_id, when} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, DataType, DataTypes, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, ShortType, StringType, StructField, StructType, TimestampType} +import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, DataType, DataTypes, DateType, DecimalType, DoubleType, FloatType, IntegerType, LongType, MapType, ShortType, StringType, StructField, StructType, TimestampType} import org.apache.comet.expressions.{CometCast, CometEvalMode} import org.apache.comet.rules.CometScanTypeChecker @@ -1496,6 +1496,37 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + // https://github.com/apache/datafusion-comet/issues/4491 + // CometCast.isSupported now routes (MapType, MapType) casts to the native cast_map_to_map + // instead of falling back to Spark. The map column must be read natively for the cast to be + // exercised by Comet, which only happens under the V1 Parquet scan (see CometMapExpressionSuite), + // so we set USE_V1_SOURCE_LIST and assert that a Comet operator (not a Spark fallback) runs. + test("cast MapType to MapType") { + withTempPath { dir => + // create input file with Comet disabled + withSQLConf(CometConf.COMET_ENABLED.key -> "false") { + val df = spark + .range(5) + // Spark does not allow null as a key but does allow null as a + // value, and the entire map can be null + .select( + when( + col("id") > 1, + map(col("id").cast(IntegerType), when(col("id") > 2, col("id").cast(IntegerType)))) + .alias("map1")) + df.write.parquet(dir.toString()) + } + + withSQLConf(SQLConf.USE_V1_SOURCE_LIST.key -> "parquet") { + val df = spark.read.parquet(dir.toString()) + // map -> map: both inner casts (int->long, int->string) are + // supported by Comet, so the whole cast should run natively without falling back. + checkSparkAnswerAndOperator( + df.select(col("map1").cast(MapType(LongType, StringType)).alias("casted"))) + } + } + } + test("cast between decimals with different precision and scale") { val rowData = Seq( Row(BigDecimal("12345.6789")), From ac8feda00161190afbf765acede9e0a94fc248e6 Mon Sep 17 00:00:00 2001 From: Ric Date: Wed, 10 Jun 2026 21:20:06 +0200 Subject: [PATCH 3/3] fix: address PR feedback for apache/datafusion-comet#4491 Signed-off-by: Ric --- .../org/apache/comet/CometCastSuite.scala | 30 +++++++++++++++---- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala index 8ca19329df..eede1ecae6 100644 --- a/spark/src/test/scala/org/apache/comet/CometCastSuite.scala +++ b/spark/src/test/scala/org/apache/comet/CometCastSuite.scala @@ -36,7 +36,7 @@ import org.apache.spark.sql.types.{ArrayType, BinaryType, BooleanType, ByteType, import org.apache.comet.expressions.{CometCast, CometEvalMode} import org.apache.comet.rules.CometScanTypeChecker -import org.apache.comet.serde.{Compatible, Incompatible} +import org.apache.comet.serde.{Compatible, Incompatible, Unsupported} class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { @@ -1509,11 +1509,13 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { .range(5) // Spark does not allow null as a key but does allow null as a // value, and the entire map can be null - .select( - when( - col("id") > 1, + .select(when(col("id") === 4, map().cast(MapType(IntegerType, IntegerType))) + .otherwise( map(col("id").cast(IntegerType), when(col("id") > 2, col("id").cast(IntegerType)))) - .alias("map1")) + .alias("map1")) + // Add an empty-map row to exercise empty/null handling in the native cast path + .unionByName( + spark.range(1).select(map().cast(MapType(IntegerType, IntegerType)).alias("map1"))) df.write.parquet(dir.toString()) } @@ -1527,6 +1529,24 @@ class CometCastSuite extends CometTestBase with AdaptiveSparkPlanHelper { } } + test("cast MapType(Integer, Float) to MapType(Integer, Decimal) is incompatible") { + val fromType = MapType(IntegerType, FloatType) + val toType = MapType(IntegerType, DecimalType(10, 2)) + assert( + CometCast + .isSupported(fromType, toType, None, CometEvalMode.LEGACY) + .isInstanceOf[Incompatible]) + } + + test("cast nested MapType to MapType with unsupported inner cast falls back") { + val fromType = MapType(IntegerType, MapType(IntegerType, IntegerType)) + val toType = MapType(IntegerType, StringType) + assert( + CometCast + .isSupported(fromType, toType, None, CometEvalMode.LEGACY) + .isInstanceOf[Unsupported]) + } + test("cast between decimals with different precision and scale") { val rowData = Seq( Row(BigDecimal("12345.6789")),