diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/DataStreamToSpanner5kTablesLT.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/DataStreamToSpanner5kTablesLT.java new file mode 100644 index 0000000000..628df5ab31 --- /dev/null +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/DataStreamToSpanner5kTablesLT.java @@ -0,0 +1,152 @@ +/* + * Copyright (C) 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.templates.loadtesting; + +import com.google.cloud.teleport.metadata.SkipDirectRunnerTest; +import com.google.cloud.teleport.metadata.TemplateLoadTest; +import com.google.cloud.teleport.v2.templates.DataStreamToSpanner; +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import org.apache.beam.it.common.utils.ResourceManagerUtils; +import org.apache.beam.it.gcp.cloudsql.CloudMySQLResourceManager; +import org.apache.beam.it.gcp.datastream.JDBCSource; +import org.apache.beam.it.gcp.datastream.MySQLSource; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Load test for {@link DataStreamToSpanner} template with 5000 tables. */ +@Category({TemplateLoadTest.class, SkipDirectRunnerTest.class}) +@TemplateLoadTest(DataStreamToSpanner.class) +@RunWith(JUnit4.class) +public class DataStreamToSpanner5kTablesLT extends DataStreamToSpannerLTBase { + + private static final Logger LOG = LoggerFactory.getLogger(DataStreamToSpanner5kTablesLT.class); + private static final int NUM_TABLES = 5000; + private CloudMySQLResourceManager jdbcResourceManager; + private Instant startTime; + + @Before + public void setUp() throws IOException { + super.setUp(); + LOG.info("Began Setup for 5K Table test"); + startTime = Instant.now(); + + setUpResourceManagers(null, true); + jdbcResourceManager = CloudMySQLResourceManager.builder(testName).build(); + } + + @After + public void cleanUp() throws IOException { + ResourceManagerUtils.cleanResources(jdbcResourceManager); + super.cleanUp(); + LOG.info( + "CleanupCompleted for 5K Table test. Test took {}", + Duration.between(startTime, Instant.now())); + } + + @Test + public void test5kTablesCDC() throws Exception { + // 1. Create 5k tables in MySQL + LOG.info("Creating master table in MySQL..."); + try (Connection conn = getJdbcConnection(jdbcResourceManager); + Statement stmt = conn.createStatement()) { + stmt.execute("CREATE TABLE table_1 (id BIGINT NOT NULL PRIMARY KEY)"); + } + + LOG.info("Creating remaining {} tables in MySQL...", NUM_TABLES - 1); + try (Connection conn = getJdbcConnection(jdbcResourceManager); + Statement stmt = conn.createStatement()) { + for (int j = 2; j <= NUM_TABLES; j++) { + String mySqlDdl = String.format("CREATE TABLE table_%d LIKE table_1", j); + stmt.addBatch(mySqlDdl); + } + stmt.executeBatch(); + } + + // 2. Create 5k tables in Spanner + LOG.info("Creating 5k tables in Spanner..."); + List spannerDdls = new ArrayList<>(); + for (int i = 1; i <= NUM_TABLES; i++) { + spannerDdls.add( + String.format("CREATE TABLE table_%d (id INT64 NOT NULL) PRIMARY KEY(id)", i)); + } + spannerResourceManager.executeDdlStatements(spannerDdls); + + // 3. Setup Datastream source + List tableNames = new ArrayList<>(); + for (int i = 1; i <= NUM_TABLES; i++) { + tableNames.add("table_" + i); + } + java.util.Map> allowedTables = + java.util.Map.of(jdbcResourceManager.getDatabaseName(), tableNames); + + JDBCSource mySQLSource = + new MySQLSource.Builder( + jdbcResourceManager.getHost(), + jdbcResourceManager.getUsername(), + jdbcResourceManager.getPassword(), + jdbcResourceManager.getPort()) + .setAllowedTables(allowedTables) + .build(); + + HashMap tables = new HashMap<>(); + for (int i = 1; i <= NUM_TABLES; i++) { + tables.put("table_" + i, 1); + } + + // 4. Run Load Test with CDC callback + runLoadTest( + tables, + mySQLSource, + new HashMap<>(), + new HashMap<>(), + () -> { + LOG.info("Inserting 1 row into all {} tables in MySQL (CDC mode)...", NUM_TABLES); + try (Connection conn = getJdbcConnection(jdbcResourceManager); + Statement stmt = conn.createStatement()) { + for (int i = 1; i <= NUM_TABLES; i++) { + String sql = String.format("INSERT INTO table_%d (id) VALUES (%d)", i, i); + stmt.addBatch(sql); + } + stmt.executeBatch(); + } catch (SQLException e) { + throw new RuntimeException("Failed to insert CDC data", e); + } + }); + } + + private static Connection getJdbcConnection(CloudMySQLResourceManager mySQLResourceManager) + throws SQLException { + return DriverManager.getConnection( + mySQLResourceManager.getUri(), + mySQLResourceManager.getUsername(), + mySQLResourceManager.getPassword()); + } +} diff --git a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/DataStreamToSpannerLTBase.java b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/DataStreamToSpannerLTBase.java index 4381ee8110..720cdfa83f 100644 --- a/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/DataStreamToSpannerLTBase.java +++ b/v2/datastream-to-spanner/src/test/java/com/google/cloud/teleport/v2/templates/loadtesting/DataStreamToSpannerLTBase.java @@ -28,6 +28,7 @@ import com.google.cloud.datastream.v1.DestinationConfig; import com.google.cloud.datastream.v1.SourceConfig; import com.google.cloud.datastream.v1.Stream; +import com.google.common.base.MoreObjects; import com.google.common.io.Resources; import com.google.pubsub.v1.SubscriptionName; import com.google.pubsub.v1.TopicName; @@ -35,14 +36,21 @@ import java.nio.charset.StandardCharsets; import java.text.ParseException; import java.time.Duration; +import java.util.ArrayList; import java.util.HashMap; +import java.util.List; import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.Future; +import java.util.function.Supplier; import org.apache.beam.it.common.PipelineLauncher; import org.apache.beam.it.common.PipelineLauncher.LaunchConfig; import org.apache.beam.it.common.PipelineLauncher.LaunchInfo; import org.apache.beam.it.common.PipelineOperator; +import org.apache.beam.it.common.TestProperties; import org.apache.beam.it.common.utils.ResourceManagerUtils; -import org.apache.beam.it.conditions.ConditionCheck; import org.apache.beam.it.gcp.TemplateLoadTestBase; import org.apache.beam.it.gcp.datastream.DatastreamResourceManager; import org.apache.beam.it.gcp.datastream.JDBCSource; @@ -50,18 +58,22 @@ import org.apache.beam.it.gcp.pubsub.PubsubResourceManager; import org.apache.beam.it.gcp.secretmanager.SecretManagerResourceManager; import org.apache.beam.it.gcp.spanner.SpannerResourceManager; -import org.apache.beam.it.gcp.spanner.conditions.SpannerRowsCheck; import org.apache.beam.it.gcp.storage.GcsResourceManager; import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings; import org.junit.After; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Base class for DataStreamToSpanner Load tests. It provides helper functions related to * environment setup and assertConditions. */ public class DataStreamToSpannerLTBase extends TemplateLoadTestBase { + private static final Logger LOG = LoggerFactory.getLogger(DataStreamToSpannerLTBase.class); protected static final String SPEC_PATH = - "gs://dataflow-templates/latest/flex/Cloud_Datastream_to_Spanner"; + MoreObjects.firstNonNull( + TestProperties.specPath(), + "gs://dataflow-templates/latest/flex/Cloud_Datastream_to_Spanner"); protected String testRootDir; protected final int maxWorkers = 100; protected final int numWorkers = 50; @@ -72,6 +84,16 @@ public class DataStreamToSpannerLTBase extends TemplateLoadTestBase { public DatastreamResourceManager datastreamResourceManager; protected SecretManagerResourceManager secretClient; + public static class RowRange { + public final int min; + public final int max; + + public RowRange(int min, int max) { + this.min = min; + this.max = max; + } + } + public void setUpResourceManagers(String spannerDdlResource) throws IOException { setUpResourceManagers(spannerDdlResource, false); } @@ -96,10 +118,14 @@ public void setUpResourceManagers(String spannerDdlResource, boolean separateSha .build(); gcsResourceManager = createSpannerLTGcsResourceManager(); - datastreamResourceManager = + DatastreamResourceManager.Builder datastreamBuilder = DatastreamResourceManager.builder(testName, project, region) - .setCredentialsProvider(CREDENTIALS_PROVIDER) - .build(); + .setCredentialsProvider(CREDENTIALS_PROVIDER); + if (System.getProperty("privateConnectivity") != null) { + datastreamBuilder.setPrivateConnectivity(System.getProperty("privateConnectivity")); + } + + datastreamResourceManager = datastreamBuilder.build(); secretClient = SecretManagerResourceManager.builder(project, CREDENTIALS_PROVIDER).build(); // Initialize shadowTableSpannerResourceManager only if separateShadowTableDb is true @@ -127,6 +153,16 @@ public void runLoadTest( HashMap templateParameters, HashMap environmentOptions) throws IOException, ParseException, InterruptedException { + runLoadTest(tables, mySQLSource, templateParameters, environmentOptions, null); + } + + public void runLoadTest( + HashMap tables, + JDBCSource mySQLSource, + HashMap templateParameters, + HashMap environmentOptions, + Runnable afterLaunchCallback) + throws IOException, ParseException, InterruptedException { // TestClassName/runId/TestMethodName/cdc String gcsPrefix = String.join("/", new String[] {testRootDir, gcsResourceManager.runId(), testName, "cdc"}); @@ -185,6 +221,7 @@ public void runLoadTest( params.putAll(templateParameters); LaunchConfig.Builder options = LaunchConfig.builder(getClass().getSimpleName(), SPEC_PATH); + options.addEnvironment("maxWorkers", maxWorkers).addEnvironment("numWorkers", numWorkers); // Set all environment options @@ -195,20 +232,18 @@ public void runLoadTest( PipelineLauncher.LaunchInfo jobInfo = pipelineLauncher.launch(project, region, options.build()); assertThatPipeline(jobInfo).isRunning(); - ConditionCheck[] checks = new ConditionCheck[tables.size()]; - int iterationCount = 0; - for (Map.Entry entry : tables.entrySet()) { - checks[iterationCount] = - SpannerRowsCheck.builder(spannerResourceManager, entry.getKey()) - .setMinRows(entry.getValue()) - .setMaxRows(entry.getValue()) - .build(); - iterationCount++; + if (afterLaunchCallback != null) { + afterLaunchCallback.run(); } + HashMap tableRanges = new HashMap<>(); + for (Map.Entry entry : tables.entrySet()) { + tableRanges.put(entry.getKey(), new RowRange(entry.getValue(), entry.getValue())); + } + Supplier condition = () -> checkAllTablesRowCounts(tableRanges); PipelineOperator.Result result = pipelineOperator.waitForCondition( - createConfig(jobInfo, Duration.ofHours(4), Duration.ofMinutes(5)), checks); + createConfig(jobInfo, Duration.ofHours(4), Duration.ofMinutes(5)), condition); // Assert Conditions assertThatResult(result).meetsConditions(); @@ -224,6 +259,39 @@ public void runLoadTest( exportMetricsToBigQuery(jobInfo, metrics); } + private boolean checkAllTablesRowCounts(HashMap tables) { + ExecutorService executor = Executors.newFixedThreadPool(20); + try { + List> tasks = new ArrayList<>(); + for (Map.Entry entry : tables.entrySet()) { + tasks.add( + () -> { + try { + long rowCount = spannerResourceManager.getRowCount(entry.getKey()); + RowRange range = entry.getValue(); + return rowCount >= range.min && rowCount <= range.max; + } catch (Exception e) { + return false; + } + }); + } + + List> futures = executor.invokeAll(tasks); + boolean allPassed = true; + for (Future future : futures) { + if (!future.get()) { + allPassed = false; + } + } + return allPassed; + } catch (Exception e) { + LOG.warn("Error checking row count in Spanner", e); + } finally { + executor.shutdown(); + } + return false; + } + public void getResourceManagerMetrics(Map metrics) { pubsubResourceManager.collectMetrics(metrics); spannerResourceManager.collectMetrics(metrics); @@ -293,6 +361,9 @@ public Stream createDatastreamResources( */ public void createSpannerDDL(SpannerResourceManager spannerResourceManager, String resourceName) throws IOException { + if (resourceName == null) { + return; + } String ddl = String.join( " ", Resources.readLines(Resources.getResource(resourceName), StandardCharsets.UTF_8));