Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/pyspark/sql/tests/test_functions.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ def test_function_parity(self):
# Functions that we expect to be missing in python until they are added to pyspark
expected_missing_in_py = {
"unix_nanos", # SPARK-57527: PySpark support tracked as a follow-up
"timestamp_nanos", # SPARK-57526: PySpark support tracked as a follow-up
}

self.assertEqual(
Expand Down
9 changes: 9 additions & 0 deletions sql/api/src/main/scala/org/apache/spark/sql/functions.scala
Original file line number Diff line number Diff line change
Expand Up @@ -8569,6 +8569,15 @@ object functions {
*/
def timestamp_micros(e: Column): Column = Column.fn("timestamp_micros", e)

/**
* Creates a timestamp with the local time zone and nanosecond precision (TIMESTAMP_LTZ(9)) from
* the number of nanoseconds since UTC epoch.
*
* @group datetime_funcs
* @since 4.3.0
*/
def timestamp_nanos(e: Column): Column = Column.fn("timestamp_nanos", e)

/**
* Gets the difference between the timestamps in the specified units by truncating the fraction
* part.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -774,6 +774,7 @@ object FunctionRegistry {
expression[SecondsToTimestamp]("timestamp_seconds"),
expression[MillisToTimestamp]("timestamp_millis"),
expression[MicrosToTimestamp]("timestamp_micros"),
expression[NanosToTimestamp]("timestamp_nanos"),
expression[UnixSeconds]("unix_seconds"),
expression[UnixMillis]("unix_millis"),
expression[UnixMicros]("unix_micros"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -759,6 +759,98 @@ case class MicrosToTimestamp(child: Expression)
copy(child = newChild)
}

// scalastyle:off line.size.limit line.contains.tab
@ExpressionDescription(
usage = "_FUNC_(nanoseconds) - Creates timestamp with the local time zone and nanosecond precision (TIMESTAMP_LTZ(9)) from the number of nanoseconds since UTC epoch.",
examples = """
Examples:
> SET spark.sql.timestampNanosTypes.enabled=true;
spark.sql.timestampNanosTypes.enabled true
> SELECT _FUNC_(1230219000123456789);
2008-12-25 07:30:00.123456789
""",
group = "datetime_funcs",
since = "4.3.0")
// scalastyle:on line.size.limit line.contains.tab
case class NanosToTimestamp(child: Expression)
extends UnaryExpression with ExpectsInputTypes {
override def nullIntolerant: Boolean = true

// Accepts an integral or DECIMAL nanosecond count only. DECIMAL is required to span the full
// [0001, 9999] calendar range: nanos for year 9999 (~2.5e20) overflow a 64-bit BIGINT, the same
// reason the inverse `unix_nanos` returns DECIMAL(21, 0); an integral argument is widened to
// BigInteger directly. FLOAT/DOUBLE/STRING are intentionally rejected at analysis rather than
// implicitly coerced: a fractional or string nanosecond count is not meaningful, and the implicit
// DECIMAL coercion (FLOAT -> DECIMAL(14, 7), DOUBLE -> DECIMAL(30, 15)) would silently overflow
// for realistic magnitudes.
override def inputTypes: Seq[AbstractDataType] = Seq(TypeCollection(IntegralType, DecimalType))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TypeCollection(IntegralType, DecimalType) accepts a DECIMAL of any scale (no implicit coercion, since this mixes in ExpectsInputTypes), so timestamp_nanos(CAST(x AS DECIMAL(p, s>0))) passes analysis and the fraction is silently floored by setScale(0, FLOOR) below. The rationale comment justifies rejecting fractional FLOAT/DOUBLE as "not meaningful" — could we extend it to say a fractional DECIMAL is accepted and floored to whole nanoseconds, so the asymmetry is deliberate? (Note timestamp_seconds's DECIMAL branch errors on sub-resolution input instead — a sentence on why nanos floors would help.)


override def dataType: DataType = TimestampLTZNanosType(9)

// Maps the integer nanosecond count to the (epochMicros, nanosWithinMicro) pair with floor
// semantics, so the sub-microsecond remainder is always in [0, 999] (matching the negative-input
// behavior of `floorDiv`/`floorMod`). When `epochMicros` overflows 64 bits -- i.e. the input is
// outside the representable timestamp range -- `longValueExact` throws, which is surfaced as a
// DATETIME_OVERFLOW error.
//
// Like the sibling `timestamp_micros`/`timestamp_millis`/`timestamp_seconds` constructors, the
// result is not validated against the [0001, 9999] calendar range: only the 64-bit `epochMicros`
// boundary is guarded, so a count whose `epochMicros` still fits in a long but lands past year
// 9999 (up to the long-micros maximum, ~year 294247) yields an out-of-range value rather than an
// error. This is intentional, keeping the nanosecond constructor consistent with its micro peers.
override def nullSafeEval(input: Any): Any = {
val n = child.dataType match {
case _: DecimalType =>
input.asInstanceOf[Decimal].toJavaBigDecimal
.setScale(0, java.math.RoundingMode.FLOOR).toBigInteger
case _: IntegralType =>
BigInteger.valueOf(input.asInstanceOf[Number].longValue())
}
val thousand = BigInteger.valueOf(NANOS_PER_MICROS)
val rem = n.mod(thousand)
val micros = try {
n.subtract(rem).divide(thousand).longValueExact()
} catch {
case _: ArithmeticException => throw QueryExecutionErrors.timestampNanosOverflowError(n)

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One question here for my curiosity: Overflow guard only catches epochMicros not fitting in a 64-bit long, not the documented calendar range. This is consistent with timestamp_micros (which also does no calendar-range validation); so I'm wondering - is it intentional?

Inputs whose epochMicros fits in a long but represents a year > 9999 (or < 0001) — up to ~year 292471 — silently produce an out-of-range TimestampNanosVal, since fromParts validates only nanosWithinMicro.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Intentional. It matches the sibling timestamp_micros/timestamp_millis/timestamp_seconds, which likewise guard only the 64-bit boundary (Math.multiplyExact) and do not validate the [0001, 9999] calendar range, so an epochMicros that fits in a long but lands past year 9999 (up to the long-micros maximum, ~year 294247) yields an out-of-range value rather than an error. I added an inline comment in e81da36 documenting this so the behavior is explicit. I kept it consistent with the micro constructors rather than introducing calendar-range validation here; happy to add that in a follow-up if we'd prefer the stricter behavior across all of them.

}
TimestampNanosVal.fromParts(micros, rem.shortValueExact())
}

override protected def doGenCode(ctx: CodegenContext, ev: ExprCode): ExprCode = {
nullSafeCodeGen(ctx, ev, c => {
val n = ctx.freshName("nanos")
val thousand = ctx.freshName("thousand")
val rem = ctx.freshName("rem")
val micros = ctx.freshName("micros")
val toBigInteger = child.dataType match {
case _: DecimalType =>
s"$c.toJavaBigDecimal().setScale(0, java.math.RoundingMode.FLOOR).toBigInteger()"
case _: IntegralType =>
s"java.math.BigInteger.valueOf((long) $c)"
}
val errors = QueryExecutionErrors.getClass.getName.stripSuffix("$")
s"""
|java.math.BigInteger $n = $toBigInteger;
|java.math.BigInteger $thousand = java.math.BigInteger.valueOf(${NANOS_PER_MICROS}L);
|java.math.BigInteger $rem = $n.mod($thousand);
|long $micros;
|try {
| $micros = $n.subtract($rem).divide($thousand).longValueExact();
|} catch (java.lang.ArithmeticException e) {
| throw $errors.timestampNanosOverflowError($n);
|}
|${ev.value} = org.apache.spark.unsafe.types.TimestampNanosVal.fromParts(
| $micros, $rem.shortValueExact());
|""".stripMargin
})
}

override def prettyName: String = "timestamp_nanos"

override protected def withNewChildInternal(newChild: Expression): NanosToTimestamp =
copy(child = newChild)
}

abstract class TimestampToLongBase extends UnaryExpression
with ExpectsInputTypes {
override def nullIntolerant: Boolean = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2646,6 +2646,16 @@ private[sql] object QueryExecutionErrors extends QueryErrorsBase with ExecutionE
summary = "")
}

def timestampNanosOverflowError(nanos: java.math.BigInteger): SparkArithmeticException = {
new SparkArithmeticException(
errorClass = "DATETIME_OVERFLOW",
messageParameters = Map(
"operation" ->
s"create a TIMESTAMP_LTZ(9) from $nanos nanoseconds since the epoch"),
context = Array.empty,
summary = "")
}

def timeAddIntervalOverflowError(
time: Long,
timePrecision: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1743,6 +1743,65 @@ class DateExpressionsSuite extends SparkFunSuite with ExpressionEvalHelper {
}
}

test("SPARK-57526: timestamp_nanos builds a TIMESTAMP_LTZ(9) from nanoseconds") {
import org.apache.spark.sql.catalyst.util.TimestampNanosTestUtils._

// DECIMAL input is accepted as-is; a wide DECIMAL(38, 0) holds every input below.
def tsNanos(n: BigInt): NanosToTimestamp =
NanosToTimestamp(Literal.create(Decimal(BigDecimal(n), 38, 0), DecimalType(38, 0)))

assert(tsNanos(0).dataType === TimestampLTZNanosType(9))

// The JIRA example: 1230219000123456789 ns -> 1230219000123456 micros + 789 ns.
checkEvaluation(tsNanos(BigInt("1230219000123456789")), nanosVal(1230219000123456L, 789))

// An integral argument is accepted directly (widened to BigInteger), exercising the
// IntegralType eval/codegen path rather than the DECIMAL one. Cover every integral width
// (TINYINT/SMALLINT/INT/BIGINT) so the `(long)` codegen cast is checked for each.
checkEvaluation(NanosToTimestamp(Literal(2.toByte)), nanosVal(0L, 2))
checkEvaluation(NanosToTimestamp(Literal(1000.toShort)), nanosVal(1L, 0))
checkEvaluation(NanosToTimestamp(Literal(1000)), nanosVal(1L, 0))
checkEvaluation(
NanosToTimestamp(Literal(1230219000123456789L)), nanosVal(1230219000123456L, 789))
checkEvaluation(NanosToTimestamp(Literal(-1L)), nanosVal(-1L, 999))

// FLOAT/DOUBLE/STRING are rejected at analysis: a fractional or string nanosecond count is not
// meaningful, and the implicit DECIMAL coercion would silently overflow for realistic values.
Seq(Literal(1.0f), Literal(1.0d), Literal("1")).foreach { lit =>
val mismatch = NanosToTimestamp(lit).checkInputDataTypes().asInstanceOf[DataTypeMismatch]
assert(mismatch.errorSubClass == "UNEXPECTED_INPUT_TYPE")
}

// Pre-epoch / negative inputs use floor semantics, so nanosWithinMicro stays in [0, 999]:
// -1 ns floors to epochMicros = -1 with a 999 ns remainder.
checkEvaluation(tsNanos(BigInt(-1)), nanosVal(-1L, 999))
checkEvaluation(tsNanos(BigInt(-1000)), nanosVal(-1L, 0))
checkEvaluation(tsNanos(BigInt(-1500)), nanosVal(-2L, 500))

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The setScale(0, FLOOR) direction is never actually exercised — every DECIMAL input in the suite is scale 0, so the scale-reduction is a no-op under test. Could we add one positive and one negative scale>0 case here, e.g. tsNanos-style Decimal(BigDecimal("1500.9"), ...) -> nanosVal(1L, 500) and especially Decimal(BigDecimal("-0.5"), ...) -> nanosVal(-1L, 999)? The negative one pins FLOOR-toward-−∞ vs HALF_UP/DOWN.


// NULL input.
checkEvaluation(
NanosToTimestamp(Literal.create(null, DecimalType(38, 0))), null)

// Full [0001, 9999] range: a DECIMAL nanosecond count far beyond a 64-bit BIGINT decodes
// losslessly back to the original value (proving the function spans the whole calendar range).
Seq(
localDateTimeToNanosVal(timestampNTZ(9999, 12, 31, 23, 59, 59, 999999999)),
localDateTimeToNanosVal(timestampNTZ(1, 1, 1, 0, 0, 0, 1))
).foreach { v =>
val n = BigInt(v.epochMicros) * NANOS_PER_MICROS + v.nanosWithinMicro.toInt
checkEvaluation(tsNanos(n), v)
// Round-trips with the inverse unix_nanos for the same full-range values.
checkEvaluation(UnixNanos(tsNanos(n)), Decimal(BigDecimal(n), 21, 0))
}

// Out-of-range input: epochMicros overflows a 64-bit long, surfaced as DATETIME_OVERFLOW.
checkErrorInExpression[SparkArithmeticException](
tsNanos(BigInt("10000000000000000000000000")),
condition = "DATETIME_OVERFLOW",
parameters = Map("operation" ->
"create a TIMESTAMP_LTZ(9) from 10000000000000000000000000 nanoseconds since the epoch"))
}

test("TIMESTAMP_SECONDS") {
def testIntegralFunc(value: Number): Unit = {
checkEvaluation(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@
| org.apache.spark.sql.catalyst.expressions.Murmur3Hash | hash | SELECT hash('Spark', array(123), 2) | struct<hash(Spark, array(123), 2):int> |
| org.apache.spark.sql.catalyst.expressions.NTile | ntile | SELECT a, b, ntile(2) OVER (PARTITION BY a ORDER BY b) FROM VALUES ('A1', 2), ('A1', 1), ('A2', 3), ('A1', 1) tab(a, b) | struct<a:string,b:int,ntile(2) OVER (PARTITION BY a ORDER BY b ASC NULLS FIRST ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW):int> |
| org.apache.spark.sql.catalyst.expressions.NaNvl | nanvl | SELECT nanvl(cast('NaN' as double), 123) | struct<nanvl(CAST(NaN AS DOUBLE), 123):double> |
| org.apache.spark.sql.catalyst.expressions.NanosToTimestamp | timestamp_nanos | SELECT timestamp_nanos(1230219000123456789) | struct<timestamp_nanos(1230219000123456789):timestamp_ltz(9)> |
| org.apache.spark.sql.catalyst.expressions.NextDay | next_day | SELECT next_day('2015-01-14', 'TU') | struct<next_day(2015-01-14, TU):date> |
| org.apache.spark.sql.catalyst.expressions.Not | ! | SELECT ! true | struct<(NOT true):boolean> |
| org.apache.spark.sql.catalyst.expressions.Not | not | SELECT not true | struct<(NOT true):boolean> |
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -762,3 +762,62 @@ SELECT unix_nanos(NULL :: timestamp_ltz(9))
-- !query analysis
Project [unix_nanos(cast(null as timestamp_ltz(9))) AS unix_nanos(CAST(NULL AS TIMESTAMP_LTZ(9)))#x]
+- OneRowRelation


-- !query
SELECT timestamp_nanos(1230219000123456789)
-- !query analysis
Project [timestamp_nanos(1230219000123456789) AS timestamp_nanos(1230219000123456789)#x]
+- OneRowRelation


-- !query
SELECT timestamp_nanos(-1)
-- !query analysis
Project [timestamp_nanos(-1) AS timestamp_nanos(-1)#x]
+- OneRowRelation


-- !query
SELECT timestamp_nanos(253402300799999999999BD)
-- !query analysis
Project [timestamp_nanos(253402300799999999999) AS timestamp_nanos(253402300799999999999)#x]
+- OneRowRelation


-- !query
SELECT timestamp_nanos(10000000000000000000000000BD)
-- !query analysis
Project [timestamp_nanos(10000000000000000000000000) AS timestamp_nanos(10000000000000000000000000)#x]
+- OneRowRelation


-- !query
SELECT timestamp_nanos(1.0D)
-- !query analysis
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
"sqlState" : "42K09",
"messageParameters" : {
"inputSql" : "\"1.0\"",
"inputType" : "\"DOUBLE\"",
"paramIndex" : "first",
"requiredType" : "(\"INTEGRAL\" or \"DECIMAL\")",
"sqlExpr" : "\"timestamp_nanos(1.0)\""
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 8,
"stopIndex" : 28,
"fragment" : "timestamp_nanos(1.0D)"
} ]
}


-- !query
SELECT timestamp_nanos(CAST(NULL AS BIGINT))
-- !query analysis
Project [timestamp_nanos(cast(null as bigint)) AS timestamp_nanos(CAST(NULL AS BIGINT))#x]
+- OneRowRelation
Original file line number Diff line number Diff line change
Expand Up @@ -215,3 +215,17 @@ SELECT unix_nanos(TIMESTAMP_LTZ '9999-12-31 23:59:59.999999999 UTC');
SELECT unix_nanos(TIMESTAMP_LTZ '1960-01-01 00:00:00.000000001 UTC');
-- NULL nanosecond timestamp.
SELECT unix_nanos(NULL :: timestamp_ltz(9));

-- SPARK-57526: timestamp_nanos builds a TIMESTAMP_LTZ(9) from a nanosecond count since the epoch.
-- An integral argument is accepted directly; the LTZ result renders in the session zone.
SELECT timestamp_nanos(1230219000123456789);
-- Negative input floors toward the past, so the sub-microsecond remainder stays in [0, 999].
SELECT timestamp_nanos(-1);
-- DECIMAL input reaches beyond a 64-bit BIGINT, up to year 9999 (nanos ~ 2.5e20).
SELECT timestamp_nanos(253402300799999999999BD);
-- Out-of-range input: epochMicros overflows a 64-bit long, so the conversion fails at runtime.
SELECT timestamp_nanos(10000000000000000000000000BD);
-- DOUBLE is rejected at analysis: only integral and DECIMAL nanosecond counts are accepted.
SELECT timestamp_nanos(1.0D);
-- NULL input.
SELECT timestamp_nanos(CAST(NULL AS BIGINT));
Original file line number Diff line number Diff line change
Expand Up @@ -854,3 +854,76 @@ SELECT unix_nanos(NULL :: timestamp_ltz(9))
struct<unix_nanos(CAST(NULL AS TIMESTAMP_LTZ(9))):decimal(21,0)>
-- !query output
NULL


-- !query
SELECT timestamp_nanos(1230219000123456789)
-- !query schema
struct<timestamp_nanos(1230219000123456789):timestamp_ltz(9)>
-- !query output
2008-12-25 07:30:00.123456789


-- !query
SELECT timestamp_nanos(-1)
-- !query schema
struct<timestamp_nanos(-1):timestamp_ltz(9)>
-- !query output
1969-12-31 15:59:59.999999999


-- !query
SELECT timestamp_nanos(253402300799999999999BD)
-- !query schema
struct<timestamp_nanos(253402300799999999999):timestamp_ltz(9)>
-- !query output
9999-12-31 15:59:59.999999999


-- !query
SELECT timestamp_nanos(10000000000000000000000000BD)
-- !query schema
struct<>
-- !query output
org.apache.spark.SparkArithmeticException
{
"errorClass" : "DATETIME_OVERFLOW",
"sqlState" : "22008",
"messageParameters" : {
"operation" : "create a TIMESTAMP_LTZ(9) from 10000000000000000000000000 nanoseconds since the epoch"
}
}


-- !query
SELECT timestamp_nanos(1.0D)
-- !query schema
struct<>
-- !query output
org.apache.spark.sql.catalyst.ExtendedAnalysisException
{
"errorClass" : "DATATYPE_MISMATCH.UNEXPECTED_INPUT_TYPE",
"sqlState" : "42K09",
"messageParameters" : {
"inputSql" : "\"1.0\"",
"inputType" : "\"DOUBLE\"",
"paramIndex" : "first",
"requiredType" : "(\"INTEGRAL\" or \"DECIMAL\")",
"sqlExpr" : "\"timestamp_nanos(1.0)\""
},
"queryContext" : [ {
"objectType" : "",
"objectName" : "",
"startIndex" : 8,
"stopIndex" : 28,
"fragment" : "timestamp_nanos(1.0D)"
} ]
}


-- !query
SELECT timestamp_nanos(CAST(NULL AS BIGINT))
-- !query schema
struct<timestamp_nanos(CAST(NULL AS BIGINT)):timestamp_ltz(9)>
-- !query output
NULL
Loading