Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import scala.PartialFunction;

import org.apache.spark.SparkUnsupportedOperationException;
import org.apache.spark.annotation.Evolving;
import org.apache.spark.sql.types.DataType;
import org.apache.spark.sql.types.Decimal;
Expand Down Expand Up @@ -328,12 +327,28 @@ public CalendarInterval getInterval(int rowId) {
return new CalendarInterval(months, days, microseconds);
}

/**
* Returns the nanosecond NTZ timestamp value for {@code rowId}, or null if the slot is null.
* <p>
* To support this type, implementations must implement {@link #getChild(int)} and define 2 child
* vectors: child 0 is a long vector holding {@code epochMicros}; child 1 is a short vector
* holding {@code nanosWithinMicro} (values in [0, 999]).
*/
public TimestampNanosVal getTimestampNTZNanos(int rowId) {
throw SparkUnsupportedOperationException.apply();
if (isNullAt(rowId)) return null;
return TimestampNanosVal.fromTrustedRowBytes(
getChild(0).getLong(rowId), getChild(1).getShort(rowId));
}

/**
* Returns the nanosecond LTZ timestamp value for {@code rowId}, or null if the slot is null.
* <p>
* Storage layout is identical to {@link #getTimestampNTZNanos(int)}.
*/
public TimestampNanosVal getTimestampLTZNanos(int rowId) {
throw SparkUnsupportedOperationException.apply();
if (isNullAt(rowId)) return null;
return TimestampNanosVal.fromTrustedRowBytes(
getChild(0).getLong(rowId), getChild(1).getShort(rowId));
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@
import org.apache.spark.sql.vectorized.ColumnarBatch;
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.TimestampNanosVal;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.unsafe.types.VariantVal;

Expand Down Expand Up @@ -106,6 +107,10 @@ public static void populate(
} else if (pdt instanceof PhysicalCalendarIntervalType) {
// The value of `numRows` is irrelevant.
col.setCalendarInterval((CalendarInterval) row.get(fieldIdx, t));
} else if (pdt instanceof PhysicalTimestampNTZNanosType) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two else if branches are identical. appendValue below at line 178 already collapses both nanos types into one condition with ||. Suggest the same here:

} else if (pdt instanceof PhysicalTimestampNTZNanosType ||
    pdt instanceof PhysicalTimestampLTZNanosType) {
  col.setTimestampNanosVal((TimestampNanosVal) row.get(fieldIdx, t));
}

col.setTimestampNanosVal((TimestampNanosVal) row.get(fieldIdx, t));
} else if (pdt instanceof PhysicalTimestampLTZNanosType) {
col.setTimestampNanosVal((TimestampNanosVal) row.get(fieldIdx, t));
} else if (pdt instanceof PhysicalVariantType) {
col.setVariant((VariantVal)row.get(fieldIdx, t));
} else if (pdt instanceof PhysicalStructType) {
Expand Down Expand Up @@ -171,7 +176,8 @@ public static Map<Integer, Integer> toJavaIntMap(ColumnarMap map) {

private static void appendValue(WritableColumnVector dst, DataType t, Object o) {
if (o == null) {
if (t instanceof CalendarIntervalType || t instanceof VariantType) {
if (t instanceof CalendarIntervalType || t instanceof VariantType ||
t instanceof TimestampNTZNanosType || t instanceof TimestampLTZNanosType) {
dst.appendStruct(true);
} else {
dst.appendNull();
Expand Down Expand Up @@ -219,6 +225,11 @@ private static void appendValue(WritableColumnVector dst, DataType t, Object o)
dst.appendStruct(false);
dst.getChild(0).appendByteArray(v.getValue(), 0, v.getValue().length);
dst.getChild(1).appendByteArray(v.getMetadata(), 0, v.getMetadata().length);
} else if (t instanceof TimestampNTZNanosType || t instanceof TimestampLTZNanosType) {
TimestampNanosVal v = (TimestampNanosVal) o;
dst.appendStruct(false);
dst.getChild(0).appendLong(v.epochMicros);
dst.getChild(1).appendShort(v.nanosWithinMicro);
} else if (t instanceof DateType) {
dst.appendInt(DateTimeUtils.fromJavaDate((Date) o));
} else if (t instanceof TimestampType) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.spark.sql.vectorized.ColumnarArray;
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.TimestampNanosVal;
import org.apache.spark.unsafe.types.UTF8String;
import org.apache.spark.unsafe.types.VariantVal;

Expand Down Expand Up @@ -73,6 +74,11 @@ public ConstantColumnVector(int numRows, DataType type) {
this.childData[0] = new ConstantColumnVector(1, DataTypes.IntegerType);
this.childData[1] = new ConstantColumnVector(1, DataTypes.IntegerType);
this.childData[2] = new ConstantColumnVector(1, DataTypes.LongType);
} else if (type instanceof TimestampNTZNanosType || type instanceof TimestampLTZNanosType) {
// Two columns. EpochMicros as Long. NanosWithinMicro as Short.
this.childData = new ConstantColumnVector[2];
this.childData[0] = new ConstantColumnVector(1, DataTypes.LongType);
this.childData[1] = new ConstantColumnVector(1, DataTypes.ShortType);
} else if (type instanceof VariantType) {
this.childData = new ConstantColumnVector[2];
this.childData[0] = new ConstantColumnVector(1, DataTypes.BinaryType);
Expand Down Expand Up @@ -359,6 +365,14 @@ public void setCalendarInterval(CalendarInterval value) {
this.childData[2].setLong(value.microseconds);
}

/**
* Sets the nanosecond timestamp `value` for all rows
*/
public void setTimestampNanosVal(TimestampNanosVal value) {
this.childData[0].setLong(value.epochMicros);
this.childData[1].setShort(value.nanosWithinMicro);
}

/**
* Sets the Variant `value` for all rows
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,10 @@ public InternalRow copy() {
row.update(i, getMap(i).copy());
} else if (dt instanceof VariantType) {
row.update(i, getVariant(i));
} else if (dt instanceof TimestampNTZNanosType) {
row.update(i, getTimestampNTZNanos(i));
} else if (dt instanceof TimestampLTZNanosType) {
row.update(i, getTimestampLTZNanos(i));
} else {
throw new RuntimeException("Not implemented. " + dt);
}
Expand Down Expand Up @@ -232,6 +236,10 @@ public Object get(int ordinal, DataType dataType) {
return getMap(ordinal);
} else if (dataType instanceof VariantType) {
return getVariant(ordinal);
} else if (dataType instanceof TimestampNTZNanosType) {
return getTimestampNTZNanos(ordinal);
} else if (dataType instanceof TimestampLTZNanosType) {
return getTimestampLTZNanos(ordinal);
} else {
throw new SparkUnsupportedOperationException(
"_LEGACY_ERROR_TEMP_3192", Map.of("dt", dataType.toString()));
Expand Down Expand Up @@ -261,6 +269,10 @@ public void update(int ordinal, Object value) {
setDecimal(ordinal, d, t.precision());
} else if (dt instanceof CalendarIntervalType) {
setInterval(ordinal, (CalendarInterval) value);
} else if (dt instanceof TimestampNTZNanosType) {
setTimestampNTZNanos(ordinal, (TimestampNanosVal) value);
} else if (dt instanceof TimestampLTZNanosType) {
setTimestampLTZNanos(ordinal, (TimestampNanosVal) value);
} else {
throw new SparkUnsupportedOperationException(
"_LEGACY_ERROR_TEMP_3192", Map.of("dt", dt.toString()));
Expand Down Expand Up @@ -326,4 +338,14 @@ public void setInterval(int ordinal, CalendarInterval value) {
columns[ordinal].putNotNull(rowId);
columns[ordinal].putInterval(rowId, value);
}

public void setTimestampNTZNanos(int ordinal, TimestampNanosVal value) {
columns[ordinal].putNotNull(rowId);
columns[ordinal].putTimestampNTZNanos(rowId, value);
}

public void setTimestampLTZNanos(int ordinal, TimestampNanosVal value) {
columns[ordinal].putNotNull(rowId);
columns[ordinal].putTimestampLTZNanos(rowId, value);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.apache.spark.sql.vectorized.ColumnarMap;
import org.apache.spark.unsafe.array.ByteArrayMethods;
import org.apache.spark.unsafe.types.CalendarInterval;
import org.apache.spark.unsafe.types.TimestampNanosVal;
import org.apache.spark.unsafe.types.UTF8String;

/**
Expand Down Expand Up @@ -493,6 +494,16 @@ public void putInterval(int rowId, CalendarInterval value) {
getChild(2).putLong(rowId, value.microseconds);
}

public void putTimestampNTZNanos(int rowId, TimestampNanosVal value) {
getChild(0).putLong(rowId, value.epochMicros);
getChild(1).putShort(rowId, value.nanosWithinMicro);
}

public void putTimestampLTZNanos(int rowId, TimestampNanosVal value) {
getChild(0).putLong(rowId, value.epochMicros);
getChild(1).putShort(rowId, value.nanosWithinMicro);
}

@Override
public UTF8String getUTF8String(int rowId) {
if (isNullAt(rowId)) return null;
Expand Down Expand Up @@ -751,7 +762,10 @@ public final int appendStruct(boolean isNull) {
putNull(elementsAppended);
elementsAppended++;
for (WritableColumnVector c: childColumns) {
if (c.type instanceof StructType || c.type instanceof VariantType) {
if (c.type instanceof StructType || c.type instanceof VariantType
|| c.type instanceof CalendarIntervalType
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding CalendarIntervalType here isn't just supporting the new nanos types — it also fixes a previously-latent bug for nested struct-of-interval. Pre-PR, when an outer struct column was appended as null and one of its child columns was a CalendarInterval, the interval child took the else branch (c.appendNull()), advancing only the interval's own cursor and leaving its three grandchild primitive columns (months/days/microseconds) un-advanced. Subsequent rows would then write into the wrong grandchild slots — silent skew.

The fix is correct, but:

  1. The PR description doesn't mention this. Worth one line so reviewers don't miss the interval-semantics change.
  2. There's no test exercising the new recursion. The minimum case is a struct-of-interval column with at least one null parent row, then read back the next non-null row's children to verify they aren't shifted. Same shape extends to struct-of-TimestampNanos.

Up to you whether to split out into a separate commit or keep bundled.

|| c.type instanceof TimestampNTZNanosType
|| c.type instanceof TimestampLTZNanosType) {
c.appendStruct(true);
} else {
c.appendNull();
Expand Down Expand Up @@ -1056,6 +1070,11 @@ protected WritableColumnVector(int capacity, DataType dataType) {
this.childColumns[0] = reserveNewColumn(capacity, DataTypes.IntegerType);
this.childColumns[1] = reserveNewColumn(capacity, DataTypes.IntegerType);
this.childColumns[2] = reserveNewColumn(capacity, DataTypes.LongType);
} else if (type instanceof TimestampNTZNanosType || type instanceof TimestampLTZNanosType) {
// Two columns. EpochMicros as Long. NanosWithinMicro as Short.
this.childColumns = new WritableColumnVector[2];
this.childColumns[0] = reserveNewColumn(capacity, DataTypes.LongType);
this.childColumns[1] = reserveNewColumn(capacity, DataTypes.ShortType);
} else if (type instanceof VariantType) {
this.childColumns = new WritableColumnVector[2];
this.childColumns[0] = reserveNewColumn(capacity, DataTypes.BinaryType);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,8 @@ private object RowToColumnConverter {
case _: GeometryType => GeometryConverter
case CalendarIntervalType => CalendarConverter
case VariantType => VariantConverter
case _: TimestampNTZNanosType => TimestampNTZNanosConverter
case _: TimestampLTZNanosType => TimestampLTZNanosConverter
case at: ArrayType => ArrayConverter(getConverterForType(at.elementType, at.containsNull))
case st: StructType => new StructConverter(st.fields.map(
(f) => getConverterForType(f.dataType, f.nullable)))
Expand All @@ -284,6 +286,8 @@ private object RowToColumnConverter {
if (nullable) {
dataType match {
case CalendarIntervalType | VariantType => new StructNullableTypeConverter(core)
case _: TimestampNTZNanosType | _: TimestampLTZNanosType =>
new StructNullableTypeConverter(core)
case st: StructType => new StructNullableTypeConverter(core)
case _ => new BasicNullableTypeConverter(core)
}
Expand Down Expand Up @@ -374,6 +378,24 @@ private object RowToColumnConverter {
}
}

private object TimestampNTZNanosConverter extends TypeConverter {
override def append(row: SpecializedGetters, column: Int, cv: WritableColumnVector): Unit = {
val v = row.getTimestampNTZNanos(column)
cv.appendStruct(false)
cv.getChild(0).appendLong(v.epochMicros)
cv.getChild(1).appendShort(v.nanosWithinMicro)
}
}

private object TimestampLTZNanosConverter extends TypeConverter {
override def append(row: SpecializedGetters, column: Int, cv: WritableColumnVector): Unit = {
val v = row.getTimestampLTZNanos(column)
cv.appendStruct(false)
cv.getChild(0).appendLong(v.epochMicros)
cv.getChild(1).appendShort(v.nanosWithinMicro)
}
}

private case class ArrayConverter(childConverter: TypeConverter) extends TypeConverter {
override def append(row: SpecializedGetters, column: Int, cv: WritableColumnVector): Unit = {
val values = row.getArray(column)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.util.{ArrayBasedMapData, ArrayData, GenericArrayData}
import org.apache.spark.sql.execution.vectorized.{OnHeapColumnVector, WritableColumnVector}
import org.apache.spark.sql.types._
import org.apache.spark.unsafe.types.UTF8String
import org.apache.spark.unsafe.types.{TimestampNanosVal, UTF8String}
import org.apache.spark.util.ArrayImplicits._

class RowToColumnConverterSuite extends SparkFunSuite {
Expand Down Expand Up @@ -130,6 +130,60 @@ class RowToColumnConverterSuite extends SparkFunSuite {
}
}

test("TimestampNTZNanosType column roundtrip") {
val t = TimestampNTZNanosType(9)
val schema = StructType(Seq(StructField("ts", t)))
val values = Seq(
TimestampNanosVal.fromParts(0L, 0.toShort),
TimestampNanosVal.fromParts(1_000_000L, 999.toShort),
TimestampNanosVal.fromParts(-1L, 123.toShort))
val rows = values.map(v => InternalRow(v))
val vectors = convertRows(rows, schema)
values.zipWithIndex.foreach { case (v, i) =>
assert(vectors.head.getTimestampNTZNanos(i) === v)
}
}

test("TimestampNTZNanosType column with nulls") {
val t = TimestampNTZNanosType(9)
val schema = StructType(Seq(StructField("ts", t, nullable = true)))
val rows = Seq(
InternalRow(TimestampNanosVal.fromParts(100L, 42.toShort)),
InternalRow(null),
InternalRow(TimestampNanosVal.fromParts(200L, 1.toShort)))
val vectors = convertRows(rows, schema)
assert(vectors.head.getTimestampNTZNanos(0) === TimestampNanosVal.fromParts(100L, 42.toShort))
assert(vectors.head.isNullAt(1))
assert(vectors.head.getTimestampNTZNanos(2) === TimestampNanosVal.fromParts(200L, 1.toShort))
}

test("TimestampLTZNanosType column roundtrip") {
val t = TimestampLTZNanosType(9)
val schema = StructType(Seq(StructField("ts", t)))
val values = Seq(
TimestampNanosVal.fromParts(0L, 0.toShort),
TimestampNanosVal.fromParts(1_000_000L, 999.toShort),
TimestampNanosVal.fromParts(-1L, 123.toShort))
val rows = values.map(v => InternalRow(v))
val vectors = convertRows(rows, schema)
values.zipWithIndex.foreach { case (v, i) =>
assert(vectors.head.getTimestampLTZNanos(i) === v)
}
}

test("TimestampLTZNanosType column with nulls") {
val t = TimestampLTZNanosType(9)
val schema = StructType(Seq(StructField("ts", t, nullable = true)))
val rows = Seq(
InternalRow(TimestampNanosVal.fromParts(100L, 42.toShort)),
InternalRow(null),
InternalRow(TimestampNanosVal.fromParts(200L, 1.toShort)))
val vectors = convertRows(rows, schema)
assert(vectors.head.getTimestampLTZNanos(0) === TimestampNanosVal.fromParts(100L, 42.toShort))
assert(vectors.head.isNullAt(1))
assert(vectors.head.getTimestampLTZNanos(2) === TimestampNanosVal.fromParts(200L, 1.toShort))
}

test("multiple columns") {
val schema = StructType(
Seq(StructField("s", ShortType), StructField("i", IntegerType), StructField("l", LongType)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql.execution.columnar.compression.ColumnBuilderHelper
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.sql.vectorized.ColumnarArray
import org.apache.spark.unsafe.types.{UTF8String, VariantVal}
import org.apache.spark.unsafe.types.{TimestampNanosVal, UTF8String, VariantVal}
import org.apache.spark.util.ArrayImplicits._

class ColumnVectorSuite extends SparkFunSuite with SQLHelper {
Expand Down Expand Up @@ -379,6 +379,50 @@ class ColumnVectorSuite extends SparkFunSuite with SQLHelper {
}
}

testVectors("timestamp_ntz_nanos", 10, TimestampNTZNanosType(9)) { testVector =>
val values = (0 until 10).map(i => TimestampNanosVal.fromParts(i * 1000L, i.toShort))
values.foreach { v =>
testVector.putNotNull(testVector.elementsAppended)
testVector.putTimestampNTZNanos(testVector.elementsAppended, v)
testVector.elementsAppended += 1
}
values.zipWithIndex.foreach { case (v, i) =>
assert(testVector.getTimestampNTZNanos(i) === v)
}
testVector.putNull(0)
assert(testVector.isNullAt(0))
}

testVectors("timestamp_ltz_nanos", 10, TimestampLTZNanosType(9)) { testVector =>
val values = (0 until 10).map(i => TimestampNanosVal.fromParts(i * 1000L, i.toShort))
values.foreach { v =>
testVector.putNotNull(testVector.elementsAppended)
testVector.putTimestampLTZNanos(testVector.elementsAppended, v)
testVector.elementsAppended += 1
}
values.zipWithIndex.foreach { case (v, i) =>
assert(testVector.getTimestampLTZNanos(i) === v)
}
testVector.putNull(0)
assert(testVector.isNullAt(0))
}

testVectors("mutable ColumnarRow with TimestampNTZNanosType", 5,
TimestampNTZNanosType(9)) { testVector =>
val mutableRow = new MutableColumnarRow(Array(testVector))
val values = (0 until 5).map(i => TimestampNanosVal.fromParts(i * 100L, i.toShort))
values.zipWithIndex.foreach { case (v, i) =>
mutableRow.rowId = i
mutableRow.setTimestampNTZNanos(0, v)
}
values.zipWithIndex.foreach { case (v, i) =>
mutableRow.rowId = i
assert(mutableRow.getTimestampNTZNanos(0) === v)
assert(mutableRow.get(0, TimestampNTZNanosType(9)) === v)
assert(mutableRow.copy().get(0, TimestampNTZNanosType(9)) === v)
}
}
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The PR adds a MutableColumnarRow test for TimestampNTZNanosType but not for TimestampLTZNanosType. The LTZ paths (setTimestampLTZNanos at MutableColumnarRow.java:348, the update(TimestampLTZNanosType) and get(TimestampLTZNanosType) dispatches at MutableColumnarRow.java:240,272, and the copy() branch at MutableColumnarRow.java:104) aren't exercised. Adding a parallel mutable ColumnarRow with TimestampLTZNanosType block right after this one closes the gap.


testVectors("mutable ColumnarRow with TimestampNTZType", 10, TimestampNTZType) { testVector =>
val mutableRow = new MutableColumnarRow(Array(testVector))
(0 until 10).foreach { i =>
Expand Down
Loading