Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ public class Constants {
/* The value for Postgres databases in the source type key */
public static final String POSTGRES_SOURCE_TYPE = "postgresql";

/* The value for Spanner databases in the source type key */
public static final String SPANNER_SOURCE_TYPE = "spanner";

/* The run mode for retryDLQ */
public static final String RUN_MODE_RETRY_DLQ = "retryDLQ";

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
/*
* Copyright (C) 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.spanner.migrations.shard;

import java.util.Objects;

/**
* Represents a shard targeting a Cloud Spanner database. The {@code projectId} is stored as a
* dedicated field; {@code instanceId} maps to the parent's {@code namespace} field and {@code
* databaseId} maps to the parent's {@code dbName} field.
*/
public class SpannerShard extends Shard {

private final String projectId;

public SpannerShard(
String logicalShardId, String projectId, String instanceId, String databaseId) {
super();
this.projectId = projectId;
setLogicalShardId(logicalShardId);
setNamespace(instanceId);
setDbName(databaseId);
}

public String getProjectId() {
return projectId;
}

public String getInstanceId() {
return getNamespace();
}

public String getDatabaseId() {
return getDbName();
}

@Override
public String toString() {
return String.format(
"SpannerShard{logicalShardId='%s', projectId='%s', instanceId='%s', databaseId='%s'}",
getLogicalShardId(), projectId, getInstanceId(), getDatabaseId());
}

@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof SpannerShard)) {
return false;
}
SpannerShard that = (SpannerShard) o;
return Objects.equals(projectId, that.projectId)
&& Objects.equals(getInstanceId(), that.getInstanceId())
&& Objects.equals(getDatabaseId(), that.getDatabaseId());
}

@Override
public int hashCode() {
return Objects.hash(projectId, getInstanceId(), getDatabaseId());
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright (C) 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.spanner.migrations.utils;

import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard;
import com.google.cloud.teleport.v2.spanner.migrations.shard.SpannerShard;
import com.google.gson.FieldNamingPolicy;
import com.google.gson.GsonBuilder;
import com.google.gson.reflect.TypeToken;
import java.io.IOException;
import java.io.InputStream;
import java.lang.reflect.Type;
import java.nio.channels.Channels;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.io.FileSystems;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Reads a JSON array of Spanner shard configurations from GCS and returns a list of {@link
* SpannerShard} instances.
*
* <p>Each entry in the JSON file must contain the following fields:
*
* <pre>
* [
* {
* "logicalShardId": "shard1",
* "projectId": "my-gcp-project",
* "instanceId": "my-spanner-instance",
* "databaseId": "my-database"
* }
* ]
* </pre>
*/
public class SpannerShardFileReader {

private static final Logger LOG = LoggerFactory.getLogger(SpannerShardFileReader.class);

/**
* Reads Spanner shard configuration from the given GCS file path.
*
* @param shardsFilePath GCS path to the JSON shard config file.
* @return list of {@link SpannerShard} objects, sorted by logicalShardId.
*/
public List<Shard> getSpannerShards(String shardsFilePath) {
try (InputStream stream =
Channels.newInputStream(
FileSystems.open(FileSystems.matchNewResource(shardsFilePath, false)))) {

String result = IOUtils.toString(stream, StandardCharsets.UTF_8);
Type listType = new TypeToken<List<Map<String, String>>>() {}.getType();
List<Map<String, String>> shardConfigs =
new GsonBuilder()
.setFieldNamingPolicy(FieldNamingPolicy.IDENTITY)
.create()
.fromJson(result, listType);

List<Shard> shards = new ArrayList<>();
for (Map<String, String> config : shardConfigs) {
String logicalShardId = config.getOrDefault("logicalShardId", "");
String projectId = config.get("projectId");
String instanceId = config.get("instanceId");
String databaseId = config.get("databaseId");
if (projectId == null || instanceId == null || databaseId == null) {
throw new RuntimeException(
"SpannerShard config at '"
+ shardsFilePath
+ "' is missing one or more required fields: projectId, instanceId, databaseId");
}
shards.add(new SpannerShard(logicalShardId, projectId, instanceId, databaseId));
LOG.info(
"Loaded SpannerShard: logicalShardId={}, project={}, instance={}, database={}",
logicalShardId,
projectId,
instanceId,
databaseId);
}

shards.sort(Comparator.comparing(Shard::getLogicalShardId));
LOG.info("Read {} Spanner shard(s) from {}", shards.size(), shardsFilePath);
return shards;

} catch (IOException e) {
throw new RuntimeException("Failed to read Spanner shard config file: " + shardsFilePath, e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,5 +22,6 @@ public enum SourceDatabaseType {
CASSANDRA,
ORACLE,
SQLSERVER,
SPANNER,
// Add more database types as needed
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
/*
* Copyright (C) 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.spanner.sourceddl;

import com.google.cloud.spanner.BatchClient;
import com.google.cloud.spanner.BatchReadOnlyTransaction;
import com.google.cloud.spanner.TimestampBound;
import com.google.cloud.teleport.v2.spanner.ddl.Column;
import com.google.cloud.teleport.v2.spanner.ddl.Ddl;
import com.google.cloud.teleport.v2.spanner.ddl.IndexColumn;
import com.google.cloud.teleport.v2.spanner.ddl.InformationSchemaScanner;
import com.google.cloud.teleport.v2.spanner.ddl.Table;
import com.google.cloud.teleport.v2.spanner.type.Type;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.apache.beam.sdk.io.gcp.spanner.SpannerAccessor;
import org.apache.beam.sdk.io.gcp.spanner.SpannerConfig;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Scans a Cloud Spanner database's information schema and converts it into a {@link SourceSchema}.
*
* <p>Uses the existing {@link InformationSchemaScanner} to read the target Spanner DDL and then
* maps each {@link Table} and {@link Column} into the {@link SourceTable}/{@link SourceColumn}
* model so that the rest of the reverse-replication pipeline can treat Spanner as just another
* source type.
*/
public class SpannerInformationSchemaScanner implements SourceSchemaScanner {

private static final Logger LOG = LoggerFactory.getLogger(SpannerInformationSchemaScanner.class);

private final SpannerConfig spannerConfig;
private final SourceDatabaseType sourceType = SourceDatabaseType.SPANNER;

public SpannerInformationSchemaScanner(SpannerConfig spannerConfig) {
this.spannerConfig = spannerConfig;
}

@Override
public SourceSchema scan() {
SpannerAccessor accessor = SpannerAccessor.getOrCreate(spannerConfig);
try {
BatchClient batchClient = accessor.getBatchClient();
BatchReadOnlyTransaction txn = batchClient.batchReadOnlyTransaction(TimestampBound.strong());
InformationSchemaScanner scanner = new InformationSchemaScanner(txn);
Ddl ddl = scanner.scan();
LOG.info("Scanned Spanner schema for database '{}'", spannerConfig.getDatabaseId().get());
return convertDdlToSourceSchema(ddl);
} finally {
accessor.close();
}
}

private SourceSchema convertDdlToSourceSchema(Ddl ddl) {
Map<String, SourceTable> tables = new HashMap<>();
for (Table spannerTable : ddl.allTables()) {
SourceTable sourceTable = convertTable(spannerTable);
tables.put(sourceTable.name(), sourceTable);
}
return SourceSchema.builder(sourceType)
.databaseName(spannerConfig.getDatabaseId().get())
.tables(ImmutableMap.copyOf(tables))
.build();
}

private SourceTable convertTable(Table spannerTable) {
List<String> pkColumns = new ArrayList<>();
for (IndexColumn pk : spannerTable.primaryKeys()) {
pkColumns.add(pk.name());
}

List<SourceColumn> columns = new ArrayList<>();
for (Column col : spannerTable.columns()) {
SourceColumn sourceCol =
SourceColumn.builder(sourceType)
.name(col.name())
.type(spannerTypeToString(col.type()))
.isNullable(!col.notNull())
.isPrimaryKey(pkColumns.contains(col.name()))
.isGenerated(col.isGenerated())
.columnOptions(ImmutableList.of())
.build();
columns.add(sourceCol);
}

return SourceTable.builder(sourceType)
.name(spannerTable.name())
.columns(ImmutableList.copyOf(columns))
.primaryKeyColumns(ImmutableList.copyOf(pkColumns))
.foreignKeys(ImmutableList.of())
.indexes(ImmutableList.of())
.build();
}

/**
* Converts a Spanner {@link Type} to a canonical type-name string used in {@link SourceColumn}.
*/
static String spannerTypeToString(Type type) {
switch (type.getCode()) {
case BOOL:
return "BOOL";
case INT64:
return "INT64";
case FLOAT32:
return "FLOAT32";
case FLOAT64:
return "FLOAT64";
case STRING:
return "STRING";
case BYTES:
return "BYTES";
case DATE:
return "DATE";
case TIMESTAMP:
return "TIMESTAMP";
case NUMERIC:
return "NUMERIC";
case JSON:
return "JSON";
case PG_NUMERIC:
return "PG_NUMERIC";
case PG_JSONB:
return "PG_JSONB";
case PG_FLOAT4:
return "PG_FLOAT4";
case PG_FLOAT8:
return "PG_FLOAT8";
case PG_TEXT:
return "PG_TEXT";
case PG_VARCHAR:
return "PG_VARCHAR";
case PG_BOOL:
return "PG_BOOL";
case PG_BYTEA:
return "PG_BYTEA";
case PG_DATE:
return "PG_DATE";
case PG_TIMESTAMPTZ:
return "PG_TIMESTAMPTZ";
case PG_COMMIT_TIMESTAMP:
return "PG_COMMIT_TIMESTAMP";
case PG_INT8:
return "PG_INT8";
case ARRAY:
return "ARRAY<" + spannerTypeToString(type.getArrayElementType()) + ">";
case PG_ARRAY:
return "PG_ARRAY<" + spannerTypeToString(type.getArrayElementType()) + ">";
default:
return type.getCode().name();
}
}
}
Loading
Loading