From 42cbeb5c105a52b185b1d7be338daa07cdbb9691 Mon Sep 17 00:00:00 2001 From: Michael Le Date: Thu, 30 Apr 2026 12:01:33 -0400 Subject: [PATCH 1/4] Write failed JSON to Document parsing to DLQ and catch NPE exceptions --- .../DataStreamMongoDBToFirestore.java | 107 +++++++++++------- .../datastream/MongoDbChangeEventContext.java | 24 ++++ .../cloud/teleport/v2/transforms/Utils.java | 4 + .../teleport/v2/transforms/UtilsTest.java | 12 ++ 4 files changed, 107 insertions(+), 40 deletions(-) diff --git a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java index 9cdd1a50f2..e9bee5876b 100644 --- a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java +++ b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java @@ -1144,27 +1144,41 @@ private void processBatch(String collectionName, MultiOutputReceiver out) { return; } - try { - // Create bulk operation - List> bulkOperations = new ArrayList<>(events.size()); - - // Add operations to bulk - for (MongoDbChangeEventContext event : events) { - Object docId = event.getDocumentId(); - Bson lookupById = eq("_id", docId); - - if (event.isDeleteEvent()) { - // Add delete operation - bulkOperations.add(new DeleteOneModel<>(lookupById)); + List> bulkOperations = new ArrayList<>(events.size()); + List processedEvents = new ArrayList<>(); + List> failedElements = new ArrayList<>(); + + for (MongoDbChangeEventContext event : events) { + var documentId = event.getDocumentId(); + var lookupById = eq("_id", documentId); + if (event.isDeleteEvent()) { + bulkOperations.add(new DeleteOneModel<>(lookupById)); + processedEvents.add(event); + } else { + Document doc = event.getDataAsDocument(); + if (event.hasParseError()) { + Exception exception = event.getParseError(); + FailsafeElement failedElement = FailsafeElement.of(event, event); + failedElement.setException(exception); + failedElement.setStacktrace(Arrays.deepToString(exception.getStackTrace())); + failedElements.add(failedElement); } else { - // Add upsert operation - bulkOperations.add( - new ReplaceOneModel<>( - lookupById, - Utils.jsonToDocument(event.getDataAsJsonString(), event.getDocumentId()), - new ReplaceOptions().upsert(true))); + bulkOperations.add(new ReplaceOneModel<>(lookupById, doc, new ReplaceOptions().upsert(true))); + processedEvents.add(event); } } + } + + // Write any failed parse events to the DLQ. + for (var failedElement : failedElements) { + out.get(failedWriteTag).output(failedElement); + } + + if (bulkOperations.isEmpty()) { + return; + } + + try { // Execute bulk write BulkWriteResult result = collection.bulkWrite(bulkOperations); @@ -1175,7 +1189,7 @@ private void processBatch(String collectionName, MultiOutputReceiver out) { result.getDeletedCount()); // Output successful events - for (MongoDbChangeEventContext event : events) { + for (MongoDbChangeEventContext event : processedEvents) { out.get(successfulWriteTag).output(event); } } catch (Exception e) { @@ -1186,7 +1200,7 @@ private void processBatch(String collectionName, MultiOutputReceiver out) { e); // On error, output all events as failed - for (MongoDbChangeEventContext event : events) { + for (MongoDbChangeEventContext event : processedEvents) { FailsafeElement failedElement = FailsafeElement.of(event, event); failedElement.setErrorMessage(e.getMessage()); @@ -1207,28 +1221,41 @@ private void processBatchFinish(String collectionName, FinishBundleContext conte return; } - try { - // Create bulk operation - List> bulkOperations = new ArrayList<>(events.size()); - - // Add operations to bulk - for (MongoDbChangeEventContext event : events) { - Object docId = event.getDocumentId(); - Bson lookupById = eq("_id", docId); - - if (event.isDeleteEvent()) { - // Add delete operation - bulkOperations.add(new DeleteOneModel<>(lookupById)); + List> bulkOperations = new ArrayList<>(events.size()); + List processedEvents = new ArrayList<>(); + List> failedElements = new ArrayList<>(); + + for (MongoDbChangeEventContext event : events) { + var documentId = event.getDocumentId(); + var lookupById = eq("_id", documentId); + if (event.isDeleteEvent()) { + bulkOperations.add(new DeleteOneModel<>(lookupById)); + processedEvents.add(event); + } else { + Document doc = event.getDataAsDocument(); + if (event.hasParseError()) { + Exception exception = event.getParseError(); + FailsafeElement failedElement = FailsafeElement.of(event, event); + failedElement.setException(exception); + failedElement.setStacktrace(Arrays.deepToString(exception.getStackTrace())); + failedElements.add(failedElement); } else { - // Add upsert operation - bulkOperations.add( - new ReplaceOneModel<>( - lookupById, - Utils.jsonToDocument(event.getDataAsJsonString(), event.getDocumentId()), - new ReplaceOptions().upsert(true))); + bulkOperations.add(new ReplaceOneModel<>(lookupById, doc, new ReplaceOptions().upsert(true))); + processedEvents.add(event); } } + } + + // Write any failed parse events to the DLQ. + for (var failedElement : failedElements) { + context.output(failedWriteTag, failedElement, Instant.now(), GlobalWindow.INSTANCE); + } + if (bulkOperations.isEmpty()) { + return; + } + + try { // Execute bulk write BulkWriteResult result = collection.bulkWrite(bulkOperations); LOG.debug( @@ -1238,7 +1265,7 @@ private void processBatchFinish(String collectionName, FinishBundleContext conte result.getDeletedCount()); // Output successful events - for (MongoDbChangeEventContext event : events) { + for (MongoDbChangeEventContext event : processedEvents) { context.output(successfulWriteTag, event, Instant.now(), GlobalWindow.INSTANCE); } } catch (Exception e) { @@ -1249,7 +1276,7 @@ private void processBatchFinish(String collectionName, FinishBundleContext conte e); // On error, output all events as failed - for (MongoDbChangeEventContext event : events) { + for (MongoDbChangeEventContext event : processedEvents) { FailsafeElement failedElement = FailsafeElement.of(event, event); failedElement.setErrorMessage(e.getMessage()); diff --git a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MongoDbChangeEventContext.java b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MongoDbChangeEventContext.java index 63e1da8582..d72121901a 100644 --- a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MongoDbChangeEventContext.java +++ b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MongoDbChangeEventContext.java @@ -59,6 +59,8 @@ public class MongoDbChangeEventContext implements Serializable { private final Document timestampDoc; private boolean isDlqReconsumed; private int retryCount; + private transient Document parsedDocument; + private transient Exception parseError; /** Gets the change type from the event metadata. */ private String getChangeType(JsonNode changeEvent) { @@ -221,6 +223,28 @@ public String getDataAsJsonString() { return jsonStringData; } + public Document getDataAsDocument() { + if (isDeleteEvent) { + return null; + } + if (parsedDocument == null && parseError == null) { + try { + parsedDocument = Utils.jsonToDocument(getDataAsJsonString(), getDocumentId()); + } catch (Exception e) { + parseError = e; + } + } + return parsedDocument; + } + + public boolean hasParseError() { + return parseError != null; + } + + public Exception getParseError() { + return parseError; + } + public Document getTimestampDoc() { return timestampDoc; } diff --git a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/transforms/Utils.java b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/transforms/Utils.java index 4462fa635d..543d611ad2 100644 --- a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/transforms/Utils.java +++ b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/transforms/Utils.java @@ -51,6 +51,10 @@ public static Document jsonToDocument(String jsonString, Object documentId) { "Document parsing for {} failed due to {}, try casting.", jsonString, ex.getMessage()); rawDoc = (Document) Document.parse(jsonString).get(DATA_COL); } + if (rawDoc == null) { + throw new IllegalArgumentException( + String.format("JSON string does not contain a valid '%s' field or it is null: %s", DATA_COL, jsonString)); + } rawDoc.put(MongoDbChangeEventContext.DOC_ID_COL, documentId); return rawDoc; } diff --git a/v2/datastream-mongodb-to-firestore/src/test/java/com/google/cloud/teleport/v2/transforms/UtilsTest.java b/v2/datastream-mongodb-to-firestore/src/test/java/com/google/cloud/teleport/v2/transforms/UtilsTest.java index 923f45ffc2..cf976fb9fb 100644 --- a/v2/datastream-mongodb-to-firestore/src/test/java/com/google/cloud/teleport/v2/transforms/UtilsTest.java +++ b/v2/datastream-mongodb-to-firestore/src/test/java/com/google/cloud/teleport/v2/transforms/UtilsTest.java @@ -139,4 +139,16 @@ public void testJsonToDocument() { .equals( "{\"_id\": 1, \"arrayField\": [\"hello\", 10], \"dateField\": {\"$date\": \"2019-08-11T17:54:14.692Z\"}, \"dateBefore1970\": {\"$date\": {\"$numberLong\": \"-1577923200000\"}}, \"decimal128Field\": {\"$numberDecimal\": \"10.99\"}, \"documentField\": {\"a\": \"hello\"}, \"doubleField\": 10.5, \"infiniteNumber\": {\"$numberDouble\": \"Infinity\"}, \"int32field\": 10, \"int64Field\": 50, \"minKeyField\": {\"$minKey\": 1}, \"maxKeyField\": {\"$maxKey\": 1}, \"regexField\": {\"$regularExpression\": {\"pattern\": \"^H\", \"options\": \"i\"}}, \"timestampField\": {\"$timestamp\": {\"t\": 1565545664, \"i\": 1}}, \"uuid\": {\"$binary\": {\"base64\": \"OyQRAeK7QlWMr0E2xWapYg==\", \"subType\": \"04\"}}}")); } + + @Test(expected = IllegalArgumentException.class) + public void testJsonToDocument_missingDataField() { + String jsonString = "{\"id\": 123, \"name\": \"test\"}"; + Utils.jsonToDocument(jsonString, 1L); + } + + @Test(expected = IllegalArgumentException.class) + public void testJsonToDocument_nullDataField() { + String jsonString = "{\"data\": null}"; + Utils.jsonToDocument(jsonString, 1L); + } } From ea9e9eb5b5cc9746bde1bc061b72e365a3f5eb61 Mon Sep 17 00:00:00 2001 From: Michael Le Date: Thu, 30 Apr 2026 12:05:46 -0400 Subject: [PATCH 2/4] Fix build --- .../teleport/v2/templates/DataStreamMongoDBToFirestore.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java index e9bee5876b..a3e553093d 100644 --- a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java +++ b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java @@ -1159,7 +1159,7 @@ private void processBatch(String collectionName, MultiOutputReceiver out) { if (event.hasParseError()) { Exception exception = event.getParseError(); FailsafeElement failedElement = FailsafeElement.of(event, event); - failedElement.setException(exception); + failedElement.setErrorMessage(exception.getMessage()); failedElement.setStacktrace(Arrays.deepToString(exception.getStackTrace())); failedElements.add(failedElement); } else { @@ -1236,7 +1236,7 @@ private void processBatchFinish(String collectionName, FinishBundleContext conte if (event.hasParseError()) { Exception exception = event.getParseError(); FailsafeElement failedElement = FailsafeElement.of(event, event); - failedElement.setException(exception); + failedElement.setErrorMessage(exception.getMessage()); failedElement.setStacktrace(Arrays.deepToString(exception.getStackTrace())); failedElements.add(failedElement); } else { From cccf22d759bd3daaacc5b1f3f7f18c0512375531 Mon Sep 17 00:00:00 2001 From: Michael Le Date: Thu, 30 Apr 2026 12:25:11 -0400 Subject: [PATCH 3/4] Move common logic into a couple helper functions --- .../DataStreamMongoDBToFirestore.java | 123 +++++++----------- 1 file changed, 46 insertions(+), 77 deletions(-) diff --git a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java index a3e553093d..7df95aa9a6 100644 --- a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java +++ b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java @@ -15,6 +15,8 @@ */ package com.google.cloud.teleport.v2.templates; +import java.util.function.Consumer; + import static com.mongodb.client.model.Filters.eq; import com.fasterxml.jackson.databind.JsonNode; @@ -1136,14 +1138,12 @@ public void finishBundle(FinishBundleContext context) { } } - private void processBatch(String collectionName, MultiOutputReceiver out) { - List events = bufferedEvents.get(collectionName); - MongoCollection collection = collectionMap.get(collectionName); - - if (events.isEmpty()) { - return; - } + private record PreparedBatch( + List> bulkOperations, + List processedEvents, + List> failedElements) {} + private PreparedBatch prepareBatch(List events) { List> bulkOperations = new ArrayList<>(events.size()); List processedEvents = new ArrayList<>(); List> failedElements = new ArrayList<>(); @@ -1168,20 +1168,30 @@ private void processBatch(String collectionName, MultiOutputReceiver out) { } } } + return new PreparedBatch(bulkOperations, processedEvents, failedElements); + } + + private void writeBatch( + String collectionName, + List events, + MongoCollection collection, + Consumer successConsumer, + Consumer> failureConsumer) { + + PreparedBatch preparedBatch = prepareBatch(events); // Write any failed parse events to the DLQ. - for (var failedElement : failedElements) { - out.get(failedWriteTag).output(failedElement); + for (var failedElement : preparedBatch.failedElements()) { + failureConsumer.accept(failedElement); } - if (bulkOperations.isEmpty()) { + if (preparedBatch.bulkOperations().isEmpty()) { return; } try { - // Execute bulk write - BulkWriteResult result = collection.bulkWrite(bulkOperations); + BulkWriteResult result = collection.bulkWrite(preparedBatch.bulkOperations()); LOG.debug( "Bulk write completed for collection {}: {} inserts/updates, {} deletes", collectionName, @@ -1189,8 +1199,8 @@ private void processBatch(String collectionName, MultiOutputReceiver out) { result.getDeletedCount()); // Output successful events - for (MongoDbChangeEventContext event : processedEvents) { - out.get(successfulWriteTag).output(event); + for (MongoDbChangeEventContext event : preparedBatch.processedEvents()) { + successConsumer.accept(event); } } catch (Exception e) { LOG.error( @@ -1200,20 +1210,17 @@ private void processBatch(String collectionName, MultiOutputReceiver out) { e); // On error, output all events as failed - for (MongoDbChangeEventContext event : processedEvents) { + for (MongoDbChangeEventContext event : preparedBatch.processedEvents()) { FailsafeElement failedElement = FailsafeElement.of(event, event); failedElement.setErrorMessage(e.getMessage()); failedElement.setStacktrace(Arrays.deepToString(e.getStackTrace())); - out.get(failedWriteTag).output(failedElement); + failureConsumer.accept(failedElement); } } - - // Clear the processed batch - events.clear(); } - private void processBatchFinish(String collectionName, FinishBundleContext context) { + private void processBatch(String collectionName, MultiOutputReceiver out) { List events = bufferedEvents.get(collectionName); MongoCollection collection = collectionMap.get(collectionName); @@ -1221,69 +1228,31 @@ private void processBatchFinish(String collectionName, FinishBundleContext conte return; } - List> bulkOperations = new ArrayList<>(events.size()); - List processedEvents = new ArrayList<>(); - List> failedElements = new ArrayList<>(); + writeBatch( + collectionName, + events, + collection, + event -> out.get(successfulWriteTag).output(event), + failedElement -> out.get(failedWriteTag).output(failedElement)); - for (MongoDbChangeEventContext event : events) { - var documentId = event.getDocumentId(); - var lookupById = eq("_id", documentId); - if (event.isDeleteEvent()) { - bulkOperations.add(new DeleteOneModel<>(lookupById)); - processedEvents.add(event); - } else { - Document doc = event.getDataAsDocument(); - if (event.hasParseError()) { - Exception exception = event.getParseError(); - FailsafeElement failedElement = FailsafeElement.of(event, event); - failedElement.setErrorMessage(exception.getMessage()); - failedElement.setStacktrace(Arrays.deepToString(exception.getStackTrace())); - failedElements.add(failedElement); - } else { - bulkOperations.add(new ReplaceOneModel<>(lookupById, doc, new ReplaceOptions().upsert(true))); - processedEvents.add(event); - } - } - } + // Clear the processed batch + events.clear(); + } - // Write any failed parse events to the DLQ. - for (var failedElement : failedElements) { - context.output(failedWriteTag, failedElement, Instant.now(), GlobalWindow.INSTANCE); - } + private void processBatchFinish(String collectionName, FinishBundleContext context) { + List events = bufferedEvents.get(collectionName); + MongoCollection collection = collectionMap.get(collectionName); - if (bulkOperations.isEmpty()) { + if (events.isEmpty()) { return; } - try { - // Execute bulk write - BulkWriteResult result = collection.bulkWrite(bulkOperations); - LOG.debug( - "Bulk write completed for collection {}: {} inserts/updates, {} deletes", - collectionName, - result.getInsertedCount() + result.getModifiedCount() + result.getUpserts().size(), - result.getDeletedCount()); - - // Output successful events - for (MongoDbChangeEventContext event : processedEvents) { - context.output(successfulWriteTag, event, Instant.now(), GlobalWindow.INSTANCE); - } - } catch (Exception e) { - LOG.error( - "Error processing backfill batch for collection {}: {}", - collectionName, - e.getMessage(), - e); - - // On error, output all events as failed - for (MongoDbChangeEventContext event : processedEvents) { - FailsafeElement failedElement = - FailsafeElement.of(event, event); - failedElement.setErrorMessage(e.getMessage()); - failedElement.setStacktrace(Arrays.deepToString(e.getStackTrace())); - context.output(failedWriteTag, failedElement, Instant.now(), GlobalWindow.INSTANCE); - } - } + writeBatch( + collectionName, + events, + collection, + event -> context.output(successfulWriteTag, event, Instant.now(), GlobalWindow.INSTANCE), + failedElement -> context.output(failedWriteTag, failedElement, Instant.now(), GlobalWindow.INSTANCE)); // Clear the processed batch events.clear(); From edb54467156018628c5b92158802678855c8a4ed Mon Sep 17 00:00:00 2001 From: Michael Le Date: Thu, 30 Apr 2026 16:30:46 +0000 Subject: [PATCH 4/4] Format code --- .../DataStreamMongoDBToFirestore.java | 23 +++++++++------- .../cloud/teleport/v2/transforms/Utils.java | 4 ++- .../teleport/v2/transforms/UtilsTest.java | 27 +++++++++++++++++-- 3 files changed, 41 insertions(+), 13 deletions(-) diff --git a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java index 7df95aa9a6..cc325b2b31 100644 --- a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java +++ b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java @@ -15,8 +15,6 @@ */ package com.google.cloud.teleport.v2.templates; -import java.util.function.Consumer; - import static com.mongodb.client.model.Filters.eq; import com.fasterxml.jackson.databind.JsonNode; @@ -39,7 +37,6 @@ import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.JavascriptTextTransformerOptions; import com.google.cloud.teleport.v2.transforms.MongoDbEventDeadLetterQueueSanitizer; import com.google.cloud.teleport.v2.transforms.ProcessChangeEventFn; -import com.google.cloud.teleport.v2.transforms.Utils; import com.google.cloud.teleport.v2.values.FailsafeElement; import com.google.common.base.Strings; import com.mongodb.MongoClientSettings; @@ -60,6 +57,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; import org.apache.beam.sdk.Pipeline; @@ -83,7 +81,6 @@ import org.apache.beam.sdk.values.TupleTagList; import org.bson.Document; import org.bson.UuidRepresentation; -import org.bson.conversions.Bson; import org.joda.time.Duration; import org.joda.time.Instant; import org.slf4j.Logger; @@ -1141,12 +1138,14 @@ public void finishBundle(FinishBundleContext context) { private record PreparedBatch( List> bulkOperations, List processedEvents, - List> failedElements) {} + List> + failedElements) {} private PreparedBatch prepareBatch(List events) { List> bulkOperations = new ArrayList<>(events.size()); List processedEvents = new ArrayList<>(); - List> failedElements = new ArrayList<>(); + List> failedElements = + new ArrayList<>(); for (MongoDbChangeEventContext event : events) { var documentId = event.getDocumentId(); @@ -1158,12 +1157,14 @@ private PreparedBatch prepareBatch(List events) { Document doc = event.getDataAsDocument(); if (event.hasParseError()) { Exception exception = event.getParseError(); - FailsafeElement failedElement = FailsafeElement.of(event, event); + FailsafeElement failedElement = + FailsafeElement.of(event, event); failedElement.setErrorMessage(exception.getMessage()); failedElement.setStacktrace(Arrays.deepToString(exception.getStackTrace())); failedElements.add(failedElement); } else { - bulkOperations.add(new ReplaceOneModel<>(lookupById, doc, new ReplaceOptions().upsert(true))); + bulkOperations.add( + new ReplaceOneModel<>(lookupById, doc, new ReplaceOptions().upsert(true))); processedEvents.add(event); } } @@ -1176,7 +1177,8 @@ private void writeBatch( List events, MongoCollection collection, Consumer successConsumer, - Consumer> failureConsumer) { + Consumer> + failureConsumer) { PreparedBatch preparedBatch = prepareBatch(events); @@ -1252,7 +1254,8 @@ private void processBatchFinish(String collectionName, FinishBundleContext conte events, collection, event -> context.output(successfulWriteTag, event, Instant.now(), GlobalWindow.INSTANCE), - failedElement -> context.output(failedWriteTag, failedElement, Instant.now(), GlobalWindow.INSTANCE)); + failedElement -> + context.output(failedWriteTag, failedElement, Instant.now(), GlobalWindow.INSTANCE)); // Clear the processed batch events.clear(); diff --git a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/transforms/Utils.java b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/transforms/Utils.java index 543d611ad2..e59827eabd 100644 --- a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/transforms/Utils.java +++ b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/transforms/Utils.java @@ -53,7 +53,9 @@ public static Document jsonToDocument(String jsonString, Object documentId) { } if (rawDoc == null) { throw new IllegalArgumentException( - String.format("JSON string does not contain a valid '%s' field or it is null: %s", DATA_COL, jsonString)); + String.format( + "JSON string does not contain a valid '%s' field or it is null: %s", + DATA_COL, jsonString)); } rawDoc.put(MongoDbChangeEventContext.DOC_ID_COL, documentId); return rawDoc; diff --git a/v2/datastream-mongodb-to-firestore/src/test/java/com/google/cloud/teleport/v2/transforms/UtilsTest.java b/v2/datastream-mongodb-to-firestore/src/test/java/com/google/cloud/teleport/v2/transforms/UtilsTest.java index cf976fb9fb..0957ed5d23 100644 --- a/v2/datastream-mongodb-to-firestore/src/test/java/com/google/cloud/teleport/v2/transforms/UtilsTest.java +++ b/v2/datastream-mongodb-to-firestore/src/test/java/com/google/cloud/teleport/v2/transforms/UtilsTest.java @@ -131,13 +131,36 @@ public void testIsNewerTimestamp_sameTimestamp() { @Test public void testJsonToDocument() { String jsonString = - "{\"_id\":\"{\\\"$oid\\\": \\\"6811235eaf8583310cb9d2e9\\\"}\",\"data\":\"{\\\"_id\\\": {\\\"$oid\\\": \\\"6811235eaf8583310cb9d2e9\\\"},\\\"arrayField\\\": [\\\"hello\\\",10],\\\"dateField\\\": {\\\"$date\\\": 1565546054692},\\\"dateBefore1970\\\": {\\\"$date\\\": -1577923200000},\\\"decimal128Field\\\": {\\\"$numberDecimal\\\": \\\"10.99\\\"},\\\"documentField\\\": {\\\"a\\\": \\\"hello\\\"},\\\"doubleField\\\": 10.5,\\\"infiniteNumber\\\": Infinity,\\\"int32field\\\": 10,\\\"int64Field\\\": {\\\"$numberLong\\\": \\\"50\\\"},\\\"minKeyField\\\": {\\\"$minKey\\\": 1},\\\"maxKeyField\\\": {\\\"$maxKey\\\": 1},\\\"regexField\\\": {\\\"$regex\\\": \\\"^H\\\",\\\"$options\\\": \\\"i\\\"},\\\"timestampField\\\": {\\\"$timestamp\\\": {\\\"t\\\": 1565545664,\\\"i\\\": 1}},\\\"uuid\\\": {\\\"$binary\\\": \\\"OyQRAeK7QlWMr0E2xWapYg==\\\",\\\"$type\\\": \\\"04\\\"}}\",\"_metadata_stream\":\"extended-types-test\",\"_metadata_timestamp\":1745957556,\"_metadata_read_timestamp\":1745957556,\"_metadata_dataflow_timestamp\":1745963670,\"_metadata_read_method\":\"backfill\",\"_metadata_source_type\":\"backfill\",\"_metadata_deleted\":false,\"_metadata_table\":null,\"_metadata_change_type\":\"READ\",\"_metadata_primary_keys\":null,\"_metadata_uuid\":\"2a9cf8eb-4f35-433c-899a-39921d4c8587\",\"_metadata_timestamp_seconds\":\"1745957556\",\"_metadata_timestamp_nanos\":\"184498000\",\"_metadata_source\":{\"database\":\"extended_types\",\"collection\":\"mycol\",\"change_type\":\"READ\",\"is_deleted\":false,\"primary_key\":[\"_id\"]},\"_metadata_error\":null,\"_metadata_retry_count\":116}"; + "{\"_id\":\"{\\\"$oid\\\": \\\"6811235eaf8583310cb9d2e9\\\"}\",\"data\":\"{\\\"_id\\\":" + + " {\\\"$oid\\\": \\\"6811235eaf8583310cb9d2e9\\\"},\\\"arrayField\\\":" + + " [\\\"hello\\\",10],\\\"dateField\\\": {\\\"$date\\\":" + + " 1565546054692},\\\"dateBefore1970\\\": {\\\"$date\\\":" + + " -1577923200000},\\\"decimal128Field\\\": {\\\"$numberDecimal\\\":" + + " \\\"10.99\\\"},\\\"documentField\\\": {\\\"a\\\":" + + " \\\"hello\\\"},\\\"doubleField\\\": 10.5,\\\"infiniteNumber\\\":" + + " Infinity,\\\"int32field\\\": 10,\\\"int64Field\\\": {\\\"$numberLong\\\":" + + " \\\"50\\\"},\\\"minKeyField\\\": {\\\"$minKey\\\": 1},\\\"maxKeyField\\\":" + + " {\\\"$maxKey\\\": 1},\\\"regexField\\\": {\\\"$regex\\\":" + + " \\\"^H\\\",\\\"$options\\\": \\\"i\\\"},\\\"timestampField\\\":" + + " {\\\"$timestamp\\\": {\\\"t\\\": 1565545664,\\\"i\\\": 1}},\\\"uuid\\\":" + + " {\\\"$binary\\\": \\\"OyQRAeK7QlWMr0E2xWapYg==\\\",\\\"$type\\\":" + + " \\\"04\\\"}}\",\"_metadata_stream\":\"extended-types-test\",\"_metadata_timestamp\":1745957556,\"_metadata_read_timestamp\":1745957556,\"_metadata_dataflow_timestamp\":1745963670,\"_metadata_read_method\":\"backfill\",\"_metadata_source_type\":\"backfill\",\"_metadata_deleted\":false,\"_metadata_table\":null,\"_metadata_change_type\":\"READ\",\"_metadata_primary_keys\":null,\"_metadata_uuid\":\"2a9cf8eb-4f35-433c-899a-39921d4c8587\",\"_metadata_timestamp_seconds\":\"1745957556\",\"_metadata_timestamp_nanos\":\"184498000\",\"_metadata_source\":{\"database\":\"extended_types\",\"collection\":\"mycol\",\"change_type\":\"READ\",\"is_deleted\":false,\"primary_key\":[\"_id\"]},\"_metadata_error\":null,\"_metadata_retry_count\":116}"; Document result = Utils.jsonToDocument(jsonString, 1L); assertTrue( result .toJson() .equals( - "{\"_id\": 1, \"arrayField\": [\"hello\", 10], \"dateField\": {\"$date\": \"2019-08-11T17:54:14.692Z\"}, \"dateBefore1970\": {\"$date\": {\"$numberLong\": \"-1577923200000\"}}, \"decimal128Field\": {\"$numberDecimal\": \"10.99\"}, \"documentField\": {\"a\": \"hello\"}, \"doubleField\": 10.5, \"infiniteNumber\": {\"$numberDouble\": \"Infinity\"}, \"int32field\": 10, \"int64Field\": 50, \"minKeyField\": {\"$minKey\": 1}, \"maxKeyField\": {\"$maxKey\": 1}, \"regexField\": {\"$regularExpression\": {\"pattern\": \"^H\", \"options\": \"i\"}}, \"timestampField\": {\"$timestamp\": {\"t\": 1565545664, \"i\": 1}}, \"uuid\": {\"$binary\": {\"base64\": \"OyQRAeK7QlWMr0E2xWapYg==\", \"subType\": \"04\"}}}")); + "{\"_id\": 1, \"arrayField\": [\"hello\", 10], \"dateField\": {\"$date\":" + + " \"2019-08-11T17:54:14.692Z\"}, \"dateBefore1970\": {\"$date\":" + + " {\"$numberLong\": \"-1577923200000\"}}, \"decimal128Field\":" + + " {\"$numberDecimal\": \"10.99\"}, \"documentField\": {\"a\": \"hello\"}," + + " \"doubleField\": 10.5, \"infiniteNumber\": {\"$numberDouble\":" + + " \"Infinity\"}, \"int32field\": 10, \"int64Field\": 50, \"minKeyField\":" + + " {\"$minKey\": 1}, \"maxKeyField\": {\"$maxKey\": 1}, \"regexField\":" + + " {\"$regularExpression\": {\"pattern\": \"^H\", \"options\": \"i\"}}," + + " \"timestampField\": {\"$timestamp\": {\"t\": 1565545664, \"i\": 1}}," + + " \"uuid\": {\"$binary\": {\"base64\": \"OyQRAeK7QlWMr0E2xWapYg==\"," + + " \"subType\": \"04\"}}}")); } @Test(expected = IllegalArgumentException.class)