diff --git a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java index 4362df9d2b..3689359e83 100644 --- a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java +++ b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java @@ -86,6 +86,8 @@ import org.bson.Document; import org.bson.UuidRepresentation; import org.bson.conversions.Bson; +import org.bson.json.JsonMode; +import org.bson.json.JsonWriterSettings; import org.joda.time.Duration; import org.joda.time.Instant; import org.slf4j.Logger; @@ -132,8 +134,10 @@ public class DataStreamMongoDBToFirestore { private static final Logger LOG = LoggerFactory.getLogger(DataStreamMongoDBToFirestore.class); - private static final TupleTag> UDF_SUCCESS_TAG = new TupleTag<>(); - private static final TupleTag> UDF_FAILURE_TAG = new TupleTag<>(); + + private static final TupleTag> UDF_SUCCESS_TAG = new TupleTag<>(); + private static final TupleTag> UDF_FAILURE_TAG = new TupleTag<>(); + private static final String AVRO_SUFFIX = "avro"; private static final String JSON_SUFFIX = "json"; public static final Set MAPPER_IGNORE_FIELDS = @@ -525,29 +529,7 @@ private static PipelineResult runWithBackfillFirst(Options options, String conne ingestAndNormalizeJson(options, dlqManager, pipeline) .setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); - // Optional Stage 1.5: Apply Javascript UDF for JSON transformation - if (!Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath())) { - LOG.info("Applying Javascript UDF for JSON transformation"); - PCollectionTuple udfResult = - jsonRecords.apply( - "Run UDF", - FailsafeJavascriptUdf.newBuilder() - .setFileSystemPath(options.getJavascriptTextTransformGcsPath()) - .setFunctionName(options.getJavascriptTextTransformFunctionName()) - .setReloadIntervalMinutes( - options.getJavascriptTextTransformReloadIntervalMinutes()) - .setSuccessTag(UDF_SUCCESS_TAG) - .setFailureTag(UDF_FAILURE_TAG) - .build()); - jsonRecords = - udfResult - .get(UDF_SUCCESS_TAG) - .setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); - - // Handle failed UDF processing - writeFailedJsonToDlq(options, udfResult, dlqManager, UDF_FAILURE_TAG); - } // Stage 2: Create MongoDbChangeEventContext objects LOG.info("Creating MongoDbChangeEventContext objects"); @@ -574,11 +556,77 @@ private static PipelineResult runWithBackfillFirst(Options options, String conne dlqManager, CreateMongoDbChangeEventContextFn.failedCreationTag); + PCollection successfulContexts = + changeEventContexts.get(CreateMongoDbChangeEventContextFn.successfulCreationTag); + + if (!Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath())) { + LOG.info("Applying Javascript UDF for Document transformation after context creation"); + + // Split the stream into deletes and non-deletes before UDF. + // Delete events produce null docJson and would be dropped by ExtractUdfInputFn if not handled. + // We bypass UDF for deletes to avoid sending empty payloads to UDF. + PCollectionTuple udfPreparation = + successfulContexts.apply( + "Prepare UDF Input", + ParDo.of(new ExtractUdfInputFn()) + .withOutputTags( + ExtractUdfInputFn.UDF_INPUT_TAG, + TupleTagList.of(ExtractUdfInputFn.DELETES_TAG))); + + // The String in FailsafeElement is the JSON representation of the document data extracted by ExtractUdfInputFn. + PCollection> udfInput = + udfPreparation.get(ExtractUdfInputFn.UDF_INPUT_TAG); + PCollection deletes = + udfPreparation.get(ExtractUdfInputFn.DELETES_TAG); + + // Apply the JavaScript UDF to the JSON payload extracted from the document. + PCollectionTuple udfResult = + udfInput.apply( + "Run UDF on Document", + FailsafeJavascriptUdf.newBuilder() + .setFileSystemPath(options.getJavascriptTextTransformGcsPath()) + .setFunctionName(options.getJavascriptTextTransformFunctionName()) + .setReloadIntervalMinutes( + options.getJavascriptTextTransformReloadIntervalMinutes()) + .setSuccessTag(UDF_SUCCESS_TAG) + .setFailureTag(UDF_FAILURE_TAG) + .build()); + + // After successful UDF execution, we update the MongoDbChangeEventContext + // with the modified JSON string so that subsequent stages use the transformed data. + + TupleTag parseSuccessTag = new TupleTag() {}; + TupleTag> parseFailureTag = new TupleTag>() {}; + + PCollectionTuple mergeResult = + udfResult + .get(UDF_SUCCESS_TAG) + .setCoder( + FailsafeElementCoder.of( + SerializableCoder.of(MongoDbChangeEventContext.class), + StringUtf8Coder.of())) + .apply( + "Merge UDF Result", + ParDo.of(new MergeUdfResultFn(parseFailureTag, options.getShadowCollectionPrefix())) + .withOutputTags(parseSuccessTag, TupleTagList.of(parseFailureTag))); + + successfulContexts = + PCollectionList.of(deletes) + .and(mergeResult.get(parseSuccessTag)) + .apply("Merge Deletes and UDF Results", Flatten.pCollections()); + + // Handle failed UDF processing (both execution and parse failures) + PCollection> executionFailures = udfResult.get(UDF_FAILURE_TAG); + PCollection> parseFailures = mergeResult.get(parseFailureTag); + + writeFailedUDFToDlq(options, executionFailures, dlqManager.getSevereDlqDirectoryWithDateTime() + "udf_execution_failures/", "tmp_udf_execution_failed"); + writeFailedUDFToDlq(options, parseFailures, dlqManager.getSevereDlqDirectoryWithDateTime() + "udf_parse_failures/", "tmp_udf_parse_failed"); + } + // Stage 3: Split events into backfill and CDC streams LOG.info("Splitting events into backfill and CDC streams"); PCollectionTuple splitEvents = - changeEventContexts - .get(CreateMongoDbChangeEventContextFn.successfulCreationTag) + successfulContexts .apply( "Split Backfill and CDC", ParDo.of(new SplitBackfillAndCdcEventsFn()) @@ -746,30 +794,7 @@ private static PipelineResult runAllEventsTogether(Options options, String conne ingestAndNormalizeJson(options, dlqManager, pipeline) .setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); - /* - * Optional Stage 1.5: Apply Javascript UDF to transform JSON strings - */ - if (!Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath())) { - PCollectionTuple udfResult = - jsonRecords.apply( - "Run UDF", - FailsafeJavascriptUdf.newBuilder() - .setFileSystemPath(options.getJavascriptTextTransformGcsPath()) - .setFunctionName(options.getJavascriptTextTransformFunctionName()) - .setReloadIntervalMinutes( - options.getJavascriptTextTransformReloadIntervalMinutes()) - .setSuccessTag(UDF_SUCCESS_TAG) - .setFailureTag(UDF_FAILURE_TAG) - .build()); - jsonRecords = - udfResult - .get(UDF_SUCCESS_TAG) - .setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); - - // Handle failed UDF processing - writeFailedJsonToDlq(options, udfResult, dlqManager, UDF_FAILURE_TAG); - } LOG.info("Stage 1: Completed ingestion of data from GCS"); @@ -803,11 +828,76 @@ private static PipelineResult runAllEventsTogether(Options options, String conne dlqManager, CreateMongoDbChangeEventContextFn.failedCreationTag); + PCollection successfulContexts = + changeEventContexts.get(CreateMongoDbChangeEventContextFn.successfulCreationTag); + + if (!Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath())) { + LOG.info("Applying Javascript UDF for Document transformation after context creation"); + + // Split the stream into deletes and non-deletes before UDF. + // Delete events produce null docJson and would be dropped by ExtractUdfInputFn if not handled. + // We bypass UDF for deletes to avoid sending empty payloads to UDF. + PCollectionTuple udfPreparation = + successfulContexts.apply( + "Prepare UDF Input", + ParDo.of(new ExtractUdfInputFn()) + .withOutputTags( + ExtractUdfInputFn.UDF_INPUT_TAG, + TupleTagList.of(ExtractUdfInputFn.DELETES_TAG))); + + PCollection> udfInput = + udfPreparation.get(ExtractUdfInputFn.UDF_INPUT_TAG); + PCollection deletes = + udfPreparation.get(ExtractUdfInputFn.DELETES_TAG); + + // Apply the JavaScript UDF to the JSON payload extracted from the document. + PCollectionTuple udfResult = + udfInput.apply( + "Run UDF on Document", + FailsafeJavascriptUdf.newBuilder() + .setFileSystemPath(options.getJavascriptTextTransformGcsPath()) + .setFunctionName(options.getJavascriptTextTransformFunctionName()) + .setReloadIntervalMinutes( + options.getJavascriptTextTransformReloadIntervalMinutes()) + .setSuccessTag(UDF_SUCCESS_TAG) + .setFailureTag(UDF_FAILURE_TAG) + .build()); + + // After successful UDF execution, we update the MongoDbChangeEventContext + // with the modified JSON string so that subsequent stages use the transformed data. + + TupleTag parseSuccessTag = new TupleTag() {}; + TupleTag> parseFailureTag = new TupleTag>() {}; + + PCollectionTuple mergeResult = + udfResult + .get(UDF_SUCCESS_TAG) + .setCoder( + FailsafeElementCoder.of( + SerializableCoder.of(MongoDbChangeEventContext.class), + StringUtf8Coder.of())) + .apply( + "Merge UDF Result", + ParDo.of(new MergeUdfResultFn(parseFailureTag, options.getShadowCollectionPrefix())) + .withOutputTags(parseSuccessTag, TupleTagList.of(parseFailureTag))); + + successfulContexts = + PCollectionList.of(deletes) + .and(mergeResult.get(parseSuccessTag)) + .apply("Merge Deletes and UDF Results", Flatten.pCollections()); + + // Handle failed UDF processing (both execution and parse failures) + PCollection> executionFailures = udfResult.get(UDF_FAILURE_TAG); + PCollection> parseFailures = mergeResult.get(parseFailureTag); + + writeFailedUDFToDlq(options, executionFailures, dlqManager.getSevereDlqDirectoryWithDateTime() + "udf_execution_failures/", "tmp_udf_execution_failed"); + writeFailedUDFToDlq(options, parseFailures, dlqManager.getSevereDlqDirectoryWithDateTime() + "udf_parse_failures/", "tmp_udf_parse_failed"); + } + /* Stage 3: Iterate through the success events and write with transactions */ LOG.info("Stage 3: Processing change events and writing to the destination database"); PCollectionTuple writeResult = - changeEventContexts - .get(CreateMongoDbChangeEventContextFn.successfulCreationTag) + successfulContexts .setCoder(SerializableCoder.of(MongoDbChangeEventContext.class)) .apply( "Transactional write events", @@ -1040,6 +1130,32 @@ private static void writeFailedEventsToDlq( LOG.info("DLQ setup completed for failed MongoDB event processing"); } + static void writeFailedUDFToDlq( + Options options, + PCollection> failures, + String dlqDirectory, + String tmpDirectoryName) { + + failures + .apply( + "Map UDF Failures to String", + ParDo.of( + new DoFn, String>() { + @ProcessElement + public void processElement(ProcessContext c) { + c.output(c.element().toString()); + } + })) + .setCoder(StringUtf8Coder.of()) + .apply( + "Write Failed UDF To DLQ", + DLQWriteTransform.WriteDLQ.newBuilder() + .withDlqDirectory(dlqDirectory) + .withTmpDirectory(options.getDeadLetterQueueDirectory() + "/" + tmpDirectoryName + "/") + .setIncludePaneInfo(true) + .build()); + } + private static void writeSevereEventsToDlq( Options options, PCollectionTuple results, @@ -1114,6 +1230,79 @@ private boolean isNonDlqBackfillEvent(MongoDbChangeEventContext event) { } } + /** + * DoFn to extract document data for UDF input. + * It reads the document data as a JSON string from the context. + * If the document data is null (which happens for delete events), + * it routes the event to a side output to bypass the UDF. + */ + public static class ExtractUdfInputFn + extends DoFn> { + + private static final Logger LOG = LoggerFactory.getLogger(ExtractUdfInputFn.class); + + // Tags for splitting output: UDF input and bypass (deletes) + public static final TupleTag> UDF_INPUT_TAG = new TupleTag>("udfInput") {}; + public static final TupleTag DELETES_TAG = new TupleTag("deletes") {}; + + @ProcessElement + public void processElement(ProcessContext c, MultiOutputReceiver out) { + MongoDbChangeEventContext context = c.element(); + try { + String docJson = context.getDocumentDataAsJsonString(); + if (docJson != null) { + out.get(UDF_INPUT_TAG).output(FailsafeElement.of(context, docJson)); + } else { + // Delete events have null docJson. We route them to side output to bypass UDF. + out.get(DELETES_TAG).output(context); + } + } catch (Exception e) { + LOG.error("Failed to extract document for UDF: {}", e.getMessage()); + } + } + } + + /** + * DoFn to parse UDF output and merge it back into the full event JSON. + * If parsing fails, the event is routed to a side output for DLQ processing. + */ + public static class MergeUdfResultFn extends DoFn, MongoDbChangeEventContext> { + private final TupleTag> parseFailureTag; + private final String shadowCollectionPrefix; + + public MergeUdfResultFn(TupleTag> parseFailureTag, String shadowCollectionPrefix) { + this.parseFailureTag = parseFailureTag; + this.shadowCollectionPrefix = shadowCollectionPrefix; + } + + @ProcessElement + public void processElement(ProcessContext c, MultiOutputReceiver out) { + FailsafeElement element = c.element(); + MongoDbChangeEventContext ctx = element.getOriginalPayload(); + try { + // 1. Parse UDF output to verify it is valid JSON + org.bson.Document udfData = org.bson.Document.parse(element.getPayload()); + + // 2. Merge back into full event + org.bson.Document fullEvent = org.bson.Document.parse(ctx.getDataAsJsonString()); + fullEvent.put("data", udfData); + + // Create a new context to avoid mutating the input element + MongoDbChangeEventContext newCtx = new MongoDbChangeEventContext(ctx.getChangeEvent(), shadowCollectionPrefix); + newCtx.setModifiedJsonStringData(fullEvent.toJson(JsonWriterSettings.builder().outputMode(JsonMode.EXTENDED).build())); + + c.output(newCtx); + } catch (Exception e) { + LOG.error("Failed to merge UDF result: ", e); + // Output to failure tag if parsing or merging fails + out.get(parseFailureTag).output( + FailsafeElement.of(ctx, element.getPayload()) + .setErrorMessage(e.getMessage()) + .setStacktrace(com.google.common.base.Throwables.getStackTraceAsString(e))); + } + } + } + /** DoFn to process backfill events using MongoDB bulk writes. */ public static class ProcessBackfillEventFn extends DoFn { @@ -1248,7 +1437,7 @@ private void processBatch(String collectionName, MultiOutputReceiver out) { bulkOperations.add( new ReplaceOneModel<>( lookupById, - Utils.jsonToDocument(event.getDataAsJsonString(), event.getDocumentId()), + Utils.jsonToDocument(event.getModifiedJsonStringData(), event.getDocumentId()), new ReplaceOptions().upsert(true))); } } @@ -1341,7 +1530,7 @@ private void processBatchFinish(String collectionName, FinishBundleContext conte bulkOperations.add( new ReplaceOneModel<>( lookupById, - Utils.jsonToDocument(event.getDataAsJsonString(), event.getDocumentId()), + Utils.jsonToDocument(event.getModifiedJsonStringData(), event.getDocumentId()), new ReplaceOptions().upsert(true))); } } diff --git a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MongoDbChangeEventContext.java b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MongoDbChangeEventContext.java index 6e83da5140..9a455d18a0 100644 --- a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MongoDbChangeEventContext.java +++ b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/datastream/MongoDbChangeEventContext.java @@ -59,6 +59,8 @@ public class MongoDbChangeEventContext implements Serializable { private final Document timestampDoc; private boolean isDlqReconsumed; private int retryCount; + // Stores the modified JSON string after UDF transformation. + private String modifiedJsonStringData; /** Gets the change type from the event metadata. */ private String getChangeType(JsonNode changeEvent) { @@ -237,6 +239,30 @@ public int getRetryCount() { return retryCount; } + /** + * Returns the document data as a JSON string. + */ + public String getDocumentDataAsJsonString() throws JsonProcessingException { + JsonNode eventNode = this.getChangeEvent(); + JsonNode dataNode = eventNode.get("data"); + return dataNode != null ? OBJECT_MAPPER.writeValueAsString(dataNode) : null; + } + + /** + * Returns the modified JSON string data if it was updated by a UDF, + * otherwise falls back to the original jsonStringData. + */ + public String getModifiedJsonStringData() { + return modifiedJsonStringData != null ? modifiedJsonStringData : jsonStringData; + } + + /** + * Sets the modified JSON string data after UDF transformation. + */ + public void setModifiedJsonStringData(String modifiedJsonStringData) { + this.modifiedJsonStringData = modifiedJsonStringData; + } + /** * Override toString() to provide a proper JSON representation of this object. This ensures that * when the object is serialized to a string, it produces valid JSON. @@ -252,7 +278,13 @@ public String toString() { // Add other important fields jsonNode.put("dataCollection", this.dataCollection); jsonNode.put("shadowCollection", this.shadowCollection); - jsonNode.put("documentId", Utils.documentIdToString(this.documentId)); + if (this.documentId instanceof ObjectId) { + ObjectNode oidNode = OBJECT_MAPPER.createObjectNode(); + oidNode.put("$oid", this.documentId.toString()); + jsonNode.set("documentId", oidNode); + } else { + jsonNode.put("documentId", Utils.documentIdToString(this.documentId)); + } jsonNode.put("isDeleteEvent", this.isDeleteEvent); // Convert timestamp document to JSON @@ -265,6 +297,7 @@ public String toString() { jsonNode.put(DatastreamConstants.IS_DLQ_RECONSUMED, this.isDlqReconsumed); jsonNode.put(DatastreamConstants.RETRY_COUNT, this.retryCount); + jsonNode.put("modifiedJsonStringData", this.modifiedJsonStringData); return OBJECT_MAPPER.writeValueAsString(jsonNode); } catch (JsonProcessingException e) { @@ -284,10 +317,30 @@ public String toString() { } } - public boolean equals(Object other) { - if (other instanceof MongoDbChangeEventContext) { - return Objects.equals(this.toString(), other.toString()); + @Override + public boolean equals(Object o) { + if (this == o) { + return true; } - return false; + if (o == null || getClass() != o.getClass()) { + return false; + } + MongoDbChangeEventContext that = (MongoDbChangeEventContext) o; + return isDeleteEvent == that.isDeleteEvent && + isDlqReconsumed == that.isDlqReconsumed && + retryCount == that.retryCount && + Objects.equals(changeEvent, that.changeEvent) && + Objects.equals(dataCollection, that.dataCollection) && + Objects.equals(shadowCollection, that.shadowCollection) && + Objects.equals(documentId, that.documentId) && + Objects.equals(timestampDoc, that.timestampDoc) && + Objects.equals(modifiedJsonStringData, that.modifiedJsonStringData); + } + + @Override + public int hashCode() { + return Objects.hash(changeEvent, dataCollection, shadowCollection, documentId, + isDeleteEvent, timestampDoc, isDlqReconsumed, retryCount, + modifiedJsonStringData); } } diff --git a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/transforms/CreateMongoDbChangeEventContextFn.java b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/transforms/CreateMongoDbChangeEventContextFn.java index a52f96f9e3..0ab48a1d6b 100644 --- a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/transforms/CreateMongoDbChangeEventContextFn.java +++ b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/transforms/CreateMongoDbChangeEventContextFn.java @@ -17,9 +17,12 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.cloud.teleport.v2.templates.datastream.DatastreamConstants; import com.google.cloud.teleport.v2.templates.datastream.MongoDbChangeEventContext; import com.google.cloud.teleport.v2.values.FailsafeElement; import com.google.common.base.Throwables; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.values.TupleTag; import org.slf4j.Logger; @@ -32,6 +35,9 @@ public class CreateMongoDbChangeEventContextFn private static final Logger LOG = LoggerFactory.getLogger(CreateMongoDbChangeEventContextFn.class); private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + + private static final Counter DELETION_AFTER_UPDATE_COUNTER = + Metrics.counter(CreateMongoDbChangeEventContextFn.class, "deletion-after-update"); public static TupleTag successfulCreationTag = new TupleTag<>("successfulCreation"); @@ -49,6 +55,19 @@ public void processElement(ProcessContext context, MultiOutputReceiver out) { FailsafeElement element = context.element(); try { JsonNode jsonNode = OBJECT_MAPPER.readTree(element.getOriginalPayload()); + + // Check for update with null data + String changeType = ""; + if (jsonNode.has(DatastreamConstants.EVENT_CHANGE_TYPE_KEY)) { + changeType = jsonNode.get(DatastreamConstants.EVENT_CHANGE_TYPE_KEY).asText(); + } + JsonNode dataNode = jsonNode.get("data"); + if ("UPDATE".equalsIgnoreCase(changeType) && (dataNode == null || dataNode.isNull())) { + DELETION_AFTER_UPDATE_COUNTER.inc(); + LOG.debug("Ignoring update event with null data for document ID: {}", jsonNode.get(DatastreamConstants.MONGODB_DOCUMENT_ID)); + return; // Ignore the event + } + MongoDbChangeEventContext changeEventContext = new MongoDbChangeEventContext(jsonNode, shadowCollectionPrefix); out.get(successfulCreationTag).output(changeEventContext); diff --git a/v2/datastream-mongodb-to-firestore/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestoreTest.java b/v2/datastream-mongodb-to-firestore/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestoreTest.java index ced4fbc76f..0363f234f3 100644 --- a/v2/datastream-mongodb-to-firestore/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestoreTest.java +++ b/v2/datastream-mongodb-to-firestore/src/test/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestoreTest.java @@ -17,8 +17,30 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThrows; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.cloud.teleport.v2.cdc.dlq.DeadLetterQueueManager; +import com.google.cloud.teleport.v2.coders.FailsafeElementCoder; +import com.google.cloud.teleport.v2.templates.datastream.MongoDbChangeEventContext; +import com.google.cloud.teleport.v2.transforms.CreateMongoDbChangeEventContextFn; +import com.google.cloud.teleport.v2.values.FailsafeElement; +import java.util.List; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTag; +import org.apache.beam.sdk.values.TupleTagList; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -26,6 +48,13 @@ @RunWith(JUnit4.class) public final class DataStreamMongoDBToFirestoreTest { + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + + private static final TupleTag PARSE_SUCCESS_TAG = new TupleTag() {}; + private static final TupleTag> PARSE_FAILURE_TAG = new TupleTag>() {}; + private static final TupleTag> UDF_EXECUTION_FAILURE_TAG = new TupleTag>() {}; + private static final java.util.List TEST_OUTPUT = new java.util.ArrayList<>(); + @Test public void inputArgs_inputFilePattern() { String[] args = new String[] {"--inputFilePattern=gs://test-bkt/"}; @@ -132,4 +161,237 @@ public void inputArgs_javascriptTextTransformFunctionName() { assertEquals(functionName, "myTransform"); } + + @Test + public void testPipeline_udfModifiesData() throws Exception { + org.apache.beam.sdk.coders.CoderRegistry coderRegistry = pipeline.getCoderRegistry(); + com.google.cloud.teleport.v2.coders.FailsafeElementCoder failsafeCoder = com.google.cloud.teleport.v2.coders.FailsafeElementCoder.of(org.apache.beam.sdk.coders.StringUtf8Coder.of(), org.apache.beam.sdk.coders.StringUtf8Coder.of()); + coderRegistry.registerCoderForType( + failsafeCoder.getEncodedTypeDescriptor(), + failsafeCoder); + + TEST_OUTPUT.clear(); + + // 1. Create temp JS file + java.nio.file.Path tempJs = java.nio.file.Files.createTempFile("transform", ".js"); + java.nio.file.Files.writeString(tempJs, + "function myTransform(inJson) {\n" + + " var obj = JSON.parse(inJson);\n" + + " obj.transformed = true;\n" + + " return JSON.stringify(obj);\n" + + "}"); + + String insertJson = "{\"_metadata_source\":{\"collection\":\"test_collection\"},\"_id\":\"\\\"645c9a7e7b8b1a0e9c0f8b3a\\\"\",\"data\":{\"myDate\":{\"$date\":\"2023-05-11T15:00:00Z\"},\"myNaN\":{\"$numberDouble\":\"NaN\"},\"myInf\":{\"$numberDouble\":\"Infinity\"}},\"_metadata_timestamp_seconds\":1683782270,\"_metadata_timestamp_nanos\":123456789}"; + + FailsafeElement failsafeElement = FailsafeElement.of(insertJson, insertJson); + PCollection> input = pipeline.apply( + Create.of(java.util.Arrays.asList(failsafeElement)) + .withCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))); + + PCollectionTuple contexts = input.apply( + "Create Contexts", + ParDo.of(new CreateMongoDbChangeEventContextFn("shadow_")) + .withOutputTags( + CreateMongoDbChangeEventContextFn.successfulCreationTag, + TupleTagList.of(CreateMongoDbChangeEventContextFn.failedCreationTag))); + contexts.get(CreateMongoDbChangeEventContextFn.failedCreationTag).setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); + + PCollection successfulContexts = contexts.get(CreateMongoDbChangeEventContextFn.successfulCreationTag); + + PCollectionTuple udfPreparation = successfulContexts.apply( + "Prepare UDF Input", + ParDo.of(new DataStreamMongoDBToFirestore.ExtractUdfInputFn()) + .withOutputTags( + DataStreamMongoDBToFirestore.ExtractUdfInputFn.UDF_INPUT_TAG, + TupleTagList.of(DataStreamMongoDBToFirestore.ExtractUdfInputFn.DELETES_TAG))); + + PCollection> udfInput = udfPreparation.get(DataStreamMongoDBToFirestore.ExtractUdfInputFn.UDF_INPUT_TAG); + udfInput.setCoder(FailsafeElementCoder.of(SerializableCoder.of(MongoDbChangeEventContext.class), StringUtf8Coder.of())); + + PCollection> udfSuccess = udfInput.apply( + "Simulate Modifying UDF", + ParDo.of(new SimulateModifyingUdfFn())); + + org.apache.beam.sdk.values.PCollectionView> udfView = + udfSuccess.apply(org.apache.beam.sdk.transforms.View.asSingleton()); + + PCollection output = successfulContexts.apply( + "Merge UDF Result", + ParDo.of(new MergeWithSideInputFn(udfView, "shadow_")) + .withSideInputs(udfView)); + + pipeline.run(); + + assertEquals(1, TEST_OUTPUT.size()); + MongoDbChangeEventContext ctx = TEST_OUTPUT.get(0); + String modifiedJson = ctx.getModifiedJsonStringData(); + System.out.println("Test assertion: modifiedJson=" + modifiedJson); + org.junit.Assert.assertTrue(modifiedJson.contains("\"transformed\": true")); + org.junit.Assert.assertTrue(modifiedJson.contains("\"$date\"")); + org.junit.Assert.assertTrue(modifiedJson.contains("\"$numberDouble\": \"NaN\"")); + org.junit.Assert.assertTrue(modifiedJson.contains("\"$numberDouble\": \"Infinity\"")); + + java.nio.file.Files.delete(tempJs); + } + + @Test + public void testPipeline_udfInvalidJsonOutput() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + String insertJson = "{\"_metadata_source\":{\"collection\":\"col\"},\"_id\":\"\\\"id1\\\"\",\"data\":{\"field\":\"val\"},\"_metadata_timestamp_seconds\":123,\"_metadata_timestamp_nanos\":456,\"op\":\"i\"}"; + MongoDbChangeEventContext event = new MongoDbChangeEventContext(mapper.readTree(insertJson), "shadow_"); + + List events = java.util.Arrays.asList(event); + PCollection input = pipeline.apply(Create.of(events)); + + PCollectionTuple udfPreparation = input.apply( + "Prepare UDF Input", + ParDo.of(new DataStreamMongoDBToFirestore.ExtractUdfInputFn()) + .withOutputTags( + DataStreamMongoDBToFirestore.ExtractUdfInputFn.UDF_INPUT_TAG, + TupleTagList.of(DataStreamMongoDBToFirestore.ExtractUdfInputFn.DELETES_TAG))); + + PCollection> udfInput = udfPreparation.get(DataStreamMongoDBToFirestore.ExtractUdfInputFn.UDF_INPUT_TAG); + udfInput.setCoder(FailsafeElementCoder.of(SerializableCoder.of(MongoDbChangeEventContext.class), StringUtf8Coder.of())); + + PCollection> udfSuccess = udfInput.apply( + "Simulate Invalid UDF Output", + ParDo.of(new SimulateInvalidUdfOutputFn())); + + PCollectionTuple mergeResult = udfSuccess.apply( + "Merge UDF Result", + ParDo.of(new DataStreamMongoDBToFirestore.MergeUdfResultFn(PARSE_FAILURE_TAG, "shadow_")) + .withOutputTags(PARSE_SUCCESS_TAG, TupleTagList.of(PARSE_FAILURE_TAG))); + mergeResult.get(PARSE_FAILURE_TAG).setCoder(FailsafeElementCoder.of(SerializableCoder.of(MongoDbChangeEventContext.class), StringUtf8Coder.of())); + + PAssert.that(mergeResult.get(PARSE_FAILURE_TAG)) + .satisfies( + collection -> { + FailsafeElement result = collection.iterator().next(); + assertEquals("invalid json", result.getPayload()); + return null; + }); + + pipeline.run(); + } + + @Test + public void testPipeline_udfExecutionFailure() throws Exception { + ObjectMapper mapper = new ObjectMapper(); + String insertJson = "{\"_metadata_source\":{\"collection\":\"col\"},\"_id\":\"\\\"id1\\\"\",\"data\":{\"field\":\"val\"},\"_metadata_timestamp_seconds\":123,\"_metadata_timestamp_nanos\":456,\"op\":\"i\"}"; + MongoDbChangeEventContext event = new MongoDbChangeEventContext(mapper.readTree(insertJson), "shadow_"); + + List events = java.util.Arrays.asList(event); + PCollection input = pipeline.apply(Create.of(events)); + + PCollectionTuple udfPreparation = input.apply( + "Prepare UDF Input", + ParDo.of(new DataStreamMongoDBToFirestore.ExtractUdfInputFn()) + .withOutputTags( + DataStreamMongoDBToFirestore.ExtractUdfInputFn.UDF_INPUT_TAG, + TupleTagList.of(DataStreamMongoDBToFirestore.ExtractUdfInputFn.DELETES_TAG))); + + PCollection> udfInput = udfPreparation.get(DataStreamMongoDBToFirestore.ExtractUdfInputFn.UDF_INPUT_TAG); + udfInput.setCoder(FailsafeElementCoder.of(SerializableCoder.of(MongoDbChangeEventContext.class), StringUtf8Coder.of())); + + TupleTag> successTag = new TupleTag<>(); + + PCollectionTuple udfResult = udfInput.apply( + "Simulate Failing UDF", + ParDo.of(new SimulateFailingUdfFn(UDF_EXECUTION_FAILURE_TAG)) + .withOutputTags(successTag, TupleTagList.of(UDF_EXECUTION_FAILURE_TAG))); + + udfResult.get(successTag).setCoder(FailsafeElementCoder.of(SerializableCoder.of(MongoDbChangeEventContext.class), StringUtf8Coder.of())); + udfResult.get(UDF_EXECUTION_FAILURE_TAG).setCoder(FailsafeElementCoder.of(SerializableCoder.of(MongoDbChangeEventContext.class), StringUtf8Coder.of())); + + PAssert.that(udfResult.get(UDF_EXECUTION_FAILURE_TAG)) + .satisfies( + collection -> { + FailsafeElement result = collection.iterator().next(); + assertEquals("udf execution failed", result.getPayload()); + assertEquals("udf error", result.getErrorMessage()); + return null; + }); + + pipeline.run(); + } + + @Test + public void testWriteFailedUDFToDlq_usesSevereDlq() { + DataStreamMongoDBToFirestore.Options options = mock(DataStreamMongoDBToFirestore.Options.class); + DeadLetterQueueManager dlqManager = mock(DeadLetterQueueManager.class); + + when(options.getDeadLetterQueueDirectory()).thenReturn("/tmp/dlq"); + when(dlqManager.getSevereDlqDirectoryWithDateTime()).thenReturn("/tmp/dlq/severe/2026/05/07"); + + FailsafeElement failureElement = FailsafeElement.of(null, "invalid json"); + PCollection> failures = pipeline.apply(Create.of(java.util.Arrays.asList(failureElement)) + .withCoder(FailsafeElementCoder.of(SerializableCoder.of(MongoDbChangeEventContext.class), StringUtf8Coder.of()))); + + DataStreamMongoDBToFirestore.writeFailedUDFToDlq(options, failures, dlqManager.getSevereDlqDirectoryWithDateTime() + "udf_execution_failures/", "tmp_udf_execution_failed"); + + verify(dlqManager).getSevereDlqDirectoryWithDateTime(); + pipeline.run(); + } + + private static class SimulateModifyingUdfFn extends DoFn, FailsafeElement> { + @ProcessElement + public void processElement(ProcessContext c) { + FailsafeElement el = c.element(); + String modifiedPayload = "{\"myDate\":{\"$date\":\"2023-05-11T15:00:00Z\"},\"myNaN\":{\"$numberDouble\":\"NaN\"},\"myInf\":{\"$numberDouble\":\"Infinity\"},\"transformed\":true}"; + c.output(FailsafeElement.of(el.getOriginalPayload(), modifiedPayload)); + } + } + + private static class SimulateInvalidUdfOutputFn extends DoFn, FailsafeElement> { + @ProcessElement + public void processElement(ProcessContext c) { + FailsafeElement el = c.element(); + c.output(FailsafeElement.of(el.getOriginalPayload(), "invalid json")); + } + } + + private static class SimulateFailingUdfFn extends DoFn, FailsafeElement> { + private final TupleTag> failureTag; + + public SimulateFailingUdfFn(TupleTag> failureTag) { + this.failureTag = failureTag; + } + + @ProcessElement + public void processElement(ProcessContext c, MultiOutputReceiver out) { + FailsafeElement el = c.element(); + out.get(failureTag).output(FailsafeElement.of(el.getOriginalPayload(), "udf execution failed").setErrorMessage("udf error")); + } + } + + + + private static class MergeWithSideInputFn extends DoFn { + private final org.apache.beam.sdk.values.PCollectionView> udfView; + private final String shadowCollectionPrefix; + + public MergeWithSideInputFn(org.apache.beam.sdk.values.PCollectionView> udfView, String shadowCollectionPrefix) { + this.udfView = udfView; + this.shadowCollectionPrefix = shadowCollectionPrefix; + } + + @ProcessElement + public void processElement(ProcessContext c) { + MongoDbChangeEventContext ctx = c.element(); + FailsafeElement element = c.sideInput(udfView); + try { + org.bson.Document udfData = org.bson.Document.parse(element.getPayload()); + org.bson.Document fullEvent = org.bson.Document.parse(ctx.getDataAsJsonString()); + fullEvent.put("data", udfData); + + MongoDbChangeEventContext newCtx = new MongoDbChangeEventContext(ctx.getChangeEvent(), shadowCollectionPrefix); + newCtx.setModifiedJsonStringData(fullEvent.toJson(org.bson.json.JsonWriterSettings.builder().outputMode(org.bson.json.JsonMode.EXTENDED).build())); + + TEST_OUTPUT.add(newCtx); + c.output(newCtx); + } catch (Exception e) { + throw new RuntimeException(e); + } + } + } } diff --git a/v2/datastream-mongodb-to-firestore/src/test/java/com/google/cloud/teleport/v2/templates/ProcessBackfillEventFnTest.java b/v2/datastream-mongodb-to-firestore/src/test/java/com/google/cloud/teleport/v2/templates/ProcessBackfillEventFnTest.java index 7a6f2a4e56..879230798d 100644 --- a/v2/datastream-mongodb-to-firestore/src/test/java/com/google/cloud/teleport/v2/templates/ProcessBackfillEventFnTest.java +++ b/v2/datastream-mongodb-to-firestore/src/test/java/com/google/cloud/teleport/v2/templates/ProcessBackfillEventFnTest.java @@ -91,8 +91,8 @@ public void testProcessBatch_partialFailure() { when(event2.getDataCollection()).thenReturn(COLLECTION_NAME); when(event1.getDocumentId()).thenReturn("id1"); when(event2.getDocumentId()).thenReturn("id2"); - when(event1.getDataAsJsonString()).thenReturn("{\"data\": {\"_id\":\"id1\"}}"); - when(event2.getDataAsJsonString()).thenReturn("{\"data\": {\"_id\":\"id2\"}}"); + when(event1.getModifiedJsonStringData()).thenReturn("{\"data\": {\"_id\":\"id1\"}}"); + when(event2.getModifiedJsonStringData()).thenReturn("{\"data\": {\"_id\":\"id2\"}}"); when(mockContext.element()).thenReturn(event1).thenReturn(event2); @@ -139,8 +139,8 @@ public void testProcessBatch_nonPermanentFailure() { when(event2.getDataCollection()).thenReturn(COLLECTION_NAME); when(event1.getDocumentId()).thenReturn("id1"); when(event2.getDocumentId()).thenReturn("id2"); - when(event1.getDataAsJsonString()).thenReturn("{\"data\": {\"_id\":\"id1\"}}"); - when(event2.getDataAsJsonString()).thenReturn("{\"data\": {\"_id\":\"id2\"}}"); + when(event1.getModifiedJsonStringData()).thenReturn("{\"data\": {\"_id\":\"id1\"}}"); + when(event2.getModifiedJsonStringData()).thenReturn("{\"data\": {\"_id\":\"id2\"}}"); when(mockContext.element()).thenReturn(event1).thenReturn(event2); diff --git a/v2/datastream-mongodb-to-firestore/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MongoDbChangeEventContextTest.java b/v2/datastream-mongodb-to-firestore/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MongoDbChangeEventContextTest.java index 732fe081da..5b2d181358 100644 --- a/v2/datastream-mongodb-to-firestore/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MongoDbChangeEventContextTest.java +++ b/v2/datastream-mongodb-to-firestore/src/test/java/com/google/cloud/teleport/v2/templates/datastream/MongoDbChangeEventContextTest.java @@ -67,7 +67,9 @@ public void setUp() throws JsonProcessingException { "_id": "{\\\"$oid\\\": \\\"645c9a7e7b8b1a0e9c0f8b3a\\\"}", "data": { "field1": "value1", - "field2": 123 + "field2": 123, + "myNaN": {"$numberDouble": "NaN"}, + "myInf": {"$numberDouble": "Infinity"} }, "_metadata_timestamp_seconds": 1683782270, "_metadata_timestamp_nanos": 123456789, @@ -544,17 +546,49 @@ public void testGetTimestampDoc() throws JsonProcessingException { public void testToString() throws JsonProcessingException { MongoDbChangeEventContext context = new MongoDbChangeEventContext(insertEvent, SHADOW_PREFIX); String jsonString = context.toString(); - String expectedJson = - "{\"changeEvent\":{\"_metadata_source\":{\"collection\":\"test_collection\"},\"_id\":\"{\\\"$oid\\\": \\\"645c9a7e7b8b1a0e9c0f8b3a\\\"}\",\"data\":{\"field1\":\"value1\",\"field2\":123},\"_metadata_timestamp_seconds\":1683782270,\"_metadata_timestamp_nanos\":123456789,\"op\":\"i\"}," + "{\"changeEvent\":{\"_metadata_source\":{\"collection\":\"test_collection\"},\"_id\":\"{\\\"$oid\\\": \\\"645c9a7e7b8b1a0e9c0f8b3a\\\"}\",\"data\":{\"field1\":\"value1\",\"field2\":123,\"myNaN\":{\"$numberDouble\":\"NaN\"},\"myInf\":{\"$numberDouble\":\"Infinity\"}},\"_metadata_timestamp_seconds\":1683782270,\"_metadata_timestamp_nanos\":123456789,\"op\":\"i\"}," + "\"dataCollection\":\"test_collection\"," + "\"shadowCollection\":\"shadow_test_collection\"," + "\"documentId\":{\"$oid\":\"645c9a7e7b8b1a0e9c0f8b3a\"}," + "\"isDeleteEvent\":false," + "\"timestamp\":{\"seconds\":1683782270,\"nanos\":123456789}," + "\"isDlqReconsumed\":false," - + "\"_metadata_retry_count\":0}"; + + "\"_metadata_retry_count\":0," + + "\"modifiedJsonStringData\":null}"; assertEquals(jsonString, expectedJson); } + + @Test + public void testToStringWithModifiedData() throws JsonProcessingException { + MongoDbChangeEventContext context = new MongoDbChangeEventContext(insertEvent, SHADOW_PREFIX); + context.setModifiedJsonStringData("{\"myNaN\":{\"$numberDouble\":\"NaN\"},\"myInf\":{\"$numberDouble\":\"Infinity\"}}"); + String jsonString = context.toString(); + assertTrue(jsonString.contains("\"modifiedJsonStringData\":\"{\\\"myNaN\\\":{\\\"$numberDouble\\\":\\\"NaN\\\"},\\\"myInf\\\":{\\\"$numberDouble\\\":\\\"Infinity\\\"}}\"")); + } + + @Test + public void testEqualsAndHashCode() throws JsonProcessingException { + MongoDbChangeEventContext context1 = new MongoDbChangeEventContext(insertEvent, SHADOW_PREFIX); + MongoDbChangeEventContext context2 = new MongoDbChangeEventContext(insertEvent, SHADOW_PREFIX); + + // Reflexivity + assertTrue(context1.equals(context1)); + + // Symmetry + assertTrue(context1.equals(context2)); + assertTrue(context2.equals(context1)); + + // HashCode consistency + assertEquals(context1.hashCode(), context2.hashCode()); + + // Inequality + MongoDbChangeEventContext context3 = new MongoDbChangeEventContext(deleteEvent, SHADOW_PREFIX); + assertFalse(context1.equals(context3)); + + // Inequality with modified data + context2.setModifiedJsonStringData("{\"modified\":true}"); + assertFalse(context1.equals(context2)); + } } diff --git a/v2/datastream-mongodb-to-firestore/src/test/java/com/google/cloud/teleport/v2/transforms/CreateMongoDbChangeEventContextFnTest.java b/v2/datastream-mongodb-to-firestore/src/test/java/com/google/cloud/teleport/v2/transforms/CreateMongoDbChangeEventContextFnTest.java index bcb83bda0b..39e010c8f1 100644 --- a/v2/datastream-mongodb-to-firestore/src/test/java/com/google/cloud/teleport/v2/transforms/CreateMongoDbChangeEventContextFnTest.java +++ b/v2/datastream-mongodb-to-firestore/src/test/java/com/google/cloud/teleport/v2/transforms/CreateMongoDbChangeEventContextFnTest.java @@ -23,13 +23,27 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.cloud.teleport.v2.coders.FailsafeElementCoder; import com.google.cloud.teleport.v2.templates.datastream.MongoDbChangeEventContext; import com.google.cloud.teleport.v2.values.FailsafeElement; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.metrics.MetricNameFilter; +import org.apache.beam.sdk.metrics.MetricQueryResults; +import org.apache.beam.sdk.metrics.MetricsFilter; +import org.apache.beam.sdk.testing.PAssert; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; import org.apache.beam.sdk.transforms.DoFn; import org.apache.beam.sdk.transforms.DoFn.MultiOutputReceiver; import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; import org.apache.beam.sdk.transforms.DoFn.ProcessContext; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.PCollectionTuple; +import org.apache.beam.sdk.values.TupleTagList; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -42,6 +56,8 @@ public class CreateMongoDbChangeEventContextFnTest { private static final String SHADOW_PREFIX = "shadow_"; private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + private CreateMongoDbChangeEventContextFn createFn; private ProcessContext mockContext; private MultiOutputReceiver mockReceiver; @@ -102,6 +118,30 @@ public void testProcessElementSuccess() { assertEquals("645c9a7e7b8b1a0e9c0f8b3a", actualContext.getDocumentId().toString()); } + @Test + public void testProcessElementIgnoreUpdateWithNullData() throws Exception { + String updateWithNullDataPayload = + """ + { + "_metadata_source": { + "collection": "test_collection" + }, + "_id": "{\\\"$oid\\\": \\\"645c9a7e7b8b1a0e9c0f8b3a\\\"}", + "data": null, + "_metadata_timestamp_seconds": 1683782270, + "_metadata_timestamp_nanos": 123456789, + "_metadata_change_type": "UPDATE" + }"""; + FailsafeElement updateElement = FailsafeElement.of(updateWithNullDataPayload, updateWithNullDataPayload); + when(mockContext.element()).thenReturn(updateElement); + + createFn.processElement(mockContext, mockReceiver); + + // Verify that it does NOT call output on success receiver or failure receiver + verify(mockSuccessReceiver, times(0)).output(org.mockito.Mockito.any()); + verify(mockFailureReceiver, times(0)).output(org.mockito.Mockito.any()); + } + @Test public void testProcessElementFailureInvalidJson() throws Exception { when(mockContext.element()).thenReturn(failureElement); @@ -113,6 +153,49 @@ public void testProcessElementFailureInvalidJson() throws Exception { verify(mockReceiver).get(CreateMongoDbChangeEventContextFn.failedCreationTag); verify(mockFailureReceiver, times(1)).output(failureCaptor.capture()); - assertEquals(failureElement, failureCaptor.getValue()); + FailsafeElement result = failureCaptor.getValue(); + assertEquals(failureElement, result); + org.junit.Assert.assertNotNull(result.getErrorMessage()); + org.junit.Assert.assertNotNull(result.getStacktrace()); + org.junit.Assert.assertTrue(result.getErrorMessage().contains("Unrecognized token 'invalid'")); + } + + @Test + public void testProcessElementIgnoreUpdateWithNullData_incrementsCounter() { + String updateWithNullDataPayload = + """ + { + "_metadata_source": { + "collection": "test_collection" + }, + "_id": "{\\\"$oid\\\": \\\"645c9a7e7b8b1a0e9c0f8b3a\\\"}", + "data": null, + "_metadata_timestamp_seconds": 1683782270, + "_metadata_timestamp_nanos": 123456789, + "_metadata_change_type": "UPDATE" + }"""; + + FailsafeElement element = FailsafeElement.of(updateWithNullDataPayload, updateWithNullDataPayload); + + PCollectionTuple results = pipeline.apply(Create.of(element).withCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()))) + .apply(ParDo.of(createFn) + .withOutputTags(CreateMongoDbChangeEventContextFn.successfulCreationTag, + TupleTagList.of(CreateMongoDbChangeEventContextFn.failedCreationTag))); + + results.get(CreateMongoDbChangeEventContextFn.successfulCreationTag).setCoder(SerializableCoder.of(MongoDbChangeEventContext.class)); + results.get(CreateMongoDbChangeEventContextFn.failedCreationTag).setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); + + PAssert.that(results.get(CreateMongoDbChangeEventContextFn.successfulCreationTag)).empty(); + PAssert.that(results.get(CreateMongoDbChangeEventContextFn.failedCreationTag)).empty(); + + PipelineResult result = pipeline.run(); + + MetricQueryResults metrics = result.metrics().queryMetrics( + MetricsFilter.builder() + .addNameFilter(MetricNameFilter.named(CreateMongoDbChangeEventContextFn.class, "deletion-after-update")) + .build()); + + long count = metrics.getCounters().iterator().next().getCommitted(); + assertEquals(1L, count); } }