From 98d8e1c9ac51e2c07d57c53a3352e8bd0463d20d Mon Sep 17 00:00:00 2001 From: Mo Ghazal <18381746+q8webmaster@users.noreply.github.com> Date: Fri, 12 Jun 2026 18:07:34 +0200 Subject: [PATCH 1/4] [cdc] Fix PostgreSQL timestamp(3) incorrectly mapped to BIGINT in PostgresRecordParser --- .../paimon/flink/action/cdc/postgres/PostgresRecordParser.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java index 8c5be3b6a13c..9375c1630ed1 100644 --- a/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java +++ b/paimon-flink/paimon-flink-cdc/src/main/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParser.java @@ -166,6 +166,8 @@ private DataType extractFieldType(DebeziumEvent.Field field) { return DataTypes.TIMESTAMP(6); } else if (MicroTime.SCHEMA_NAME.equals(field.name())) { return DataTypes.TIME(6); + } else if (Timestamp.SCHEMA_NAME.equals(field.name())) { + return DataTypes.TIMESTAMP(3); } return DataTypes.BIGINT(); case "float": From 4bdaed263bef509f7a8c55eaa6a3651362963efe Mon Sep 17 00:00:00 2001 From: Mo Ghazal <18381746+q8webmaster@users.noreply.github.com> Date: Fri, 12 Jun 2026 18:12:35 +0200 Subject: [PATCH 2/4] [cdc] Add unit test for PostgresRecordParser.extractFieldType timestamp mapping --- .../postgres/PostgresRecordParserTest.java | 153 ++++++++++++++++++ 1 file changed, 153 insertions(+) create mode 100644 paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParserTest.java diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParserTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParserTest.java new file mode 100644 index 000000000000..5354ff83dbbb --- /dev/null +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParserTest.java @@ -0,0 +1,153 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.paimon.flink.action.cdc.postgres; + +import org.apache.paimon.flink.action.cdc.CdcSourceRecord; +import org.apache.paimon.flink.action.cdc.TypeMapping; +import org.apache.paimon.flink.sink.cdc.RichCdcMultiplexRecord; +import org.apache.paimon.types.DataField; +import org.apache.paimon.types.DataTypes; + +import org.apache.flink.api.common.functions.util.ListCollector; +import org.apache.flink.configuration.Configuration; +import org.junit.jupiter.api.Test; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +import static org.assertj.core.api.Assertions.assertThat; + +/** Unit tests for {@link PostgresRecordParser} field type extraction. */ +public class PostgresRecordParserTest { + + /** + * Verifies that {@code io.debezium.time.Timestamp} (int64, millisecond precision) is mapped to + * {@code TIMESTAMP(3)}, not {@code BIGINT}. + * + *

PostgreSQL {@code timestamp(n)} columns with {@code n <= 3} are encoded by Debezium using + * the {@code io.debezium.time.Timestamp} logical type (epoch-millis int64). The JDBC schema + * path maps the same columns to {@code TIMESTAMP(n)}, so without this fix the two paths + * disagree and schema evolution crashes with "Cannot convert field from TIMESTAMP(3) to BIGINT". + */ + @Test + public void testTimestampMillisFieldMapsToTimestamp3() throws Exception { + String json = debeziumJson("io.debezium.time.Timestamp"); + List out = parse(json); + + assertThat(out).isNotEmpty(); + DataField field = findField(out.get(0), "ts_col"); + assertThat(field.type()) + .as("io.debezium.time.Timestamp (int64) must map to TIMESTAMP(3), not BIGINT") + .isEqualTo(DataTypes.TIMESTAMP(3).nullable()); + } + + /** Verifies that {@code io.debezium.time.MicroTimestamp} (int64) still maps to TIMESTAMP(6). */ + @Test + public void testMicroTimestampFieldMapsToTimestamp6() throws Exception { + String json = debeziumJson("io.debezium.time.MicroTimestamp"); + List out = parse(json); + + assertThat(out).isNotEmpty(); + DataField field = findField(out.get(0), "ts_col"); + assertThat(field.type()) + .as("io.debezium.time.MicroTimestamp (int64) must map to TIMESTAMP(6)") + .isEqualTo(DataTypes.TIMESTAMP(6).nullable()); + } + + /** + * Verifies that a plain int64 field (no logical type name) still maps to BIGINT — i.e. the fix + * does not break the default fallthrough. + */ + @Test + public void testPlainInt64FieldMapsToBigint() throws Exception { + String json = debeziumJson(null); + List out = parse(json); + + assertThat(out).isNotEmpty(); + DataField field = findField(out.get(0), "ts_col"); + assertThat(field.type()) + .as("int64 with no logical type name must remain BIGINT") + .isEqualTo(DataTypes.BIGINT().nullable()); + } + + // ------------------------------------------------------------------------- + // helpers + // ------------------------------------------------------------------------- + + private List parse(String json) throws Exception { + PostgresRecordParser parser = + new PostgresRecordParser( + new Configuration(), + Collections.emptyList(), + TypeMapping.defaultMapping(), + new org.apache.paimon.flink.action.cdc.CdcMetadataConverter[0]); + List out = new ArrayList<>(); + parser.flatMap(new CdcSourceRecord(json), new ListCollector<>(out)); + return out; + } + + private DataField findField(RichCdcMultiplexRecord record, String fieldName) { + return record.cdcSchema().fields().stream() + .filter(f -> f.name().equals(fieldName)) + .findFirst() + .orElseThrow( + () -> + new AssertionError( + "Field '" + fieldName + "' not found in schema: " + + record.cdcSchema().fields())); + } + + /** + * Builds a minimal Debezium PostgreSQL CDC JSON event containing one int64 column {@code + * ts_col} with the given logical type {@code schemaName} (may be null for a plain int64). + */ + private static String debeziumJson(String schemaName) { + String nameField = schemaName == null ? "" : "\"name\":\"" + schemaName + "\","; + return "{" + + "\"schema\":{" + + " \"type\":\"struct\"," + + " \"fields\":[" + + " {" + + " \"type\":\"struct\"," + + " \"optional\":true," + + " \"field\":\"after\"," + + " \"fields\":[" + + " {\"type\":\"int32\",\"optional\":false,\"field\":\"id\"}," + + " {\"type\":\"int64\",\"optional\":true," + + nameField + + " \"field\":\"ts_col\"}" + + " ]" + + " }," + + " {" + + " \"type\":\"struct\"," + + " \"optional\":false," + + " \"field\":\"source\"," + + " \"fields\":[]" + + " }" + + " ]" + + "}," + + "\"payload\":{" + + " \"op\":\"r\"," + + " \"after\":{\"id\":1,\"ts_col\":1700000000000}," + + " \"source\":{\"db\":\"testdb\",\"table\":\"test_table\"}" + + "}" + + "}"; + } +} From 2d2a89cfe7e2efdf780d1186595b841017d396fc Mon Sep 17 00:00:00 2001 From: Mo Ghazal <18381746+q8webmaster@users.noreply.github.com> Date: Sat, 13 Jun 2026 15:02:53 +0200 Subject: [PATCH 3/4] [cdc] Fix Spotless formatting in PostgresRecordParserTest --- .../action/cdc/postgres/PostgresRecordParserTest.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParserTest.java b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParserTest.java index 5354ff83dbbb..f75fcdf61e84 100644 --- a/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParserTest.java +++ b/paimon-flink/paimon-flink-cdc/src/test/java/org/apache/paimon/flink/action/cdc/postgres/PostgresRecordParserTest.java @@ -44,7 +44,8 @@ public class PostgresRecordParserTest { *

PostgreSQL {@code timestamp(n)} columns with {@code n <= 3} are encoded by Debezium using * the {@code io.debezium.time.Timestamp} logical type (epoch-millis int64). The JDBC schema * path maps the same columns to {@code TIMESTAMP(n)}, so without this fix the two paths - * disagree and schema evolution crashes with "Cannot convert field from TIMESTAMP(3) to BIGINT". + * disagree and schema evolution crashes with "Cannot convert field from TIMESTAMP(3) to + * BIGINT". */ @Test public void testTimestampMillisFieldMapsToTimestamp3() throws Exception { @@ -110,7 +111,9 @@ private DataField findField(RichCdcMultiplexRecord record, String fieldName) { .orElseThrow( () -> new AssertionError( - "Field '" + fieldName + "' not found in schema: " + "Field '" + + fieldName + + "' not found in schema: " + record.cdcSchema().fields())); } From 87e09ec21bcba56bb11e91531fdeedd7fdbc712e Mon Sep 17 00:00:00 2001 From: Q8Webmaster Date: Sat, 13 Jun 2026 17:15:39 +0200 Subject: [PATCH 4/4] ci: retrigger after CI timeout