Batch and Write transform code#3797
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request implements the core batching and writing logic for the CDC data generator. It introduces a stateful DoFn to manage record processing, batching, and sink interaction, along with a PTransform to integrate this logic into the broader pipeline. The changes also include a new data model for records and robust unit testing to validate the new components. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces the BatchAndWrite transform and its corresponding DoFn, BatchAndWriteFn, which manage stateful data generation and writing to MySQL and Spanner sinks. The changes include the GeneratedRecord data model and comprehensive unit tests. Review feedback highlights potential NullPointerException risks in the BatchAndWriteFn constructor due to auto-unboxing of nullable Integer parameters and suggests providing default values. Additionally, a test case was identified as misleading because it fails to assert the specific fallback behavior it claims to verify.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #3797 +/- ##
============================================
+ Coverage 53.62% 53.79% +0.17%
- Complexity 6323 6368 +45
============================================
Files 1087 1090 +3
Lines 66762 66917 +155
Branches 7476 7485 +9
============================================
+ Hits 35801 36001 +200
+ Misses 28534 28483 -51
- Partials 2427 2433 +6
🚀 New features to boost your workflow:
|
| private transient volatile DataGeneratorSchema schema; | ||
| private transient volatile List<String> insertTopoOrder; |
There was a problem hiding this comment.
Using volatile guarantees that once schema and insertTopoOrder are published by the first bundle, all subsequent bundles and timer callbacks on that worker instance will deterministically observe the fully constructed objects
| protected DataWriter createWriter(SinkType type, String configPath) { | ||
| switch (type) { | ||
| case MYSQL: | ||
| return new MySqlDataWriter(configPath); | ||
| case SPANNER: | ||
| return new SpannerDataWriter(configPath); | ||
| default: | ||
| throw new IllegalArgumentException("Unsupported sink type: " + type); | ||
| } | ||
| } |
There was a problem hiding this comment.
Can consider moving this out to a factory for cleaner separation. In future, constructing writers may become more complex and segregation will help.
There was a problem hiding this comment.
makes sense, will move
| FailureRecord.toJson( | ||
| "UNKNOWN_TABLE", FailureRecord.OPERATION_GENERATION, null, timerError)); |
There was a problem hiding this comment.
Why is this always UNKNOWN_TABLE?
There was a problem hiding this comment.
All event-specific and table-specific errors are caught inside DataGeneratorEngine, any exception that escapes to the outer catch (Exception timerError) block in BatchAndWriteFn.java represents a system failure hence it doesn't have a table name.
| private void writeFailedRecords(Consumer<String> sink) { | ||
| List<String> dlq = batcher.getFailedRecords(); | ||
| if (dlq == null || dlq.isEmpty()) { | ||
| return; | ||
| } | ||
| for (String record : dlq) { | ||
| sink.accept(record); | ||
| } | ||
| batcher.clearDlq(); | ||
| } |
There was a problem hiding this comment.
I didn't get what this method is doing (I don't fully understand the Consumer function). Where are the failed records from the DLQ being written to?
Also, what is the function of the DLQ? Are users expected to re-run the DLQ, or is it more for reporting purpose?
There was a problem hiding this comment.
It is outputting the failed records from the dofn, it is written to a gcs file and is only for reporting not retrying
|
|
||
| /** Type-safe container wrapping table names and primary key values. */ | ||
| @AutoValue | ||
| public abstract class GeneratedRecord implements Serializable { |
There was a problem hiding this comment.
The more Serializable you introduce now the harder it will be to properly clean it up later...just keep it in mind.
There was a problem hiding this comment.
I understand, but i have retained it in classes which use beam row, because my analysis suggests that it might require custom coder and solving it for 1 class will help me solve for all others so I would like to take them up at once
| public class BatchAndWrite | ||
| extends PTransform<PCollection<KV<Integer, GeneratedRecord>>, PCollection<String>> { |
There was a problem hiding this comment.
Seems to only contain the BatchAndWriteFn. Given the strictly coupled/identical naming (BatchAndWrite and BatchAndWriteFn), do we need this transform at all? Can we wire the DoFn directly? Is there some future extensibility use-case?
No description provided.