Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ public static Type createTimestampWithLogicalType(
return Types.primitive(INT64, repetition)
.as(
LogicalTypeAnnotation.timestampType(
isAdjustToUTC, LogicalTypeAnnotation.TimeUnit.MILLIS))
isAdjustToUTC, LogicalTypeAnnotation.TimeUnit.MICROS))
.named(name);
} else if (precision > 6) {
return Types.primitive(PrimitiveType.PrimitiveTypeName.INT96, repetition).named(name);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -223,8 +223,8 @@ private SimpleColStats toTimestampStats(Statistics<?> stats, int precision) {
if (precision <= 3) {
LongStatistics longStats = (LongStatistics) stats;
return new SimpleColStats(
Timestamp.fromEpochMillis(longStats.getMin()),
Timestamp.fromEpochMillis(longStats.getMax()),
Timestamp.fromMicros(longStats.getMin()),
Timestamp.fromMicros(longStats.getMax()),
stats.getNumNulls());
} else if (precision <= 6) {
LongStatistics longStats = (LongStatistics) stats;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@
import static org.apache.paimon.utils.Preconditions.checkArgument;

/**
* Parquet write timestamp precision 0-3 as int64 mills, 4-6 as int64 micros, 7-9 as int96, this
* class wrap the real vector to provide {@link TimestampColumnVector} interface.
* Parquet write timestamp precision 0-6 as int64 micros, 7-9 as int96, this class wrap the real
* vector to provide {@link TimestampColumnVector} interface.
*/
public class ParquetTimestampVector implements TimestampColumnVector {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@
import org.apache.paimon.data.columnar.writable.WritableColumnVector;
import org.apache.paimon.data.columnar.writable.WritableIntVector;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.TimestampType;

import org.apache.parquet.CorruptDeltaByteArrays;
import org.apache.parquet.VersionParser.ParsedVersion;
Expand Down Expand Up @@ -115,14 +117,18 @@ public VectorizedColumnReader(
}

private boolean isLazyDecodingSupported(
PrimitiveType.PrimitiveTypeName typeName, ColumnVector columnVector) {
PrimitiveType.PrimitiveTypeName typeName,
DataType dataType,
ColumnVector columnVector) {
boolean isSupported = false;
switch (typeName) {
case INT32:
isSupported = columnVector instanceof IntColumnVector;
break;
case INT64:
isSupported = columnVector instanceof LongColumnVector;
isSupported =
columnVector instanceof LongColumnVector
&& !isLowPrecisionTimestamp(dataType);
break;
case FLOAT:
isSupported = columnVector instanceof FloatColumnVector;
Expand All @@ -139,6 +145,17 @@ private boolean isLazyDecodingSupported(
return isSupported;
}

private static boolean isLowPrecisionTimestamp(DataType dataType) {
switch (dataType.getTypeRoot()) {
case TIMESTAMP_WITHOUT_TIME_ZONE:
return ((TimestampType) dataType).getPrecision() <= 3;
case TIMESTAMP_WITH_LOCAL_TIME_ZONE:
return ((LocalZonedTimestampType) dataType).getPrecision() <= 3;
default:
return false;
}
}

/** Reads `total` rows from this columnReader into column. */
void readBatch(
int total,
Expand Down Expand Up @@ -198,12 +215,9 @@ void readBatch(
(VectorizedValuesReader) dataColumn);
}

// TIMESTAMP_MILLIS encoded as INT64 can't be lazily decoded as we need to post
// process
// the values to add microseconds precision.
if (column.hasDictionary()
|| (startRowId == pageFirstRowIndex
&& isLazyDecodingSupported(typeName, column))) {
&& isLazyDecodingSupported(typeName, type, column))) {
column.setDictionary(new ParquetDictionary(dictionary));
} else {
updater.decodeDictionaryIds(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ public void write(InternalArray arrayData, int ordinal) {
}

private void writeTimestamp(Timestamp value) {
recordConsumer.addLong(value.getMillisecond());
recordConsumer.addLong(value.toMicros());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,11 +299,7 @@ private static Comparable<?> toParquetObject(
} else if (value instanceof Timestamp) {
Timestamp timestamp = (Timestamp) value;
int precision = getTimestampPrecision(type);
if (precision <= 3) {
// milliseconds
return timestamp.getMillisecond();
} else if (precision <= 6) {
// microseconds
if (precision <= 6) {
return timestamp.toMicros();
}
// precision > 6 uses INT96, not supported
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -447,7 +447,7 @@ public void testInFilterDecimal64Bit() {

@Test
public void testTimestampMillis() {
// precision <= 3 uses milliseconds (INT64)
// precision <= 3 now uses microseconds (MICROS annotation, matching the writer)
int precision = 3;
PredicateBuilder builder =
new PredicateBuilder(
Expand All @@ -456,16 +456,16 @@ public void testTimestampMillis() {
new DataField(0, "ts1", new TimestampType(precision)))));

Timestamp value = Timestamp.fromEpochMillis(1704067200000L); // 2024-01-01 00:00:00
long expectedMillis = value.getMillisecond();
long expectedMicros = value.toMicros();

test(builder.isNull(0), "eq(ts1, null)", true);
test(builder.isNotNull(0), "noteq(ts1, null)", true);
test(builder.equal(0, value), "eq(ts1, " + expectedMillis + ")", true);
test(builder.notEqual(0, value), "noteq(ts1, " + expectedMillis + ")", true);
test(builder.lessThan(0, value), "lt(ts1, " + expectedMillis + ")", true);
test(builder.lessOrEqual(0, value), "lteq(ts1, " + expectedMillis + ")", true);
test(builder.greaterThan(0, value), "gt(ts1, " + expectedMillis + ")", true);
test(builder.greaterOrEqual(0, value), "gteq(ts1, " + expectedMillis + ")", true);
test(builder.equal(0, value), "eq(ts1, " + expectedMicros + ")", true);
test(builder.notEqual(0, value), "noteq(ts1, " + expectedMicros + ")", true);
test(builder.lessThan(0, value), "lt(ts1, " + expectedMicros + ")", true);
test(builder.lessOrEqual(0, value), "lteq(ts1, " + expectedMicros + ")", true);
test(builder.greaterThan(0, value), "gt(ts1, " + expectedMicros + ")", true);
test(builder.greaterOrEqual(0, value), "gteq(ts1, " + expectedMicros + ")", true);
}

@Test
Expand Down Expand Up @@ -493,7 +493,7 @@ public void testTimestampMicros() {

@Test
public void testLocalZonedTimestampMillis() {
// precision <= 3 uses milliseconds (INT64)
// precision <= 3 now uses microseconds (MICROS annotation, matching the writer)
int precision = 3;
PredicateBuilder builder =
new PredicateBuilder(
Expand All @@ -505,14 +505,14 @@ public void testLocalZonedTimestampMillis() {
new LocalZonedTimestampType(precision)))));

Timestamp value = Timestamp.fromEpochMillis(1704067200000L);
long expectedMillis = value.getMillisecond();
long expectedMicros = value.toMicros();

test(builder.isNull(0), "eq(ts1, null)", true);
test(builder.isNotNull(0), "noteq(ts1, null)", true);
test(builder.equal(0, value), "eq(ts1, " + expectedMillis + ")", true);
test(builder.notEqual(0, value), "noteq(ts1, " + expectedMillis + ")", true);
test(builder.lessThan(0, value), "lt(ts1, " + expectedMillis + ")", true);
test(builder.greaterThan(0, value), "gt(ts1, " + expectedMillis + ")", true);
test(builder.equal(0, value), "eq(ts1, " + expectedMicros + ")", true);
test(builder.notEqual(0, value), "noteq(ts1, " + expectedMicros + ")", true);
test(builder.lessThan(0, value), "lt(ts1, " + expectedMicros + ")", true);
test(builder.greaterThan(0, value), "gt(ts1, " + expectedMicros + ")", true);
}

@Test
Expand Down Expand Up @@ -555,22 +555,22 @@ public void testInFilterTimestampMillis() {
test(
builder.in(0, Arrays.asList(v1, v2, v3)),
"or(or(eq(ts1, "
+ v1.getMillisecond()
+ v1.toMicros()
+ "), eq(ts1, "
+ v2.getMillisecond()
+ v2.toMicros()
+ ")), eq(ts1, "
+ v3.getMillisecond()
+ v3.toMicros()
+ "))",
true);

test(
builder.notIn(0, Arrays.asList(v1, v2, v3)),
"and(and(noteq(ts1, "
+ v1.getMillisecond()
+ v1.toMicros()
+ "), noteq(ts1, "
+ v2.getMillisecond()
+ v2.toMicros()
+ ")), noteq(ts1, "
+ v3.getMillisecond()
+ v3.toMicros()
+ "))",
true);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,25 @@
import org.apache.paimon.types.ArrayType;
import org.apache.paimon.types.DataField;
import org.apache.paimon.types.DataTypes;
import org.apache.paimon.types.LocalZonedTimestampType;
import org.apache.paimon.types.MapType;
import org.apache.paimon.types.RowType;
import org.apache.paimon.types.TimestampType;

import org.apache.parquet.schema.LogicalTypeAnnotation;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;
import org.apache.parquet.schema.Types;
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;

import java.util.Arrays;

import static org.apache.paimon.format.parquet.ParquetSchemaConverter.convertToPaimonRowType;
import static org.apache.paimon.format.parquet.ParquetSchemaConverter.convertToParquetMessageType;
import static org.apache.paimon.format.parquet.ParquetSchemaConverter.createTimestampWithLogicalType;
import static org.apache.paimon.types.DataTypesTest.assertThat;
import static org.apache.parquet.schema.LogicalTypeAnnotation.TimeUnit.MICROS;
import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64;

/** Test for {@link ParquetSchemaConverter}. */
Expand Down Expand Up @@ -139,6 +144,115 @@ public void testParquetTimestampNanosSchemaConvert() {
public void testPaimonParquetSchemaConvert() {
MessageType messageType = convertToParquetMessageType(ALL_TYPES);
RowType rowType = convertToPaimonRowType(messageType);
assertThat(ALL_TYPES).isEqualTo(rowType);
// TIMESTAMP(n<=3) is written with a MICROS annotation (for Iceberg v2 compatibility) and
// therefore reads back as TIMESTAMP(6). All other types round-trip exactly.
RowType expected =
new RowType(
Arrays.asList(
new DataField(0, "string", DataTypes.STRING()),
new DataField(1, "stringNotNull", DataTypes.STRING().notNull()),
new DataField(2, "boolean", DataTypes.BOOLEAN()),
new DataField(3, "bytes", DataTypes.BYTES()),
new DataField(4, "decimal(9,2)", DataTypes.DECIMAL(9, 2)),
new DataField(5, "decimal(18,2)", DataTypes.DECIMAL(18, 2)),
new DataField(6, "decimal(27,2)", DataTypes.DECIMAL(27, 2)),
new DataField(7, "tinyint", DataTypes.TINYINT()),
new DataField(8, "smallint", DataTypes.SMALLINT()),
new DataField(9, "int", DataTypes.INT()),
new DataField(10, "bigint", DataTypes.BIGINT()),
new DataField(11, "float", DataTypes.FLOAT()),
new DataField(12, "double", DataTypes.DOUBLE()),
new DataField(13, "date", DataTypes.DATE()),
new DataField(14, "time", DataTypes.TIME()),
new DataField(15, "timestamp(3)", new TimestampType(6)),
new DataField(16, "timestamp", DataTypes.TIMESTAMP()),
new DataField(
17, "timestampLtz(3)", new LocalZonedTimestampType(6)),
new DataField(
18,
"timestampLtz",
DataTypes.TIMESTAMP_WITH_LOCAL_TIME_ZONE()),
new DataField(19, "array", new ArrayType(DataTypes.STRING())),
new DataField(
20,
"map",
new MapType(DataTypes.STRING(), DataTypes.STRING())),
new DataField(
21,
"row",
new RowType(
Arrays.asList(
new DataField(
22,
"f1",
DataTypes.INT().notNull()),
new DataField(
23, "f2", DataTypes.STRING())))),
new DataField(
24,
"nested",
new RowType(
Arrays.asList(
new DataField(
25,
"f1",
new MapType(
DataTypes.STRING(),
new ArrayType(
DataTypes
.STRING()))),
new DataField(
26,
"f2",
new RowType(
Arrays.asList(
new DataField(
27,
"f1",
DataTypes
.INT()
.notNull()),
new DataField(
28,
"f2",
DataTypes
.STRING())))
.notNull()))))));
assertThat(expected).isEqualTo(rowType);
}

@Test
public void testLowPrecisionTimestampUseMicrosAnnotation() {
// TIMESTAMP(n<=3) must emit a MICROS Parquet annotation, not MILLIS, so that Iceberg v2
// readers (e.g. Athena, Trino) can interpret the column as "timestamp"/"timestamptz".
// The Iceberg v2 spec only allows INT64 MICROS for those logical types; MILLIS is
// Iceberg v3 only (https://iceberg.apache.org/spec/#parquet).
for (int precision = 0; precision <= 3; precision++) {
Type tsType =
createTimestampWithLogicalType(
"ts", precision, Type.Repetition.OPTIONAL, false);
Type tsLtzType =
createTimestampWithLogicalType(
"ts_ltz", precision, Type.Repetition.OPTIONAL, true);

LogicalTypeAnnotation.TimestampLogicalTypeAnnotation tsAnnotation =
(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation)
tsType.getLogicalTypeAnnotation();
LogicalTypeAnnotation.TimestampLogicalTypeAnnotation tsLtzAnnotation =
(LogicalTypeAnnotation.TimestampLogicalTypeAnnotation)
tsLtzType.getLogicalTypeAnnotation();

Assertions.assertThat(tsAnnotation.getUnit())
.as("TIMESTAMP(%d) should use MICROS annotation", precision)
.isEqualTo(MICROS);
Assertions.assertThat(tsLtzAnnotation.getUnit())
.as(
"TIMESTAMP_WITH_LOCAL_TIME_ZONE(%d) should use MICROS annotation",
precision)
.isEqualTo(MICROS);
Assertions.assertThat(tsLtzAnnotation.isAdjustedToUTC())
.as("TIMESTAMP_WITH_LOCAL_TIME_ZONE(%d) should be UTC-adjusted", precision)
.isTrue();
}
}
}