diff --git a/spark/src/main/scala/org/apache/comet/serde/datetime.scala b/spark/src/main/scala/org/apache/comet/serde/datetime.scala index 1487af1bd2..5a3a76cc3e 100644 --- a/spark/src/main/scala/org/apache/comet/serde/datetime.scala +++ b/spark/src/main/scala/org/apache/comet/serde/datetime.scala @@ -23,7 +23,7 @@ import java.util.Locale import org.apache.spark.sql.catalyst.expressions.{AddMonths, Attribute, ConvertTimezone, DateAdd, DateDiff, DateFormatClass, DateFromUnixDate, DateSub, DayOfMonth, DayOfWeek, DayOfYear, Days, FromUTCTimestamp, GetDateField, GetTimestamp, Hour, Hours, LastDay, Literal, MakeDate, MakeTimestamp, MicrosToTimestamp, MillisToTimestamp, Minute, Month, MonthsBetween, NextDay, Quarter, Second, SecondsToTimestamp, ToUnixTimestamp, ToUTCTimestamp, TruncDate, TruncTimestamp, UnixDate, UnixMicros, UnixMillis, UnixSeconds, UnixTimestamp, WeekDay, WeekOfYear, Year} import org.apache.spark.sql.internal.SQLConf -import org.apache.spark.sql.types.{DateType, DoubleType, FloatType, IntegerType, LongType, StringType, TimestampNTZType, TimestampType} +import org.apache.spark.sql.types.{DataType, DateType, DoubleType, FloatType, IntegerType, LongType, StringType, TimestampNTZType, TimestampType} import org.apache.spark.unsafe.types.UTF8String import org.apache.comet.CometConf @@ -179,23 +179,24 @@ object CometQuarter extends CometExpressionSerde[Quarter] with CometExprGetDateF } } -object CometHour extends CometExpressionSerde[Hour] with CodegenDispatchFallback { +private object TimeFieldSerde { + val timestampNtzIncompatReason: String = + "Incorrectly applies timezone conversion to TimestampNTZ inputs" + + " (https://github.com/apache/datafusion-comet/issues/3180)" - val incompatReason: String = "Incorrectly applies timezone conversion to TimestampNTZ inputs" + - " (https://github.com/apache/datafusion-comet/issues/3180)" + def supportLevelForChild(childType: DataType): SupportLevel = childType match { + case TimestampNTZType => Incompatible(Some(timestampNtzIncompatReason)) + case _ => Compatible() + } +} - override def getIncompatibleReasons(): Seq[String] = Seq(incompatReason) +object CometHour extends CometExpressionSerde[Hour] with CodegenDispatchFallback { - override def getSupportLevel(expr: Hour): SupportLevel = { - if (expr.child.dataType == TimestampNTZType) { - Incompatible( - Some( - "Incorrectly applies timezone conversion to TimestampNTZ inputs" + - " (https://github.com/apache/datafusion-comet/issues/3180)")) - } else { - Compatible() - } - } + override def getIncompatibleReasons(): Seq[String] = + Seq(TimeFieldSerde.timestampNtzIncompatReason) + + override def getSupportLevel(expr: Hour): SupportLevel = + TimeFieldSerde.supportLevelForChild(expr.child.dataType) override def convert( expr: Hour, @@ -224,20 +225,11 @@ object CometHour extends CometExpressionSerde[Hour] with CodegenDispatchFallback object CometMinute extends CometExpressionSerde[Minute] with CodegenDispatchFallback { - override def getIncompatibleReasons(): Seq[String] = Seq( - "Incorrectly applies timezone conversion to TimestampNTZ inputs" + - " (https://github.com/apache/datafusion-comet/issues/3180)") - - override def getSupportLevel(expr: Minute): SupportLevel = { - if (expr.child.dataType == TimestampNTZType) { - Incompatible( - Some( - "Incorrectly applies timezone conversion to TimestampNTZ inputs" + - " (https://github.com/apache/datafusion-comet/issues/3180)")) - } else { - Compatible() - } - } + override def getIncompatibleReasons(): Seq[String] = + Seq(TimeFieldSerde.timestampNtzIncompatReason) + + override def getSupportLevel(expr: Minute): SupportLevel = + TimeFieldSerde.supportLevelForChild(expr.child.dataType) override def convert( expr: Minute, @@ -266,20 +258,11 @@ object CometMinute extends CometExpressionSerde[Minute] with CodegenDispatchFall object CometSecond extends CometExpressionSerde[Second] with CodegenDispatchFallback { - override def getIncompatibleReasons(): Seq[String] = Seq( - "Incorrectly applies timezone conversion to TimestampNTZ inputs" + - " (https://github.com/apache/datafusion-comet/issues/3180)") - - override def getSupportLevel(expr: Second): SupportLevel = { - if (expr.child.dataType == TimestampNTZType) { - Incompatible( - Some( - "Incorrectly applies timezone conversion to TimestampNTZ inputs" + - " (https://github.com/apache/datafusion-comet/issues/3180)")) - } else { - Compatible() - } - } + override def getIncompatibleReasons(): Seq[String] = + Seq(TimeFieldSerde.timestampNtzIncompatReason) + + override def getSupportLevel(expr: Second): SupportLevel = + TimeFieldSerde.supportLevelForChild(expr.child.dataType) override def convert( expr: Second, @@ -469,6 +452,11 @@ object CometMakeDate extends CometExpressionSerde[MakeDate] { object CometSecondsToTimestamp extends CometScalarFunction[SecondsToTimestamp]("seconds_to_timestamp") { + + override def getUnsupportedReasons(): Seq[String] = Seq( + "Only `IntegerType`, `LongType`, `FloatType`, and `DoubleType` inputs are supported." + + " `DecimalType`, `ByteType`, and `ShortType` fall back to Spark.") + override def getSupportLevel(expr: SecondsToTimestamp): SupportLevel = expr.child.dataType match { case IntegerType | LongType | FloatType | DoubleType => Compatible() @@ -514,8 +502,14 @@ object CometTruncDate extends CometExpressionSerde[TruncDate] with CodegenDispat val supportedFormats: Seq[String] = Seq("year", "yyyy", "yy", "quarter", "mon", "month", "mm", "week") - override def getIncompatibleReasons(): Seq[String] = Seq( - "Non-literal format strings will throw an exception instead of returning NULL") + private val nonLiteralFormatIncompatReason: String = + "Non-literal format strings will throw an exception instead of returning NULL" + + private def unsupportedFormatReason(fmt: Any): String = + s"Format $fmt is not supported. Only the following formats are supported: " + + supportedFormats.mkString(", ") + + override def getIncompatibleReasons(): Seq[String] = Seq(nonLiteralFormatIncompatReason) override def getUnsupportedReasons(): Seq[String] = Seq( "Only the following formats are supported: " + supportedFormats.mkString(", ")) @@ -526,11 +520,10 @@ object CometTruncDate extends CometExpressionSerde[TruncDate] with CodegenDispat if (supportedFormats.contains(fmt.toString.toLowerCase(Locale.ROOT))) { Compatible() } else { - Unsupported(Some(s"Format $fmt is not supported")) + Unsupported(Some(unsupportedFormatReason(fmt))) } case _ => - Incompatible( - Some("Invalid format strings will throw an exception instead of returning NULL")) + Incompatible(Some(nonLiteralFormatIncompatReason)) } } @@ -555,10 +548,6 @@ object CometTruncTimestamp extends CometExpressionSerde[TruncTimestamp] with CodegenDispatchFallback { - override def getIncompatibleReasons(): Seq[String] = Seq( - "Produces incorrect results when used with non-UTC timezones. Compatible when timezone is" + - " UTC. (https://github.com/apache/datafusion-comet/issues/2649)") - val supportedFormats: Seq[String] = Seq( "year", @@ -577,6 +566,23 @@ object CometTruncTimestamp "millisecond", "microsecond") + private val nonUtcIncompatReason: String = + "Produces incorrect results when used with non-UTC timezones. Compatible when timezone is" + + " UTC. (https://github.com/apache/datafusion-comet/issues/2649)" + + private val nonLiteralFormatIncompatReason: String = + "Non-literal format strings will throw an exception instead of returning NULL" + + private def unsupportedFormatReason(fmt: Any): String = + s"Format $fmt is not supported. Only the following formats are supported: " + + supportedFormats.mkString(", ") + + override def getIncompatibleReasons(): Seq[String] = + Seq(nonUtcIncompatReason, nonLiteralFormatIncompatReason) + + override def getUnsupportedReasons(): Seq[String] = Seq( + "Only the following formats are supported: " + supportedFormats.mkString(", ")) + override def getSupportLevel(expr: TruncTimestamp): SupportLevel = { val timezone = expr.timeZoneId.getOrElse("UTC") val isUtc = timezone == "UTC" || timezone == "Etc/UTC" @@ -586,17 +592,13 @@ object CometTruncTimestamp if (isUtc) { Compatible() } else { - Incompatible( - Some( - s"Incorrect results in non-UTC timezone '$timezone'" + - " (https://github.com/apache/datafusion-comet/issues/2649)")) + Incompatible(Some(nonUtcIncompatReason)) } } else { - Unsupported(Some(s"Format $fmt is not supported")) + Unsupported(Some(unsupportedFormatReason(fmt))) } case _ => - Incompatible( - Some("Invalid format strings will throw an exception instead of returning NULL")) + Incompatible(Some(nonLiteralFormatIncompatReason)) } } @@ -734,24 +736,27 @@ object CometDateFormat extends CometExpressionSerde[DateFormatClass] { * without applying any session timezone offset. */ object CometHours extends CometExpressionSerde[Hours] { + + override def getUnsupportedReasons(): Seq[String] = Seq( + "Only `TimestampType` and `TimestampNTZType` inputs are supported.") + + override def getSupportLevel(expr: Hours): SupportLevel = expr.child.dataType match { + case TimestampType | TimestampNTZType => Compatible() + case other => Unsupported(Some(s"Hours does not support input type: $other")) + } + override def convert( expr: Hours, inputs: Seq[Attribute], binding: Boolean): Option[ExprOuterClass.Expr] = { - val optExpr = expr.child.dataType match { - case TimestampType | TimestampNTZType => - exprToProtoInternal(expr.child, inputs, binding).map { childExpr => - val builder = ExprOuterClass.HoursTransform.newBuilder() - builder.setChild(childExpr) + val optExpr = exprToProtoInternal(expr.child, inputs, binding).map { childExpr => + val builder = ExprOuterClass.HoursTransform.newBuilder() + builder.setChild(childExpr) - ExprOuterClass.Expr - .newBuilder() - .setHoursTransform(builder) - .build() - } - case other => - withFallbackReason(expr, s"Hours does not support input type: $other") - None + ExprOuterClass.Expr + .newBuilder() + .setHoursTransform(builder) + .build() } optExprWithFallbackReason(optExpr, expr, expr.child) } @@ -768,6 +773,16 @@ object CometHours extends CometExpressionSerde[Hours] { * The first cast respects the session timezone to correctly determine the date boundary. */ object CometDays extends CometExpressionSerde[Days] { + + override def getUnsupportedReasons(): Seq[String] = Seq( + "Only `DateType` and `TimestampType` inputs are supported." + + " `TimestampNTZType` is not supported.") + + override def getSupportLevel(expr: Days): SupportLevel = expr.child.dataType match { + case DateType | TimestampType => Compatible() + case other => Unsupported(Some(s"Days does not support input type: $other")) + } + override def convert( expr: Days, inputs: Seq[Attribute], @@ -782,9 +797,7 @@ object CometDays extends CometExpressionSerde[Days] { childExpr.flatMap { child => CometCast.castToProto(expr, Some(timezone), DateType, child, CometEvalMode.LEGACY) } - case other => - withFallbackReason(expr, s"Days does not support input type: $other") - None + case _ => None } // Convert DateType to IntegerType (days since epoch)