Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -286,6 +286,14 @@ class VeloxSparkPlanExecApi extends SparkPlanExecApi with Logging {
GenericExpressionTransformer(substraitExprName, Seq(endDate, startDate), original)
}

/** Transform map_from_entries to Substrait. */
override def genMapFromEntriesTransformer(
substraitExprName: String,
child: ExpressionTransformer,
expr: Expression): ExpressionTransformer = {
GenericExpressionTransformer(substraitExprName, Seq(child), expr)
}

override def genPreciseTimestampConversionTransformer(
substraitExprName: String,
children: Seq[ExpressionTransformer],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,28 @@ abstract class ScalarFunctionsValidateSuite extends FunctionsValidateSuite {
}
}

test("map_from_entries") {
withTempPath {
path =>
Seq(
Seq((1, "10"), (2, "20"), (3, null)),
Seq((1, "10"), null, (2, "20")),
Seq.empty,
null
).toDF("a")
.write
.parquet(path.getCanonicalPath)

spark.read
.parquet(path.getCanonicalPath)
.createOrReplaceTempView("test")

runQueryAndCompare("select map_from_entries(a) from test") {
checkGlutenPlan[ProjectExecTransformer]
}
}
}

test("map_keys") {
withTempPath {
path =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -375,6 +375,14 @@ trait SparkPlanExecApi {
GenericExpressionTransformer(substraitExprName, childrenTransformers, expr)
}

/** Transform map_from_entries to Substrait. */
def genMapFromEntriesTransformer(
substraitExprName: String,
child: ExpressionTransformer,
expr: Expression): ExpressionTransformer = {
throw new UnsupportedOperationException("map_from_entries is not supported")
}

/**
* Generate ShuffleDependency for ColumnarShuffleExchangeExec.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -335,6 +335,11 @@ object ExpressionConverter extends SQLConfHelper with Logging {
substraitExprName,
replaceWithExpressionTransformer0(m.child, attributeSeq, expressionsMap),
m)
case m: MapFromEntries =>
BackendsApiManager.getSparkPlanExecApiInstance.genMapFromEntriesTransformer(
substraitExprName,
replaceWithExpressionTransformer0(m.child, attributeSeq, expressionsMap),
m)
case e: Explode =>
ExplodeTransformer(
substraitExprName,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -266,6 +266,7 @@ object ExpressionMappings {
Sig[MapFromArrays](MAP_FROM_ARRAYS),
Sig[MapEntries](MAP_ENTRIES),
Sig[MapZipWith](MAP_ZIP_WITH),
Sig[MapFromEntries](MAP_FROM_ENTRIES),
Sig[StringToMap](STR_TO_MAP),
Sig[TransformKeys](TRANSFORM_KEYS),
Sig[TransformValues](TRANSFORM_VALUES),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,8 @@ class VeloxTestSettings extends BackendTestSettings {
// Rewrite in Gluten to replace Seq with Array
.exclude("Shuffle")
.excludeGlutenTest("Shuffle")
// Rewrite.
.exclude("MapFromEntries")
enableSuite[GlutenDateExpressionsSuite]
// Rewrite because Spark collect causes long overflow.
.exclude("TIMESTAMP_MICROS")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.GlutenTestsTrait
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.types._

import scala.util.Random
Expand Down Expand Up @@ -87,4 +89,63 @@ class GlutenCollectionExpressionsSuite extends CollectionExpressionsSuite with G
evaluateWithUnsafeProjection(Shuffle(ai0, seed1)) !==
evaluateWithUnsafeProjection(Shuffle(ai0, seed2)))
}

testGluten("MapFromEntries") {
def arrayType(keyType: DataType, valueType: DataType): DataType = {
ArrayType(StructType(Seq(StructField("a", keyType), StructField("b", valueType))), true)
}

def row(values: Any*): InternalRow = create_row(values: _*)

// Primitive-type keys and values
val aiType = arrayType(IntegerType, IntegerType)
val ai0 = Literal.create(Seq(row(1, 10), row(2, 20), row(3, 20)), aiType)
val ai1 = Literal.create(Seq(row(1, null), row(2, 20), row(3, null)), aiType)
val ai2 = Literal.create(Seq.empty, aiType)
val ai3 = Literal.create(null, aiType)
// Ignore duplicated key as 'last_win' not supported by Velox for now
// val ai4 = Literal.create(Seq(row(1, 10), row(1, 20)), aiType)
// The map key is null
val ai5 = Literal.create(Seq(row(1, 10), row(null, 20)), aiType)
val ai6 = Literal.create(Seq(null, row(2, 20), null), aiType)

checkEvaluation(MapFromEntries(ai0), create_map(1 -> 10, 2 -> 20, 3 -> 20))
checkEvaluation(MapFromEntries(ai1), create_map(1 -> null, 2 -> 20, 3 -> null))
checkEvaluation(MapFromEntries(ai2), Map.empty)
checkEvaluation(MapFromEntries(ai3), null)

// Map key can't be null
checkExceptionInExpression[RuntimeException](MapFromEntries(ai5), "Cannot use null as map key")
checkEvaluation(MapFromEntries(ai6), null)

// Non-primitive-type keys and values
val asType = arrayType(StringType, StringType)
val as0 = Literal.create(Seq(row("a", "aa"), row("b", "bb"), row("c", "bb")), asType)
val as1 = Literal.create(Seq(row("a", null), row("b", "bb"), row("c", null)), asType)
val as2 = Literal.create(Seq.empty, asType)
val as3 = Literal.create(null, asType)
val as5 = Literal.create(Seq(row("a", "aa"), row(null, "bb")), asType)
val as6 = Literal.create(Seq(null, row("b", "bb"), null), asType)

checkEvaluation(MapFromEntries(as0), create_map("a" -> "aa", "b" -> "bb", "c" -> "bb"))
checkEvaluation(MapFromEntries(as1), create_map("a" -> null, "b" -> "bb", "c" -> null))
checkEvaluation(MapFromEntries(as2), Map.empty)
checkEvaluation(MapFromEntries(as3), null)

// Map key can't be null
checkExceptionInExpression[RuntimeException](MapFromEntries(as5), "Cannot use null as map key")
checkEvaluation(MapFromEntries(as6), null)

// map key can't be map
val structOfMap = row(create_map(1 -> 1), 1)
val map = MapFromEntries(
Literal.create(
Seq(structOfMap),
arrayType(keyType = MapType(IntegerType, IntegerType), valueType = IntegerType)))
map.checkInputDataTypes() match {
case TypeCheckResult.TypeCheckSuccess => fail("should not allow map as map key")
case TypeCheckResult.TypeCheckFailure(msg) =>
assert(msg.contains("The key of map cannot be/contain map"))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ class VeloxTestSettings extends BackendTestSettings {
// Rewrite in Gluten to replace Seq with Array
.exclude("Shuffle")
.excludeGlutenTest("Shuffle")
// Rewrite.
.exclude("MapFromEntries")
enableSuite[GlutenConditionalExpressionSuite]
enableSuite[GlutenDateExpressionsSuite]
// Has exception in fallback execution when we use resultDF.collect in evaluation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.sql.GlutenTestsTrait
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.types._

import scala.util.Random
Expand Down Expand Up @@ -87,4 +89,63 @@ class GlutenCollectionExpressionsSuite extends CollectionExpressionsSuite with G
evaluateWithUnsafeProjection(Shuffle(ai0, seed1)) !==
evaluateWithUnsafeProjection(Shuffle(ai0, seed2)))
}

testGluten("MapFromEntries") {
def arrayType(keyType: DataType, valueType: DataType): DataType = {
ArrayType(StructType(Seq(StructField("a", keyType), StructField("b", valueType))), true)
}

def row(values: Any*): InternalRow = create_row(values: _*)

// Primitive-type keys and values
val aiType = arrayType(IntegerType, IntegerType)
val ai0 = Literal.create(Seq(row(1, 10), row(2, 20), row(3, 20)), aiType)
val ai1 = Literal.create(Seq(row(1, null), row(2, 20), row(3, null)), aiType)
val ai2 = Literal.create(Seq.empty, aiType)
val ai3 = Literal.create(null, aiType)
// Ignore duplicated key as 'last_win' not supported by Velox for now
// val ai4 = Literal.create(Seq(row(1, 10), row(1, 20)), aiType)
// The map key is null
val ai5 = Literal.create(Seq(row(1, 10), row(null, 20)), aiType)
val ai6 = Literal.create(Seq(null, row(2, 20), null), aiType)

checkEvaluation(MapFromEntries(ai0), create_map(1 -> 10, 2 -> 20, 3 -> 20))
checkEvaluation(MapFromEntries(ai1), create_map(1 -> null, 2 -> 20, 3 -> null))
checkEvaluation(MapFromEntries(ai2), Map.empty)
checkEvaluation(MapFromEntries(ai3), null)

// Map key can't be null
checkExceptionInExpression[RuntimeException](MapFromEntries(ai5), "Cannot use null as map key")
checkEvaluation(MapFromEntries(ai6), null)

// Non-primitive-type keys and values
val asType = arrayType(StringType, StringType)
val as0 = Literal.create(Seq(row("a", "aa"), row("b", "bb"), row("c", "bb")), asType)
val as1 = Literal.create(Seq(row("a", null), row("b", "bb"), row("c", null)), asType)
val as2 = Literal.create(Seq.empty, asType)
val as3 = Literal.create(null, asType)
val as5 = Literal.create(Seq(row("a", "aa"), row(null, "bb")), asType)
val as6 = Literal.create(Seq(null, row("b", "bb"), null), asType)

checkEvaluation(MapFromEntries(as0), create_map("a" -> "aa", "b" -> "bb", "c" -> "bb"))
checkEvaluation(MapFromEntries(as1), create_map("a" -> null, "b" -> "bb", "c" -> null))
checkEvaluation(MapFromEntries(as2), Map.empty)
checkEvaluation(MapFromEntries(as3), null)

// Map key can't be null
checkExceptionInExpression[RuntimeException](MapFromEntries(as5), "Cannot use null as map key")
checkEvaluation(MapFromEntries(as6), null)

// map key can't be map
val structOfMap = row(create_map(1 -> 1), 1)
val map = MapFromEntries(
Literal.create(
Seq(structOfMap),
arrayType(keyType = MapType(IntegerType, IntegerType), valueType = IntegerType)))
map.checkInputDataTypes() match {
case TypeCheckResult.TypeCheckSuccess => fail("should not allow map as map key")
case TypeCheckResult.TypeCheckFailure(msg) =>
assert(msg.contains("The key of map cannot be/contain map"))
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ class VeloxTestSettings extends BackendTestSettings {
// Rewrite in Gluten to replace Seq with Array
.exclude("Shuffle")
.excludeGlutenTest("Shuffle")
// Rewrite.
.exclude("MapFromEntries")
enableSuite[GlutenConditionalExpressionSuite]
enableSuite[GlutenDateExpressionsSuite]
// Has exception in fallback execution when we use resultDF.collect in evaluation.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
*/
package org.apache.spark.sql.catalyst.expressions

import org.apache.spark.SparkRuntimeException
import org.apache.spark.sql.GlutenTestsTrait
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult
import org.apache.spark.sql.catalyst.analysis.TypeCheckResult.DataTypeMismatch
import org.apache.spark.sql.types._

import scala.util.Random
Expand Down Expand Up @@ -87,4 +91,78 @@ class GlutenCollectionExpressionsSuite extends CollectionExpressionsSuite with G
evaluateWithUnsafeProjection(Shuffle(ai0, seed1)) !==
evaluateWithUnsafeProjection(Shuffle(ai0, seed2)))
}

testGluten("MapFromEntries") {
def arrayType(keyType: DataType, valueType: DataType): DataType = {
ArrayType(StructType(Seq(StructField("a", keyType), StructField("b", valueType))), true)
}

def row(values: Any*): InternalRow = create_row(values: _*)

// Primitive-type keys and values
val aiType = arrayType(IntegerType, IntegerType)
val ai0 = Literal.create(Seq(row(1, 10), row(2, 20), row(3, 20)), aiType)
val ai1 = Literal.create(Seq(row(1, null), row(2, 20), row(3, null)), aiType)
val ai2 = Literal.create(Seq.empty, aiType)
val ai3 = Literal.create(null, aiType)
// Velox doesn't support duplicated map key
// val ai4 = Literal.create(Seq(row(1, 10), row(1, 20)), aiType)
// The map key is null
val ai5 = Literal.create(Seq(row(1, 10), row(null, 20)), aiType)
val ai6 = Literal.create(Seq(null, row(2, 20), null), aiType)

checkEvaluation(MapFromEntries(ai0), create_map(1 -> 10, 2 -> 20, 3 -> 20))
checkEvaluation(MapFromEntries(ai1), create_map(1 -> null, 2 -> 20, 3 -> null))
checkEvaluation(MapFromEntries(ai2), Map.empty)
checkEvaluation(MapFromEntries(ai3), null)

// Map key can't be null
checkErrorInExpression[SparkRuntimeException](MapFromEntries(ai5), "NULL_MAP_KEY")
checkEvaluation(MapFromEntries(ai6), null)

// Non-primitive-type keys and values
val asType = arrayType(StringType, StringType)
val as0 = Literal.create(Seq(row("a", "aa"), row("b", "bb"), row("c", "bb")), asType)
val as1 = Literal.create(Seq(row("a", null), row("b", "bb"), row("c", null)), asType)
val as2 = Literal.create(Seq.empty, asType)
val as3 = Literal.create(null, asType)
val as5 = Literal.create(Seq(row("a", "aa"), row(null, "bb")), asType)
val as6 = Literal.create(Seq(null, row("b", "bb"), null), asType)

checkEvaluation(MapFromEntries(as0), create_map("a" -> "aa", "b" -> "bb", "c" -> "bb"))
checkEvaluation(MapFromEntries(as1), create_map("a" -> null, "b" -> "bb", "c" -> null))
checkEvaluation(MapFromEntries(as2), Map.empty)
checkEvaluation(MapFromEntries(as3), null)

// Map key can't be null
checkExceptionInExpression[SparkRuntimeException](MapFromEntries(as5), "NULL_MAP_KEY")
checkEvaluation(MapFromEntries(as6), null)

// map key can't be map
val structOfMap = row(create_map(1 -> 1), 1)
val map = MapFromEntries(
Literal.create(
Seq(structOfMap),
arrayType(keyType = MapType(IntegerType, IntegerType), valueType = IntegerType)))
map.checkInputDataTypes() match {
case TypeCheckResult.TypeCheckSuccess => fail("should not allow map as map key")
case TypeCheckResult.DataTypeMismatch(errorSubClass, messageParameters) =>
assert(errorSubClass === "INVALID_MAP_KEY_TYPE")
assert(messageParameters === Map("keyType" -> "\"MAP<INT, INT>\""))
}

// accepts only arrays of pair structs
val mapWrongType = MapFromEntries(Literal(1))
assert(
mapWrongType.checkInputDataTypes() ==
DataTypeMismatch(
errorSubClass = "UNEXPECTED_INPUT_TYPE",
messageParameters = Map(
"paramIndex" -> "1",
"inputSql" -> "\"1\"",
"inputType" -> "\"INT\"",
"requiredType" -> "\"ARRAY\" of pair \"STRUCT\""
)
))
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,8 @@ class VeloxTestSettings extends BackendTestSettings {
// Rewrite in Gluten to replace Seq with Array
.exclude("Shuffle")
.excludeGlutenTest("Shuffle")
// Rewrite
.exclude("MapFromEntries")
enableSuite[GlutenConditionalExpressionSuite]
enableSuite[GlutenDateExpressionsSuite]
// Has exception in fallback execution when we use resultDF.collect in evaluation.
Expand Down
Loading
Loading