Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Sets;
import java.io.Serializable;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
Expand Down Expand Up @@ -82,6 +83,10 @@ public enum PostgreSQLVersion {
private static final String NO_PAD_SPACE_RETURN_TYPE = "TEXT";

private final PostgreSQLVersion version;
private final Map<ColumnKey, String> columnCastWrappers =
new java.util.concurrent.ConcurrentHashMap<>();
private final Map<ColumnKey, String> columnParameterCastWrappers =
new java.util.concurrent.ConcurrentHashMap<>();

public PostgreSQLDialectAdapter(PostgreSQLVersion version) {
this.version = version;
Expand Down Expand Up @@ -344,6 +349,9 @@ public ImmutableMap<String, ImmutableList<SourceColumnIndexInfo>> discoverTableI
try (ResultSet resultSet = statement.executeQuery()) {
while (resultSet.next()) {
final String tableName = resultSet.getString("table_name");
final String typeName = resultSet.getString("type_name");
final SourceColumnIndexInfo.IndexType indexType =
indexTypeFrom(resultSet.getString("type_category"), typeName);
SourceColumnIndexInfo.Builder indexBuilder =
SourceColumnIndexInfo.builder()
.setColumnName(resultSet.getString("column_name"))
Expand All @@ -352,16 +360,30 @@ public ImmutableMap<String, ImmutableList<SourceColumnIndexInfo>> discoverTableI
.setIsPrimary(resultSet.getBoolean("is_primary"))
.setCardinality(resultSet.getLong("cardinality"))
.setOrdinalPosition(resultSet.getLong("ordinal_position"))
.setIndexType(indexTypeFrom(resultSet.getString("type_category")));
.setIndexType(indexType);

String collation = resultSet.getString("collation");
if ("uuid".equalsIgnoreCase(typeName)) {
// PostgreSQL UUID type lacks a physical collation, but sorts lexicographically/binary.
// We map it to a virtual "UUID" collation to trigger the static UUID mapper in
// CollationMapper.
collation = "UUID";
}
if (collation != null) {
String charset = resultSet.getString("charset");
String typeName = resultSet.getString("type_name");
if (charset == null) {
charset = "UTF8";
}
Integer typeLength = resultSet.getInt("type_length");
if (resultSet.wasNull()) {
typeLength = null;
}
if (typeLength == null && "uuid".equalsIgnoreCase(typeName)) {
// A standard UUID has 36 characters, but since hyphens are stripped inside
// CollationMapper,
// the serialized representation length is exactly 32 characters.
typeLength = 32;
}
// Collation PAD SPACE is not supported in Postgresql
// (https://www.postgresql.org/docs/current/infoschema-collations.html)
// The only way to have blank space padding is for specific types with fixed length
Expand All @@ -375,6 +397,13 @@ public ImmutableMap<String, ImmutableList<SourceColumnIndexInfo>> discoverTableI
.build());
indexBuilder.setStringMaxLength(typeLength == null ? VARCHAR_MAX_LENGTH : typeLength);
}
if ("uuid".equalsIgnoreCase(typeName)) {
ColumnKey key = new ColumnKey(tableName, resultSet.getString("column_name"));
// PostgreSQL requires explicit casting to TEXT for MIN/MAX boundary queries on UUIDs,
// and requires explicit casting back to UUID when binding parameter placeholders.
columnCastWrappers.put(key, "CAST(%s AS TEXT)");
columnParameterCastWrappers.put(key, "CAST(? AS uuid)");
}
if (builders.containsKey(tableName)) {
builders.get(tableName).add(indexBuilder.build());
}
Expand Down Expand Up @@ -431,7 +460,7 @@ public ImmutableMap<String, ImmutableList<SourceColumnIndexInfo>> discoverTableI
*/
@Override
public String getReadQuery(String tableName, ImmutableList<String> partitionColumns) {
return addWhereClause("SELECT * FROM " + tableName, partitionColumns);
return addWhereClause("SELECT * FROM " + tableName, tableName, partitionColumns);
}

/**
Expand All @@ -449,7 +478,8 @@ public String getReadQuery(String tableName, ImmutableList<String> partitionColu
@Override
public String getCountQuery(
String tableName, ImmutableList<String> partitionColumns, long timeoutMillis) {
return addWhereClause(String.format("SELECT COUNT(*) FROM %s", tableName), partitionColumns);
return addWhereClause(
String.format("SELECT COUNT(*) FROM %s", tableName), tableName, partitionColumns);
}

/**
Expand All @@ -463,8 +493,14 @@ public String getCountQuery(
@Override
public String getBoundaryQuery(
String tableName, ImmutableList<String> partitionColumns, String colName) {
String selectCol = colName;
String wrapper = columnCastWrappers.get(new ColumnKey(tableName, colName));
if (wrapper != null) {
selectCol = String.format(wrapper, colName);
}
return addWhereClause(
String.format("SELECT MIN(%s), MAX(%s) FROM %s", colName, colName, tableName),
String.format("SELECT MIN(%s), MAX(%s) FROM %s", selectCol, selectCol, tableName),
tableName,
partitionColumns);
}

Expand Down Expand Up @@ -506,7 +542,8 @@ public String getCollationsOrderQuery(String dbCharset, String dbCollation, bool
return replaceTagsAndSanitize(query, tags);
}

private String addWhereClause(String query, ImmutableList<String> partitionColumns) {
private String addWhereClause(
String query, String tableName, ImmutableList<String> partitionColumns) {
StringBuilder queryBuilder = new StringBuilder();
queryBuilder.append(query);
if (!partitionColumns.isEmpty()) {
Expand All @@ -517,10 +554,14 @@ private String addWhereClause(String query, ImmutableList<String> partitionColum
// `(exclude col = FALSE) OR (col >= range.start() AND (col < range.end() OR
// (range.isLast() = TRUE AND col = range.end()))`
.map(
partitionColumn ->
String.format(
"((? = FALSE) OR (%1$s >= ? AND (%1$s < ? OR (? = TRUE AND %1$s = ?))))",
partitionColumn))
partitionColumn -> {
String paramPlaceholder =
columnParameterCastWrappers.getOrDefault(
new ColumnKey(tableName, partitionColumn), "?");
return String.format(
"((? = FALSE) OR (%1$s >= %2$s AND (%1$s < %2$s OR (? = TRUE AND %1$s = %2$s))))",
partitionColumn, paramPlaceholder);
})
.collect(Collectors.joining(" AND ")));
}
return queryBuilder.toString();
Expand All @@ -530,14 +571,28 @@ private String addWhereClause(String query, ImmutableList<String> partitionColum
* Ref <a
* href="https://www.postgresql.org/docs/16/catalog-pg-type.html#CATALOG-TYPCATEGORY-TABLE"></a>.
*/
private SourceColumnIndexInfo.IndexType indexTypeFrom(String typeCategory) {
private SourceColumnIndexInfo.IndexType indexTypeFrom(String typeCategory, String typeName) {
switch (typeCategory) {
case "N":
return SourceColumnIndexInfo.IndexType.NUMERIC;
case "D":
return SourceColumnIndexInfo.IndexType.TIME_STAMP;
case "S":
return SourceColumnIndexInfo.IndexType.STRING;
case "U":
return indexTypeForUserDefinedType(typeName);
default:
return SourceColumnIndexInfo.IndexType.OTHER;
}
}

private SourceColumnIndexInfo.IndexType indexTypeForUserDefinedType(String typeName) {
if (typeName == null) {
return SourceColumnIndexInfo.IndexType.OTHER;
}
switch (typeName.toUpperCase()) {
case "UUID":
return SourceColumnIndexInfo.IndexType.STRING;
default:
return SourceColumnIndexInfo.IndexType.OTHER;
}
Expand All @@ -551,4 +606,45 @@ private boolean isBlankPaddedType(String typeName, @Nullable Integer typeLength)
|| upperTypeName.equals("CHAR")
|| upperTypeName.equals("BPCHAR"));
}

/**
* A composite key representing a specific table and column combination. Used to track
* type-specific state across different lifecycle phases (e.g., mapping unorderable types
* discovered during index inspection to custom SQL casting wrappers used during query
* generation). Normalizes identifiers to ensure case-insensitive, quote-stripped matching logic
* is consistent.
*/
private static final class ColumnKey implements Serializable {
private final String tableName;
private final String columnName;

public ColumnKey(String tableName, String columnName) {
this.tableName = clean(tableName);
this.columnName = clean(columnName);
}

private static String clean(String identifier) {
if (identifier == null) {
return "";
}
return identifier.replace("\"", "").toLowerCase();
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof ColumnKey)) {
return false;
}
ColumnKey that = (ColumnKey) o;
return tableName.equals(that.tableName) && columnName.equals(that.columnName);
}

@Override
public int hashCode() {
return java.util.Objects.hash(tableName, columnName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,11 @@ public BigInteger mapString(@Nullable String element, int lengthToPad) {
if (element == null) {
return BigInteger.valueOf(-1);
}
if ("UUID".equalsIgnoreCase(this.collationReference().dbCollation())) {
// For UUID columns, we strip hyphens and normalize to lowercase so that we can
// map the canonical 32 hexadecimal characters directly to BigInteger.
element = element.replace("-", "").toLowerCase();
}
BigInteger ret = BigInteger.ZERO;

// MySQL ignores empty character in string comparisons.
Expand Down Expand Up @@ -204,6 +209,19 @@ public String unMapString(BigInteger element) {
index++;
}
String ret = word.reverse().toString();
if ("UUID".equalsIgnoreCase(this.collationReference().dbCollation())) {
// Convert the unmapped 32-character hex string back to the standard 36-character
// UUID format by padding with leading zeros and re-inserting hyphens at correct offsets.
String hex = org.apache.commons.lang3.StringUtils.leftPad(ret, 32, '0');
ret =
String.format(
"%s-%s-%s-%s-%s",
hex.substring(0, 8),
hex.substring(8, 12),
hex.substring(12, 16),
hex.substring(16, 20),
hex.substring(20, 32));
}
return ret;
}

Expand All @@ -227,6 +245,12 @@ public static CollationMapper fromDB(
UniformSplitterDBAdapter dbAdapter,
CollationReference collationReference)
throws SQLException {
if ("UUID".equalsIgnoreCase(collationReference.dbCollation())) {
// Standard UUID values reside in the hexadecimal character range.
// We use a static, pre-compiled base-16 character mapper to avoid expensive database
// collation queries.
return buildStaticUuidMapper(collationReference);
}
String query =
dbAdapter.getCollationsOrderQuery(
collationReference.dbCharacterSet(),
Expand Down Expand Up @@ -345,4 +369,23 @@ public CollationMapper build() {
return autoBuild();
}
}

private static CollationMapper buildStaticUuidMapper(CollationReference collationReference) {
Builder builder = builder(collationReference);
String hexChars = "0123456789abcdef";
for (int i = 0; i < hexChars.length(); i++) {
char c = hexChars.charAt(i);
builder.addCharacter(
CollationOrderRow.builder()
.setCharsetChar(c)
.setEquivalentChar(c)
.setCodepointRank((long) i)
.setEquivalentCharPadSpace(c)
.setCodepointRankPadSpace((long) i)
.setIsEmpty(false)
.setIsSpace(false)
.build());
}
return builder.build();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import com.google.cloud.teleport.v2.source.reader.io.jdbc.JdbcSchemaReference;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.dialectadapter.ResourceUtils;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.dialectadapter.postgresql.PostgreSQLDialectAdapter.PostgreSQLVersion;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.range.PartitionColumn;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.uniformsplitter.stringmapper.CollationReference;
import com.google.cloud.teleport.v2.source.reader.io.schema.SourceColumnIndexInfo;
import com.google.cloud.teleport.v2.source.reader.io.schema.SourceColumnIndexInfo.IndexType;
Expand Down Expand Up @@ -238,7 +239,7 @@ public void testDiscoverTableIndexes() throws SQLException, RetriableSchemaDisco
when(mockResultSet.getString("collation")).thenReturn(null, "en_US", "en_US", null);
when(mockResultSet.getInt("type_length")).thenReturn(100, 0);
when(mockResultSet.wasNull()).thenReturn(false, true);
when(mockResultSet.getString("type_name")).thenReturn("char", "text");
when(mockResultSet.getString("type_name")).thenReturn("bigint", "char", "text", "timestamp");
when(mockResultSet.getString("charset")).thenReturn("UTF8", "UTF8");

assertThat(adapter.discoverTableIndexes(mockDataSource, sourceSchemaReference, tables))
Expand Down Expand Up @@ -297,6 +298,85 @@ public void testDiscoverTableIndexes() throws SQLException, RetriableSchemaDisco
.build()));
}

@Test
public void testDiscoverTableIndexesWithUuid()
throws SQLException, RetriableSchemaDiscoveryException {
ImmutableList<String> tables = ImmutableList.of("my_schema.table1");

when(mockDataSource.getConnection()).thenReturn(mockConnection);
when(mockConnection.prepareStatement(anyString())).thenReturn(mockPreparedStatement);
when(mockPreparedStatement.executeQuery()).thenReturn(mockResultSet);
when(mockResultSet.next()).thenReturn(true, false);
when(mockResultSet.getString("table_name")).thenReturn("my_schema.table1");
when(mockResultSet.getString("column_name")).thenReturn("col_uuid");
when(mockResultSet.getString("index_name")).thenReturn("table_uuid_idx");
when(mockResultSet.getBoolean("is_unique")).thenReturn(true);
when(mockResultSet.getBoolean("is_primary")).thenReturn(true);
when(mockResultSet.getLong("cardinality")).thenReturn(1L);
when(mockResultSet.getLong("ordinal_position")).thenReturn(1L);
when(mockResultSet.getString("type_category")).thenReturn("U");
when(mockResultSet.getString("type_name")).thenReturn("uuid");
when(mockResultSet.getInt("type_length")).thenReturn(0);
when(mockResultSet.wasNull()).thenReturn(true);

ImmutableMap<String, ImmutableList<SourceColumnIndexInfo>> indexes =
adapter.discoverTableIndexes(mockDataSource, sourceSchemaReference, tables);

// 1. Assert discovered index info matches expected schema mapping with standard C collation
assertThat(indexes)
.containsExactly(
"my_schema.table1",
ImmutableList.of(
SourceColumnIndexInfo.builder()
.setColumnName("col_uuid")
.setIndexName("table_uuid_idx")
.setIsUnique(true)
.setIsPrimary(true)
.setCardinality(1L)
.setOrdinalPosition(1L)
.setIndexType(SourceColumnIndexInfo.IndexType.STRING)
.setCollationReference(
CollationReference.builder()
.setDbCharacterSet("UTF8")
.setDbCollation("UUID")
.setPadSpace(false)
.build())
.setStringMaxLength(32)
.build()));

SourceColumnIndexInfo info = indexes.get("my_schema.table1").get(0);

// 2. Assert that a PartitionColumn can be built successfully from this index info (precondition
// check)
PartitionColumn partitionColumn =
PartitionColumn.builder()
.setColumnName(info.columnName())
.setColumnClass(String.class)
.setStringCollation(info.collationReference())
.setStringMaxLength(info.stringMaxLength())
.build();

assertThat(partitionColumn).isNotNull();
assertThat(partitionColumn.stringCollation()).isEqualTo(info.collationReference());
assertThat(partitionColumn.stringMaxLength()).isEqualTo(32);

// 3. Assert that getBoundaryQuery correctly wraps this discovered UUID column in a CAST
// statement
assertThat(adapter.getBoundaryQuery("my_schema.table1", ImmutableList.of(), "col_uuid"))
.isEqualTo(
"SELECT MIN(CAST(col_uuid AS TEXT)), MAX(CAST(col_uuid AS TEXT)) FROM my_schema.table1");

// 4. Assert that getReadQuery generates the correct query for UUID column
assertThat(adapter.getReadQuery("my_schema.table1", ImmutableList.of("col_uuid")))
.isEqualTo(
"SELECT * FROM my_schema.table1 WHERE ((? = FALSE) OR (col_uuid >= CAST(? AS uuid) AND (col_uuid < CAST(? AS uuid) OR (? = TRUE AND col_uuid = CAST(? AS uuid)))))");

// 5. Assert that getCountQuery generates the correct query for UUID column
assertThat(adapter.getCountQuery("my_schema.table1", ImmutableList.of("col_uuid"), 1000L))
.isEqualTo(
"SELECT COUNT(*) FROM my_schema.table1 WHERE ((? = FALSE) OR (col_uuid >= CAST(? AS uuid) AND (col_uuid < CAST(? AS uuid) OR (? = TRUE AND col_uuid = CAST(? AS uuid)))))");
}

@Test
public void testDiscoverTableIndexesBulk()
throws SQLException, RetriableSchemaDiscoveryException {
Expand Down
Loading
Loading