Conversation
Codecov Report✅ All modified and coverable lines are covered by tests. Additional details and impacted files@@ Coverage Diff @@
## main #3779 +/- ##
============================================
+ Coverage 53.52% 58.17% +4.65%
+ Complexity 6637 2768 -3869
============================================
Files 1082 524 -558
Lines 65868 31592 -34276
Branches 7332 3437 -3895
============================================
- Hits 35255 18379 -16876
+ Misses 28257 12173 -16084
+ Partials 2356 1040 -1316
🚀 New features to boost your workflow:
|
3b41e8b to
588119b
Compare
d0215c8 to
8e751ce
Compare
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request introduces a significant expansion to the load testing capabilities for the DataStream to Spanner template, specifically targeting high-scale scenarios with 5000 tables. It includes infrastructure improvements to the base test class to handle parallel row count checks and private connectivity configurations, alongside minor refactoring of the main processing logic to improve resource handling during execution. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces a new load test for the DataStream to Spanner template designed to handle 5,000 tables and refactors the ProcessInformationSchema class to manage DDL objects as transient fields. The load test base class was also updated to support private connectivity and includes a new parallelized row count verification mechanism. Review feedback highlights several opportunities for improvement: batching DDL and DML statements in both Spanner and MySQL is recommended to avoid timeouts when dealing with 5,000 tables, and the row count verification logic should be refactored to use a parallelStream instead of manually creating an ExecutorService on every invocation to prevent resource exhaustion.
| spannerDdls.add( | ||
| String.format("CREATE TABLE table_%d (id INT64 NOT NULL) PRIMARY KEY(id)", i)); | ||
| } | ||
| spannerResourceManager.executeDdlStatements(spannerDdls); |
There was a problem hiding this comment.
Executing 5,000 DDL statements in a single call to executeDdlStatements is likely to exceed Cloud Spanner's limits for a single request or cause timeouts. It is recommended to batch DDL statements into smaller groups (e.g., 100 statements per call).
| spannerResourceManager.executeDdlStatements(spannerDdls); | |
| for (int i = 0; i < spannerDdls.size(); i += 100) { | |
| spannerResourceManager.executeDdlStatements( | |
| spannerDdls.subList(i, Math.min(i + 100, spannerDdls.size()))); | |
| } |
| for (int j = 2; j <= NUM_TABLES; j++) { | ||
| String mySqlDdl = String.format("CREATE TABLE table_%d LIKE table_1", j); | ||
| stmt.addBatch(mySqlDdl); | ||
| } | ||
| stmt.executeBatch(); | ||
| } |
There was a problem hiding this comment.
Executing a batch of 5,000 CREATE TABLE statements in MySQL might exceed the max_allowed_packet size or lead to long-running transactions that could time out. It is recommended to partition the batch into smaller chunks (e.g., 500 statements per batch).
for (int j = 2; j <= NUM_TABLES; j++) {
String mySqlDdl = String.format("CREATE TABLE table_%d LIKE table_1", j);
stmt.addBatch(mySqlDdl);
if (j % 500 == 0) {
stmt.executeBatch();
}
}
stmt.executeBatch();| for (int i = 1; i <= NUM_TABLES; i++) { | ||
| String sql = String.format("INSERT INTO table_%d (id) VALUES (%d)", i, i); | ||
| stmt.addBatch(sql); | ||
| } | ||
| stmt.executeBatch(); |
There was a problem hiding this comment.
Similar to the table creation batch, inserting 5,000 rows in a single batch might encounter limits. It is safer to execute the batch in smaller increments.
for (int i = 1; i <= NUM_TABLES; i++) {
String sql = String.format("INSERT INTO table_%d (id) VALUES (%d)", i, i);
stmt.addBatch(sql);
if (i % 500 == 0) {
stmt.executeBatch();
}
}
stmt.executeBatch();| private boolean checkAllTablesRowCounts(HashMap<String, RowRange> tables) { | ||
| ExecutorService executor = Executors.newFixedThreadPool(20); | ||
| try { | ||
| List<Callable<Boolean>> tasks = new ArrayList<>(); | ||
| for (Map.Entry<String, RowRange> entry : tables.entrySet()) { | ||
| tasks.add( | ||
| () -> { | ||
| try { | ||
| long rowCount = spannerResourceManager.getRowCount(entry.getKey()); | ||
| RowRange range = entry.getValue(); | ||
| return rowCount >= range.min && rowCount <= range.max; | ||
| } catch (Exception e) { | ||
| return false; | ||
| } | ||
| }); | ||
| } | ||
|
|
||
| List<Future<Boolean>> futures = executor.invokeAll(tasks); | ||
| boolean allPassed = true; | ||
| for (Future<Boolean> future : futures) { | ||
| if (!future.get()) { | ||
| allPassed = false; | ||
| } | ||
| } | ||
| return allPassed; | ||
| } catch (Exception e) { | ||
| LOG.warn("Error checking row count in Spanner", e); | ||
| } finally { | ||
| executor.shutdown(); | ||
| } | ||
| return false; | ||
| } |
There was a problem hiding this comment.
The current implementation of checkAllTablesRowCounts creates a new ExecutorService with 20 threads on every invocation. Since this method is called periodically by waitForCondition, this is inefficient and can lead to resource exhaustion. Additionally, the broad catch block masks potential persistent errors. Using a parallelStream is a more idiomatic and efficient way to perform these checks in parallel using the common pool.
private boolean checkAllTablesRowCounts(HashMap<String, RowRange> tables) {
return tables.entrySet().parallelStream().allMatch(entry -> {
try {
long rowCount = spannerResourceManager.getRowCount(entry.getKey());
RowRange range = entry.getValue();
return rowCount >= range.min && rowCount <= range.max;
} catch (Exception e) {
LOG.warn("Error checking row count for table {}: {}", entry.getKey(), e.getMessage());
return false;
}
});
}References
- It is acceptable to use streams with lambdas that wrap checked exceptions (e.g., IOException) into a RuntimeException, instead of refactoring to a traditional for-loop for more direct exception handling.
No description provided.