diff --git a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala index 6cd5aef0359a..f885bca2ebff 100644 --- a/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala +++ b/backends-velox/src/main/scala/org/apache/gluten/backendsapi/velox/VeloxSparkPlanExecApi.scala @@ -286,6 +286,14 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with Logging { GenericExpressionTransformer(substraitExprName, Seq(endDate, startDate), original) } + /** Transform map_from_entries to Substrait. */ + override def genMapFromEntriesTransformer( + substraitExprName: String, + child: ExpressionTransformer, + expr: Expression): ExpressionTransformer = { + GenericExpressionTransformer(substraitExprName, Seq(child), expr) + } + override def genPreciseTimestampConversionTransformer( substraitExprName: String, children: Seq[ExpressionTransformer], diff --git a/backends-velox/src/test/scala/org/apache/gluten/functions/ScalarFunctionsValidateSuite.scala b/backends-velox/src/test/scala/org/apache/gluten/functions/ScalarFunctionsValidateSuite.scala index 97fe66319d53..748ba0d6b070 100644 --- a/backends-velox/src/test/scala/org/apache/gluten/functions/ScalarFunctionsValidateSuite.scala +++ b/backends-velox/src/test/scala/org/apache/gluten/functions/ScalarFunctionsValidateSuite.scala @@ -269,6 +269,28 @@ abstract class ScalarFunctionsValidateSuite extends FunctionsValidateSuite { } } + test("map_from_entries") { + withTempPath { + path => + Seq( + Seq((1, "10"), (2, "20"), (3, null)), + Seq((1, "10"), null, (2, "20")), + Seq.empty, + null + ).toDF("a") + .write + .parquet(path.getCanonicalPath) + + spark.read + .parquet(path.getCanonicalPath) + .createOrReplaceTempView("test") + + runQueryAndCompare("select map_from_entries(a) from test") { + checkGlutenPlan[ProjectExecTransformer] + } + } + } + test("map_keys") { withTempPath { path => diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala index 27dbb6ce4c95..73f2b57b9dfe 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/backendsapi/SparkPlanExecApi.scala @@ -375,6 +375,14 @@ trait SparkPlanExecApi { GenericExpressionTransformer(substraitExprName, childrenTransformers, expr) } + /** Transform map_from_entries to Substrait. */ + def genMapFromEntriesTransformer( + substraitExprName: String, + child: ExpressionTransformer, + expr: Expression): ExpressionTransformer = { + throw new UnsupportedOperationException("map_from_entries is not supported") + } + /** * Generate ShuffleDependency for ColumnarShuffleExchangeExec. * diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala index ab2cbbea71d4..73f7b8699630 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionConverter.scala @@ -335,6 +335,11 @@ object ExpressionConverter extends SQLConfHelper with Logging { substraitExprName, replaceWithExpressionTransformer0(m.child, attributeSeq, expressionsMap), m) + case m: MapFromEntries => + BackendsApiManager.getSparkPlanExecApiInstance.genMapFromEntriesTransformer( + substraitExprName, + replaceWithExpressionTransformer0(m.child, attributeSeq, expressionsMap), + m) case e: Explode => ExplodeTransformer( substraitExprName, diff --git a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala index b13aced2a62c..e432ac152143 100644 --- a/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala +++ b/gluten-substrait/src/main/scala/org/apache/gluten/expression/ExpressionMappings.scala @@ -266,6 +266,7 @@ object ExpressionMappings { Sig[MapFromArrays](MAP_FROM_ARRAYS), Sig[MapEntries](MAP_ENTRIES), Sig[MapZipWith](MAP_ZIP_WITH), + Sig[MapFromEntries](MAP_FROM_ENTRIES), Sig[StringToMap](STR_TO_MAP), Sig[TransformKeys](TRANSFORM_KEYS), Sig[TransformValues](TRANSFORM_VALUES), diff --git a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 001516de624d..36c912929182 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -202,6 +202,8 @@ class VeloxTestSettings extends BackendTestSettings { // Rewrite in Gluten to replace Seq with Array .exclude("Shuffle") .excludeGlutenTest("Shuffle") + // Rewrite. + .exclude("MapFromEntries") enableSuite[GlutenDateExpressionsSuite] // Rewrite because Spark collect causes long overflow. .exclude("TIMESTAMP_MICROS") diff --git a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/catalyst/expressions/GlutenCollectionExpressionsSuite.scala b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/catalyst/expressions/GlutenCollectionExpressionsSuite.scala index ca6ef9df0f48..fcbbf2a5b358 100644 --- a/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/catalyst/expressions/GlutenCollectionExpressionsSuite.scala +++ b/gluten-ut/spark32/src/test/scala/org/apache/spark/sql/catalyst/expressions/GlutenCollectionExpressionsSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.GlutenTestsTrait +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.types._ import scala.util.Random @@ -87,4 +89,63 @@ class GlutenCollectionExpressionsSuite extends CollectionExpressionsSuite with G evaluateWithUnsafeProjection(Shuffle(ai0, seed1)) !== evaluateWithUnsafeProjection(Shuffle(ai0, seed2))) } + + testGluten("MapFromEntries") { + def arrayType(keyType: DataType, valueType: DataType): DataType = { + ArrayType(StructType(Seq(StructField("a", keyType), StructField("b", valueType))), true) + } + + def row(values: Any*): InternalRow = create_row(values: _*) + + // Primitive-type keys and values + val aiType = arrayType(IntegerType, IntegerType) + val ai0 = Literal.create(Seq(row(1, 10), row(2, 20), row(3, 20)), aiType) + val ai1 = Literal.create(Seq(row(1, null), row(2, 20), row(3, null)), aiType) + val ai2 = Literal.create(Seq.empty, aiType) + val ai3 = Literal.create(null, aiType) + // Ignore duplicated key as 'last_win' not supported by Velox for now + // val ai4 = Literal.create(Seq(row(1, 10), row(1, 20)), aiType) + // The map key is null + val ai5 = Literal.create(Seq(row(1, 10), row(null, 20)), aiType) + val ai6 = Literal.create(Seq(null, row(2, 20), null), aiType) + + checkEvaluation(MapFromEntries(ai0), create_map(1 -> 10, 2 -> 20, 3 -> 20)) + checkEvaluation(MapFromEntries(ai1), create_map(1 -> null, 2 -> 20, 3 -> null)) + checkEvaluation(MapFromEntries(ai2), Map.empty) + checkEvaluation(MapFromEntries(ai3), null) + + // Map key can't be null + checkExceptionInExpression[RuntimeException](MapFromEntries(ai5), "Cannot use null as map key") + checkEvaluation(MapFromEntries(ai6), null) + + // Non-primitive-type keys and values + val asType = arrayType(StringType, StringType) + val as0 = Literal.create(Seq(row("a", "aa"), row("b", "bb"), row("c", "bb")), asType) + val as1 = Literal.create(Seq(row("a", null), row("b", "bb"), row("c", null)), asType) + val as2 = Literal.create(Seq.empty, asType) + val as3 = Literal.create(null, asType) + val as5 = Literal.create(Seq(row("a", "aa"), row(null, "bb")), asType) + val as6 = Literal.create(Seq(null, row("b", "bb"), null), asType) + + checkEvaluation(MapFromEntries(as0), create_map("a" -> "aa", "b" -> "bb", "c" -> "bb")) + checkEvaluation(MapFromEntries(as1), create_map("a" -> null, "b" -> "bb", "c" -> null)) + checkEvaluation(MapFromEntries(as2), Map.empty) + checkEvaluation(MapFromEntries(as3), null) + + // Map key can't be null + checkExceptionInExpression[RuntimeException](MapFromEntries(as5), "Cannot use null as map key") + checkEvaluation(MapFromEntries(as6), null) + + // map key can't be map + val structOfMap = row(create_map(1 -> 1), 1) + val map = MapFromEntries( + Literal.create( + Seq(structOfMap), + arrayType(keyType = MapType(IntegerType, IntegerType), valueType = IntegerType))) + map.checkInputDataTypes() match { + case TypeCheckResult.TypeCheckSuccess => fail("should not allow map as map key") + case TypeCheckResult.TypeCheckFailure(msg) => + assert(msg.contains("The key of map cannot be/contain map")) + } + } } diff --git a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 9c51dee8a6f3..2d52286bb5d3 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -123,6 +123,8 @@ class VeloxTestSettings extends BackendTestSettings { // Rewrite in Gluten to replace Seq with Array .exclude("Shuffle") .excludeGlutenTest("Shuffle") + // Rewrite. + .exclude("MapFromEntries") enableSuite[GlutenConditionalExpressionSuite] enableSuite[GlutenDateExpressionsSuite] // Has exception in fallback execution when we use resultDF.collect in evaluation. diff --git a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/catalyst/expressions/GlutenCollectionExpressionsSuite.scala b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/catalyst/expressions/GlutenCollectionExpressionsSuite.scala index ca6ef9df0f48..fcbbf2a5b358 100644 --- a/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/catalyst/expressions/GlutenCollectionExpressionsSuite.scala +++ b/gluten-ut/spark33/src/test/scala/org/apache/spark/sql/catalyst/expressions/GlutenCollectionExpressionsSuite.scala @@ -17,6 +17,8 @@ package org.apache.spark.sql.catalyst.expressions import org.apache.spark.sql.GlutenTestsTrait +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult import org.apache.spark.sql.types._ import scala.util.Random @@ -87,4 +89,63 @@ class GlutenCollectionExpressionsSuite extends CollectionExpressionsSuite with G evaluateWithUnsafeProjection(Shuffle(ai0, seed1)) !== evaluateWithUnsafeProjection(Shuffle(ai0, seed2))) } + + testGluten("MapFromEntries") { + def arrayType(keyType: DataType, valueType: DataType): DataType = { + ArrayType(StructType(Seq(StructField("a", keyType), StructField("b", valueType))), true) + } + + def row(values: Any*): InternalRow = create_row(values: _*) + + // Primitive-type keys and values + val aiType = arrayType(IntegerType, IntegerType) + val ai0 = Literal.create(Seq(row(1, 10), row(2, 20), row(3, 20)), aiType) + val ai1 = Literal.create(Seq(row(1, null), row(2, 20), row(3, null)), aiType) + val ai2 = Literal.create(Seq.empty, aiType) + val ai3 = Literal.create(null, aiType) + // Ignore duplicated key as 'last_win' not supported by Velox for now + // val ai4 = Literal.create(Seq(row(1, 10), row(1, 20)), aiType) + // The map key is null + val ai5 = Literal.create(Seq(row(1, 10), row(null, 20)), aiType) + val ai6 = Literal.create(Seq(null, row(2, 20), null), aiType) + + checkEvaluation(MapFromEntries(ai0), create_map(1 -> 10, 2 -> 20, 3 -> 20)) + checkEvaluation(MapFromEntries(ai1), create_map(1 -> null, 2 -> 20, 3 -> null)) + checkEvaluation(MapFromEntries(ai2), Map.empty) + checkEvaluation(MapFromEntries(ai3), null) + + // Map key can't be null + checkExceptionInExpression[RuntimeException](MapFromEntries(ai5), "Cannot use null as map key") + checkEvaluation(MapFromEntries(ai6), null) + + // Non-primitive-type keys and values + val asType = arrayType(StringType, StringType) + val as0 = Literal.create(Seq(row("a", "aa"), row("b", "bb"), row("c", "bb")), asType) + val as1 = Literal.create(Seq(row("a", null), row("b", "bb"), row("c", null)), asType) + val as2 = Literal.create(Seq.empty, asType) + val as3 = Literal.create(null, asType) + val as5 = Literal.create(Seq(row("a", "aa"), row(null, "bb")), asType) + val as6 = Literal.create(Seq(null, row("b", "bb"), null), asType) + + checkEvaluation(MapFromEntries(as0), create_map("a" -> "aa", "b" -> "bb", "c" -> "bb")) + checkEvaluation(MapFromEntries(as1), create_map("a" -> null, "b" -> "bb", "c" -> null)) + checkEvaluation(MapFromEntries(as2), Map.empty) + checkEvaluation(MapFromEntries(as3), null) + + // Map key can't be null + checkExceptionInExpression[RuntimeException](MapFromEntries(as5), "Cannot use null as map key") + checkEvaluation(MapFromEntries(as6), null) + + // map key can't be map + val structOfMap = row(create_map(1 -> 1), 1) + val map = MapFromEntries( + Literal.create( + Seq(structOfMap), + arrayType(keyType = MapType(IntegerType, IntegerType), valueType = IntegerType))) + map.checkInputDataTypes() match { + case TypeCheckResult.TypeCheckSuccess => fail("should not allow map as map key") + case TypeCheckResult.TypeCheckFailure(msg) => + assert(msg.contains("The key of map cannot be/contain map")) + } + } } diff --git a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index c5455b6c6b40..f7c4ba1a3b42 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -121,6 +121,8 @@ class VeloxTestSettings extends BackendTestSettings { // Rewrite in Gluten to replace Seq with Array .exclude("Shuffle") .excludeGlutenTest("Shuffle") + // Rewrite. + .exclude("MapFromEntries") enableSuite[GlutenConditionalExpressionSuite] enableSuite[GlutenDateExpressionsSuite] // Has exception in fallback execution when we use resultDF.collect in evaluation. diff --git a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/catalyst/expressions/GlutenCollectionExpressionsSuite.scala b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/catalyst/expressions/GlutenCollectionExpressionsSuite.scala index ca6ef9df0f48..4804b683276d 100644 --- a/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/catalyst/expressions/GlutenCollectionExpressionsSuite.scala +++ b/gluten-ut/spark34/src/test/scala/org/apache/spark/sql/catalyst/expressions/GlutenCollectionExpressionsSuite.scala @@ -16,7 +16,11 @@ */ package org.apache.spark.sql.catalyst.expressions +import org.apache.spark.SparkRuntimeException import org.apache.spark.sql.GlutenTestsTrait +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch import org.apache.spark.sql.types._ import scala.util.Random @@ -87,4 +91,78 @@ class GlutenCollectionExpressionsSuite extends CollectionExpressionsSuite with G evaluateWithUnsafeProjection(Shuffle(ai0, seed1)) !== evaluateWithUnsafeProjection(Shuffle(ai0, seed2))) } + + testGluten("MapFromEntries") { + def arrayType(keyType: DataType, valueType: DataType): DataType = { + ArrayType(StructType(Seq(StructField("a", keyType), StructField("b", valueType))), true) + } + + def row(values: Any*): InternalRow = create_row(values: _*) + + // Primitive-type keys and values + val aiType = arrayType(IntegerType, IntegerType) + val ai0 = Literal.create(Seq(row(1, 10), row(2, 20), row(3, 20)), aiType) + val ai1 = Literal.create(Seq(row(1, null), row(2, 20), row(3, null)), aiType) + val ai2 = Literal.create(Seq.empty, aiType) + val ai3 = Literal.create(null, aiType) + // Velox doesn't support duplicated map key + // val ai4 = Literal.create(Seq(row(1, 10), row(1, 20)), aiType) + // The map key is null + val ai5 = Literal.create(Seq(row(1, 10), row(null, 20)), aiType) + val ai6 = Literal.create(Seq(null, row(2, 20), null), aiType) + + checkEvaluation(MapFromEntries(ai0), create_map(1 -> 10, 2 -> 20, 3 -> 20)) + checkEvaluation(MapFromEntries(ai1), create_map(1 -> null, 2 -> 20, 3 -> null)) + checkEvaluation(MapFromEntries(ai2), Map.empty) + checkEvaluation(MapFromEntries(ai3), null) + + // Map key can't be null + checkErrorInExpression[SparkRuntimeException](MapFromEntries(ai5), "NULL_MAP_KEY") + checkEvaluation(MapFromEntries(ai6), null) + + // Non-primitive-type keys and values + val asType = arrayType(StringType, StringType) + val as0 = Literal.create(Seq(row("a", "aa"), row("b", "bb"), row("c", "bb")), asType) + val as1 = Literal.create(Seq(row("a", null), row("b", "bb"), row("c", null)), asType) + val as2 = Literal.create(Seq.empty, asType) + val as3 = Literal.create(null, asType) + val as5 = Literal.create(Seq(row("a", "aa"), row(null, "bb")), asType) + val as6 = Literal.create(Seq(null, row("b", "bb"), null), asType) + + checkEvaluation(MapFromEntries(as0), create_map("a" -> "aa", "b" -> "bb", "c" -> "bb")) + checkEvaluation(MapFromEntries(as1), create_map("a" -> null, "b" -> "bb", "c" -> null)) + checkEvaluation(MapFromEntries(as2), Map.empty) + checkEvaluation(MapFromEntries(as3), null) + + // Map key can't be null + checkExceptionInExpression[SparkRuntimeException](MapFromEntries(as5), "NULL_MAP_KEY") + checkEvaluation(MapFromEntries(as6), null) + + // map key can't be map + val structOfMap = row(create_map(1 -> 1), 1) + val map = MapFromEntries( + Literal.create( + Seq(structOfMap), + arrayType(keyType = MapType(IntegerType, IntegerType), valueType = IntegerType))) + map.checkInputDataTypes() match { + case TypeCheckResult.TypeCheckSuccess => fail("should not allow map as map key") + case TypeCheckResult.DataTypeMismatch(errorSubClass, messageParameters) => + assert(errorSubClass === "INVALID_MAP_KEY_TYPE") + assert(messageParameters === Map("keyType" -> "\"MAP\"")) + } + + // accepts only arrays of pair structs + val mapWrongType = MapFromEntries(Literal(1)) + assert( + mapWrongType.checkInputDataTypes() == + DataTypeMismatch( + errorSubClass = "UNEXPECTED_INPUT_TYPE", + messageParameters = Map( + "paramIndex" -> "1", + "inputSql" -> "\"1\"", + "inputType" -> "\"INT\"", + "requiredType" -> "\"ARRAY\" of pair \"STRUCT\"" + ) + )) + } } diff --git a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 6d30450e626c..a075ea98f49e 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -121,6 +121,8 @@ class VeloxTestSettings extends BackendTestSettings { // Rewrite in Gluten to replace Seq with Array .exclude("Shuffle") .excludeGlutenTest("Shuffle") + // Rewrite + .exclude("MapFromEntries") enableSuite[GlutenConditionalExpressionSuite] enableSuite[GlutenDateExpressionsSuite] // Has exception in fallback execution when we use resultDF.collect in evaluation. diff --git a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/catalyst/expressions/GlutenCollectionExpressionsSuite.scala b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/catalyst/expressions/GlutenCollectionExpressionsSuite.scala index ca6ef9df0f48..4804b683276d 100644 --- a/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/catalyst/expressions/GlutenCollectionExpressionsSuite.scala +++ b/gluten-ut/spark35/src/test/scala/org/apache/spark/sql/catalyst/expressions/GlutenCollectionExpressionsSuite.scala @@ -16,7 +16,11 @@ */ package org.apache.spark.sql.catalyst.expressions +import org.apache.spark.SparkRuntimeException import org.apache.spark.sql.GlutenTestsTrait +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch import org.apache.spark.sql.types._ import scala.util.Random @@ -87,4 +91,78 @@ class GlutenCollectionExpressionsSuite extends CollectionExpressionsSuite with G evaluateWithUnsafeProjection(Shuffle(ai0, seed1)) !== evaluateWithUnsafeProjection(Shuffle(ai0, seed2))) } + + testGluten("MapFromEntries") { + def arrayType(keyType: DataType, valueType: DataType): DataType = { + ArrayType(StructType(Seq(StructField("a", keyType), StructField("b", valueType))), true) + } + + def row(values: Any*): InternalRow = create_row(values: _*) + + // Primitive-type keys and values + val aiType = arrayType(IntegerType, IntegerType) + val ai0 = Literal.create(Seq(row(1, 10), row(2, 20), row(3, 20)), aiType) + val ai1 = Literal.create(Seq(row(1, null), row(2, 20), row(3, null)), aiType) + val ai2 = Literal.create(Seq.empty, aiType) + val ai3 = Literal.create(null, aiType) + // Velox doesn't support duplicated map key + // val ai4 = Literal.create(Seq(row(1, 10), row(1, 20)), aiType) + // The map key is null + val ai5 = Literal.create(Seq(row(1, 10), row(null, 20)), aiType) + val ai6 = Literal.create(Seq(null, row(2, 20), null), aiType) + + checkEvaluation(MapFromEntries(ai0), create_map(1 -> 10, 2 -> 20, 3 -> 20)) + checkEvaluation(MapFromEntries(ai1), create_map(1 -> null, 2 -> 20, 3 -> null)) + checkEvaluation(MapFromEntries(ai2), Map.empty) + checkEvaluation(MapFromEntries(ai3), null) + + // Map key can't be null + checkErrorInExpression[SparkRuntimeException](MapFromEntries(ai5), "NULL_MAP_KEY") + checkEvaluation(MapFromEntries(ai6), null) + + // Non-primitive-type keys and values + val asType = arrayType(StringType, StringType) + val as0 = Literal.create(Seq(row("a", "aa"), row("b", "bb"), row("c", "bb")), asType) + val as1 = Literal.create(Seq(row("a", null), row("b", "bb"), row("c", null)), asType) + val as2 = Literal.create(Seq.empty, asType) + val as3 = Literal.create(null, asType) + val as5 = Literal.create(Seq(row("a", "aa"), row(null, "bb")), asType) + val as6 = Literal.create(Seq(null, row("b", "bb"), null), asType) + + checkEvaluation(MapFromEntries(as0), create_map("a" -> "aa", "b" -> "bb", "c" -> "bb")) + checkEvaluation(MapFromEntries(as1), create_map("a" -> null, "b" -> "bb", "c" -> null)) + checkEvaluation(MapFromEntries(as2), Map.empty) + checkEvaluation(MapFromEntries(as3), null) + + // Map key can't be null + checkExceptionInExpression[SparkRuntimeException](MapFromEntries(as5), "NULL_MAP_KEY") + checkEvaluation(MapFromEntries(as6), null) + + // map key can't be map + val structOfMap = row(create_map(1 -> 1), 1) + val map = MapFromEntries( + Literal.create( + Seq(structOfMap), + arrayType(keyType = MapType(IntegerType, IntegerType), valueType = IntegerType))) + map.checkInputDataTypes() match { + case TypeCheckResult.TypeCheckSuccess => fail("should not allow map as map key") + case TypeCheckResult.DataTypeMismatch(errorSubClass, messageParameters) => + assert(errorSubClass === "INVALID_MAP_KEY_TYPE") + assert(messageParameters === Map("keyType" -> "\"MAP\"")) + } + + // accepts only arrays of pair structs + val mapWrongType = MapFromEntries(Literal(1)) + assert( + mapWrongType.checkInputDataTypes() == + DataTypeMismatch( + errorSubClass = "UNEXPECTED_INPUT_TYPE", + messageParameters = Map( + "paramIndex" -> "1", + "inputSql" -> "\"1\"", + "inputType" -> "\"INT\"", + "requiredType" -> "\"ARRAY\" of pair \"STRUCT\"" + ) + )) + } } diff --git a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index d0716932b756..160fcae2a95b 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -133,6 +133,8 @@ class VeloxTestSettings extends BackendTestSettings { // Rewrite in Gluten to replace Seq with Array .exclude("Shuffle") .excludeGlutenTest("Shuffle") + // Rewrite + .exclude("MapFromEntries") enableSuite[GlutenConditionalExpressionSuite] enableSuite[GlutenDateExpressionsSuite] // Has exception in fallback execution when we use resultDF.collect in evaluation. diff --git a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/catalyst/expressions/GlutenCollectionExpressionsSuite.scala b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/catalyst/expressions/GlutenCollectionExpressionsSuite.scala index ca6ef9df0f48..c33315c0a02a 100644 --- a/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/catalyst/expressions/GlutenCollectionExpressionsSuite.scala +++ b/gluten-ut/spark40/src/test/scala/org/apache/spark/sql/catalyst/expressions/GlutenCollectionExpressionsSuite.scala @@ -16,7 +16,12 @@ */ package org.apache.spark.sql.catalyst.expressions +import org.apache.spark.SparkRuntimeException import org.apache.spark.sql.GlutenTestsTrait +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch +import org.apache.spark.sql.catalyst.util.TypeUtils.ordinalNumber import org.apache.spark.sql.types._ import scala.util.Random @@ -87,4 +92,78 @@ class GlutenCollectionExpressionsSuite extends CollectionExpressionsSuite with G evaluateWithUnsafeProjection(Shuffle(ai0, seed1)) !== evaluateWithUnsafeProjection(Shuffle(ai0, seed2))) } + + testGluten("MapFromEntries") { + def arrayType(keyType: DataType, valueType: DataType): DataType = { + ArrayType(StructType(Seq(StructField("a", keyType), StructField("b", valueType))), true) + } + + def row(values: Any*): InternalRow = create_row(values: _*) + + // Primitive-type keys and values + val aiType = arrayType(IntegerType, IntegerType) + val ai0 = Literal.create(Seq(row(1, 10), row(2, 20), row(3, 20)), aiType) + val ai1 = Literal.create(Seq(row(1, null), row(2, 20), row(3, null)), aiType) + val ai2 = Literal.create(Seq.empty, aiType) + val ai3 = Literal.create(null, aiType) + // Velox doesn't support duplicated map key + // val ai4 = Literal.create(Seq(row(1, 10), row(1, 20)), aiType) + // The map key is null + val ai5 = Literal.create(Seq(row(1, 10), row(null, 20)), aiType) + val ai6 = Literal.create(Seq(null, row(2, 20), null), aiType) + + checkEvaluation(MapFromEntries(ai0), create_map(1 -> 10, 2 -> 20, 3 -> 20)) + checkEvaluation(MapFromEntries(ai1), create_map(1 -> null, 2 -> 20, 3 -> null)) + checkEvaluation(MapFromEntries(ai2), Map.empty) + checkEvaluation(MapFromEntries(ai3), null) + + // Map key can't be null + checkErrorInExpression[SparkRuntimeException](MapFromEntries(ai5), "NULL_MAP_KEY") + checkEvaluation(MapFromEntries(ai6), null) + + // Non-primitive-type keys and values + val asType = arrayType(StringType, StringType) + val as0 = Literal.create(Seq(row("a", "aa"), row("b", "bb"), row("c", "bb")), asType) + val as1 = Literal.create(Seq(row("a", null), row("b", "bb"), row("c", null)), asType) + val as2 = Literal.create(Seq.empty, asType) + val as3 = Literal.create(null, asType) + val as5 = Literal.create(Seq(row("a", "aa"), row(null, "bb")), asType) + val as6 = Literal.create(Seq(null, row("b", "bb"), null), asType) + + checkEvaluation(MapFromEntries(as0), create_map("a" -> "aa", "b" -> "bb", "c" -> "bb")) + checkEvaluation(MapFromEntries(as1), create_map("a" -> null, "b" -> "bb", "c" -> null)) + checkEvaluation(MapFromEntries(as2), Map.empty) + checkEvaluation(MapFromEntries(as3), null) + + // Map key can't be null + checkExceptionInExpression[SparkRuntimeException](MapFromEntries(as5), "NULL_MAP_KEY") + checkEvaluation(MapFromEntries(as6), null) + + // map key can't be map + val structOfMap = row(create_map(1 -> 1), 1) + val map = MapFromEntries( + Literal.create( + Seq(structOfMap), + arrayType(keyType = MapType(IntegerType, IntegerType), valueType = IntegerType))) + map.checkInputDataTypes() match { + case TypeCheckResult.TypeCheckSuccess => fail("should not allow map as map key") + case TypeCheckResult.DataTypeMismatch(errorSubClass, messageParameters) => + assert(errorSubClass === "INVALID_MAP_KEY_TYPE") + assert(messageParameters === Map("keyType" -> "\"MAP\"")) + } + + // accepts only arrays of pair structs + val mapWrongType = MapFromEntries(Literal(1)) + assert( + mapWrongType.checkInputDataTypes() == + DataTypeMismatch( + errorSubClass = "UNEXPECTED_INPUT_TYPE", + messageParameters = Map( + "paramIndex" -> ordinalNumber(0), + "inputSql" -> "\"1\"", + "inputType" -> "\"INT\"", + "requiredType" -> "\"ARRAY\" of pair \"STRUCT\"" + ) + )) + } } diff --git a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala index 47a1ff3d66e7..45539b768a26 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/gluten/utils/velox/VeloxTestSettings.scala @@ -138,6 +138,8 @@ class VeloxTestSettings extends BackendTestSettings { // Rewrite in Gluten to replace Seq with Array .exclude("Shuffle") .excludeGlutenTest("Shuffle") + // Rewrite + .exclude("MapFromEntries") enableSuite[GlutenConditionalExpressionSuite] enableSuite[GlutenConstraintExpressionSuite] enableSuite[GlutenDateExpressionsSuite] diff --git a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/catalyst/expressions/GlutenCollectionExpressionsSuite.scala b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/catalyst/expressions/GlutenCollectionExpressionsSuite.scala index ca6ef9df0f48..c33315c0a02a 100644 --- a/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/catalyst/expressions/GlutenCollectionExpressionsSuite.scala +++ b/gluten-ut/spark41/src/test/scala/org/apache/spark/sql/catalyst/expressions/GlutenCollectionExpressionsSuite.scala @@ -16,7 +16,12 @@ */ package org.apache.spark.sql.catalyst.expressions +import org.apache.spark.SparkRuntimeException import org.apache.spark.sql.GlutenTestsTrait +import org.apache.spark.sql.catalyst.InternalRow +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult +import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch +import org.apache.spark.sql.catalyst.util.TypeUtils.ordinalNumber import org.apache.spark.sql.types._ import scala.util.Random @@ -87,4 +92,78 @@ class GlutenCollectionExpressionsSuite extends CollectionExpressionsSuite with G evaluateWithUnsafeProjection(Shuffle(ai0, seed1)) !== evaluateWithUnsafeProjection(Shuffle(ai0, seed2))) } + + testGluten("MapFromEntries") { + def arrayType(keyType: DataType, valueType: DataType): DataType = { + ArrayType(StructType(Seq(StructField("a", keyType), StructField("b", valueType))), true) + } + + def row(values: Any*): InternalRow = create_row(values: _*) + + // Primitive-type keys and values + val aiType = arrayType(IntegerType, IntegerType) + val ai0 = Literal.create(Seq(row(1, 10), row(2, 20), row(3, 20)), aiType) + val ai1 = Literal.create(Seq(row(1, null), row(2, 20), row(3, null)), aiType) + val ai2 = Literal.create(Seq.empty, aiType) + val ai3 = Literal.create(null, aiType) + // Velox doesn't support duplicated map key + // val ai4 = Literal.create(Seq(row(1, 10), row(1, 20)), aiType) + // The map key is null + val ai5 = Literal.create(Seq(row(1, 10), row(null, 20)), aiType) + val ai6 = Literal.create(Seq(null, row(2, 20), null), aiType) + + checkEvaluation(MapFromEntries(ai0), create_map(1 -> 10, 2 -> 20, 3 -> 20)) + checkEvaluation(MapFromEntries(ai1), create_map(1 -> null, 2 -> 20, 3 -> null)) + checkEvaluation(MapFromEntries(ai2), Map.empty) + checkEvaluation(MapFromEntries(ai3), null) + + // Map key can't be null + checkErrorInExpression[SparkRuntimeException](MapFromEntries(ai5), "NULL_MAP_KEY") + checkEvaluation(MapFromEntries(ai6), null) + + // Non-primitive-type keys and values + val asType = arrayType(StringType, StringType) + val as0 = Literal.create(Seq(row("a", "aa"), row("b", "bb"), row("c", "bb")), asType) + val as1 = Literal.create(Seq(row("a", null), row("b", "bb"), row("c", null)), asType) + val as2 = Literal.create(Seq.empty, asType) + val as3 = Literal.create(null, asType) + val as5 = Literal.create(Seq(row("a", "aa"), row(null, "bb")), asType) + val as6 = Literal.create(Seq(null, row("b", "bb"), null), asType) + + checkEvaluation(MapFromEntries(as0), create_map("a" -> "aa", "b" -> "bb", "c" -> "bb")) + checkEvaluation(MapFromEntries(as1), create_map("a" -> null, "b" -> "bb", "c" -> null)) + checkEvaluation(MapFromEntries(as2), Map.empty) + checkEvaluation(MapFromEntries(as3), null) + + // Map key can't be null + checkExceptionInExpression[SparkRuntimeException](MapFromEntries(as5), "NULL_MAP_KEY") + checkEvaluation(MapFromEntries(as6), null) + + // map key can't be map + val structOfMap = row(create_map(1 -> 1), 1) + val map = MapFromEntries( + Literal.create( + Seq(structOfMap), + arrayType(keyType = MapType(IntegerType, IntegerType), valueType = IntegerType))) + map.checkInputDataTypes() match { + case TypeCheckResult.TypeCheckSuccess => fail("should not allow map as map key") + case TypeCheckResult.DataTypeMismatch(errorSubClass, messageParameters) => + assert(errorSubClass === "INVALID_MAP_KEY_TYPE") + assert(messageParameters === Map("keyType" -> "\"MAP\"")) + } + + // accepts only arrays of pair structs + val mapWrongType = MapFromEntries(Literal(1)) + assert( + mapWrongType.checkInputDataTypes() == + DataTypeMismatch( + errorSubClass = "UNEXPECTED_INPUT_TYPE", + messageParameters = Map( + "paramIndex" -> ordinalNumber(0), + "inputSql" -> "\"1\"", + "inputType" -> "\"INT\"", + "requiredType" -> "\"ARRAY\" of pair \"STRUCT\"" + ) + )) + } } diff --git a/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala b/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala index b168cf4fa250..d4afb7ff739f 100644 --- a/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala +++ b/shims/common/src/main/scala/org/apache/gluten/expression/ExpressionNames.scala @@ -305,6 +305,7 @@ object ExpressionNames { final val MAP_ZIP_WITH = "map_zip_with" final val TRANSFORM_KEYS = "transform_keys" final val TRANSFORM_VALUES = "transform_values" + final val MAP_FROM_ENTRIES = "map_from_entries" final val STR_TO_MAP = "str_to_map" final val MAP_FILTER = "map_filter" final val MAP_CONTAINS_KEY = "map_contains_key"