Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .github/workflows/spanner-load-tests.yml
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ permissions: write-all
jobs:
load_tests:
name: Spanner Dataflow Templates Load tests
timeout-minutes: 1440 # 1 day
timeout-minutes: 2100 # 35 hours
# Run on any runner that matches all the specified runs-on values.
runs-on: [ self-hosted, perf ]
steps:
Expand All @@ -57,6 +57,7 @@ jobs:
--lt-export-project="cloud-teleport-testing" \
--lt-export-dataset="performance_tests" \
--lt-export-table="template_performance_metrics" \
--test="${{ github.event.inputs.specific_test }}"
- name: Upload Load Tests Report
uses: actions/upload-artifact@v7
if: always() # always run even if the previous step fails
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.loadtesting;
package org.apache.beam.it.gcp.cloudsql;

import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
import com.google.api.client.googleapis.json.GoogleJsonResponseException;
Expand All @@ -27,7 +27,6 @@
import com.google.auth.http.HttpCredentialsAdapter;
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper.config.SQLDialect;
import com.google.common.annotations.VisibleForTesting;
import java.io.IOException;
import java.security.GeneralSecurityException;
Expand All @@ -39,9 +38,6 @@
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import org.apache.beam.it.gcp.artifacts.GcsArtifact;
import org.apache.beam.it.gcp.cloudsql.CloudMySQLResourceManager;
import org.apache.beam.it.gcp.cloudsql.CloudPostgresResourceManager;
import org.apache.beam.it.gcp.cloudsql.CloudSqlResourceManager;
import org.apache.beam.it.gcp.storage.GcsResourceManager;
import org.json.JSONArray;
import org.json.JSONObject;
Expand Down Expand Up @@ -72,29 +68,35 @@ public class CloudSqlShardOrchestrator {
public static final String MYSQL_8_0 = "MYSQL_8_0";
public static final String POSTGRES_14 = "POSTGRES_14";

protected final SQLDialect sqlDialect;
public enum DatabaseType {
MYSQL,
POSTGRESQL
}

protected final DatabaseType databaseType;
protected final String dbVersion;
protected final int port;
protected final String project;
protected final String region;
protected final String username;
protected final String password;
protected final GcsResourceManager gcsResourceManager;
protected final Map<String, CloudSqlResourceManager> managers;
public final Map<String, CloudSqlResourceManager> managers;
protected final Map<String, String> instanceIpMap;
protected Map<String, List<String>> requestedShardMap;
public Map<String, List<String>> requestedShardMap;
protected final SQLAdmin sqlAdmin;

/**
* Constructs a new orchestrator for the specified database dialect.
*
* @param dbType The dialect of the source database (e.g., MYSQL, POSTGRESQL).
* @param dbType The type of the source database (MYSQL or POSTGRESQL).
* @param dbVersion The version of the database.
* @param project The GCP project ID.
* @param region The GCP region for Cloud SQL instances.
* @param gcsResourceManager The GCS resource manager for uploading configuration artifacts.
*/
public CloudSqlShardOrchestrator(
SQLDialect dbType,
DatabaseType dbType,
String dbVersion,
String project,
String region,
Expand All @@ -106,15 +108,16 @@ public CloudSqlShardOrchestrator(
region,
gcsResourceManager,
System.getProperty(
"cloudProxyUsername", (dbType == SQLDialect.MYSQL) ? "root" : "postgres"),
"cloudProxyUsername", (dbType == DatabaseType.MYSQL) ? "root" : "postgres"),
System.getProperty("cloudProxyPassword", ""),
null);
}

/**
* Constructs a new orchestrator with explicit credentials.
*
* @param sqlDialect The dialect of the source database.
* @param databaseType The type of the source database.
* @param dbVersion The version of the database.
* @param project The GCP project ID.
* @param region The GCP region.
* @param gcsResourceManager The GCS resource manager.
Expand All @@ -123,16 +126,16 @@ public CloudSqlShardOrchestrator(
* @param credentials The GCP credentials to use.
*/
public CloudSqlShardOrchestrator(
SQLDialect sqlDialect,
DatabaseType databaseType,
String dbVersion,
String project,
String region,
GcsResourceManager gcsResourceManager,
String username,
String password,
GoogleCredentials credentials) {
checkVersionCompatibility(sqlDialect, dbVersion);
this.sqlDialect = sqlDialect;
checkVersionCompatibility(databaseType, dbVersion);
this.databaseType = databaseType;
this.dbVersion = dbVersion;
this.project = project;
this.region = region;
Expand All @@ -158,14 +161,14 @@ public CloudSqlShardOrchestrator(
LOG.error("Exception while initializing SQL Admin", e);
throw new RuntimeException("Failed to initialize SQLAdmin client", e);
}
port = (sqlDialect == SQLDialect.MYSQL) ? 3306 : 5432;
port = (databaseType == DatabaseType.MYSQL) ? 3306 : 5432;
}

@VisibleForTesting
protected static void checkVersionCompatibility(SQLDialect sqlDialect, String dbVersion) {
protected static void checkVersionCompatibility(DatabaseType databaseType, String dbVersion) {
Preconditions.checkArgument(
(sqlDialect == SQLDialect.MYSQL && dbVersion.toLowerCase().startsWith("mysql"))
|| (sqlDialect == SQLDialect.POSTGRESQL
(databaseType == DatabaseType.MYSQL && dbVersion.toLowerCase().startsWith("mysql"))
|| (databaseType == DatabaseType.POSTGRESQL
&& dbVersion.toLowerCase().startsWith("postgres")));
}

Expand Down Expand Up @@ -265,15 +268,15 @@ protected void updateUserPassword(String instanceName) throws IOException, Inter
User user = new User().setName(username).setPassword(password);

// MySQL requires a '%' host for connections from any IP within the VPC.
if (sqlDialect == SQLDialect.MYSQL) {
if (databaseType == DatabaseType.MYSQL) {
user.setHost("%");
}

// MySQL requires name and host to identify the user. PostgreSQL only needs the name.
// These are passed as query parameters in the Update request.
SQLAdmin.Users.Update request = sqlAdmin.users().update(project, instanceName, user);
request.setName(username);
if (sqlDialect == SQLDialect.MYSQL) {
if (databaseType == DatabaseType.MYSQL) {
// In MySQL, host is part of the primary key for a user. '%' allows the user to connect from
// any VPC IP.
request.setHost("%");
Expand Down Expand Up @@ -319,7 +322,9 @@ protected String ensureInstanceAndGetIp(String instanceName)

protected void createPhysicalInstance(String instanceName)
throws IOException, InterruptedException {
String tier = sqlDialect == SQLDialect.MYSQL ? "db-n1-standard-2" : "db-custom-2-7680";
String defaultTier =
databaseType == DatabaseType.MYSQL ? "db-n1-standard-2" : "db-custom-2-7680";
String tier = System.getProperty("cloudSqlInstanceTier", defaultTier);
DatabaseInstance instance =
new DatabaseInstance()
.setName(instanceName)
Expand Down Expand Up @@ -370,19 +375,19 @@ protected void waitForOperation(Operation operation) throws IOException, Interru

protected CloudSqlResourceManager createManager(String instanceName) {
String ip = instanceIpMap.get(instanceName);
if (sqlDialect == SQLDialect.MYSQL) {
if (databaseType == DatabaseType.MYSQL) {
return (CloudSqlResourceManager)
CloudMySQLResourceManager.builder(instanceName)
.maybeUseStaticInstance(ip, port, username, password)
.build();
} else if (sqlDialect == SQLDialect.POSTGRESQL) {
} else if (databaseType == DatabaseType.POSTGRESQL) {
return (CloudSqlResourceManager)
CloudPostgresResourceManager.builder(instanceName)
.maybeUseStaticInstance(ip, port, username, password)
.setDatabaseName("postgres")
.build();
} else {
throw new IllegalArgumentException("Unsupported database type: " + sqlDialect);
throw new IllegalArgumentException("Unsupported database type: " + databaseType);
}
}

Expand All @@ -400,6 +405,11 @@ protected void createLogicalDatabases() {
CloudSqlResourceManager manager = createManager(instanceName);
managers.put(instanceName, manager);
for (String dbName : dbNames) {
try {
manager.runSQLUpdate("DROP DATABASE IF EXISTS " + dbName);
} catch (Exception e) {
LOG.warn("Failed to drop pre-existing database {} if it existed", dbName, e);
}
manager.createDatabase(dbName);
}
}));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.loadtesting;
package org.apache.beam.it.gcp.cloudsql;

/** Exception thrown when Cloud SQL shard orchestration fails. */
public class ShardOrchestrationException extends RuntimeException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.loadtesting;
package org.apache.beam.it.gcp.cloudsql;

import static com.google.cloud.teleport.v2.templates.loadtesting.CloudSqlShardOrchestrator.MYSQL_8_0;
import static com.google.cloud.teleport.v2.templates.loadtesting.CloudSqlShardOrchestrator.POSTGRES_14;
import static com.google.common.truth.Truth.assertThat;
import static org.apache.beam.it.gcp.cloudsql.CloudSqlShardOrchestrator.MYSQL_8_0;
import static org.apache.beam.it.gcp.cloudsql.CloudSqlShardOrchestrator.POSTGRES_14;
import static org.junit.Assert.assertThrows;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyInt;
Expand All @@ -41,7 +41,6 @@
import com.google.auth.oauth2.GoogleCredentials;
import com.google.cloud.storage.Blob;
import com.google.cloud.storage.BlobInfo;
import com.google.cloud.teleport.v2.source.reader.io.jdbc.iowrapper.config.SQLDialect;
import com.google.common.util.concurrent.MoreExecutors;
import java.io.IOException;
import java.util.Arrays;
Expand All @@ -50,8 +49,7 @@
import java.util.Map;
import java.util.concurrent.ExecutorService;
import org.apache.beam.it.gcp.artifacts.GcsArtifact;
import org.apache.beam.it.gcp.cloudsql.CloudMySQLResourceManager;
import org.apache.beam.it.gcp.cloudsql.CloudPostgresResourceManager;
import org.apache.beam.it.gcp.cloudsql.CloudSqlShardOrchestrator.DatabaseType;
import org.apache.beam.it.gcp.storage.GcsResourceManager;
import org.junit.Before;
import org.junit.Test;
Expand Down Expand Up @@ -88,13 +86,13 @@ public class CloudSqlShardOrchestratorTest {
/** Helper subclass to inject mock SQLAdmin and synchronous executor. */
private static class TestCloudSqlShardOrchestrator extends CloudSqlShardOrchestrator {
public TestCloudSqlShardOrchestrator(
SQLDialect sqlDialect,
DatabaseType databaseType,
String dbVersion,
String project,
String region,
GcsResourceManager gcsResourceManager,
SQLAdmin sqlAdmin) {
super(sqlDialect, dbVersion, project, region, gcsResourceManager);
super(databaseType, dbVersion, project, region, gcsResourceManager);
// Overwrite the real sqlAdmin created in super constructor
try {
java.lang.reflect.Field field =
Expand Down Expand Up @@ -138,7 +136,7 @@ public void setUp() {
public void testInitialize_provisionsAndSetsUpCorrectly() throws Exception {
TestCloudSqlShardOrchestrator orchestrator =
new TestCloudSqlShardOrchestrator(
SQLDialect.MYSQL, MYSQL_8_0, PROJECT_ID, REGION, gcsResourceManager, sqlAdmin);
DatabaseType.MYSQL, MYSQL_8_0, PROJECT_ID, REGION, gcsResourceManager, sqlAdmin);

// Mock Stage 1: Physical Provisioning
DatabaseInstance instance =
Expand Down Expand Up @@ -189,7 +187,7 @@ public void testInitialize_provisionsAndSetsUpCorrectly() throws Exception {
public void testInitialize_createsInstance_whenMissing() throws Exception {
TestCloudSqlShardOrchestrator orchestrator =
new TestCloudSqlShardOrchestrator(
SQLDialect.MYSQL, MYSQL_8_0, PROJECT_ID, REGION, gcsResourceManager, sqlAdmin);
DatabaseType.MYSQL, MYSQL_8_0, PROJECT_ID, REGION, gcsResourceManager, sqlAdmin);

// Mock 404 for first call, then 200 for refresh
when(sqlAdmin.instances().get(PROJECT_ID, INSTANCE_NAME).execute())
Expand Down Expand Up @@ -228,7 +226,7 @@ public void testInitialize_createsInstance_whenMissing() throws Exception {
public void testCleanup_delegatesToManagers() throws Exception {
TestCloudSqlShardOrchestrator orchestrator =
new TestCloudSqlShardOrchestrator(
SQLDialect.MYSQL, MYSQL_8_0, PROJECT_ID, REGION, gcsResourceManager, sqlAdmin);
DatabaseType.MYSQL, MYSQL_8_0, PROJECT_ID, REGION, gcsResourceManager, sqlAdmin);

// Mock successful initialization to populate managers map
when(sqlAdmin.instances().get(PROJECT_ID, INSTANCE_NAME).execute())
Expand Down Expand Up @@ -272,7 +270,7 @@ public void testConstructor_withDefaultCredentials() throws Exception {

CloudSqlShardOrchestrator orchestrator =
new CloudSqlShardOrchestrator(
SQLDialect.MYSQL, MYSQL_8_0, PROJECT_ID, REGION, gcsResourceManager);
DatabaseType.MYSQL, MYSQL_8_0, PROJECT_ID, REGION, gcsResourceManager);

assertThat(orchestrator.project).isEqualTo(PROJECT_ID);
}
Expand All @@ -282,7 +280,7 @@ public void testConstructor_withDefaultCredentials() throws Exception {
public void testInitialize_provisionsAndSetsUpPostgresCorrectly() throws Exception {
TestCloudSqlShardOrchestrator orchestrator =
new TestCloudSqlShardOrchestrator(
SQLDialect.POSTGRESQL, POSTGRES_14, PROJECT_ID, REGION, gcsResourceManager, sqlAdmin);
DatabaseType.POSTGRESQL, POSTGRES_14, PROJECT_ID, REGION, gcsResourceManager, sqlAdmin);

DatabaseInstance instance =
new DatabaseInstance()
Expand Down Expand Up @@ -318,20 +316,19 @@ public void testInitialize_provisionsAndSetsUpPostgresCorrectly() throws Excepti
public void testInitialize_throwsOnFailure() throws Exception {
TestCloudSqlShardOrchestrator orchestrator =
new TestCloudSqlShardOrchestrator(
SQLDialect.MYSQL, MYSQL_8_0, PROJECT_ID, REGION, gcsResourceManager, sqlAdmin);
DatabaseType.MYSQL, MYSQL_8_0, PROJECT_ID, REGION, gcsResourceManager, sqlAdmin);

when(sqlAdmin.instances().get(anyString(), anyString()).execute())
.thenThrow(new IOException("API Error"));

assertThrows(
ShardOrchestrationException.class, () -> orchestrator.initialize(shardMap, "shards.json"));
assertThrows(RuntimeException.class, () -> orchestrator.initialize(shardMap, "shards.json"));
}

@Test
public void testExecuteWithRetries_retriesOn409() throws Exception {
TestCloudSqlShardOrchestrator orchestrator =
new TestCloudSqlShardOrchestrator(
SQLDialect.MYSQL, MYSQL_8_0, PROJECT_ID, REGION, gcsResourceManager, sqlAdmin);
DatabaseType.MYSQL, MYSQL_8_0, PROJECT_ID, REGION, gcsResourceManager, sqlAdmin);

com.google.api.client.googleapis.services.AbstractGoogleClientRequest<DatabaseInstance>
mockRequest =
Expand All @@ -355,7 +352,7 @@ public void testExecuteWithRetries_retriesOn409() throws Exception {
public void testWaitForOperation_throwsOnOpError() throws Exception {
TestCloudSqlShardOrchestrator orchestrator =
new TestCloudSqlShardOrchestrator(
SQLDialect.MYSQL, MYSQL_8_0, PROJECT_ID, REGION, gcsResourceManager, sqlAdmin);
DatabaseType.MYSQL, MYSQL_8_0, PROJECT_ID, REGION, gcsResourceManager, sqlAdmin);

Operation op = mock(Operation.class, Answers.RETURNS_DEEP_STUBS);
when(op.getName()).thenReturn("op-error");
Expand All @@ -373,7 +370,7 @@ public void testWaitForOperation_throwsOnOpError() throws Exception {
public void testCleanup_handlesDropDatabaseError() throws Exception {
TestCloudSqlShardOrchestrator orchestrator =
new TestCloudSqlShardOrchestrator(
SQLDialect.MYSQL, MYSQL_8_0, PROJECT_ID, REGION, gcsResourceManager, sqlAdmin);
DatabaseType.MYSQL, MYSQL_8_0, PROJECT_ID, REGION, gcsResourceManager, sqlAdmin);

// Mock successful initialization
DatabaseInstance instance =
Expand Down Expand Up @@ -411,13 +408,14 @@ public void testCleanup_handlesDropDatabaseError() throws Exception {
public void testVersionCompatability() {
assertThrows(
IllegalArgumentException.class,
() -> CloudSqlShardOrchestrator.checkVersionCompatibility(SQLDialect.MYSQL, POSTGRES_14));
() -> CloudSqlShardOrchestrator.checkVersionCompatibility(DatabaseType.MYSQL, POSTGRES_14));
assertThrows(
IllegalArgumentException.class,
() ->
CloudSqlShardOrchestrator.checkVersionCompatibility(SQLDialect.POSTGRESQL, MYSQL_8_0));
CloudSqlShardOrchestrator.checkVersionCompatibility(
DatabaseType.POSTGRESQL, MYSQL_8_0));

CloudSqlShardOrchestrator.checkVersionCompatibility(SQLDialect.MYSQL, MYSQL_8_0);
CloudSqlShardOrchestrator.checkVersionCompatibility(SQLDialect.POSTGRESQL, POSTGRES_14);
CloudSqlShardOrchestrator.checkVersionCompatibility(DatabaseType.MYSQL, MYSQL_8_0);
CloudSqlShardOrchestrator.checkVersionCompatibility(DatabaseType.POSTGRESQL, POSTGRES_14);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@
*/
package com.google.cloud.teleport.v2.templates.loadtesting;

import static com.google.cloud.teleport.v2.templates.loadtesting.CloudSqlShardOrchestrator.MYSQL_8_0;
import static com.google.common.truth.Truth.assertThat;
import static org.apache.beam.it.gcp.cloudsql.CloudSqlShardOrchestrator.MYSQL_8_0;
import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult;

import com.google.cloud.spanner.Struct;
Expand Down Expand Up @@ -45,6 +45,8 @@
import org.apache.beam.it.common.utils.ResourceManagerUtils;
import org.apache.beam.it.gcp.artifacts.GcsArtifact;
import org.apache.beam.it.gcp.cloudsql.CloudSqlResourceManager;
import org.apache.beam.it.gcp.cloudsql.CloudSqlShardOrchestrator;
import org.apache.beam.it.gcp.cloudsql.CloudSqlShardOrchestrator.DatabaseType;
import org.apache.beam.it.gcp.spanner.SpannerResourceManager;
import org.json.JSONArray;
import org.json.JSONObject;
Expand Down Expand Up @@ -101,7 +103,7 @@ public void setUp() throws IOException {

orchestrator =
new CloudSqlShardOrchestrator(
SQLDialect.MYSQL, MYSQL_8_0, project, region, gcsResourceManager);
DatabaseType.MYSQL, MYSQL_8_0, project, region, gcsResourceManager);
}

@After
Expand Down
Loading
Loading