diff --git a/python/pyspark/sql/tests/test_functions.py b/python/pyspark/sql/tests/test_functions.py index 10aa01e5a6005..c9ca0fca96a72 100644 --- a/python/pyspark/sql/tests/test_functions.py +++ b/python/pyspark/sql/tests/test_functions.py @@ -84,6 +84,7 @@ def test_function_parity(self): # Functions that we expect to be missing in python until they are added to pyspark expected_missing_in_py = { "unix_nanos", # SPARK-57527: PySpark support tracked as a follow-up + "timestamp_nanos", # SPARK-57526: PySpark support tracked as a follow-up } self.assertEqual( diff --git a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala index 76748f0ae9420..8aea50291cdc5 100644 --- a/sql/api/src/main/scala/org/apache/spark/sql/functions.scala +++ b/sql/api/src/main/scala/org/apache/spark/sql/functions.scala @@ -8569,6 +8569,15 @@ object functions { */ def timestamp_micros(e: Column): Column = Column.fn("timestamp_micros", e) + /** + * Creates a timestamp with the local time zone and nanosecond precision (TIMESTAMP_LTZ(9)) from + * the number of nanoseconds since UTC epoch. + * + * @group datetime_funcs + * @since 4.3.0 + */ + def timestamp_nanos(e: Column): Column = Column.fn("timestamp_nanos", e) + /** * Gets the difference between the timestamps in the specified units by truncating the fraction * part. diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala index 2c47fca543a98..415a842c9bf46 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/FunctionRegistry.scala @@ -774,6 +774,7 @@ object FunctionRegistry { expression[SecondsToTimestamp]("timestamp_seconds"), expression[MillisToTimestamp]("timestamp_millis"), expression[MicrosToTimestamp]("timestamp_micros"), + expression[NanosToTimestamp]("timestamp_nanos"), expression[UnixSeconds]("unix_seconds"), expression[UnixMillis]("unix_millis"), expression[UnixMicros]("unix_micros"), diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala index 3fbef82ef246e..3f773e5bb6dc5 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/datetimeExpressions.scala @@ -759,6 +759,98 @@ case class MicrosToTimestamp(child: Expression) copy(child = newChild) } +// scalastyle:off line.size.limit line.contains.tab +@ExpressionDescription( + usage = "_FUNC_(nanoseconds) - Creates timestamp with the local time zone and nanosecond precision (TIMESTAMP_LTZ(9)) from the number of nanoseconds since UTC epoch.", + examples = """ + Examples: + > SET spark.sql.timestampNanosTypes.enabled=true; + spark.sql.timestampNanosTypes.enabled true + > SELECT _FUNC_(1230219000123456789); + 2008-12-25 07:30:00.123456789 + """, + group = "datetime_funcs", + since = "4.3.0") +// scalastyle:on line.size.limit line.contains.tab +case class NanosToTimestamp(child: Expression) + extends UnaryExpression with ExpectsInputTypes { + override def nullIntolerant: Boolean = true + + // Accepts an integral or DECIMAL nanosecond count only. DECIMAL is required to span the full + // [0001, 9999] calendar range: nanos for year 9999 (~2.5e20) overflow a 64-bit BIGINT, the same + // reason the inverse `unix_nanos` returns DECIMAL(21, 0); an integral argument is widened to + // BigInteger directly. FLOAT/DOUBLE/STRING are intentionally rejected at analysis rather than + // implicitly coerced: a fractional or string nanosecond count is not meaningful, and the implicit + // DECIMAL coercion (FLOAT -> DECIMAL(14, 7), DOUBLE -> DECIMAL(30, 15)) would silently overflow + // for realistic magnitudes. + override def inputTypes: Seq[AbstractDataType] = Seq(TypeCollection(IntegralType, DecimalType)) + + override def dataType: DataType = TimestampLTZNanosType(9) + + // Maps the integer nanosecond count to the (epochMicros, nanosWithinMicro) pair with floor + // semantics, so the sub-microsecond remainder is always in [0, 999] (matching the negative-input + // behavior of `floorDiv`/`floorMod`). When `epochMicros` overflows 64 bits -- i.e. the input is + // outside the representable timestamp range -- `longValueExact` throws, which is surfaced as a + // DATETIME_OVERFLOW error. + // + // Like the sibling `timestamp_micros`/`timestamp_millis`/`timestamp_seconds` constructors, the + // result is not validated against the [0001, 9999] calendar range: only the 64-bit `epochMicros` + // boundary is guarded, so a count whose `epochMicros` still fits in a long but lands past year + // 9999 (up to the long-micros maximum, ~year 294247) yields an out-of-range value rather than an + // error. This is intentional, keeping the nanosecond constructor consistent with its micro peers. + override def nullSafeEval(input: Any): Any = { + val n = child.dataType match { + case _: DecimalType => + input.asInstanceOf[Decimal].toJavaBigDecimal + .setScale(0, java.math.RoundingMode.FLOOR).toBigInteger + case _: IntegralType => + BigInteger.valueOf(input.asInstanceOf[Number].longValue()) + } + val thousand = BigInteger.valueOf(NANOS_PER_MICROS) + val rem = n.mod(thousand) + val micros = try { + n.subtract(rem).divide(thousand).longValueExact() + } catch { + case _: ArithmeticException => throw QueryExecutionErrors.timestampNanosOverflowError(n) + } + TimestampNanosVal.fromParts(micros, rem.shortValueExact()) + } + + override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = { + nullSafeCodeGen(ctx, ev, c => { + val n = ctx.freshName("nanos") + val thousand = ctx.freshName("thousand") + val rem = ctx.freshName("rem") + val micros = ctx.freshName("micros") + val toBigInteger = child.dataType match { + case _: DecimalType => + s"$c.toJavaBigDecimal().setScale(0, java.math.RoundingMode.FLOOR).toBigInteger()" + case _: IntegralType => + s"java.math.BigInteger.valueOf((long) $c)" + } + val errors = QueryExecutionErrors.getClass.getName.stripSuffix("$") + s""" + |java.math.BigInteger $n = $toBigInteger; + |java.math.BigInteger $thousand = java.math.BigInteger.valueOf(${NANOS_PER_MICROS}L); + |java.math.BigInteger $rem = $n.mod($thousand); + |long $micros; + |try { + | $micros = $n.subtract($rem).divide($thousand).longValueExact(); + |} catch (java.lang.ArithmeticException e) { + | throw $errors.timestampNanosOverflowError($n); + |} + |${ev.value} = org.apache.spark.unsafe.types.TimestampNanosVal.fromParts( + | $micros, $rem.shortValueExact()); + |""".stripMargin + }) + } + + override def prettyName: String = "timestamp_nanos" + + override protected def withNewChildInternal(newChild: Expression): NanosToTimestamp = + copy(child = newChild) +} + abstract class TimestampToLongBase extends UnaryExpression with ExpectsInputTypes { override def nullIntolerant: Boolean = true diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala index 48c3ef0c6a936..f4db9c9041f2a 100644 --- a/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala +++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/errors/QueryExecutionErrors.scala @@ -2646,6 +2646,16 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE summary = "") } + def timestampNanosOverflowError(nanos: java.math.BigInteger): SparkArithmeticException = { + new SparkArithmeticException( + errorClass = "DATETIME_OVERFLOW", + messageParameters = Map( + "operation" -> + s"create a TIMESTAMP_LTZ(9) from $nanos nanoseconds since the epoch"), + context = Array.empty, + summary = "") + } + def timeAddIntervalOverflowError( time: Long, timePrecision: Int, diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala index 8771123ad1202..d6b18a9370e0c 100644 --- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala +++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/DateExpressionsSuite.scala @@ -1743,6 +1743,65 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper { } } + test("SPARK-57526: timestamp_nanos builds a TIMESTAMP_LTZ(9) from nanoseconds") { + import org.apache.spark.sql.catalyst.util.TimestampNanosTestUtils._ + + // DECIMAL input is accepted as-is; a wide DECIMAL(38, 0) holds every input below. + def tsNanos(n: BigInt): NanosToTimestamp = + NanosToTimestamp(Literal.create(Decimal(BigDecimal(n), 38, 0), DecimalType(38, 0))) + + assert(tsNanos(0).dataType === TimestampLTZNanosType(9)) + + // The JIRA example: 1230219000123456789 ns -> 1230219000123456 micros + 789 ns. + checkEvaluation(tsNanos(BigInt("1230219000123456789")), nanosVal(1230219000123456L, 789)) + + // An integral argument is accepted directly (widened to BigInteger), exercising the + // IntegralType eval/codegen path rather than the DECIMAL one. Cover every integral width + // (TINYINT/SMALLINT/INT/BIGINT) so the `(long)` codegen cast is checked for each. + checkEvaluation(NanosToTimestamp(Literal(2.toByte)), nanosVal(0L, 2)) + checkEvaluation(NanosToTimestamp(Literal(1000.toShort)), nanosVal(1L, 0)) + checkEvaluation(NanosToTimestamp(Literal(1000)), nanosVal(1L, 0)) + checkEvaluation( + NanosToTimestamp(Literal(1230219000123456789L)), nanosVal(1230219000123456L, 789)) + checkEvaluation(NanosToTimestamp(Literal(-1L)), nanosVal(-1L, 999)) + + // FLOAT/DOUBLE/STRING are rejected at analysis: a fractional or string nanosecond count is not + // meaningful, and the implicit DECIMAL coercion would silently overflow for realistic values. + Seq(Literal(1.0f), Literal(1.0d), Literal("1")).foreach { lit => + val mismatch = NanosToTimestamp(lit).checkInputDataTypes().asInstanceOf[DataTypeMismatch] + assert(mismatch.errorSubClass == "UNEXPECTED_INPUT_TYPE") + } + + // Pre-epoch / negative inputs use floor semantics, so nanosWithinMicro stays in [0, 999]: + // -1 ns floors to epochMicros = -1 with a 999 ns remainder. + checkEvaluation(tsNanos(BigInt(-1)), nanosVal(-1L, 999)) + checkEvaluation(tsNanos(BigInt(-1000)), nanosVal(-1L, 0)) + checkEvaluation(tsNanos(BigInt(-1500)), nanosVal(-2L, 500)) + + // NULL input. + checkEvaluation( + NanosToTimestamp(Literal.create(null, DecimalType(38, 0))), null) + + // Full [0001, 9999] range: a DECIMAL nanosecond count far beyond a 64-bit BIGINT decodes + // losslessly back to the original value (proving the function spans the whole calendar range). + Seq( + localDateTimeToNanosVal(timestampNTZ(9999, 12, 31, 23, 59, 59, 999999999)), + localDateTimeToNanosVal(timestampNTZ(1, 1, 1, 0, 0, 0, 1)) + ).foreach { v => + val n = BigInt(v.epochMicros) * NANOS_PER_MICROS + v.nanosWithinMicro.toInt + checkEvaluation(tsNanos(n), v) + // Round-trips with the inverse unix_nanos for the same full-range values. + checkEvaluation(UnixNanos(tsNanos(n)), Decimal(BigDecimal(n), 21, 0)) + } + + // Out-of-range input: epochMicros overflows a 64-bit long, surfaced as DATETIME_OVERFLOW. + checkErrorInExpression[SparkArithmeticException]( + tsNanos(BigInt("10000000000000000000000000")), + condition = "DATETIME_OVERFLOW", + parameters = Map("operation" -> + "create a TIMESTAMP_LTZ(9) from 10000000000000000000000000 nanoseconds since the epoch")) + } + test("TIMESTAMP_SECONDS") { def testIntegralFunc(value: Number): Unit = { checkEvaluation( diff --git a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md index 3ff81b7f57f02..6297aece4cbbd 100644 --- a/sql/core/src/test/resources/sql-functions/sql-expression-schema.md +++ b/sql/core/src/test/resources/sql-functions/sql-expression-schema.md @@ -255,6 +255,7 @@ | org.apache.spark.sql.catalyst.expressions.Murmur3Hash | hash | SELECT hash('Spark', array(123), 2) | struct | | org.apache.spark.sql.catalyst.expressions.NTile | ntile | SELECT a, b, ntile(2) OVER (PARTITION BY a ORDER BY b) FROM VALUES ('A1', 2), ('A1', 1), ('A2', 3), ('A1', 1) tab(a, b) | struct | | org.apache.spark.sql.catalyst.expressions.NaNvl | nanvl | SELECT nanvl(cast('NaN' as double), 123) | struct | +| org.apache.spark.sql.catalyst.expressions.NanosToTimestamp | timestamp_nanos | SELECT timestamp_nanos(1230219000123456789) | struct | | org.apache.spark.sql.catalyst.expressions.NextDay | next_day | SELECT next_day('2015-01-14', 'TU') | struct | | org.apache.spark.sql.catalyst.expressions.Not | ! | SELECT ! true | struct<(NOT true):boolean> | | org.apache.spark.sql.catalyst.expressions.Not | not | SELECT not true | struct<(NOT true):boolean> | diff --git a/sql/core/src/test/resources/sql-tests/analyzer-results/timestamp-ltz-nanos.sql.out b/sql/core/src/test/resources/sql-tests/analyzer-results/timestamp-ltz-nanos.sql.out index a4dadf760088d..9c15c197d8e7b 100644 --- a/sql/core/src/test/resources/sql-tests/analyzer-results/timestamp-ltz-nanos.sql.out +++ b/sql/core/src/test/resources/sql-tests/analyzer-results/timestamp-ltz-nanos.sql.out @@ -762,3 +762,62 @@ SELECT unix_nanos(NULL :: timestamp_ltz(9)) -- !query analysis Project [unix_nanos(cast(null as timestamp_ltz(9))) AS unix_nanos(CAST(NULL AS TIMESTAMP_LTZ(9)))#x] +- OneRowRelation + + +-- !query +SELECT timestamp_nanos(1230219000123456789) +-- !query analysis +Project [timestamp_nanos(1230219000123456789) AS timestamp_nanos(1230219000123456789)#x] ++- OneRowRelation + + +-- !query +SELECT timestamp_nanos(-1) +-- !query analysis +Project [timestamp_nanos(-1) AS timestamp_nanos(-1)#x] ++- OneRowRelation + + +-- !query +SELECT timestamp_nanos(253402300799999999999BD) +-- !query analysis +Project [timestamp_nanos(253402300799999999999) AS timestamp_nanos(253402300799999999999)#x] ++- OneRowRelation + + +-- !query +SELECT timestamp_nanos(10000000000000000000000000BD) +-- !query analysis +Project [timestamp_nanos(10000000000000000000000000) AS timestamp_nanos(10000000000000000000000000)#x] ++- OneRowRelation + + +-- !query +SELECT timestamp_nanos(1.0D) +-- !query analysis +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE", + "sqlState" : "42K09", + "messageParameters" : { + "inputSql" : "\"1.0\"", + "inputType" : "\"DOUBLE\"", + "paramIndex" : "first", + "requiredType" : "(\"INTEGRAL\" or \"DECIMAL\")", + "sqlExpr" : "\"timestamp_nanos(1.0)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 28, + "fragment" : "timestamp_nanos(1.0D)" + } ] +} + + +-- !query +SELECT timestamp_nanos(CAST(NULL AS BIGINT)) +-- !query analysis +Project [timestamp_nanos(cast(null as bigint)) AS timestamp_nanos(CAST(NULL AS BIGINT))#x] ++- OneRowRelation diff --git a/sql/core/src/test/resources/sql-tests/inputs/timestamp-ltz-nanos.sql b/sql/core/src/test/resources/sql-tests/inputs/timestamp-ltz-nanos.sql index e208704196ba3..bad3c1aee842f 100644 --- a/sql/core/src/test/resources/sql-tests/inputs/timestamp-ltz-nanos.sql +++ b/sql/core/src/test/resources/sql-tests/inputs/timestamp-ltz-nanos.sql @@ -215,3 +215,17 @@ SELECT unix_nanos(TIMESTAMP_LTZ '9999-12-31 23:59:59.999999999 UTC'); SELECT unix_nanos(TIMESTAMP_LTZ '1960-01-01 00:00:00.000000001 UTC'); -- NULL nanosecond timestamp. SELECT unix_nanos(NULL :: timestamp_ltz(9)); + +-- SPARK-57526: timestamp_nanos builds a TIMESTAMP_LTZ(9) from a nanosecond count since the epoch. +-- An integral argument is accepted directly; the LTZ result renders in the session zone. +SELECT timestamp_nanos(1230219000123456789); +-- Negative input floors toward the past, so the sub-microsecond remainder stays in [0, 999]. +SELECT timestamp_nanos(-1); +-- DECIMAL input reaches beyond a 64-bit BIGINT, up to year 9999 (nanos ~ 2.5e20). +SELECT timestamp_nanos(253402300799999999999BD); +-- Out-of-range input: epochMicros overflows a 64-bit long, so the conversion fails at runtime. +SELECT timestamp_nanos(10000000000000000000000000BD); +-- DOUBLE is rejected at analysis: only integral and DECIMAL nanosecond counts are accepted. +SELECT timestamp_nanos(1.0D); +-- NULL input. +SELECT timestamp_nanos(CAST(NULL AS BIGINT)); diff --git a/sql/core/src/test/resources/sql-tests/results/timestamp-ltz-nanos.sql.out b/sql/core/src/test/resources/sql-tests/results/timestamp-ltz-nanos.sql.out index 1f75f01da848a..84987fcb433cc 100644 --- a/sql/core/src/test/resources/sql-tests/results/timestamp-ltz-nanos.sql.out +++ b/sql/core/src/test/resources/sql-tests/results/timestamp-ltz-nanos.sql.out @@ -854,3 +854,76 @@ SELECT unix_nanos(NULL :: timestamp_ltz(9)) struct -- !query output NULL + + +-- !query +SELECT timestamp_nanos(1230219000123456789) +-- !query schema +struct +-- !query output +2008-12-25 07:30:00.123456789 + + +-- !query +SELECT timestamp_nanos(-1) +-- !query schema +struct +-- !query output +1969-12-31 15:59:59.999999999 + + +-- !query +SELECT timestamp_nanos(253402300799999999999BD) +-- !query schema +struct +-- !query output +9999-12-31 15:59:59.999999999 + + +-- !query +SELECT timestamp_nanos(10000000000000000000000000BD) +-- !query schema +struct<> +-- !query output +org.apache.spark.SparkArithmeticException +{ + "errorClass" : "DATETIME_OVERFLOW", + "sqlState" : "22008", + "messageParameters" : { + "operation" : "create a TIMESTAMP_LTZ(9) from 10000000000000000000000000 nanoseconds since the epoch" + } +} + + +-- !query +SELECT timestamp_nanos(1.0D) +-- !query schema +struct<> +-- !query output +org.apache.spark.sql.catalyst.ExtendedAnalysisException +{ + "errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE", + "sqlState" : "42K09", + "messageParameters" : { + "inputSql" : "\"1.0\"", + "inputType" : "\"DOUBLE\"", + "paramIndex" : "first", + "requiredType" : "(\"INTEGRAL\" or \"DECIMAL\")", + "sqlExpr" : "\"timestamp_nanos(1.0)\"" + }, + "queryContext" : [ { + "objectType" : "", + "objectName" : "", + "startIndex" : 8, + "stopIndex" : 28, + "fragment" : "timestamp_nanos(1.0D)" + } ] +} + + +-- !query +SELECT timestamp_nanos(CAST(NULL AS BIGINT)) +-- !query schema +struct +-- !query output +NULL diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TimestampNanosFunctionsSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/TimestampNanosFunctionsSuiteBase.scala index da2e9d3a8d885..ab830da6b2abc 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/TimestampNanosFunctionsSuiteBase.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/TimestampNanosFunctionsSuiteBase.scala @@ -481,6 +481,42 @@ abstract class TimestampNanosFunctionsSuiteBase extends SharedSparkSession { checkAnswer(ltz.select(unix_nanos(col("c"))), Row(null)) } } + + test("SPARK-57526: timestamp_nanos builds nanosecond-precision TIMESTAMP_LTZ values") { + // 1230219000123456789 ns since the epoch -> 2008-12-25 15:30:00.123456789 UTC. The result is a + // TIMESTAMP_LTZ(9); collecting it yields the absolute Instant regardless of the session zone. + val nanos = 1230219000123456789L + val instant = Instant.parse("2008-12-25T15:30:00.123456789Z") + val sqlRes = spark.sql(s"SELECT timestamp_nanos($nanos)") + val colRes = spark.range(1).select(timestamp_nanos(lit(nanos))) + // The SQL and Scala Column API agree, return the expected instant, and keep the LTZ(9) type. + checkAnswer(sqlRes, colRes) + checkAnswer(sqlRes, Row(instant)) + assert(sqlRes.schema.head.dataType === TimestampLTZNanosType(9)) + + // A BIGINT argument is accepted directly through the dedicated IntegralType path (widened to + // BigInteger, no DECIMAL coercion), so the integral literal works without a cast. + checkAnswer(spark.sql(s"SELECT timestamp_nanos(${nanos}L)"), Row(instant)) + + // DECIMAL input reaches the full [0001, 9999] calendar range, beyond a 64-bit BIGINT of nanos. + Seq( + Instant.parse("9999-12-31T23:59:59.999999999Z"), + Instant.parse("0001-01-01T00:00:00.000000001Z") + ).foreach { i => + val n = BigInt(i.getEpochSecond) * 1000000000L + i.getNano + checkAnswer( + spark.range(1).select(timestamp_nanos(lit(BigDecimal(n).bigDecimal))), + Row(i)) + } + } + + test("SPARK-57526: timestamp_nanos over NULL input") { + val df = spark.createDataFrame( + spark.sparkContext.parallelize(Seq(Row(null))), + new StructType().add("n", LongType)) + checkAnswer(df.select(timestamp_nanos(col("n"))), Row(null)) + checkAnswer(df.selectExpr("timestamp_nanos(n)"), Row(null)) + } } // Runs the nanosecond timestamp function tests with ANSI mode enabled explicitly.