feat(spanner-to-sourcedb): add Spanner-to-Spanner reverse replication support#3773
feat(spanner-to-sourcedb): add Spanner-to-Spanner reverse replication support#3773srozsnyai wants to merge 6 commits into
Conversation
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request enables reverse replication from Spanner to Spanner by extending the existing template infrastructure. It introduces a new set of DAO and DML generation classes specifically for Spanner, ensuring that mutation data is correctly formatted and committed. A key aspect of this change is the modification of the write flow to defer target database operations until after the shadow-table transaction has committed, preventing illegal nested transaction errors. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request implements support for Cloud Spanner as a target database in the Spanner-to-SourceDb template. It introduces new components for connection management, DML generation, and data access specifically for Spanner targets. Feedback identifies critical issues including a potential data loss scenario in SourceWriterFn due to non-atomic writes, type mismatch errors in SpannerDMLGenerator when handling custom transformations or null array values, and an initialization bug in SpannerConnectionHelper that affects multi-project deployments.
| if (request.getCustomTransformationResponse() != null | ||
| && request.getCustomTransformationResponse().containsKey(targetColName)) { | ||
| Object customVal = request.getCustomTransformationResponse().get(targetColName); | ||
| builder.set(targetColName).to(customVal == null ? (String) null : customVal.toString()); |
There was a problem hiding this comment.
This line assumes that all custom transformation results should be treated as strings. If the target Spanner column is of a different type (e.g., INT64, BOOL, TIMESTAMP), calling .to(String) on the Mutation.WriteBuilder will cause a runtime IllegalArgumentException or a type mismatch error during the commit. The value should be set using a type-aware method similar to the logic in setColumnValue, based on the target column's type.
There was a problem hiding this comment.
Fixed in 7a5274c. Custom-transformation values are now routed through setCustomColumnValue / appendCustomKeyComponent, which dispatch on Column.type().getCode() and call the matching typed .to(...) / Key.Builder.append(...) overload. null custom values emit a properly typed NULL via setNullValue. Added tests.
| if (pendingSpannerWrite.get() != null) { | ||
| IDao sourceDao = sourceProcessor.getSourceDao(shardId); | ||
| sourceDao.write(pendingSpannerWrite.get(), null); | ||
| } |
There was a problem hiding this comment.
Deferring the Spanner target write until after the shadow-table transaction commits introduces a critical consistency risk. If the shadow-table transaction succeeds but the subsequent sourceDao.write call fails (e.g., due to a transient network issue or target database constraint), the record will be marked as processed in the shadow table. Upon retry from the Dead Letter Queue (DLQ), the isSourceAhead check (line 270) will see that the record's timestamp matches the shadow table and skip the write, resulting in permanent data loss on the target. To ensure atomicity, the target write should be performed within the same transaction as the shadow table update. If the target is the same database, use shadowTransaction.buffer(mutation). If it is a different database, calling writeAtLeastOnce inside the readWriteTransaction block will ensure that a failure in the target write rolls back the shadow table update, allowing for a safe retry.
There was a problem hiding this comment.
shadowTransaction.buffer(mutation) only works when the mutation targets the same database as the transaction. The shadow table currently lives in a separate metadata database, so buffer() cannot span them.
writeAtLeastOnce inside readWriteTransaction throws INTERNAL: Nested transactions are not supported for every record, regardless of which DatabaseClient instance is used. The same error reproduces with the Spanner JDBC driver, so JDBC is not an escape hatch. MySQL/Postgres can use this pattern only because their JDBC writes target a non-Spanner engine.
The residual risk is narrow in practice: the client retries ABORTED / UNAVAILABLE / DEADLINE_EXCEEDED internally, errors that escape are typically permanent and operator-visible, and Spanner mutations are idempotent.
IMHO the only real fix is colocating the reverse-replication metadata in the target Spanner database so tx.buffer() becomes available. That is an architectural change and will be addressed as a follow-up once there is team consensus, not as a side effect of this PR.
| private static Map<String, DatabaseClient> clientMap = new ConcurrentHashMap<>(); | ||
| private static Spanner spannerService; |
There was a problem hiding this comment.
The use of static variables for clientMap and spannerService combined with the early return in init (lines 45-48) can lead to issues if shards belong to different Google Cloud projects. If init is called first for a shard in Project A, the spannerService is initialized for Project A. A subsequent call for a shard in Project B will return early because clientMap is not empty, and getConnection will later attempt to use the Project A service to access Project B, which may fail due to permission or configuration mismatches. Consider mapping spannerService by project ID or removing the static singleton pattern in favor of a more robust initialization that ensures all requested shards are processed.
There was a problem hiding this comment.
Same idempotent-init pattern as JdbcConnectionHelper and CassandraConnectionHelper — all shards are processed in a single init() call, so the early return doesn't skip later shards.
| builder.set(targetColName).to(Value.json(null)); | ||
| break; | ||
| case ARRAY: | ||
| builder.set(targetColName).to(Value.stringArray(null)); |
There was a problem hiding this comment.
When setting a null value for an array column, Value.stringArray(null) is always used. This will cause a type mismatch error if the target column is an array of a different type (e.g., ARRAY<INT64>). The null value should be created using the appropriate element type, such as Value.int64Array((Iterable<Long>) null).
There was a problem hiding this comment.
Fixed in a00dd66b5. setNullValue now dispatches on
type.getArrayElementType().getCode() and emits the matching
Value.*Array((Iterable) null) so NULL ARRAY values carry the correct
element type. Added tests.
Codecov Report❌ Patch coverage is ❌ Your patch check has failed because the patch coverage (46.65%) is below the target coverage (80.00%). You can increase the patch coverage or adjust the target coverage. Additional details and impacted files@@ Coverage Diff @@
## main #3773 +/- ##
============================================
- Coverage 53.41% 53.41% -0.01%
+ Complexity 6629 6329 -300
============================================
Files 1082 1091 +9
Lines 65795 66964 +1169
Branches 7328 7483 +155
============================================
+ Hits 35147 35767 +620
- Misses 28288 28772 +484
- Partials 2360 2425 +65
🚀 New features to boost your workflow:
|
Extends the spanner-to-sourcedb reverse replication template to support Cloud Spanner as a target database.
Change stream events from a source Spanner instance are converted to mutations and written to a target Spanner database, coordinated through the existing shadow-table mechanism that prevents duplicate and out-of-order writes.
Because calling any Spanner write API inside an active
readWriteTransactioncauses a nested-transaction error, the target write is deferred: the mutation is generated inside the shadow-table transaction (where ordering and filtering decisions are made), then committed viawriteAtLeastOnce()after the shadow-table transaction completes.