diff --git a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java index 9cdd1a50f2..cc325b2b31 100644 --- a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java +++ b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java @@ -37,7 +37,6 @@ import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.JavascriptTextTransformerOptions; import com.google.cloud.teleport.v2.transforms.MongoDbEventDeadLetterQueueSanitizer; import com.google.cloud.teleport.v2.transforms.ProcessChangeEventFn; -import com.google.cloud.teleport.v2.transforms.Utils; import com.google.cloud.teleport.v2.values.FailsafeElement; import com.google.common.base.Strings; import com.mongodb.MongoClientSettings; @@ -58,6 +57,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.TimeUnit; +import java.util.function.Consumer; import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions; import org.apache.beam.sdk.Pipeline; @@ -81,7 +81,6 @@ import org.apache.beam.sdk.values.TupleTagList; import org.bson.Document; import org.bson.UuidRepresentation; -import org.bson.conversions.Bson; import org.joda.time.Duration; import org.joda.time.Instant; import org.slf4j.Logger; @@ -1136,38 +1135,65 @@ public void finishBundle(FinishBundleContext context) { } } - private void processBatch(String collectionName, MultiOutputReceiver out) { - List events = bufferedEvents.get(collectionName); - MongoCollection collection = collectionMap.get(collectionName); - - if (events.isEmpty()) { - return; - } - - try { - // Create bulk operation - List> bulkOperations = new ArrayList<>(events.size()); - - // Add operations to bulk - for (MongoDbChangeEventContext event : events) { - Object docId = event.getDocumentId(); - Bson lookupById = eq("_id", docId); - - if (event.isDeleteEvent()) { - // Add delete operation - bulkOperations.add(new DeleteOneModel<>(lookupById)); + private record PreparedBatch( + List> bulkOperations, + List processedEvents, + List> + failedElements) {} + + private PreparedBatch prepareBatch(List events) { + List> bulkOperations = new ArrayList<>(events.size()); + List processedEvents = new ArrayList<>(); + List> failedElements = + new ArrayList<>(); + + for (MongoDbChangeEventContext event : events) { + var documentId = event.getDocumentId(); + var lookupById = eq("_id", documentId); + if (event.isDeleteEvent()) { + bulkOperations.add(new DeleteOneModel<>(lookupById)); + processedEvents.add(event); + } else { + Document doc = event.getDataAsDocument(); + if (event.hasParseError()) { + Exception exception = event.getParseError(); + FailsafeElement failedElement = + FailsafeElement.of(event, event); + failedElement.setErrorMessage(exception.getMessage()); + failedElement.setStacktrace(Arrays.deepToString(exception.getStackTrace())); + failedElements.add(failedElement); } else { - // Add upsert operation bulkOperations.add( - new ReplaceOneModel<>( - lookupById, - Utils.jsonToDocument(event.getDataAsJsonString(), event.getDocumentId()), - new ReplaceOptions().upsert(true))); + new ReplaceOneModel<>(lookupById, doc, new ReplaceOptions().upsert(true))); + processedEvents.add(event); } } + } + return new PreparedBatch(bulkOperations, processedEvents, failedElements); + } + + private void writeBatch( + String collectionName, + List events, + MongoCollection collection, + Consumer successConsumer, + Consumer> + failureConsumer) { + PreparedBatch preparedBatch = prepareBatch(events); + + // Write any failed parse events to the DLQ. + for (var failedElement : preparedBatch.failedElements()) { + failureConsumer.accept(failedElement); + } + + if (preparedBatch.bulkOperations().isEmpty()) { + return; + } + + try { // Execute bulk write - BulkWriteResult result = collection.bulkWrite(bulkOperations); + BulkWriteResult result = collection.bulkWrite(preparedBatch.bulkOperations()); LOG.debug( "Bulk write completed for collection {}: {} inserts/updates, {} deletes", collectionName, @@ -1175,8 +1201,8 @@ private void processBatch(String collectionName, MultiOutputReceiver out) { result.getDeletedCount()); // Output successful events - for (MongoDbChangeEventContext event : events) { - out.get(successfulWriteTag).output(event); + for (MongoDbChangeEventContext event : preparedBatch.processedEvents()) { + successConsumer.accept(event); } } catch (Exception e) { LOG.error( @@ -1186,20 +1212,17 @@ private void processBatch(String collectionName, MultiOutputReceiver out) { e); // On error, output all events as failed - for (MongoDbChangeEventContext event : events) { + for (MongoDbChangeEventContext event : preparedBatch.processedEvents()) { FailsafeElement failedElement = FailsafeElement.of(event, event); failedElement.setErrorMessage(e.getMessage()); failedElement.setStacktrace(Arrays.deepToString(e.getStackTrace())); - out.get(failedWriteTag).output(failedElement); + failureConsumer.accept(failedElement); } } - - // Clear the processed batch - events.clear(); } - private void processBatchFinish(String collectionName, FinishBundleContext context) { + private void processBatch(String collectionName, MultiOutputReceiver out) { List events = bufferedEvents.get(collectionName); MongoCollection collection = collectionMap.get(collectionName); @@ -1207,57 +1230,33 @@ private void processBatchFinish(String collectionName, FinishBundleContext conte return; } - try { - // Create bulk operation - List> bulkOperations = new ArrayList<>(events.size()); - - // Add operations to bulk - for (MongoDbChangeEventContext event : events) { - Object docId = event.getDocumentId(); - Bson lookupById = eq("_id", docId); - - if (event.isDeleteEvent()) { - // Add delete operation - bulkOperations.add(new DeleteOneModel<>(lookupById)); - } else { - // Add upsert operation - bulkOperations.add( - new ReplaceOneModel<>( - lookupById, - Utils.jsonToDocument(event.getDataAsJsonString(), event.getDocumentId()), - new ReplaceOptions().upsert(true))); - } - } + writeBatch( + collectionName, + events, + collection, + event -> out.get(successfulWriteTag).output(event), + failedElement -> out.get(failedWriteTag).output(failedElement)); - // Execute bulk write - BulkWriteResult result = collection.bulkWrite(bulkOperations); - LOG.debug( - "Bulk write completed for collection {}: {} inserts/updates, {} deletes", - collectionName, - result.getInsertedCount() + result.getModifiedCount() + result.getUpserts().size(), - result.getDeletedCount()); + // Clear the processed batch + events.clear(); + } - // Output successful events - for (MongoDbChangeEventContext event : events) { - context.output(successfulWriteTag, event, Instant.now(), GlobalWindow.INSTANCE); - } - } catch (Exception e) { - LOG.error( - "Error processing backfill batch for collection {}: {}", - collectionName, - e.getMessage(), - e); + private void processBatchFinish(String collectionName, FinishBundleContext context) { + List events = bufferedEvents.get(collectionName); + MongoCollection collection = collectionMap.get(collectionName); - // On error, output all events as failed - for (MongoDbChangeEventContext event : events) { - FailsafeElement failedElement = - FailsafeElement.of(event, event); - failedElement.setErrorMessage(e.getMessage()); - failedElement.setStacktrace(Arrays.deepToString(e.getStackTrace())); - context.output(failedWriteTag, failedElement, Instant.now(), GlobalWindow.INSTANCE); - } + if (events.isEmpty()) { + return; } + writeBatch( + collectionName, + events, + collection, + event -> context.output(successfulWriteTag, event, Instant.now(), GlobalWindow.INSTANCE), + failedElement -> + context.output(failedWriteTag, failedElement, Instant.now(), GlobalWindow.INSTANCE)); + // Clear the processed batch events.clear(); } diff --git a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MongoDbChangeEventContext.java b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MongoDbChangeEventContext.java index 63e1da8582..d72121901a 100644 --- a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MongoDbChangeEventContext.java +++ b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MongoDbChangeEventContext.java @@ -59,6 +59,8 @@ public class MongoDbChangeEventContext implements Serializable { private final Document timestampDoc; private boolean isDlqReconsumed; private int retryCount; + private transient Document parsedDocument; + private transient Exception parseError; /** Gets the change type from the event metadata. */ private String getChangeType(JsonNode changeEvent) { @@ -221,6 +223,28 @@ public String getDataAsJsonString() { return jsonStringData; } + public Document getDataAsDocument() { + if (isDeleteEvent) { + return null; + } + if (parsedDocument == null && parseError == null) { + try { + parsedDocument = Utils.jsonToDocument(getDataAsJsonString(), getDocumentId()); + } catch (Exception e) { + parseError = e; + } + } + return parsedDocument; + } + + public boolean hasParseError() { + return parseError != null; + } + + public Exception getParseError() { + return parseError; + } + public Document getTimestampDoc() { return timestampDoc; } diff --git a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/transforms/Utils.java b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/transforms/Utils.java index 4462fa635d..e59827eabd 100644 --- a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/transforms/Utils.java +++ b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/transforms/Utils.java @@ -51,6 +51,12 @@ public static Document jsonToDocument(String jsonString, Object documentId) { "Document parsing for {} failed due to {}, try casting.", jsonString, ex.getMessage()); rawDoc = (Document) Document.parse(jsonString).get(DATA_COL); } + if (rawDoc == null) { + throw new IllegalArgumentException( + String.format( + "JSON string does not contain a valid '%s' field or it is null: %s", + DATA_COL, jsonString)); + } rawDoc.put(MongoDbChangeEventContext.DOC_ID_COL, documentId); return rawDoc; } diff --git a/v2/datastream-mongodb-to-firestore/src/test/java/com/google/cloud/teleport/v2/transforms/UtilsTest.java b/v2/datastream-mongodb-to-firestore/src/test/java/com/google/cloud/teleport/v2/transforms/UtilsTest.java index 923f45ffc2..0957ed5d23 100644 --- a/v2/datastream-mongodb-to-firestore/src/test/java/com/google/cloud/teleport/v2/transforms/UtilsTest.java +++ b/v2/datastream-mongodb-to-firestore/src/test/java/com/google/cloud/teleport/v2/transforms/UtilsTest.java @@ -131,12 +131,47 @@ public void testIsNewerTimestamp_sameTimestamp() { @Test public void testJsonToDocument() { String jsonString = - "{\"_id\":\"{\\\"$oid\\\": \\\"6811235eaf8583310cb9d2e9\\\"}\",\"data\":\"{\\\"_id\\\": {\\\"$oid\\\": \\\"6811235eaf8583310cb9d2e9\\\"},\\\"arrayField\\\": [\\\"hello\\\",10],\\\"dateField\\\": {\\\"$date\\\": 1565546054692},\\\"dateBefore1970\\\": {\\\"$date\\\": -1577923200000},\\\"decimal128Field\\\": {\\\"$numberDecimal\\\": \\\"10.99\\\"},\\\"documentField\\\": {\\\"a\\\": \\\"hello\\\"},\\\"doubleField\\\": 10.5,\\\"infiniteNumber\\\": Infinity,\\\"int32field\\\": 10,\\\"int64Field\\\": {\\\"$numberLong\\\": \\\"50\\\"},\\\"minKeyField\\\": {\\\"$minKey\\\": 1},\\\"maxKeyField\\\": {\\\"$maxKey\\\": 1},\\\"regexField\\\": {\\\"$regex\\\": \\\"^H\\\",\\\"$options\\\": \\\"i\\\"},\\\"timestampField\\\": {\\\"$timestamp\\\": {\\\"t\\\": 1565545664,\\\"i\\\": 1}},\\\"uuid\\\": {\\\"$binary\\\": \\\"OyQRAeK7QlWMr0E2xWapYg==\\\",\\\"$type\\\": \\\"04\\\"}}\",\"_metadata_stream\":\"extended-types-test\",\"_metadata_timestamp\":1745957556,\"_metadata_read_timestamp\":1745957556,\"_metadata_dataflow_timestamp\":1745963670,\"_metadata_read_method\":\"backfill\",\"_metadata_source_type\":\"backfill\",\"_metadata_deleted\":false,\"_metadata_table\":null,\"_metadata_change_type\":\"READ\",\"_metadata_primary_keys\":null,\"_metadata_uuid\":\"2a9cf8eb-4f35-433c-899a-39921d4c8587\",\"_metadata_timestamp_seconds\":\"1745957556\",\"_metadata_timestamp_nanos\":\"184498000\",\"_metadata_source\":{\"database\":\"extended_types\",\"collection\":\"mycol\",\"change_type\":\"READ\",\"is_deleted\":false,\"primary_key\":[\"_id\"]},\"_metadata_error\":null,\"_metadata_retry_count\":116}"; + "{\"_id\":\"{\\\"$oid\\\": \\\"6811235eaf8583310cb9d2e9\\\"}\",\"data\":\"{\\\"_id\\\":" + + " {\\\"$oid\\\": \\\"6811235eaf8583310cb9d2e9\\\"},\\\"arrayField\\\":" + + " [\\\"hello\\\",10],\\\"dateField\\\": {\\\"$date\\\":" + + " 1565546054692},\\\"dateBefore1970\\\": {\\\"$date\\\":" + + " -1577923200000},\\\"decimal128Field\\\": {\\\"$numberDecimal\\\":" + + " \\\"10.99\\\"},\\\"documentField\\\": {\\\"a\\\":" + + " \\\"hello\\\"},\\\"doubleField\\\": 10.5,\\\"infiniteNumber\\\":" + + " Infinity,\\\"int32field\\\": 10,\\\"int64Field\\\": {\\\"$numberLong\\\":" + + " \\\"50\\\"},\\\"minKeyField\\\": {\\\"$minKey\\\": 1},\\\"maxKeyField\\\":" + + " {\\\"$maxKey\\\": 1},\\\"regexField\\\": {\\\"$regex\\\":" + + " \\\"^H\\\",\\\"$options\\\": \\\"i\\\"},\\\"timestampField\\\":" + + " {\\\"$timestamp\\\": {\\\"t\\\": 1565545664,\\\"i\\\": 1}},\\\"uuid\\\":" + + " {\\\"$binary\\\": \\\"OyQRAeK7QlWMr0E2xWapYg==\\\",\\\"$type\\\":" + + " \\\"04\\\"}}\",\"_metadata_stream\":\"extended-types-test\",\"_metadata_timestamp\":1745957556,\"_metadata_read_timestamp\":1745957556,\"_metadata_dataflow_timestamp\":1745963670,\"_metadata_read_method\":\"backfill\",\"_metadata_source_type\":\"backfill\",\"_metadata_deleted\":false,\"_metadata_table\":null,\"_metadata_change_type\":\"READ\",\"_metadata_primary_keys\":null,\"_metadata_uuid\":\"2a9cf8eb-4f35-433c-899a-39921d4c8587\",\"_metadata_timestamp_seconds\":\"1745957556\",\"_metadata_timestamp_nanos\":\"184498000\",\"_metadata_source\":{\"database\":\"extended_types\",\"collection\":\"mycol\",\"change_type\":\"READ\",\"is_deleted\":false,\"primary_key\":[\"_id\"]},\"_metadata_error\":null,\"_metadata_retry_count\":116}"; Document result = Utils.jsonToDocument(jsonString, 1L); assertTrue( result .toJson() .equals( - "{\"_id\": 1, \"arrayField\": [\"hello\", 10], \"dateField\": {\"$date\": \"2019-08-11T17:54:14.692Z\"}, \"dateBefore1970\": {\"$date\": {\"$numberLong\": \"-1577923200000\"}}, \"decimal128Field\": {\"$numberDecimal\": \"10.99\"}, \"documentField\": {\"a\": \"hello\"}, \"doubleField\": 10.5, \"infiniteNumber\": {\"$numberDouble\": \"Infinity\"}, \"int32field\": 10, \"int64Field\": 50, \"minKeyField\": {\"$minKey\": 1}, \"maxKeyField\": {\"$maxKey\": 1}, \"regexField\": {\"$regularExpression\": {\"pattern\": \"^H\", \"options\": \"i\"}}, \"timestampField\": {\"$timestamp\": {\"t\": 1565545664, \"i\": 1}}, \"uuid\": {\"$binary\": {\"base64\": \"OyQRAeK7QlWMr0E2xWapYg==\", \"subType\": \"04\"}}}")); + "{\"_id\": 1, \"arrayField\": [\"hello\", 10], \"dateField\": {\"$date\":" + + " \"2019-08-11T17:54:14.692Z\"}, \"dateBefore1970\": {\"$date\":" + + " {\"$numberLong\": \"-1577923200000\"}}, \"decimal128Field\":" + + " {\"$numberDecimal\": \"10.99\"}, \"documentField\": {\"a\": \"hello\"}," + + " \"doubleField\": 10.5, \"infiniteNumber\": {\"$numberDouble\":" + + " \"Infinity\"}, \"int32field\": 10, \"int64Field\": 50, \"minKeyField\":" + + " {\"$minKey\": 1}, \"maxKeyField\": {\"$maxKey\": 1}, \"regexField\":" + + " {\"$regularExpression\": {\"pattern\": \"^H\", \"options\": \"i\"}}," + + " \"timestampField\": {\"$timestamp\": {\"t\": 1565545664, \"i\": 1}}," + + " \"uuid\": {\"$binary\": {\"base64\": \"OyQRAeK7QlWMr0E2xWapYg==\"," + + " \"subType\": \"04\"}}}")); + } + + @Test(expected = IllegalArgumentException.class) + public void testJsonToDocument_missingDataField() { + String jsonString = "{\"id\": 123, \"name\": \"test\"}"; + Utils.jsonToDocument(jsonString, 1L); + } + + @Test(expected = IllegalArgumentException.class) + public void testJsonToDocument_nullDataField() { + String jsonString = "{\"data\": null}"; + Utils.jsonToDocument(jsonString, 1L); } }