Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
/*
* Copyright (C) 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.loadtesting;

import com.google.cloud.teleport.metadata.SkipDirectRunnerTest;
import com.google.cloud.teleport.metadata.TemplateLoadTest;
import com.google.cloud.teleport.v2.templates.DataStreamToSpanner;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.gcp.cloudsql.CloudMySQLResourceManager;
import org.apache.beam.it.gcp.datastream.JDBCSource;
import org.apache.beam.it.gcp.datastream.MySQLSource;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** Load test for {@link DataStreamToSpanner} template with 5000 tables. */
@Category({TemplateLoadTest.class, SkipDirectRunnerTest.class})
@TemplateLoadTest(DataStreamToSpanner.class)
@RunWith(JUnit4.class)
public class DataStreamToSpanner5kTablesLT extends DataStreamToSpannerLTBase {

private static final Logger LOG = LoggerFactory.getLogger(DataStreamToSpanner5kTablesLT.class);
private static final int NUM_TABLES = 5000;
private CloudMySQLResourceManager jdbcResourceManager;
private Instant startTime;

@Before
public void setUp() throws IOException {
super.setUp();
LOG.info("Began Setup for 5K Table test");
startTime = Instant.now();

setUpResourceManagers(null, true);
jdbcResourceManager = CloudMySQLResourceManager.builder(testName).build();
}

@After
public void cleanUp() throws IOException {
ResourceManagerUtils.cleanResources(jdbcResourceManager);
super.cleanUp();
LOG.info(
"CleanupCompleted for 5K Table test. Test took {}",
Duration.between(startTime, Instant.now()));
}

@Test
public void test5kTablesCDC() throws Exception {
// 1. Create 5k tables in MySQL
LOG.info("Creating master table in MySQL...");
try (Connection conn = getJdbcConnection(jdbcResourceManager);
Statement stmt = conn.createStatement()) {
stmt.execute("CREATE TABLE table_1 (id BIGINT NOT NULL PRIMARY KEY)");
}

LOG.info("Creating remaining {} tables in MySQL...", NUM_TABLES - 1);
try (Connection conn = getJdbcConnection(jdbcResourceManager);
Statement stmt = conn.createStatement()) {
for (int j = 2; j <= NUM_TABLES; j++) {
String mySqlDdl = String.format("CREATE TABLE table_%d LIKE table_1", j);
stmt.addBatch(mySqlDdl);
}
stmt.executeBatch();
}
Comment on lines +86 to +91
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Executing a batch of 5,000 CREATE TABLE statements in MySQL might exceed the max_allowed_packet size or lead to long-running transactions that could time out. It is recommended to partition the batch into smaller chunks (e.g., 500 statements per batch).

      for (int j = 2; j <= NUM_TABLES; j++) {
        String mySqlDdl = String.format("CREATE TABLE table_%d LIKE table_1", j);
        stmt.addBatch(mySqlDdl);
        if (j % 500 == 0) {
          stmt.executeBatch();
        }
      }
      stmt.executeBatch();


// 2. Create 5k tables in Spanner
LOG.info("Creating 5k tables in Spanner...");
List<String> spannerDdls = new ArrayList<>();
for (int i = 1; i <= NUM_TABLES; i++) {
spannerDdls.add(
String.format("CREATE TABLE table_%d (id INT64 NOT NULL) PRIMARY KEY(id)", i));
}
spannerResourceManager.executeDdlStatements(spannerDdls);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

Executing 5,000 DDL statements in a single call to executeDdlStatements is likely to exceed Cloud Spanner's limits for a single request or cause timeouts. It is recommended to batch DDL statements into smaller groups (e.g., 100 statements per call).

Suggested change
spannerResourceManager.executeDdlStatements(spannerDdls);
for (int i = 0; i < spannerDdls.size(); i += 100) {
spannerResourceManager.executeDdlStatements(
spannerDdls.subList(i, Math.min(i + 100, spannerDdls.size())));
}


// 3. Setup Datastream source
List<String> tableNames = new ArrayList<>();
for (int i = 1; i <= NUM_TABLES; i++) {
tableNames.add("table_" + i);
}
java.util.Map<String, List<String>> allowedTables =
java.util.Map.of(jdbcResourceManager.getDatabaseName(), tableNames);

JDBCSource mySQLSource =
new MySQLSource.Builder(
jdbcResourceManager.getHost(),
jdbcResourceManager.getUsername(),
jdbcResourceManager.getPassword(),
jdbcResourceManager.getPort())
.setAllowedTables(allowedTables)
.build();

HashMap<String, Integer> tables = new HashMap<>();
for (int i = 1; i <= NUM_TABLES; i++) {
tables.put("table_" + i, 1);
}

// 4. Run Load Test with CDC callback
runLoadTest(
tables,
mySQLSource,
new HashMap<>(),
new HashMap<>(),
() -> {
LOG.info("Inserting 1 row into all {} tables in MySQL (CDC mode)...", NUM_TABLES);
try (Connection conn = getJdbcConnection(jdbcResourceManager);
Statement stmt = conn.createStatement()) {
for (int i = 1; i <= NUM_TABLES; i++) {
String sql = String.format("INSERT INTO table_%d (id) VALUES (%d)", i, i);
stmt.addBatch(sql);
}
stmt.executeBatch();
Comment on lines +134 to +138
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to the table creation batch, inserting 5,000 rows in a single batch might encounter limits. It is safer to execute the batch in smaller increments.

            for (int i = 1; i <= NUM_TABLES; i++) {
              String sql = String.format("INSERT INTO table_%d (id) VALUES (%d)", i, i);
              stmt.addBatch(sql);
              if (i % 500 == 0) {
                stmt.executeBatch();
              }
            }
            stmt.executeBatch();

} catch (SQLException e) {
throw new RuntimeException("Failed to insert CDC data", e);
}
});
}

private static Connection getJdbcConnection(CloudMySQLResourceManager mySQLResourceManager)
throws SQLException {
return DriverManager.getConnection(
mySQLResourceManager.getUri(),
mySQLResourceManager.getUsername(),
mySQLResourceManager.getPassword());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,40 +28,52 @@
import com.google.cloud.datastream.v1.DestinationConfig;
import com.google.cloud.datastream.v1.SourceConfig;
import com.google.cloud.datastream.v1.Stream;
import com.google.common.base.MoreObjects;
import com.google.common.io.Resources;
import com.google.pubsub.v1.SubscriptionName;
import com.google.pubsub.v1.TopicName;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.text.ParseException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.function.Supplier;
import org.apache.beam.it.common.PipelineLauncher;
import org.apache.beam.it.common.PipelineLauncher.LaunchConfig;
import org.apache.beam.it.common.PipelineLauncher.LaunchInfo;
import org.apache.beam.it.common.PipelineOperator;
import org.apache.beam.it.common.TestProperties;
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.conditions.ConditionCheck;
import org.apache.beam.it.gcp.TemplateLoadTestBase;
import org.apache.beam.it.gcp.datastream.DatastreamResourceManager;
import org.apache.beam.it.gcp.datastream.JDBCSource;
import org.apache.beam.it.gcp.datastream.MySQLSource;
import org.apache.beam.it.gcp.pubsub.PubsubResourceManager;
import org.apache.beam.it.gcp.secretmanager.SecretManagerResourceManager;
import org.apache.beam.it.gcp.spanner.SpannerResourceManager;
import org.apache.beam.it.gcp.spanner.conditions.SpannerRowsCheck;
import org.apache.beam.it.gcp.storage.GcsResourceManager;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
import org.junit.After;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Base class for DataStreamToSpanner Load tests. It provides helper functions related to
* environment setup and assertConditions.
*/
public class DataStreamToSpannerLTBase extends TemplateLoadTestBase {
private static final Logger LOG = LoggerFactory.getLogger(DataStreamToSpannerLTBase.class);
protected static final String SPEC_PATH =
"gs://dataflow-templates/latest/flex/Cloud_Datastream_to_Spanner";
MoreObjects.firstNonNull(
TestProperties.specPath(),
"gs://dataflow-templates/latest/flex/Cloud_Datastream_to_Spanner");
protected String testRootDir;
protected final int maxWorkers = 100;
protected final int numWorkers = 50;
Expand All @@ -72,6 +84,16 @@ public class DataStreamToSpannerLTBase extends TemplateLoadTestBase {
public DatastreamResourceManager datastreamResourceManager;
protected SecretManagerResourceManager secretClient;

public static class RowRange {
public final int min;
public final int max;

public RowRange(int min, int max) {
this.min = min;
this.max = max;
}
}

public void setUpResourceManagers(String spannerDdlResource) throws IOException {
setUpResourceManagers(spannerDdlResource, false);
}
Expand All @@ -96,10 +118,14 @@ public void setUpResourceManagers(String spannerDdlResource, boolean separateSha
.build();

gcsResourceManager = createSpannerLTGcsResourceManager();
datastreamResourceManager =
DatastreamResourceManager.Builder datastreamBuilder =
DatastreamResourceManager.builder(testName, project, region)
.setCredentialsProvider(CREDENTIALS_PROVIDER)
.build();
.setCredentialsProvider(CREDENTIALS_PROVIDER);
if (System.getProperty("privateConnectivity") != null) {
datastreamBuilder.setPrivateConnectivity(System.getProperty("privateConnectivity"));
}

datastreamResourceManager = datastreamBuilder.build();
secretClient = SecretManagerResourceManager.builder(project, CREDENTIALS_PROVIDER).build();

// Initialize shadowTableSpannerResourceManager only if separateShadowTableDb is true
Expand Down Expand Up @@ -127,6 +153,16 @@ public void runLoadTest(
HashMap<String, String> templateParameters,
HashMap<String, Object> environmentOptions)
throws IOException, ParseException, InterruptedException {
runLoadTest(tables, mySQLSource, templateParameters, environmentOptions, null);
}

public void runLoadTest(
HashMap<String, Integer> tables,
JDBCSource mySQLSource,
HashMap<String, String> templateParameters,
HashMap<String, Object> environmentOptions,
Runnable afterLaunchCallback)
throws IOException, ParseException, InterruptedException {
// TestClassName/runId/TestMethodName/cdc
String gcsPrefix =
String.join("/", new String[] {testRootDir, gcsResourceManager.runId(), testName, "cdc"});
Expand Down Expand Up @@ -185,6 +221,7 @@ public void runLoadTest(
params.putAll(templateParameters);

LaunchConfig.Builder options = LaunchConfig.builder(getClass().getSimpleName(), SPEC_PATH);

options.addEnvironment("maxWorkers", maxWorkers).addEnvironment("numWorkers", numWorkers);

// Set all environment options
Expand All @@ -195,20 +232,18 @@ public void runLoadTest(
PipelineLauncher.LaunchInfo jobInfo = pipelineLauncher.launch(project, region, options.build());
assertThatPipeline(jobInfo).isRunning();

ConditionCheck[] checks = new ConditionCheck[tables.size()];
int iterationCount = 0;
for (Map.Entry<String, Integer> entry : tables.entrySet()) {
checks[iterationCount] =
SpannerRowsCheck.builder(spannerResourceManager, entry.getKey())
.setMinRows(entry.getValue())
.setMaxRows(entry.getValue())
.build();
iterationCount++;
if (afterLaunchCallback != null) {
afterLaunchCallback.run();
}

HashMap<String, RowRange> tableRanges = new HashMap<>();
for (Map.Entry<String, Integer> entry : tables.entrySet()) {
tableRanges.put(entry.getKey(), new RowRange(entry.getValue(), entry.getValue()));
}
Supplier<Boolean> condition = () -> checkAllTablesRowCounts(tableRanges);
PipelineOperator.Result result =
pipelineOperator.waitForCondition(
createConfig(jobInfo, Duration.ofHours(4), Duration.ofMinutes(5)), checks);
createConfig(jobInfo, Duration.ofHours(4), Duration.ofMinutes(5)), condition);

// Assert Conditions
assertThatResult(result).meetsConditions();
Expand All @@ -224,6 +259,39 @@ public void runLoadTest(
exportMetricsToBigQuery(jobInfo, metrics);
}

private boolean checkAllTablesRowCounts(HashMap<String, RowRange> tables) {
ExecutorService executor = Executors.newFixedThreadPool(20);
try {
List<Callable<Boolean>> tasks = new ArrayList<>();
for (Map.Entry<String, RowRange> entry : tables.entrySet()) {
tasks.add(
() -> {
try {
long rowCount = spannerResourceManager.getRowCount(entry.getKey());
RowRange range = entry.getValue();
return rowCount >= range.min && rowCount <= range.max;
} catch (Exception e) {
return false;
}
});
}

List<Future<Boolean>> futures = executor.invokeAll(tasks);
boolean allPassed = true;
for (Future<Boolean> future : futures) {
if (!future.get()) {
allPassed = false;
}
}
return allPassed;
} catch (Exception e) {
LOG.warn("Error checking row count in Spanner", e);
} finally {
executor.shutdown();
}
return false;
}
Comment on lines +262 to +293
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The current implementation of checkAllTablesRowCounts creates a new ExecutorService with 20 threads on every invocation. Since this method is called periodically by waitForCondition, this is inefficient and can lead to resource exhaustion. Additionally, the broad catch block masks potential persistent errors. Using a parallelStream is a more idiomatic and efficient way to perform these checks in parallel using the common pool.

  private boolean checkAllTablesRowCounts(HashMap<String, RowRange> tables) {
    return tables.entrySet().parallelStream().allMatch(entry -> {
      try {
        long rowCount = spannerResourceManager.getRowCount(entry.getKey());
        RowRange range = entry.getValue();
        return rowCount >= range.min && rowCount <= range.max;
      } catch (Exception e) {
        LOG.warn("Error checking row count for table {}: {}", entry.getKey(), e.getMessage());
        return false;
      }
    });
  }
References
  1. It is acceptable to use streams with lambdas that wrap checked exceptions (e.g., IOException) into a RuntimeException, instead of refactoring to a traditional for-loop for more direct exception handling.


public void getResourceManagerMetrics(Map<String, Double> metrics) {
pubsubResourceManager.collectMetrics(metrics);
spannerResourceManager.collectMetrics(metrics);
Expand Down Expand Up @@ -293,6 +361,9 @@ public Stream createDatastreamResources(
*/
public void createSpannerDDL(SpannerResourceManager spannerResourceManager, String resourceName)
throws IOException {
if (resourceName == null) {
return;
}
String ddl =
String.join(
" ", Resources.readLines(Resources.getResource(resourceName), StandardCharsets.UTF_8));
Expand Down
Loading