diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java index 8c5be3b6a13c..9375c1630ed1 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java @@ -166,6 +166,8 @@ private DataType extractFieldType(DebeziumEvent.Field field) { return DataTypes.TIMESTAMP(6); } else if (MicroTime.SCHEMA_NAME.equals(field.name())) { return DataTypes.TIME(6); + } else if (Timestamp.SCHEMA_NAME.equals(field.name())) { + return DataTypes.TIMESTAMP(3); } return DataTypes.BIGINT(); case "float": diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParserTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParserTest.java new file mode 100644 index 000000000000..f75fcdf61e84 --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParserTest.java @@ -0,0 +1,156 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.postgres; + +import org.apache.paimon.flink.action.cdc.CdcSourceRecord; +import org.apache.paimon.flink.action.cdc.TypeMapping; +import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; + +import org.apache.flink.api.common.functions.util.ListCollector; +import org.apache.flink.configuration.Configuration; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit tests for {@link PostgresRecordParser} field type extraction. */ +public class PostgresRecordParserTest { + + /** + * Verifies that {@code io.debezium.time.Timestamp} (int64, millisecond precision) is mapped to + * {@code TIMESTAMP(3)}, not {@code BIGINT}. + * + *
PostgreSQL {@code timestamp(n)} columns with {@code n <= 3} are encoded by Debezium using
+ * the {@code io.debezium.time.Timestamp} logical type (epoch-millis int64). The JDBC schema
+ * path maps the same columns to {@code TIMESTAMP(n)}, so without this fix the two paths
+ * disagree and schema evolution crashes with "Cannot convert field from TIMESTAMP(3) to
+ * BIGINT".
+ */
+ @Test
+ public void testTimestampMillisFieldMapsToTimestamp3() throws Exception {
+ String json = debeziumJson("io.debezium.time.Timestamp");
+ List