diff --git a/.github/workflows/spanner-load-tests.yml b/.github/workflows/spanner-load-tests.yml index bca53402e2..fc3d1f82e1 100644 --- a/.github/workflows/spanner-load-tests.yml +++ b/.github/workflows/spanner-load-tests.yml @@ -32,7 +32,7 @@ permissions: write-all jobs: load_tests: name: Spanner Dataflow Templates Load tests - timeout-minutes: 1440 # 1 day + timeout-minutes: 2100 # 35 hours # Run on any runner that matches all the specified runs-on values. runs-on: [ self-hosted, perf ] steps: @@ -57,6 +57,7 @@ jobs: --lt-export-project="cloud-teleport-testing" \ --lt-export-dataset="performance_tests" \ --lt-export-table="template_performance_metrics" \ + --test="${{ github.event.inputs.specific_test }}" - name: Upload Load Tests Report uses: actions/upload-artifact@v7 if: always() # always run even if the previous step fails diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/CloudSqlShardOrchestrator.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/cloudsql/CloudSqlShardOrchestrator.java similarity index 91% rename from v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/CloudSqlShardOrchestrator.java rename to it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/cloudsql/CloudSqlShardOrchestrator.java index 32e8853e77..6e1704f104 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/CloudSqlShardOrchestrator.java +++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/cloudsql/CloudSqlShardOrchestrator.java @@ -13,7 +13,7 @@ * License for the specific language governing permissions and limitations under * the License. */ -package com.google.cloud.teleport.v2.templates.loadtesting; +package org.apache.beam.it.gcp.cloudsql; import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport; import com.google.api.client.googleapis.json.GoogleJsonResponseException; @@ -27,7 +27,6 @@ import com.google.auth.http.HttpCredentialsAdapter; import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.storage.BlobInfo; -import com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper.config.SQLDialect; import com.google.common.annotations.VisibleForTesting; import java.io.IOException; import java.security.GeneralSecurityException; @@ -39,9 +38,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import org.apache.beam.it.gcp.artifacts.GcsArtifact; -import org.apache.beam.it.gcp.cloudsql.CloudMySQLResourceManager; -import org.apache.beam.it.gcp.cloudsql.CloudPostgresResourceManager; -import org.apache.beam.it.gcp.cloudsql.CloudSqlResourceManager; import org.apache.beam.it.gcp.storage.GcsResourceManager; import org.json.JSONArray; import org.json.JSONObject; @@ -72,7 +68,12 @@ public class CloudSqlShardOrchestrator { public static final String MYSQL_8_0 = "MYSQL_8_0"; public static final String POSTGRES_14 = "POSTGRES_14"; - protected final SQLDialect sqlDialect; + public enum DatabaseType { + MYSQL, + POSTGRESQL + } + + protected final DatabaseType databaseType; protected final String dbVersion; protected final int port; protected final String project; @@ -80,21 +81,22 @@ public class CloudSqlShardOrchestrator { protected final String username; protected final String password; protected final GcsResourceManager gcsResourceManager; - protected final Map managers; + public final Map managers; protected final Map instanceIpMap; - protected Map> requestedShardMap; + public Map> requestedShardMap; protected final SQLAdmin sqlAdmin; /** * Constructs a new orchestrator for the specified database dialect. * - * @param dbType The dialect of the source database (e.g., MYSQL, POSTGRESQL). + * @param dbType The type of the source database (MYSQL or POSTGRESQL). + * @param dbVersion The version of the database. * @param project The GCP project ID. * @param region The GCP region for Cloud SQL instances. * @param gcsResourceManager The GCS resource manager for uploading configuration artifacts. */ public CloudSqlShardOrchestrator( - SQLDialect dbType, + DatabaseType dbType, String dbVersion, String project, String region, @@ -106,7 +108,7 @@ public CloudSqlShardOrchestrator( region, gcsResourceManager, System.getProperty( - "cloudProxyUsername", (dbType == SQLDialect.MYSQL) ? "root" : "postgres"), + "cloudProxyUsername", (dbType == DatabaseType.MYSQL) ? "root" : "postgres"), System.getProperty("cloudProxyPassword", ""), null); } @@ -114,7 +116,8 @@ public CloudSqlShardOrchestrator( /** * Constructs a new orchestrator with explicit credentials. * - * @param sqlDialect The dialect of the source database. + * @param databaseType The type of the source database. + * @param dbVersion The version of the database. * @param project The GCP project ID. * @param region The GCP region. * @param gcsResourceManager The GCS resource manager. @@ -123,7 +126,7 @@ public CloudSqlShardOrchestrator( * @param credentials The GCP credentials to use. */ public CloudSqlShardOrchestrator( - SQLDialect sqlDialect, + DatabaseType databaseType, String dbVersion, String project, String region, @@ -131,8 +134,8 @@ public CloudSqlShardOrchestrator( String username, String password, GoogleCredentials credentials) { - checkVersionCompatibility(sqlDialect, dbVersion); - this.sqlDialect = sqlDialect; + checkVersionCompatibility(databaseType, dbVersion); + this.databaseType = databaseType; this.dbVersion = dbVersion; this.project = project; this.region = region; @@ -158,14 +161,14 @@ public CloudSqlShardOrchestrator( LOG.error("Exception while initializing SQL Admin", e); throw new RuntimeException("Failed to initialize SQLAdmin client", e); } - port = (sqlDialect == SQLDialect.MYSQL) ? 3306 : 5432; + port = (databaseType == DatabaseType.MYSQL) ? 3306 : 5432; } @VisibleForTesting - protected static void checkVersionCompatibility(SQLDialect sqlDialect, String dbVersion) { + protected static void checkVersionCompatibility(DatabaseType databaseType, String dbVersion) { Preconditions.checkArgument( - (sqlDialect == SQLDialect.MYSQL && dbVersion.toLowerCase().startsWith("mysql")) - || (sqlDialect == SQLDialect.POSTGRESQL + (databaseType == DatabaseType.MYSQL && dbVersion.toLowerCase().startsWith("mysql")) + || (databaseType == DatabaseType.POSTGRESQL && dbVersion.toLowerCase().startsWith("postgres"))); } @@ -265,7 +268,7 @@ protected void updateUserPassword(String instanceName) throws IOException, Inter User user = new User().setName(username).setPassword(password); // MySQL requires a '%' host for connections from any IP within the VPC. - if (sqlDialect == SQLDialect.MYSQL) { + if (databaseType == DatabaseType.MYSQL) { user.setHost("%"); } @@ -273,7 +276,7 @@ protected void updateUserPassword(String instanceName) throws IOException, Inter // These are passed as query parameters in the Update request. SQLAdmin.Users.Update request = sqlAdmin.users().update(project, instanceName, user); request.setName(username); - if (sqlDialect == SQLDialect.MYSQL) { + if (databaseType == DatabaseType.MYSQL) { // In MySQL, host is part of the primary key for a user. '%' allows the user to connect from // any VPC IP. request.setHost("%"); @@ -319,7 +322,9 @@ protected String ensureInstanceAndGetIp(String instanceName) protected void createPhysicalInstance(String instanceName) throws IOException, InterruptedException { - String tier = sqlDialect == SQLDialect.MYSQL ? "db-n1-standard-2" : "db-custom-2-7680"; + String defaultTier = + databaseType == DatabaseType.MYSQL ? "db-n1-standard-2" : "db-custom-2-7680"; + String tier = System.getProperty("cloudSqlInstanceTier", defaultTier); DatabaseInstance instance = new DatabaseInstance() .setName(instanceName) @@ -370,19 +375,19 @@ protected void waitForOperation(Operation operation) throws IOException, Interru protected CloudSqlResourceManager createManager(String instanceName) { String ip = instanceIpMap.get(instanceName); - if (sqlDialect == SQLDialect.MYSQL) { + if (databaseType == DatabaseType.MYSQL) { return (CloudSqlResourceManager) CloudMySQLResourceManager.builder(instanceName) .maybeUseStaticInstance(ip, port, username, password) .build(); - } else if (sqlDialect == SQLDialect.POSTGRESQL) { + } else if (databaseType == DatabaseType.POSTGRESQL) { return (CloudSqlResourceManager) CloudPostgresResourceManager.builder(instanceName) .maybeUseStaticInstance(ip, port, username, password) .setDatabaseName("postgres") .build(); } else { - throw new IllegalArgumentException("Unsupported database type: " + sqlDialect); + throw new IllegalArgumentException("Unsupported database type: " + databaseType); } } @@ -400,6 +405,11 @@ protected void createLogicalDatabases() { CloudSqlResourceManager manager = createManager(instanceName); managers.put(instanceName, manager); for (String dbName : dbNames) { + try { + manager.runSQLUpdate("DROP DATABASE IF EXISTS " + dbName); + } catch (Exception e) { + LOG.warn("Failed to drop pre-existing database {} if it existed", dbName, e); + } manager.createDatabase(dbName); } })); diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/ShardOrchestrationException.java b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/cloudsql/ShardOrchestrationException.java similarity index 93% rename from v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/ShardOrchestrationException.java rename to it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/cloudsql/ShardOrchestrationException.java index 8f6f469946..659c9763f5 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/ShardOrchestrationException.java +++ b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/cloudsql/ShardOrchestrationException.java @@ -13,7 +13,7 @@ * License for the specific language governing permissions and limitations under * the License. */ -package com.google.cloud.teleport.v2.templates.loadtesting; +package org.apache.beam.it.gcp.cloudsql; /** Exception thrown when Cloud SQL shard orchestration fails. */ public class ShardOrchestrationException extends RuntimeException { diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/CloudSqlShardOrchestratorTest.java b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/cloudsql/CloudSqlShardOrchestratorTest.java similarity index 89% rename from v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/CloudSqlShardOrchestratorTest.java rename to it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/cloudsql/CloudSqlShardOrchestratorTest.java index 48a40db735..29b348c896 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/CloudSqlShardOrchestratorTest.java +++ b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/cloudsql/CloudSqlShardOrchestratorTest.java @@ -13,11 +13,11 @@ * License for the specific language governing permissions and limitations under * the License. */ -package com.google.cloud.teleport.v2.templates.loadtesting; +package org.apache.beam.it.gcp.cloudsql; -import static com.google.cloud.teleport.v2.templates.loadtesting.CloudSqlShardOrchestrator.MYSQL_8_0; -import static com.google.cloud.teleport.v2.templates.loadtesting.CloudSqlShardOrchestrator.POSTGRES_14; import static com.google.common.truth.Truth.assertThat; +import static org.apache.beam.it.gcp.cloudsql.CloudSqlShardOrchestrator.MYSQL_8_0; +import static org.apache.beam.it.gcp.cloudsql.CloudSqlShardOrchestrator.POSTGRES_14; import static org.junit.Assert.assertThrows; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; @@ -41,7 +41,6 @@ import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.storage.Blob; import com.google.cloud.storage.BlobInfo; -import com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper.config.SQLDialect; import com.google.common.util.concurrent.MoreExecutors; import java.io.IOException; import java.util.Arrays; @@ -50,8 +49,7 @@ import java.util.Map; import java.util.concurrent.ExecutorService; import org.apache.beam.it.gcp.artifacts.GcsArtifact; -import org.apache.beam.it.gcp.cloudsql.CloudMySQLResourceManager; -import org.apache.beam.it.gcp.cloudsql.CloudPostgresResourceManager; +import org.apache.beam.it.gcp.cloudsql.CloudSqlShardOrchestrator.DatabaseType; import org.apache.beam.it.gcp.storage.GcsResourceManager; import org.junit.Before; import org.junit.Test; @@ -88,13 +86,13 @@ public class CloudSqlShardOrchestratorTest { /** Helper subclass to inject mock SQLAdmin and synchronous executor. */ private static class TestCloudSqlShardOrchestrator extends CloudSqlShardOrchestrator { public TestCloudSqlShardOrchestrator( - SQLDialect sqlDialect, + DatabaseType databaseType, String dbVersion, String project, String region, GcsResourceManager gcsResourceManager, SQLAdmin sqlAdmin) { - super(sqlDialect, dbVersion, project, region, gcsResourceManager); + super(databaseType, dbVersion, project, region, gcsResourceManager); // Overwrite the real sqlAdmin created in super constructor try { java.lang.reflect.Field field = @@ -138,7 +136,7 @@ public void setUp() { public void testInitialize_provisionsAndSetsUpCorrectly() throws Exception { TestCloudSqlShardOrchestrator orchestrator = new TestCloudSqlShardOrchestrator( - SQLDialect.MYSQL, MYSQL_8_0, PROJECT_ID, REGION, gcsResourceManager, sqlAdmin); + DatabaseType.MYSQL, MYSQL_8_0, PROJECT_ID, REGION, gcsResourceManager, sqlAdmin); // Mock Stage 1: Physical Provisioning DatabaseInstance instance = @@ -189,7 +187,7 @@ public void testInitialize_provisionsAndSetsUpCorrectly() throws Exception { public void testInitialize_createsInstance_whenMissing() throws Exception { TestCloudSqlShardOrchestrator orchestrator = new TestCloudSqlShardOrchestrator( - SQLDialect.MYSQL, MYSQL_8_0, PROJECT_ID, REGION, gcsResourceManager, sqlAdmin); + DatabaseType.MYSQL, MYSQL_8_0, PROJECT_ID, REGION, gcsResourceManager, sqlAdmin); // Mock 404 for first call, then 200 for refresh when(sqlAdmin.instances().get(PROJECT_ID, INSTANCE_NAME).execute()) @@ -228,7 +226,7 @@ public void testInitialize_createsInstance_whenMissing() throws Exception { public void testCleanup_delegatesToManagers() throws Exception { TestCloudSqlShardOrchestrator orchestrator = new TestCloudSqlShardOrchestrator( - SQLDialect.MYSQL, MYSQL_8_0, PROJECT_ID, REGION, gcsResourceManager, sqlAdmin); + DatabaseType.MYSQL, MYSQL_8_0, PROJECT_ID, REGION, gcsResourceManager, sqlAdmin); // Mock successful initialization to populate managers map when(sqlAdmin.instances().get(PROJECT_ID, INSTANCE_NAME).execute()) @@ -272,7 +270,7 @@ public void testConstructor_withDefaultCredentials() throws Exception { CloudSqlShardOrchestrator orchestrator = new CloudSqlShardOrchestrator( - SQLDialect.MYSQL, MYSQL_8_0, PROJECT_ID, REGION, gcsResourceManager); + DatabaseType.MYSQL, MYSQL_8_0, PROJECT_ID, REGION, gcsResourceManager); assertThat(orchestrator.project).isEqualTo(PROJECT_ID); } @@ -282,7 +280,7 @@ public void testConstructor_withDefaultCredentials() throws Exception { public void testInitialize_provisionsAndSetsUpPostgresCorrectly() throws Exception { TestCloudSqlShardOrchestrator orchestrator = new TestCloudSqlShardOrchestrator( - SQLDialect.POSTGRESQL, POSTGRES_14, PROJECT_ID, REGION, gcsResourceManager, sqlAdmin); + DatabaseType.POSTGRESQL, POSTGRES_14, PROJECT_ID, REGION, gcsResourceManager, sqlAdmin); DatabaseInstance instance = new DatabaseInstance() @@ -318,20 +316,19 @@ public void testInitialize_provisionsAndSetsUpPostgresCorrectly() throws Excepti public void testInitialize_throwsOnFailure() throws Exception { TestCloudSqlShardOrchestrator orchestrator = new TestCloudSqlShardOrchestrator( - SQLDialect.MYSQL, MYSQL_8_0, PROJECT_ID, REGION, gcsResourceManager, sqlAdmin); + DatabaseType.MYSQL, MYSQL_8_0, PROJECT_ID, REGION, gcsResourceManager, sqlAdmin); when(sqlAdmin.instances().get(anyString(), anyString()).execute()) .thenThrow(new IOException("API Error")); - assertThrows( - ShardOrchestrationException.class, () -> orchestrator.initialize(shardMap, "shards.json")); + assertThrows(RuntimeException.class, () -> orchestrator.initialize(shardMap, "shards.json")); } @Test public void testExecuteWithRetries_retriesOn409() throws Exception { TestCloudSqlShardOrchestrator orchestrator = new TestCloudSqlShardOrchestrator( - SQLDialect.MYSQL, MYSQL_8_0, PROJECT_ID, REGION, gcsResourceManager, sqlAdmin); + DatabaseType.MYSQL, MYSQL_8_0, PROJECT_ID, REGION, gcsResourceManager, sqlAdmin); com.google.api.client.googleapis.services.AbstractGoogleClientRequest mockRequest = @@ -355,7 +352,7 @@ public void testExecuteWithRetries_retriesOn409() throws Exception { public void testWaitForOperation_throwsOnOpError() throws Exception { TestCloudSqlShardOrchestrator orchestrator = new TestCloudSqlShardOrchestrator( - SQLDialect.MYSQL, MYSQL_8_0, PROJECT_ID, REGION, gcsResourceManager, sqlAdmin); + DatabaseType.MYSQL, MYSQL_8_0, PROJECT_ID, REGION, gcsResourceManager, sqlAdmin); Operation op = mock(Operation.class, Answers.RETURNS_DEEP_STUBS); when(op.getName()).thenReturn("op-error"); @@ -373,7 +370,7 @@ public void testWaitForOperation_throwsOnOpError() throws Exception { public void testCleanup_handlesDropDatabaseError() throws Exception { TestCloudSqlShardOrchestrator orchestrator = new TestCloudSqlShardOrchestrator( - SQLDialect.MYSQL, MYSQL_8_0, PROJECT_ID, REGION, gcsResourceManager, sqlAdmin); + DatabaseType.MYSQL, MYSQL_8_0, PROJECT_ID, REGION, gcsResourceManager, sqlAdmin); // Mock successful initialization DatabaseInstance instance = @@ -411,13 +408,14 @@ public void testCleanup_handlesDropDatabaseError() throws Exception { public void testVersionCompatability() { assertThrows( IllegalArgumentException.class, - () -> CloudSqlShardOrchestrator.checkVersionCompatibility(SQLDialect.MYSQL, POSTGRES_14)); + () -> CloudSqlShardOrchestrator.checkVersionCompatibility(DatabaseType.MYSQL, POSTGRES_14)); assertThrows( IllegalArgumentException.class, () -> - CloudSqlShardOrchestrator.checkVersionCompatibility(SQLDialect.POSTGRESQL, MYSQL_8_0)); + CloudSqlShardOrchestrator.checkVersionCompatibility( + DatabaseType.POSTGRESQL, MYSQL_8_0)); - CloudSqlShardOrchestrator.checkVersionCompatibility(SQLDialect.MYSQL, MYSQL_8_0); - CloudSqlShardOrchestrator.checkVersionCompatibility(SQLDialect.POSTGRESQL, POSTGRES_14); + CloudSqlShardOrchestrator.checkVersionCompatibility(DatabaseType.MYSQL, MYSQL_8_0); + CloudSqlShardOrchestrator.checkVersionCompatibility(DatabaseType.POSTGRESQL, POSTGRES_14); } } diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/MySQLMultiSharded1024ShardsLT.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/MySQLMultiSharded1024ShardsLT.java index 2119674c6a..cd5681ebc9 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/MySQLMultiSharded1024ShardsLT.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/MySQLMultiSharded1024ShardsLT.java @@ -15,8 +15,8 @@ */ package com.google.cloud.teleport.v2.templates.loadtesting; -import static com.google.cloud.teleport.v2.templates.loadtesting.CloudSqlShardOrchestrator.MYSQL_8_0; import static com.google.common.truth.Truth.assertThat; +import static org.apache.beam.it.gcp.cloudsql.CloudSqlShardOrchestrator.MYSQL_8_0; import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult; import com.google.cloud.spanner.Struct; @@ -45,6 +45,8 @@ import org.apache.beam.it.common.utils.ResourceManagerUtils; import org.apache.beam.it.gcp.artifacts.GcsArtifact; import org.apache.beam.it.gcp.cloudsql.CloudSqlResourceManager; +import org.apache.beam.it.gcp.cloudsql.CloudSqlShardOrchestrator; +import org.apache.beam.it.gcp.cloudsql.CloudSqlShardOrchestrator.DatabaseType; import org.apache.beam.it.gcp.spanner.SpannerResourceManager; import org.json.JSONArray; import org.json.JSONObject; @@ -101,7 +103,7 @@ public void setUp() throws IOException { orchestrator = new CloudSqlShardOrchestrator( - SQLDialect.MYSQL, MYSQL_8_0, project, region, gcsResourceManager); + DatabaseType.MYSQL, MYSQL_8_0, project, region, gcsResourceManager); } @After diff --git a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/PostgreSQLMultiSharded1024ShardsLT.java b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/PostgreSQLMultiSharded1024ShardsLT.java index 85ea94a535..1ade4d59ea 100644 --- a/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/PostgreSQLMultiSharded1024ShardsLT.java +++ b/v2/sourcedb-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/PostgreSQLMultiSharded1024ShardsLT.java @@ -15,8 +15,8 @@ */ package com.google.cloud.teleport.v2.templates.loadtesting; -import static com.google.cloud.teleport.v2.templates.loadtesting.CloudSqlShardOrchestrator.POSTGRES_14; import static com.google.common.truth.Truth.assertThat; +import static org.apache.beam.it.gcp.cloudsql.CloudSqlShardOrchestrator.POSTGRES_14; import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult; import com.google.cloud.spanner.Struct; @@ -45,6 +45,8 @@ import org.apache.beam.it.common.utils.ResourceManagerUtils; import org.apache.beam.it.gcp.artifacts.GcsArtifact; import org.apache.beam.it.gcp.cloudsql.CloudSqlResourceManager; +import org.apache.beam.it.gcp.cloudsql.CloudSqlShardOrchestrator; +import org.apache.beam.it.gcp.cloudsql.CloudSqlShardOrchestrator.DatabaseType; import org.apache.beam.it.gcp.spanner.SpannerResourceManager; import org.json.JSONArray; import org.json.JSONObject; @@ -102,7 +104,7 @@ public void setUp() throws IOException { orchestrator = new CloudSqlShardOrchestrator( - SQLDialect.POSTGRESQL, POSTGRES_14, project, region, gcsResourceManager); + DatabaseType.POSTGRESQL, POSTGRES_14, project, region, gcsResourceManager); } @After diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbLTBase.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbLTBase.java index 7a2c0c6946..3c30c74348 100644 --- a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbLTBase.java +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbLTBase.java @@ -32,6 +32,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Random; import org.apache.beam.it.common.PipelineLauncher; import org.apache.beam.it.common.PipelineLauncher.LaunchConfig; import org.apache.beam.it.common.PipelineLauncher.LaunchInfo; @@ -39,6 +40,7 @@ import org.apache.beam.it.common.utils.IORedirectUtil; import org.apache.beam.it.common.utils.ResourceManagerUtils; import org.apache.beam.it.gcp.TemplateLoadTestBase; +import org.apache.beam.it.gcp.TestConstants; import org.apache.beam.it.gcp.artifacts.utils.ArtifactUtils; import org.apache.beam.it.gcp.pubsub.PubsubResourceManager; import org.apache.beam.it.gcp.spanner.SpannerResourceManager; @@ -152,10 +154,57 @@ public SpannerResourceManager createSpannerDatabase(String spannerDdlResourceFil } public SpannerResourceManager createSpannerMetadataDatabase() throws IOException { - SpannerResourceManager spannerMetadataResourceManager = - SpannerResourceManager.builder("rr-meta-" + testName, project, region) - .maybeUseStaticInstance() - .build(); + String metadataInstanceId = System.getProperty("spannerMetadataInstanceId"); + SpannerResourceManager.Builder builder = + SpannerResourceManager.builder("rr-meta-" + testName, project, region); + + if (metadataInstanceId != null && !metadataInstanceId.isEmpty()) { + builder.setInstanceId(metadataInstanceId).useStaticInstance(); + } else { + builder.maybeUseStaticInstance(); + } + + SpannerResourceManager spannerMetadataResourceManager = builder.build(); + + // Collision Detection and Auto-Avoidance + if (spannerResourceManager != null + && spannerMetadataResourceManager + .getInstanceId() + .equals(spannerResourceManager.getInstanceId())) { + + String spannerInstanceId = + System.getProperty("spannerInstanceId"); // check if it was a user defined instance + boolean isTestProject = + java.util.Objects.equals(project, "cloud-teleport-testing") + || java.util.Objects.equals(project, "span-cloud-teleport-testing"); + boolean shouldPickRandomInstance = + com.google.common.base.Strings.isNullOrEmpty(spannerInstanceId) + || java.util.Objects.equals(spannerInstanceId, "teleport"); + + if (isTestProject && shouldPickRandomInstance) { + List staticInstanceList = new ArrayList<>(TestConstants.SPANNER_TEST_INSTANCES); + // Avoid picking the same instance + staticInstanceList.remove(spannerResourceManager.getInstanceId()); + if (!staticInstanceList.isEmpty()) { + String newMetadataInstanceId = + staticInstanceList.get(new Random().nextInt(staticInstanceList.size())); + LOG.info( + "Spanner collision detected. Re-selecting metadata instance to: {}", + newMetadataInstanceId); + spannerMetadataResourceManager = + SpannerResourceManager.builder("rr-meta-" + testName, project, region) + .setInstanceId(newMetadataInstanceId) + .useStaticInstance() + .build(); + } + } else { + LOG.warn( + "WARNING: Both primary and metadata Spanner resource managers are configured to use the same instance: {}. " + + "To isolate resources, consider specifying '-DspannerInstanceId' and '-DspannerMetadataInstanceId' separately.", + spannerResourceManager.getInstanceId()); + } + } + String dummy = "CREATE TABLE IF NOT EXISTS t1(id INT64 ) primary key(id)"; spannerMetadataResourceManager.executeDdlStatement(dummy); return spannerMetadataResourceManager; diff --git a/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbLargeBacklogLT.java b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbLargeBacklogLT.java new file mode 100644 index 0000000000..163b337629 --- /dev/null +++ b/v2/spanner-to-sourcedb/src/test/java/com/google/cloud/teleport/v2/templates/SpannerToSourceDbLargeBacklogLT.java @@ -0,0 +1,620 @@ +/* + * Copyright (C) 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.templates; + +import static org.apache.beam.it.common.TestProperties.getProperty; +import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline; +import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.google.cloud.spanner.Instance; +import com.google.cloud.spanner.InstanceAdminClient; +import com.google.cloud.spanner.InstanceId; +import com.google.cloud.spanner.InstanceInfo; +import com.google.cloud.spanner.InstanceInfo.InstanceField; +import com.google.cloud.spanner.Spanner; +import com.google.cloud.spanner.SpannerOptions; +import com.google.cloud.teleport.metadata.TemplateLoadTest; +import com.google.cloud.teleport.v2.spanner.migrations.shard.Shard; +import com.google.common.base.MoreObjects; +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; +import java.io.IOException; +import java.text.ParseException; +import java.time.Duration; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.apache.beam.it.common.PipelineLauncher; +import org.apache.beam.it.common.PipelineOperator; +import org.apache.beam.it.common.TestProperties; +import org.apache.beam.it.conditions.ConditionCheck; +import org.apache.beam.it.gcp.cloudsql.CloudSqlResourceManager; +import org.apache.beam.it.gcp.cloudsql.CloudSqlShardOrchestrator; +import org.apache.beam.it.gcp.cloudsql.CloudSqlShardOrchestrator.DatabaseType; +import org.apache.beam.it.gcp.dataflow.ClassicTemplateClient; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Performance / Load test for {@link SpannerToSourceDb} template. + * + *

Objective: Validate if Reverse Replication pipeline can successfully process a massive backlog + * of 1 Billion rows (1 Terabyte of data) generated and imported into Spanner. + */ +@Category(TemplateLoadTest.class) +@TemplateLoadTest(SpannerToSourceDb.class) +@RunWith(JUnit4.class) +public class SpannerToSourceDbLargeBacklogLT extends SpannerToSourceDbLTBase { + + private static final Logger LOG = LoggerFactory.getLogger(SpannerToSourceDbLargeBacklogLT.class); + + private static final String TEMPLATE_SPEC_PATH = + MoreObjects.firstNonNull( + TestProperties.specPath(), "gs://dataflow-templates/latest/flex/Spanner_to_SourceDb"); + + private static final String SPANNER_DDL_RESOURCE = + "SpannerToSourceDbLargeBacklogLT/spanner-schema.sql"; + private static final String SESSION_FILE_RESOURCE = + "SpannerToSourceDbLargeBacklogLT/session.json"; + private static final String TABLE = "MigrationLoadTest"; + + private static final String DEFAULT_PHYSICAL_SHARD_1 = "nokill-high-resources-backlog-shard1"; + private static final String DEFAULT_PHYSICAL_SHARD_2 = "nokill-high-resources-backlog-shard2"; + private static final String DEFAULT_SPANNER_SCALE_NODES = "25"; + private static final String DEFAULT_AVRO_INPUT_DIR = + "gs://nokill-spanner-to-sourcedb-load/data/avro/"; + private static final String DEFAULT_EXPECTED_SPANNER_COUNT = "1000000000"; + private static final String DEFAULT_IMPORT_TIMEOUT_MINUTES = "120"; + private static final String DEFAULT_SPANNER_DOWNSCALE_NODES = "5"; + private static final String DEFAULT_METADATA_SCALE_NODES = "20"; + private static final String DEFAULT_REVERSE_TIMEOUT_MINUTES = "600"; + private static final String DEFAULT_MAX_SHARD_CONNECTIONS = "2000"; + private static final String DEFAULT_NUM_WORKERS = "200"; + private static final String DEFAULT_MAX_WORKERS = "200"; + private static final String DEFAULT_MACHINE_TYPE = "n2-highmem-8"; + private static final String DEFAULT_EXPECTED_SHARD_COUNT = "250000000"; + private static final String DEFAULT_METRIC_THRESHOLD = "1000000000"; + private static final String DEFAULT_VERIFICATION_TIMEOUT_MINUTES = "30"; + + private CloudSqlShardOrchestrator orchestrator; + private CloudSqlResourceManager manager1; + private CloudSqlResourceManager manager2; + private Integer originalSpannerNodeCount = null; + private Integer originalSpannerMetadataNodeCount = null; + + @Before + public void setup() throws IOException { + setupResourceManagers(SPANNER_DDL_RESOURCE, SESSION_FILE_RESOURCE); + + orchestrator = + new CloudSqlShardOrchestrator( + DatabaseType.MYSQL, + CloudSqlShardOrchestrator.MYSQL_8_0, + project, + region, + gcsResourceManager); + + // The CloudSQL setup consists of 2 physical shards with 2 logical shards each + String physicalShard1 = + getProperty("physicalShard1", DEFAULT_PHYSICAL_SHARD_1, TestProperties.Type.PROPERTY); + String physicalShard2 = + getProperty("physicalShard2", DEFAULT_PHYSICAL_SHARD_2, TestProperties.Type.PROPERTY); + + Map> shardMap = new HashMap<>(); + shardMap.put(physicalShard1, List.of("shard0", "shard1")); + shardMap.put(physicalShard2, List.of("shard2", "shard3")); + + // Initialize the physical instances (reusing existing ones) and logical schemas + orchestrator.initialize(shardMap, "orchestrator_shards_bulk.json"); + manager1 = (CloudSqlResourceManager) orchestrator.managers.get(physicalShard1); + manager2 = (CloudSqlResourceManager) orchestrator.managers.get(physicalShard2); + + LOG.info("Creating logical schemas on MySQL shards..."); + createLogicalTableSchema(manager1, "shard0"); + createLogicalTableSchema(manager1, "shard1"); + createLogicalTableSchema(manager2, "shard2"); + createLogicalTableSchema(manager2, "shard3"); + + LOG.info("Generating and uploading sharding configuration to GCS..."); + createAndUploadShardConfigToGcs(); + + // Store original node counts for cleanup + originalSpannerNodeCount = getSpannerNodeCount(spannerResourceManager.getInstanceId()); + originalSpannerMetadataNodeCount = + getSpannerNodeCount(spannerMetadataResourceManager.getInstanceId()); + } + + @After + public void tearDown() { + LOG.info("Cleaning up resources..."); + + // Reset Spanner instance to its original node count if it was modified + if (originalSpannerNodeCount != null && spannerResourceManager != null) { + try { + updateSpannerNodeCount(spannerResourceManager.getInstanceId(), originalSpannerNodeCount); + } catch (Exception e) { + LOG.warn("Failed to reset Spanner node count during teardown: ", e); + } + } + // Reset Spanner Metadata instance to its original node count if it was modified + if (originalSpannerMetadataNodeCount != null && spannerMetadataResourceManager != null) { + try { + updateSpannerNodeCount( + spannerMetadataResourceManager.getInstanceId(), originalSpannerMetadataNodeCount); + } catch (Exception e) { + LOG.warn("Failed to reset Spanner Metadata node count during teardown: ", e); + } + } + + cleanupResourceManagers(); + if (orchestrator != null) { + orchestrator.cleanup(); + } + } + + @Test + public void reverseReplicationBacklogLoadTest() + throws IOException, ParseException, InterruptedException { + + // ------------------------------------------------------------- + // PHASE 1: Import 1 billion rows to Spanner + // ------------------------------------------------------------- + LOG.info("PHASE 1: Import 1 billion rows to Spanner"); + + // Record UTC start timestamp before import begins (to serve as start timestamp to reverse + // replication job) + String startTimestamp = java.time.Instant.now().toString(); + LOG.info("Recorded UTC start timestamp for Reverse Replication Job: {}", startTimestamp); + + // Node count taken from manual test results available in go/reverse-backlog-manual-tests + int scaleNodes = + Integer.parseInt( + getProperty( + "spannerScaleNodes", DEFAULT_SPANNER_SCALE_NODES, TestProperties.Type.PROPERTY)); + updateSpannerNodeCount(spannerResourceManager.getInstanceId(), scaleNodes); + + // Verify scale-up - it is critical that the Spanner instance is scaled up, otherwise the + // pipeline might run out of resources or face bottleneck issues. + int currentNodeCount = getSpannerNodeCount(spannerResourceManager.getInstanceId()); + assertEquals( + "Spanner instance node count mismatch after scale-up", scaleNodes, currentNodeCount); + + // Run Avro Import with complete dataset (1 billion rows) + String avroInputDir = + getProperty("avroInputDir", DEFAULT_AVRO_INPUT_DIR, TestProperties.Type.PROPERTY); + long expectedSpannerCount = + Long.parseLong( + getProperty( + "expectedSpannerCount", + DEFAULT_EXPECTED_SPANNER_COUNT, + TestProperties.Type.PROPERTY)); + int importTimeoutMinutes = + Integer.parseInt( + getProperty( + "importTimeoutMinutes", + DEFAULT_IMPORT_TIMEOUT_MINUTES, + TestProperties.Type.PROPERTY)); + + // Ensure avroInputDir ends with a trailing slash for the classic import template + if (!avroInputDir.endsWith("/")) { + avroInputDir = avroInputDir + "/"; + } + + LOG.info("Avro Input Directory: {}", avroInputDir); + LOG.info("Expected Spanner count: {}", expectedSpannerCount); + + PipelineLauncher.LaunchInfo importJobInfo = launchImportJob(avroInputDir); + assertThatPipeline(importJobInfo).isRunning(); + + PipelineOperator.Result importResult = + pipelineOperator.waitUntilDone( + createConfig(importJobInfo, Duration.ofMinutes(importTimeoutMinutes))); + assertThatResult(importResult).isLaunchFinished(); + + long spannerCount = spannerResourceManager.getRowCount(TABLE); + assertEquals("Spanner database row count mismatch", expectedSpannerCount, spannerCount); + LOG.info("Import Phase successful! Imported {} rows.", spannerCount); + + // ------------------------------------------------------------- + // PHASE 2: Reverse Replication E2E Verification + // ------------------------------------------------------------- + // Downscale main Spanner instance to 5 nodes and upscale metadata Spanner instance to 20 nodes + // (go/reverse-backlog-manual-tests) + int spannerDownscaleNodes = + Integer.parseInt( + getProperty( + "spannerDownscaleNodes", + DEFAULT_SPANNER_DOWNSCALE_NODES, + TestProperties.Type.PROPERTY)); + int metadataScaleNodes = + Integer.parseInt( + getProperty( + "metadataScaleNodes", DEFAULT_METADATA_SCALE_NODES, TestProperties.Type.PROPERTY)); + + LOG.info( + "Downscaling main Spanner instance to {} nodes and upscaling metadata instance to {} nodes before starting replication...", + spannerDownscaleNodes, + metadataScaleNodes); + updateSpannerNodeCount(spannerResourceManager.getInstanceId(), spannerDownscaleNodes); + updateSpannerNodeCount(spannerMetadataResourceManager.getInstanceId(), metadataScaleNodes); + + int reverseTimeoutMinutes = + Integer.parseInt( + getProperty( + "reverseTimeoutMinutes", + DEFAULT_REVERSE_TIMEOUT_MINUTES, + TestProperties.Type.PROPERTY)); + int maxShardConnections = + Integer.parseInt( + getProperty( + "maxShardConnections", + DEFAULT_MAX_SHARD_CONNECTIONS, + TestProperties.Type.PROPERTY)); + int numWorkers = + Integer.parseInt( + getProperty("numWorkers", DEFAULT_NUM_WORKERS, TestProperties.Type.PROPERTY)); + int maxWorkers = + Integer.parseInt( + getProperty("maxWorkers", DEFAULT_MAX_WORKERS, TestProperties.Type.PROPERTY)); + String machineType = + getProperty("machineType", DEFAULT_MACHINE_TYPE, TestProperties.Type.PROPERTY); + + PipelineLauncher.LaunchInfo reverseJobInfo = + launchReverseReplicationJob( + startTimestamp, numWorkers, maxWorkers, machineType, maxShardConnections); + assertThatPipeline(reverseJobInfo).isRunning(); + + // This is a long running test (7-8 hours) and we don't expect the SQL count queries to pass + // the assertion for the first few hours (when the replication backlog is being processed). + // Asserting on direct database counts during early stages would be highly inefficient and put + // unnecessary resource contention on the database shards. Thus, we poll the + // "success_record_count" metric and only start asserting on SQL counts once the metric + // threshold is met. + // Note that the above metric can NOT act as a reliable source of truth for the success of a + // replication pipeline. It is ONLY used as an indicator to start the SQL count verification. + + long expectedShardCount = + Long.parseLong( + getProperty( + "expectedShardCount", DEFAULT_EXPECTED_SHARD_COUNT, TestProperties.Type.PROPERTY)); + long metricThreshold = + Long.parseLong( + getProperty("metricThreshold", DEFAULT_METRIC_THRESHOLD, TestProperties.Type.PROPERTY)); + + long startTimeMillis = System.currentTimeMillis(); + int numShards = 4; + + ConditionCheck successRecordsCheck = + new ConditionCheck() { + @Override + protected String getDescription() { + return String.format( + "Check if Dataflow metric success_record_count reaches %d", metricThreshold); + } + + @Override + protected CheckResult check() { + try { + Double successRecordsCount = + pipelineLauncher.getMetric( + project, region, reverseJobInfo.jobId(), "success_record_count"); + long polledCount = successRecordsCount != null ? successRecordsCount.longValue() : 0; + + LOG.info("--- PIPELINE PROGRESS UPDATE ---"); + LOG.info( + "Time Elapsed: {} minutes / {} minutes", + (System.currentTimeMillis() - startTimeMillis) / 60000, + reverseTimeoutMinutes); + LOG.info( + "Polled success_record_count: {} / Target: {}", polledCount, metricThreshold); + LOG.info("---------------------------------"); + + if (polledCount >= metricThreshold) { + return new CheckResult(true, String.format("Threshold reached: %d", polledCount)); + } + return new CheckResult( + false, String.format("Current progress: %d rows", polledCount)); + } catch (Exception e) { + return new CheckResult(false, "Failed to retrieve job metrics: " + e.getMessage()); + } + } + }; + + PipelineOperator.Result result = + pipelineOperator.waitForCondition( + createConfig( + reverseJobInfo, + Duration.ofMinutes(reverseTimeoutMinutes), // total timeout + Duration.ofMinutes( + 15)), // Poll every 15 minutes. Since the test runs for 7-8 hours, 15-minute + // intervals + // print exactly 4 logs per hour, preventing clutter and API call costs. + successRecordsCheck); + + assertThatResult(result).meetsConditions(); + + // Verify database parity on MySQL shards with a retry loop to handle minor replication + // synchronization lag + LOG.info( + "Replication threshold reached. Verifying logical databases row counts on CloudSQL with a catch-up retry loop..."); + long verificationStartTime = System.currentTimeMillis(); + int verificationTimeoutMinutes = + Integer.parseInt( + getProperty( + "verificationTimeoutMinutes", + DEFAULT_VERIFICATION_TIMEOUT_MINUTES, + TestProperties.Type.PROPERTY)); + long verificationTimeoutMs = + verificationTimeoutMinutes + * 60 + * 1000; // Configurable minutes timeout for final parity catch-up + boolean parityAchieved = false; + long count0 = 0, count1 = 0, count2 = 0, count3 = 0; + + // We execute the logical database row counts using a parallel thread pool instead of running + // them sequentially. At 1-billion row scale, executing sequential SELECT COUNT(*) on 4 massive + // MySQL database shards sequentially takes around 12 minutes per iteration, causing the + // catch-up verification timeout to exhaust after only two/three iterations. Running in parallel + // reduces + // the query round-trip time to the slowest single shard query (~3 minutes), ensuring the loop + // has + // ample opportunities to poll and catch up. + ExecutorService executor = Executors.newFixedThreadPool(4); + try { + while (System.currentTimeMillis() - verificationStartTime < verificationTimeoutMs) { + CompletableFuture f0 = + CompletableFuture.supplyAsync( + () -> getLogicalDatabaseRowCount(manager1, "shard0"), executor); + CompletableFuture f1 = + CompletableFuture.supplyAsync( + () -> getLogicalDatabaseRowCount(manager1, "shard1"), executor); + CompletableFuture f2 = + CompletableFuture.supplyAsync( + () -> getLogicalDatabaseRowCount(manager2, "shard2"), executor); + CompletableFuture f3 = + CompletableFuture.supplyAsync( + () -> getLogicalDatabaseRowCount(manager2, "shard3"), executor); + + // Block and wait for all parallel database count queries to resolve and return their values + // - neccessary for the subsequent if block to compute correctly + count0 = f0.join(); + count1 = f1.join(); + count2 = f2.join(); + count3 = f3.join(); + + LOG.info( + "Polled replicated row counts: shard0={}, shard1={}, shard2={}, shard3={} (Target: {})", + count0, + count1, + count2, + count3, + expectedShardCount); + + if (count0 == expectedShardCount + && count1 == expectedShardCount + && count2 == expectedShardCount + && count3 == expectedShardCount) { + parityAchieved = true; + break; + } + + LOG.info( + "Database counts have not reached exact target parity yet. Retrying in 1 minute..."); + Thread.sleep(60000); // Polling retry interval of 1 minute + } + } finally { + executor.shutdown(); + } + + assertTrue( + String.format( + "Logical database row count mismatch after replication verification timeout. Replicated: shard0=%d, shard1=%d, shard2=%d, shard3=%d (Expected: %d each)", + count0, count1, count2, count3, expectedShardCount), + parityAchieved); + + LOG.info("All sharded replication backlog counts successfully verified. Cancelling job..."); + PipelineOperator.Result cancelResult = + pipelineOperator.cancelJobAndFinish(createConfig(reverseJobInfo, Duration.ofMinutes(20))); + assertThatResult(cancelResult).isLaunchFinished(); + exportMetrics(reverseJobInfo, numShards); + } + + private void createLogicalTableSchema(CloudSqlResourceManager manager, String dbName) { + manager.runSQLUpdate( + "CREATE TABLE IF NOT EXISTS " + + dbName + + "." + + TABLE + + " (" + + "Id VARCHAR(36) NOT NULL," + + "Payload LONGTEXT NOT NULL," + + "PRIMARY KEY (Id)" + + ") ENGINE=InnoDB"); + } + + private void createAndUploadShardConfigToGcs() throws IOException { + JsonArray ja = new JsonArray(); + ja.add(createShardConfig("shard_0", "shard0", manager1)); + ja.add(createShardConfig("shard_1", "shard1", manager1)); + ja.add(createShardConfig("shard_2", "shard2", manager2)); + ja.add(createShardConfig("shard_3", "shard3", manager2)); + + String shardFileContents = ja.toString(); + LOG.info("Shard file contents: {}", shardFileContents); + gcsResourceManager.createArtifact(SOURCE_SHARDS_FILE_NAME, shardFileContents); + } + + private JsonObject createShardConfig( + String logicalShardId, String dbName, CloudSqlResourceManager manager) { + Shard shard = new Shard(); + shard.setLogicalShardId(logicalShardId); + shard.setUser(manager.getUsername()); + shard.setHost(manager.getHost()); + shard.setPassword(manager.getPassword()); + shard.setPort(String.valueOf(manager.getPort())); + shard.setDbName(dbName); + JsonObject jsObj = (JsonObject) new Gson().toJsonTree(shard).getAsJsonObject(); + jsObj.remove("secretManagerUri"); + return jsObj; + } + + private PipelineLauncher.LaunchInfo launchImportJob(String inputDir) throws IOException { + ClassicTemplateClient classicClient = ClassicTemplateClient.builder(CREDENTIALS).build(); + + Map params = new HashMap<>(); + params.put("instanceId", spannerResourceManager.getInstanceId()); + params.put("databaseId", spannerResourceManager.getDatabaseId()); + params.put("inputDir", inputDir); + + PipelineLauncher.LaunchConfig options = + PipelineLauncher.LaunchConfig.builder( + "spanner-avro-import-backlog", + "gs://dataflow-templates/latest/GCS_Avro_to_Cloud_Spanner") + .setParameters(params) + .addEnvironment("numWorkers", 80) + .addEnvironment("maxWorkers", 120) + .addEnvironment("machineType", "n2-highmem-8") + .build(); + + return classicClient.launch(project, region, options); + } + + private PipelineLauncher.LaunchInfo launchReverseReplicationJob( + String startTimestamp, + int numWorkers, + int maxWorkers, + String machineType, + int maxShardConnections) + throws IOException { + + Map params = new HashMap<>(); + params.put("changeStreamName", "MigrationStream"); + params.put("instanceId", spannerResourceManager.getInstanceId()); + params.put("databaseId", spannerResourceManager.getDatabaseId()); + params.put("spannerProjectId", project); + params.put("metadataInstance", spannerMetadataResourceManager.getInstanceId()); + params.put("metadataDatabase", spannerMetadataResourceManager.getDatabaseId()); + params.put("sourceShardsFilePath", getGcsPath(SOURCE_SHARDS_FILE_NAME, gcsResourceManager)); + params.put("deadLetterQueueDirectory", getGcsPath("dlq", gcsResourceManager)); + params.put("startTimestamp", startTimestamp); + params.put("maxShardConnections", String.valueOf(maxShardConnections)); + params.put("sessionFilePath", getGcsPath(SESSION_FILE_NAME, gcsResourceManager)); + params.put("workerMachineType", machineType); + + PipelineLauncher.LaunchConfig.Builder options = + PipelineLauncher.LaunchConfig.builder(getClass().getSimpleName(), TEMPLATE_SPEC_PATH); + options + .addEnvironment("maxWorkers", maxWorkers) + .addEnvironment("numWorkers", numWorkers) + .addEnvironment("machineType", machineType) + .addEnvironment( + "additionalExperiments", java.util.Collections.singletonList("use_runner_v2")); + + options.setParameters(params); + return pipelineLauncher.launch(project, region, options.build()); + } + + private long getLogicalDatabaseRowCount(CloudSqlResourceManager manager, String dbName) { + // We use the InnoDB Optimizer hint 'SET_VAR(innodb_parallel_read_threads=94)' to explicitly + // instruct + // MySQL to leverage parallel read threads for the COUNT(*) operation. On massive shards + // containing + // 250 million rows, this utilizes multiple CPU cores of the high-resource Cloud SQL instance, + // accelerating the full-table scan from over 10 minutes down to 2 minutes. + String query = + "SELECT /*+ SET_VAR(innodb_parallel_read_threads=94) */ COUNT(*) FROM " + + dbName + + "." + + TABLE; + List> result = manager.runSQLQuery(query); + if (result != null && !result.isEmpty()) { + Map row = result.get(0); + for (Object val : row.values()) { + if (val instanceof Number) { + return ((Number) val).longValue(); + } + } + } + return 0; + } + + public void updateSpannerNodeCount(String instanceId, int nodeCount) { + SpannerOptions options = SpannerOptions.newBuilder().setProjectId(project).build(); + try (Spanner spanner = options.getService()) { + InstanceAdminClient instanceAdminClient = spanner.getInstanceAdminClient(); + + InstanceInfo instanceInfo = + InstanceInfo.newBuilder(InstanceId.of(project, instanceId)) + .setNodeCount(nodeCount) + .build(); + + int maxRetries = 3; + long backoffMs = 10000; // 10 seconds initial backoff + for (int attempt = 1; attempt <= maxRetries; attempt++) { + try { + LOG.info( + "Updating Spanner instance {} node count to {}... (Attempt {}/{})", + instanceId, + nodeCount, + attempt, + maxRetries); + instanceAdminClient.updateInstance(instanceInfo, InstanceField.NODE_COUNT).get(); + LOG.info( + "Successfully updated Spanner instance {} node count to {}.", instanceId, nodeCount); + return; + } catch (Exception e) { + if (attempt == maxRetries) { + throw e; + } + LOG.warn( + "Failed to update Spanner instance node count on attempt {}. Retrying in {} ms...", + attempt, + backoffMs, + e); + Thread.sleep(backoffMs); + backoffMs *= 2; // Exponential backoff + } + } + } catch (Exception e) { + LOG.error("Failed to update Spanner instance node count after retries.", e); + throw new RuntimeException("Failed to update Spanner node count", e); + } + } + + public int getSpannerNodeCount(String instanceId) { + SpannerOptions options = SpannerOptions.newBuilder().setProjectId(project).build(); + try (Spanner spanner = options.getService()) { + InstanceAdminClient instanceAdminClient = spanner.getInstanceAdminClient(); + Instance instance = instanceAdminClient.getInstance(instanceId); + return instance.getNodeCount(); + } catch (Exception e) { + LOG.error("Failed to retrieve Spanner instance node count.", e); + throw new RuntimeException("Failed to get Spanner node count", e); + } + } +} diff --git a/v2/spanner-to-sourcedb/src/test/resources/SpannerToSourceDbLargeBacklogLT/session.json b/v2/spanner-to-sourcedb/src/test/resources/SpannerToSourceDbLargeBacklogLT/session.json new file mode 100644 index 0000000000..f8035b77a0 --- /dev/null +++ b/v2/spanner-to-sourcedb/src/test/resources/SpannerToSourceDbLargeBacklogLT/session.json @@ -0,0 +1,313 @@ +{ + "SpSchema": { + "t1": { + "Name": "MigrationLoadTest", + "ColIds": [ + "c2", + "c3", + "c4" + ], + "ShardIdColumn": "c4", + "ColDefs": { + "c2": { + "Name": "Id", + "T": { + "Name": "STRING", + "Len": 36, + "IsArray": false + }, + "NotNull": true, + "Comment": "From: Id varchar(36)", + "Id": "c2", + "AutoGen": { + "Name": "", + "GenerationType": "", + "IdentityOptions": { + "SkipRangeMin": "", + "SkipRangeMax": "", + "StartCounterWith": "" + } + }, + "DefaultValue": { + "IsPresent": false, + "Value": { + "ExpressionId": "", + "Statement": "" + } + }, + "GeneratedColumn": { + "IsPresent": false, + "Value": { + "ExpressionId": "", + "Statement": "" + }, + "Type": "" + }, + "Opts": null + }, + "c3": { + "Name": "Payload", + "T": { + "Name": "STRING", + "Len": 9223372036854775807, + "IsArray": false + }, + "NotNull": true, + "Comment": "From: Payload longtext(4294967295)", + "Id": "c3", + "AutoGen": { + "Name": "", + "GenerationType": "", + "IdentityOptions": { + "SkipRangeMin": "", + "SkipRangeMax": "", + "StartCounterWith": "" + } + }, + "DefaultValue": { + "IsPresent": false, + "Value": { + "ExpressionId": "", + "Statement": "" + } + }, + "GeneratedColumn": { + "IsPresent": false, + "Value": { + "ExpressionId": "", + "Statement": "" + }, + "Type": "" + }, + "Opts": null + }, + "c4": { + "Name": "migration_shard_id", + "T": { + "Name": "STRING", + "Len": 50, + "IsArray": false + }, + "NotNull": false, + "Comment": "", + "Id": "c4", + "AutoGen": { + "Name": "", + "GenerationType": "", + "IdentityOptions": { + "SkipRangeMin": "", + "SkipRangeMax": "", + "StartCounterWith": "" + } + }, + "DefaultValue": { + "IsPresent": false, + "Value": { + "ExpressionId": "", + "Statement": "" + } + }, + "GeneratedColumn": { + "IsPresent": false, + "Value": { + "ExpressionId": "", + "Statement": "" + }, + "Type": "" + }, + "Opts": null + } + }, + "PrimaryKeys": [ + { + "ColId": "c2", + "Desc": false, + "Order": 2 + }, + { + "ColId": "c4", + "Desc": false, + "Order": 1 + } + ], + "ForeignKeys": null, + "Indexes": null, + "ParentTable": { + "Id": "", + "OnDelete": "", + "InterleaveType": "" + }, + "CheckConstraints": null, + "Comment": "Spanner schema for source table MigrationLoadTest", + "Id": "t1" + } + }, + "SyntheticPKeys": {}, + "SrcSchema": { + "t1": { + "Name": "MigrationLoadTest", + "Schema": "shard0", + "ColIds": [ + "c2", + "c3" + ], + "ColDefs": { + "c2": { + "Name": "Id", + "Type": { + "Name": "varchar", + "Mods": [ + 36 + ], + "ArrayBounds": null + }, + "NotNull": true, + "Ignored": { + "Check": false, + "Identity": false, + "Default": false, + "Exclusion": false, + "ForeignKey": false, + "AutoIncrement": false + }, + "Id": "c2", + "AutoGen": { + "Name": "", + "GenerationType": "", + "IdentityOptions": { + "SkipRangeMin": "", + "SkipRangeMax": "", + "StartCounterWith": "" + } + }, + "DefaultValue": { + "IsPresent": false, + "Value": { + "ExpressionId": "", + "Statement": "" + } + }, + "GeneratedColumn": { + "IsPresent": false, + "Value": { + "ExpressionId": "", + "Statement": "" + }, + "Type": "" + } + }, + "c3": { + "Name": "Payload", + "Type": { + "Name": "longtext", + "Mods": [ + 4294967295 + ], + "ArrayBounds": null + }, + "NotNull": true, + "Ignored": { + "Check": false, + "Identity": false, + "Default": false, + "Exclusion": false, + "ForeignKey": false, + "AutoIncrement": false + }, + "Id": "c3", + "AutoGen": { + "Name": "", + "GenerationType": "", + "IdentityOptions": { + "SkipRangeMin": "", + "SkipRangeMax": "", + "StartCounterWith": "" + } + }, + "DefaultValue": { + "IsPresent": false, + "Value": { + "ExpressionId": "", + "Statement": "" + } + }, + "GeneratedColumn": { + "IsPresent": false, + "Value": { + "ExpressionId": "", + "Statement": "" + }, + "Type": "" + } + } + }, + "PrimaryKeys": [ + { + "ColId": "c2", + "Desc": false, + "Order": 1 + } + ], + "ForeignKeys": null, + "CheckConstraints": null, + "Indexes": null, + "Id": "t1" + } + }, + "SchemaIssues": { + "t1": { + "ColumnLevelIssues": { + "c4": [ + 29 + ] + }, + "TableLevelIssues": null + } + }, + "InvalidCheckExp": null, + "ToSpanner": { + "MigrationLoadTest": { + "Name": "MigrationLoadTest", + "Cols": { + "Id": "Id", + "Payload": "Payload" + } + } + }, + "Location": {}, + "TimezoneOffset": "+00:00", + "SpDialect": "google_standard_sql", + "UniquePKey": {}, + "Rules": [ + { + "Id": "r5", + "Name": "r5", + "Type": "add_shard_id_primary_key", + "ObjectType": "", + "AssociatedObjects": "All Tables", + "Enabled": true, + "Data": { + "AddedAtTheStart": true + }, + "AddedOn": null + } + ], + "IsSharded": true, + "SpRegion": "", + "ResourceValidation": false, + "UI": true, + "SpSequences": {}, + "SrcSequences": {}, + "SpProjectId": "span-cloud-ck-testing-external", + "SpInstanceId": "ea-functional-tests", + "Source": "mysql", + "DatabaseOptions": { + "DbName": "", + "DefaultTimezone": "" + }, + "DefaultIdentityOptions": { + "SkipRangeMin": "", + "SkipRangeMax": "", + "StartCounterWith": "" + } +} \ No newline at end of file diff --git a/v2/spanner-to-sourcedb/src/test/resources/SpannerToSourceDbLargeBacklogLT/spanner-schema.sql b/v2/spanner-to-sourcedb/src/test/resources/SpannerToSourceDbLargeBacklogLT/spanner-schema.sql new file mode 100644 index 0000000000..5e36104ff9 --- /dev/null +++ b/v2/spanner-to-sourcedb/src/test/resources/SpannerToSourceDbLargeBacklogLT/spanner-schema.sql @@ -0,0 +1,12 @@ +CREATE TABLE MigrationLoadTest ( + Id STRING(36) NOT NULL, + Payload STRING(MAX) NOT NULL, + migration_shard_id STRING(50), +) PRIMARY KEY (migration_shard_id, Id); + +CREATE CHANGE STREAM MigrationStream +FOR MigrationLoadTest +OPTIONS ( + retention_period = '7d', + value_capture_type = 'OLD_AND_NEW_VALUES' +);