From 2929e216e90f625215d403e6b813cea9ee465dae Mon Sep 17 00:00:00 2001 From: aasthabharill Date: Tue, 12 May 2026 10:41:43 +0530 Subject: [PATCH 1/6] initial changes --- .../postgresql/PostgreSQLDialectAdapter.java | 11 ++++-- .../PostgreSQLDialectAdapterTest.java | 35 +++++++++++++++++++ 2 files changed, 43 insertions(+), 3 deletions(-) diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapter.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapter.java index 6f23a63bf9..3bdddb4f70 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapter.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapter.java @@ -344,6 +344,7 @@ public ImmutableMap> discoverTableI try (ResultSet resultSet = statement.executeQuery()) { while (resultSet.next()) { final String tableName = resultSet.getString("table_name"); + final String typeName = resultSet.getString("type_name"); SourceColumnIndexInfo.Builder indexBuilder = SourceColumnIndexInfo.builder() .setColumnName(resultSet.getString("column_name")) @@ -352,12 +353,11 @@ public ImmutableMap> discoverTableI .setIsPrimary(resultSet.getBoolean("is_primary")) .setCardinality(resultSet.getLong("cardinality")) .setOrdinalPosition(resultSet.getLong("ordinal_position")) - .setIndexType(indexTypeFrom(resultSet.getString("type_category"))); + .setIndexType(indexTypeFrom(resultSet.getString("type_category"), typeName)); String collation = resultSet.getString("collation"); if (collation != null) { String charset = resultSet.getString("charset"); - String typeName = resultSet.getString("type_name"); Integer typeLength = resultSet.getInt("type_length"); if (resultSet.wasNull()) { typeLength = null; @@ -374,6 +374,8 @@ public ImmutableMap> discoverTableI .setPadSpace(shouldPadSpace) .build()); indexBuilder.setStringMaxLength(typeLength == null ? VARCHAR_MAX_LENGTH : typeLength); + } else if ("uuid".equalsIgnoreCase(typeName)) { + indexBuilder.setStringMaxLength(36); } if (builders.containsKey(tableName)) { builders.get(tableName).add(indexBuilder.build()); @@ -530,7 +532,10 @@ private String addWhereClause(String query, ImmutableList partitionColum * Ref . */ - private SourceColumnIndexInfo.IndexType indexTypeFrom(String typeCategory) { + private SourceColumnIndexInfo.IndexType indexTypeFrom(String typeCategory, String typeName) { + if ("uuid".equalsIgnoreCase(typeName)) { + return SourceColumnIndexInfo.IndexType.STRING; + } switch (typeCategory) { case "N": return SourceColumnIndexInfo.IndexType.NUMERIC; diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapterTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapterTest.java index db1318e066..b100defb83 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapterTest.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapterTest.java @@ -297,6 +297,41 @@ public void testDiscoverTableIndexes() throws SQLException, RetriableSchemaDisco .build())); } + @Test + public void testDiscoverTableIndexesWithUuid() + throws SQLException, RetriableSchemaDiscoveryException { + ImmutableList tables = ImmutableList.of("my_schema.table1"); + + when(mockDataSource.getConnection()).thenReturn(mockConnection); + when(mockConnection.prepareStatement(anyString())).thenReturn(mockPreparedStatement); + when(mockPreparedStatement.executeQuery()).thenReturn(mockResultSet); + when(mockResultSet.next()).thenReturn(true, false); + when(mockResultSet.getString("table_name")).thenReturn("my_schema.table1"); + when(mockResultSet.getString("column_name")).thenReturn("col_uuid"); + when(mockResultSet.getString("index_name")).thenReturn("table_uuid_idx"); + when(mockResultSet.getBoolean("is_unique")).thenReturn(true); + when(mockResultSet.getBoolean("is_primary")).thenReturn(true); + when(mockResultSet.getLong("cardinality")).thenReturn(1L); + when(mockResultSet.getLong("ordinal_position")).thenReturn(1L); + when(mockResultSet.getString("type_category")).thenReturn("U"); + when(mockResultSet.getString("type_name")).thenReturn("uuid"); + + assertThat(adapter.discoverTableIndexes(mockDataSource, sourceSchemaReference, tables)) + .containsExactly( + "my_schema.table1", + ImmutableList.of( + SourceColumnIndexInfo.builder() + .setColumnName("col_uuid") + .setIndexName("table_uuid_idx") + .setIsUnique(true) + .setIsPrimary(true) + .setCardinality(1L) + .setOrdinalPosition(1L) + .setIndexType(SourceColumnIndexInfo.IndexType.STRING) + .setStringMaxLength(36) + .build())); + } + @Test public void testDiscoverTableIndexesBulk() throws SQLException, RetriableSchemaDiscoveryException { From dfa515f5fe82ffc17b7e8ee4a8d53e5eaa656b84 Mon Sep 17 00:00:00 2001 From: aasthabharill Date: Tue, 12 May 2026 10:41:43 +0530 Subject: [PATCH 2/6] initial changes --- .../dialectadapter/postgresql/PostgreSQLDialectAdapterTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapterTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapterTest.java index b100defb83..d0440a65a4 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapterTest.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapterTest.java @@ -238,7 +238,7 @@ public void testDiscoverTableIndexes() throws SQLException, RetriableSchemaDisco when(mockResultSet.getString("collation")).thenReturn(null, "en_US", "en_US", null); when(mockResultSet.getInt("type_length")).thenReturn(100, 0); when(mockResultSet.wasNull()).thenReturn(false, true); - when(mockResultSet.getString("type_name")).thenReturn("char", "text"); + when(mockResultSet.getString("type_name")).thenReturn("bigint", "char", "text", "timestamp"); when(mockResultSet.getString("charset")).thenReturn("UTF8", "UTF8"); assertThat(adapter.discoverTableIndexes(mockDataSource, sourceSchemaReference, tables)) From 8d62dc25710c87374ce53551bc736e92488b43c2 Mon Sep 17 00:00:00 2001 From: aasthabharill Date: Tue, 12 May 2026 15:28:29 +0530 Subject: [PATCH 3/6] review changes --- .../postgresql/PostgreSQLDialectAdapter.java | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapter.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapter.java index 3bdddb4f70..422eb19503 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapter.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapter.java @@ -533,9 +533,6 @@ private String addWhereClause(String query, ImmutableList partitionColum * href="https://www.postgresql.org/docs/16/catalog-pg-type.html#CATALOG-TYPCATEGORY-TABLE">. */ private SourceColumnIndexInfo.IndexType indexTypeFrom(String typeCategory, String typeName) { - if ("uuid".equalsIgnoreCase(typeName)) { - return SourceColumnIndexInfo.IndexType.STRING; - } switch (typeCategory) { case "N": return SourceColumnIndexInfo.IndexType.NUMERIC; @@ -543,6 +540,20 @@ private SourceColumnIndexInfo.IndexType indexTypeFrom(String typeCategory, Strin return SourceColumnIndexInfo.IndexType.TIME_STAMP; case "S": return SourceColumnIndexInfo.IndexType.STRING; + case "U": + return indexTypeForUserDefinedType(typeName); + default: + return SourceColumnIndexInfo.IndexType.OTHER; + } + } + + private SourceColumnIndexInfo.IndexType indexTypeForUserDefinedType(String typeName) { + if (typeName == null) { + return SourceColumnIndexInfo.IndexType.OTHER; + } + switch (typeName.toUpperCase()) { + case "UUID": + return SourceColumnIndexInfo.IndexType.STRING; default: return SourceColumnIndexInfo.IndexType.OTHER; } From dbe97994bb46b6b25986f5c104b38c6ee3c4e954 Mon Sep 17 00:00:00 2001 From: aasthabharill Date: Tue, 12 May 2026 15:43:08 +0530 Subject: [PATCH 4/6] add IT --- .../PostgreSQLWithUniformizationIT.java | 21 +++++++++++++++---- .../DataTypesIT/postgresql-few-data-types.sql | 5 +++++ ...stgresql-spanner-schema-few-data-types.sql | 1 + 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/PostgreSQLWithUniformizationIT.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/PostgreSQLWithUniformizationIT.java index 0e5567074b..0a0b2bb40e 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/PostgreSQLWithUniformizationIT.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/PostgreSQLWithUniformizationIT.java @@ -108,16 +108,29 @@ public void withUniformizationTest() throws Exception { private Map>> getExpectedData() { HashMap>> result = new HashMap<>(); - result.put("bigint", createRows("-9223372036854775808", "9223372036854775807", "42", "NULL")); - result.put("bigserial", createRows("-9223372036854775808", "9223372036854775807", "42")); + result.put( + "bigint", + createRows("bigint", "-9223372036854775808", "9223372036854775807", "42", "NULL")); + result.put( + "bigserial", createRows("bigserial", "-9223372036854775808", "9223372036854775807", "42")); + result.put( + "uuid_pk", + createRows( + "uuid_pk", + "a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11", + "b0eebc99-9c0b-4ef8-bb6d-6bb9bd380a22")); return result; } - private List> createRows(Object... values) { + private List> createRows(String colPrefix, Object... values) { List> rows = new ArrayList<>(); for (int i = 0; i < values.length; i++) { Map row = new HashMap<>(); - row.put("id", i + 1); + if (colPrefix.toLowerCase().contains("_pk")) { + row.put("id", values[i]); + } else { + row.put("id", i + 1); + } row.put("col", values[i]); rows.add(row); } diff --git a/v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/postgresql-few-data-types.sql b/v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/postgresql-few-data-types.sql index f01e5a74eb..5e4b4db537 100644 --- a/v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/postgresql-few-data-types.sql +++ b/v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/postgresql-few-data-types.sql @@ -5,3 +5,8 @@ CREATE TABLE t_bigserial (id serial primary key, col bigserial); INSERT INTO t_bigint (col) VALUES (-9223372036854775808), (9223372036854775807), (42), (NULL); INSERT INTO t_bigserial (col) VALUES (-9223372036854775808), (9223372036854775807), (42); + +CREATE TABLE t_uuid_pk (id uuid PRIMARY KEY, col uuid NOT NULL); +INSERT INTO t_uuid_pk (id, col) VALUES +('a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::uuid, 'a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11'::uuid), +('b0eebc99-9c0b-4ef8-bb6d-6bb9bd380a22'::uuid, 'b0eebc99-9c0b-4ef8-bb6d-6bb9bd380a22'::uuid); diff --git a/v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/postgresql-spanner-schema-few-data-types.sql b/v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/postgresql-spanner-schema-few-data-types.sql index b9083ec5b6..ee2fcba7f9 100644 --- a/v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/postgresql-spanner-schema-few-data-types.sql +++ b/v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/postgresql-spanner-schema-few-data-types.sql @@ -1,2 +1,3 @@ CREATE TABLE IF NOT EXISTS t_bigint (id INT64, col INT64) PRIMARY KEY (id); CREATE TABLE IF NOT EXISTS t_bigserial (id INT64, col INT64) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_uuid_pk (id STRING(MAX), col STRING(MAX)) PRIMARY KEY (id); From 41dea52d634e9d48dcf11f0bf7d1ce138a4bbd0b Mon Sep 17 00:00:00 2001 From: aasthabharill Date: Tue, 12 May 2026 16:21:29 +0530 Subject: [PATCH 5/6] fix IT --- .../postgresql/PostgreSQLDialectAdapter.java | 15 ++++++++++++--- .../postgresql/PostgreSQLDialectAdapterTest.java | 8 ++++++++ 2 files changed, 20 insertions(+), 3 deletions(-) diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapter.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapter.java index 422eb19503..53671033d2 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapter.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapter.java @@ -345,6 +345,8 @@ public ImmutableMap> discoverTableI while (resultSet.next()) { final String tableName = resultSet.getString("table_name"); final String typeName = resultSet.getString("type_name"); + final SourceColumnIndexInfo.IndexType indexType = + indexTypeFrom(resultSet.getString("type_category"), typeName); SourceColumnIndexInfo.Builder indexBuilder = SourceColumnIndexInfo.builder() .setColumnName(resultSet.getString("column_name")) @@ -353,15 +355,24 @@ public ImmutableMap> discoverTableI .setIsPrimary(resultSet.getBoolean("is_primary")) .setCardinality(resultSet.getLong("cardinality")) .setOrdinalPosition(resultSet.getLong("ordinal_position")) - .setIndexType(indexTypeFrom(resultSet.getString("type_category"), typeName)); + .setIndexType(indexType); String collation = resultSet.getString("collation"); + if (collation == null && indexType == SourceColumnIndexInfo.IndexType.STRING) { + collation = "C"; + } if (collation != null) { String charset = resultSet.getString("charset"); + if (charset == null) { + charset = "UTF8"; + } Integer typeLength = resultSet.getInt("type_length"); if (resultSet.wasNull()) { typeLength = null; } + if (typeLength == null && "uuid".equalsIgnoreCase(typeName)) { + typeLength = 36; + } // Collation PAD SPACE is not supported in Postgresql // (https://www.postgresql.org/docs/current/infoschema-collations.html) // The only way to have blank space padding is for specific types with fixed length @@ -374,8 +385,6 @@ public ImmutableMap> discoverTableI .setPadSpace(shouldPadSpace) .build()); indexBuilder.setStringMaxLength(typeLength == null ? VARCHAR_MAX_LENGTH : typeLength); - } else if ("uuid".equalsIgnoreCase(typeName)) { - indexBuilder.setStringMaxLength(36); } if (builders.containsKey(tableName)) { builders.get(tableName).add(indexBuilder.build()); diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapterTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapterTest.java index d0440a65a4..88148df26d 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapterTest.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapterTest.java @@ -315,6 +315,8 @@ public void testDiscoverTableIndexesWithUuid() when(mockResultSet.getLong("ordinal_position")).thenReturn(1L); when(mockResultSet.getString("type_category")).thenReturn("U"); when(mockResultSet.getString("type_name")).thenReturn("uuid"); + when(mockResultSet.getInt("type_length")).thenReturn(0); + when(mockResultSet.wasNull()).thenReturn(true); assertThat(adapter.discoverTableIndexes(mockDataSource, sourceSchemaReference, tables)) .containsExactly( @@ -328,6 +330,12 @@ public void testDiscoverTableIndexesWithUuid() .setCardinality(1L) .setOrdinalPosition(1L) .setIndexType(SourceColumnIndexInfo.IndexType.STRING) + .setCollationReference( + CollationReference.builder() + .setDbCharacterSet("UTF8") + .setDbCollation("C") + .setPadSpace(false) + .build()) .setStringMaxLength(36) .build())); } From 4fd97faab3f3a7ad1eb7c9c98dc1190dd529c2e5 Mon Sep 17 00:00:00 2001 From: aasthabharill Date: Tue, 12 May 2026 16:21:29 +0530 Subject: [PATCH 6/6] fix IT --- .../postgresql/PostgreSQLDialectAdapter.java | 93 ++++++++++++++++--- .../stringmapper/CollationMapper.java | 43 +++++++++ .../PostgreSQLDialectAdapterTest.java | 43 ++++++++- .../stringmapper/CollationMapperTest.java | 26 ++++++ ...stgresql-spanner-schema-few-data-types.sql | 2 +- 5 files changed, 192 insertions(+), 15 deletions(-) diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapter.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapter.java index 53671033d2..d0efb65293 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapter.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapter.java @@ -34,6 +34,7 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import com.google.common.collect.Sets; +import java.io.Serializable; import java.sql.Connection; import java.sql.PreparedStatement; import java.sql.ResultSet; @@ -82,6 +83,10 @@ public enum PostgreSQLVersion { private static final String NO_PAD_SPACE_RETURN_TYPE = "TEXT"; private final PostgreSQLVersion version; + private final Map columnCastWrappers = + new java.util.concurrent.ConcurrentHashMap<>(); + private final Map columnParameterCastWrappers = + new java.util.concurrent.ConcurrentHashMap<>(); public PostgreSQLDialectAdapter(PostgreSQLVersion version) { this.version = version; @@ -358,8 +363,11 @@ public ImmutableMap> discoverTableI .setIndexType(indexType); String collation = resultSet.getString("collation"); - if (collation == null && indexType == SourceColumnIndexInfo.IndexType.STRING) { - collation = "C"; + if ("uuid".equalsIgnoreCase(typeName)) { + // PostgreSQL UUID type lacks a physical collation, but sorts lexicographically/binary. + // We map it to a virtual "UUID" collation to trigger the static UUID mapper in + // CollationMapper. + collation = "UUID"; } if (collation != null) { String charset = resultSet.getString("charset"); @@ -371,7 +379,10 @@ public ImmutableMap> discoverTableI typeLength = null; } if (typeLength == null && "uuid".equalsIgnoreCase(typeName)) { - typeLength = 36; + // A standard UUID has 36 characters, but since hyphens are stripped inside + // CollationMapper, + // the serialized representation length is exactly 32 characters. + typeLength = 32; } // Collation PAD SPACE is not supported in Postgresql // (https://www.postgresql.org/docs/current/infoschema-collations.html) @@ -386,6 +397,13 @@ public ImmutableMap> discoverTableI .build()); indexBuilder.setStringMaxLength(typeLength == null ? VARCHAR_MAX_LENGTH : typeLength); } + if ("uuid".equalsIgnoreCase(typeName)) { + ColumnKey key = new ColumnKey(tableName, resultSet.getString("column_name")); + // PostgreSQL requires explicit casting to TEXT for MIN/MAX boundary queries on UUIDs, + // and requires explicit casting back to UUID when binding parameter placeholders. + columnCastWrappers.put(key, "CAST(%s AS TEXT)"); + columnParameterCastWrappers.put(key, "CAST(? AS uuid)"); + } if (builders.containsKey(tableName)) { builders.get(tableName).add(indexBuilder.build()); } @@ -442,7 +460,7 @@ public ImmutableMap> discoverTableI */ @Override public String getReadQuery(String tableName, ImmutableList partitionColumns) { - return addWhereClause("SELECT * FROM " + tableName, partitionColumns); + return addWhereClause("SELECT * FROM " + tableName, tableName, partitionColumns); } /** @@ -460,7 +478,8 @@ public String getReadQuery(String tableName, ImmutableList partitionColu @Override public String getCountQuery( String tableName, ImmutableList partitionColumns, long timeoutMillis) { - return addWhereClause(String.format("SELECT COUNT(*) FROM %s", tableName), partitionColumns); + return addWhereClause( + String.format("SELECT COUNT(*) FROM %s", tableName), tableName, partitionColumns); } /** @@ -474,8 +493,14 @@ public String getCountQuery( @Override public String getBoundaryQuery( String tableName, ImmutableList partitionColumns, String colName) { + String selectCol = colName; + String wrapper = columnCastWrappers.get(new ColumnKey(tableName, colName)); + if (wrapper != null) { + selectCol = String.format(wrapper, colName); + } return addWhereClause( - String.format("SELECT MIN(%s), MAX(%s) FROM %s", colName, colName, tableName), + String.format("SELECT MIN(%s), MAX(%s) FROM %s", selectCol, selectCol, tableName), + tableName, partitionColumns); } @@ -517,7 +542,8 @@ public String getCollationsOrderQuery(String dbCharset, String dbCollation, bool return replaceTagsAndSanitize(query, tags); } - private String addWhereClause(String query, ImmutableList partitionColumns) { + private String addWhereClause( + String query, String tableName, ImmutableList partitionColumns) { StringBuilder queryBuilder = new StringBuilder(); queryBuilder.append(query); if (!partitionColumns.isEmpty()) { @@ -528,10 +554,14 @@ private String addWhereClause(String query, ImmutableList partitionColum // `(exclude col = FALSE) OR (col >= range.start() AND (col < range.end() OR // (range.isLast() = TRUE AND col = range.end()))` .map( - partitionColumn -> - String.format( - "((? = FALSE) OR (%1$s >= ? AND (%1$s < ? OR (? = TRUE AND %1$s = ?))))", - partitionColumn)) + partitionColumn -> { + String paramPlaceholder = + columnParameterCastWrappers.getOrDefault( + new ColumnKey(tableName, partitionColumn), "?"); + return String.format( + "((? = FALSE) OR (%1$s >= %2$s AND (%1$s < %2$s OR (? = TRUE AND %1$s = %2$s))))", + partitionColumn, paramPlaceholder); + }) .collect(Collectors.joining(" AND "))); } return queryBuilder.toString(); @@ -576,4 +606,45 @@ private boolean isBlankPaddedType(String typeName, @Nullable Integer typeLength) || upperTypeName.equals("CHAR") || upperTypeName.equals("BPCHAR")); } + + /** + * A composite key representing a specific table and column combination. Used to track + * type-specific state across different lifecycle phases (e.g., mapping unorderable types + * discovered during index inspection to custom SQL casting wrappers used during query + * generation). Normalizes identifiers to ensure case-insensitive, quote-stripped matching logic + * is consistent. + */ + private static final class ColumnKey implements Serializable { + private final String tableName; + private final String columnName; + + public ColumnKey(String tableName, String columnName) { + this.tableName = clean(tableName); + this.columnName = clean(columnName); + } + + private static String clean(String identifier) { + if (identifier == null) { + return ""; + } + return identifier.replace("\"", "").toLowerCase(); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (!(o instanceof ColumnKey)) { + return false; + } + ColumnKey that = (ColumnKey) o; + return tableName.equals(that.tableName) && columnName.equals(that.columnName); + } + + @Override + public int hashCode() { + return java.util.Objects.hash(tableName, columnName); + } + } } diff --git a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/stringmapper/CollationMapper.java b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/stringmapper/CollationMapper.java index ff0214987b..593126046c 100644 --- a/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/stringmapper/CollationMapper.java +++ b/v2/sourcedb-to-spanner/src/main/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/stringmapper/CollationMapper.java @@ -131,6 +131,11 @@ public BigInteger mapString(@Nullable String element, int lengthToPad) { if (element == null) { return BigInteger.valueOf(-1); } + if ("UUID".equalsIgnoreCase(this.collationReference().dbCollation())) { + // For UUID columns, we strip hyphens and normalize to lowercase so that we can + // map the canonical 32 hexadecimal characters directly to BigInteger. + element = element.replace("-", "").toLowerCase(); + } BigInteger ret = BigInteger.ZERO; // MySQL ignores empty character in string comparisons. @@ -204,6 +209,19 @@ public String unMapString(BigInteger element) { index++; } String ret = word.reverse().toString(); + if ("UUID".equalsIgnoreCase(this.collationReference().dbCollation())) { + // Convert the unmapped 32-character hex string back to the standard 36-character + // UUID format by padding with leading zeros and re-inserting hyphens at correct offsets. + String hex = org.apache.commons.lang3.StringUtils.leftPad(ret, 32, '0'); + ret = + String.format( + "%s-%s-%s-%s-%s", + hex.substring(0, 8), + hex.substring(8, 12), + hex.substring(12, 16), + hex.substring(16, 20), + hex.substring(20, 32)); + } return ret; } @@ -227,6 +245,12 @@ public static CollationMapper fromDB( UniformSplitterDBAdapter dbAdapter, CollationReference collationReference) throws SQLException { + if ("UUID".equalsIgnoreCase(collationReference.dbCollation())) { + // Standard UUID values reside in the hexadecimal character range. + // We use a static, pre-compiled base-16 character mapper to avoid expensive database + // collation queries. + return buildStaticUuidMapper(collationReference); + } String query = dbAdapter.getCollationsOrderQuery( collationReference.dbCharacterSet(), @@ -345,4 +369,23 @@ public CollationMapper build() { return autoBuild(); } } + + private static CollationMapper buildStaticUuidMapper(CollationReference collationReference) { + Builder builder = builder(collationReference); + String hexChars = "0123456789abcdef"; + for (int i = 0; i < hexChars.length(); i++) { + char c = hexChars.charAt(i); + builder.addCharacter( + CollationOrderRow.builder() + .setCharsetChar(c) + .setEquivalentChar(c) + .setCodepointRank((long) i) + .setEquivalentCharPadSpace(c) + .setCodepointRankPadSpace((long) i) + .setIsEmpty(false) + .setIsSpace(false) + .build()); + } + return builder.build(); + } } diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapterTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapterTest.java index 88148df26d..9c492c9875 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapterTest.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/dialectadapter/postgresql/PostgreSQLDialectAdapterTest.java @@ -25,6 +25,7 @@ import com.google.cloud.teleport.v2.source.reader.io.jdbc.JdbcSchemaReference; import com.google.cloud.teleport.v2.source.reader.io.jdbc.dialectadapter.ResourceUtils; import com.google.cloud.teleport.v2.source.reader.io.jdbc.dialectadapter.postgresql.PostgreSQLDialectAdapter.PostgreSQLVersion; +import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.range.PartitionColumn; import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper.CollationReference; import com.google.cloud.teleport.v2.source.reader.io.schema.SourceColumnIndexInfo; import com.google.cloud.teleport.v2.source.reader.io.schema.SourceColumnIndexInfo.IndexType; @@ -318,7 +319,11 @@ public void testDiscoverTableIndexesWithUuid() when(mockResultSet.getInt("type_length")).thenReturn(0); when(mockResultSet.wasNull()).thenReturn(true); - assertThat(adapter.discoverTableIndexes(mockDataSource, sourceSchemaReference, tables)) + ImmutableMap> indexes = + adapter.discoverTableIndexes(mockDataSource, sourceSchemaReference, tables); + + // 1. Assert discovered index info matches expected schema mapping with standard C collation + assertThat(indexes) .containsExactly( "my_schema.table1", ImmutableList.of( @@ -333,11 +338,43 @@ public void testDiscoverTableIndexesWithUuid() .setCollationReference( CollationReference.builder() .setDbCharacterSet("UTF8") - .setDbCollation("C") + .setDbCollation("UUID") .setPadSpace(false) .build()) - .setStringMaxLength(36) + .setStringMaxLength(32) .build())); + + SourceColumnIndexInfo info = indexes.get("my_schema.table1").get(0); + + // 2. Assert that a PartitionColumn can be built successfully from this index info (precondition + // check) + PartitionColumn partitionColumn = + PartitionColumn.builder() + .setColumnName(info.columnName()) + .setColumnClass(String.class) + .setStringCollation(info.collationReference()) + .setStringMaxLength(info.stringMaxLength()) + .build(); + + assertThat(partitionColumn).isNotNull(); + assertThat(partitionColumn.stringCollation()).isEqualTo(info.collationReference()); + assertThat(partitionColumn.stringMaxLength()).isEqualTo(32); + + // 3. Assert that getBoundaryQuery correctly wraps this discovered UUID column in a CAST + // statement + assertThat(adapter.getBoundaryQuery("my_schema.table1", ImmutableList.of(), "col_uuid")) + .isEqualTo( + "SELECT MIN(CAST(col_uuid AS TEXT)), MAX(CAST(col_uuid AS TEXT)) FROM my_schema.table1"); + + // 4. Assert that getReadQuery generates the correct query for UUID column + assertThat(adapter.getReadQuery("my_schema.table1", ImmutableList.of("col_uuid"))) + .isEqualTo( + "SELECT * FROM my_schema.table1 WHERE ((? = FALSE) OR (col_uuid >= CAST(? AS uuid) AND (col_uuid < CAST(? AS uuid) OR (? = TRUE AND col_uuid = CAST(? AS uuid)))))"); + + // 5. Assert that getCountQuery generates the correct query for UUID column + assertThat(adapter.getCountQuery("my_schema.table1", ImmutableList.of("col_uuid"), 1000L)) + .isEqualTo( + "SELECT COUNT(*) FROM my_schema.table1 WHERE ((? = FALSE) OR (col_uuid >= CAST(? AS uuid) AND (col_uuid < CAST(? AS uuid) OR (? = TRUE AND col_uuid = CAST(? AS uuid)))))"); } @Test diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/stringmapper/CollationMapperTest.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/stringmapper/CollationMapperTest.java index afd9c9d22d..0f28c08b47 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/stringmapper/CollationMapperTest.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/source/reader/io/jdbc/uniformsplitter/stringmapper/CollationMapperTest.java @@ -29,6 +29,8 @@ import com.google.cloud.teleport.v2.source.reader.io.jdbc.dialectadapter.mysql.MysqlDialectAdapter; import com.google.cloud.teleport.v2.source.reader.io.jdbc.dialectadapter.mysql.MysqlDialectAdapter.MySqlVersion; +import com.google.cloud.teleport.v2.source.reader.io.jdbc.dialectadapter.postgresql.PostgreSQLDialectAdapter; +import com.google.cloud.teleport.v2.source.reader.io.jdbc.dialectadapter.postgresql.PostgreSQLDialectAdapter.PostgreSQLVersion; import java.io.IOException; import java.math.BigInteger; import java.sql.Connection; @@ -474,4 +476,28 @@ public void testUtf8Mb4UnicodeCi() throws SQLException, IOException { // Check that trailing spaces are ignored. assertThat(collationMapper.mapString("a", 1).equals(collationMapper.mapString("a ", 1))); } + + @Test + public void testUuidCollationMapper() throws SQLException { + CollationReference uuidCollation = + CollationReference.builder() + .setDbCharacterSet("UTF8") + .setDbCollation("UUID") + .setPadSpace(false) + .build(); + CollationMapper collationMapper = + CollationMapper.fromDB( + mockConnection, new PostgreSQLDialectAdapter(PostgreSQLVersion.DEFAULT), uuidCollation); + + assertThat(collationMapper.collationReference().dbCollation()).isEqualTo("UUID"); + + // Map a UUID string + String uuidStr = "a0eebc99-9c0b-4ef8-bb6d-6bb9bd380a11"; + BigInteger mapped = collationMapper.mapString(uuidStr, 32); + assertThat(mapped).isNotNull(); + + // Unmap back + String unmapped = collationMapper.unMapString(mapped); + assertThat(unmapped).isEqualTo(uuidStr.toLowerCase()); + } } diff --git a/v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/postgresql-spanner-schema-few-data-types.sql b/v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/postgresql-spanner-schema-few-data-types.sql index ee2fcba7f9..e261b61efb 100644 --- a/v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/postgresql-spanner-schema-few-data-types.sql +++ b/v2/sourcedb-to-spanner/src/test/resources/DataTypesIT/postgresql-spanner-schema-few-data-types.sql @@ -1,3 +1,3 @@ CREATE TABLE IF NOT EXISTS t_bigint (id INT64, col INT64) PRIMARY KEY (id); CREATE TABLE IF NOT EXISTS t_bigserial (id INT64, col INT64) PRIMARY KEY (id); -CREATE TABLE IF NOT EXISTS t_uuid_pk (id STRING(MAX), col STRING(MAX)) PRIMARY KEY (id); +CREATE TABLE IF NOT EXISTS t_uuid_pk (id STRING(MAX), col STRING(MAX)) PRIMARY KEY (id); \ No newline at end of file