Move UDF after context creation and update consumers to use modified …#3784
Move UDF after context creation and update consumers to use modified …#3784namita-l wants to merge 1 commit into
Conversation
…JSON data in DataStreamMongoDBToFirestore
Summary of ChangesHello, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request refactors the DataStreamMongoDBToFirestore pipeline to improve the flexibility and reliability of JavaScript UDF transformations. By shifting the UDF execution to follow the creation of the event context, the pipeline can now operate on structured document data rather than raw JSON strings. Additionally, the changes introduce robust error handling for UDF failures and better management of specific event types like deletes and updates with null payloads. Highlights
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize the Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counterproductive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for GitHub and other Google products, sign up here. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request refactors the JavaScript UDF application logic in the MongoDB to Firestore template, moving the transformation stage to occur after the creation of MongoDbChangeEventContext objects. Key changes include bypassing UDFs for delete events, merging transformed data back into the event context, and adding a filter for UPDATE events with null data. Feedback from the review highlights opportunities to reduce code duplication by refactoring the UDF pipeline logic into a shared method, improving DLQ usability by ensuring outputs are valid JSON, and maintaining BSON type fidelity by using specialized BSON utilities for JSON serialization.
| if (!Strings.isNullOrEmpty(options.getJavascriptTextTransformGcsPath())) { | ||
| LOG.info("Applying Javascript UDF for Document transformation after context creation"); | ||
|
|
||
| // Split the stream into deletes and non-deletes before UDF. | ||
| // Delete events produce null docJson and would be dropped by ExtractUdfInputFn if not handled. | ||
| // We bypass UDF for deletes to avoid sending empty payloads to UDF. | ||
| PCollectionTuple udfPreparation = | ||
| successfulContexts.apply( | ||
| "Prepare UDF Input", | ||
| ParDo.of(new ExtractUdfInputFn()) | ||
| .withOutputTags( | ||
| ExtractUdfInputFn.UDF_INPUT_TAG, | ||
| TupleTagList.of(ExtractUdfInputFn.DELETES_TAG))); | ||
|
|
||
| // The String in FailsafeElement is the JSON representation of the document data extracted by ExtractUdfInputFn. | ||
| PCollection<FailsafeElement<MongoDbChangeEventContext, String>> udfInput = | ||
| udfPreparation.get(ExtractUdfInputFn.UDF_INPUT_TAG); | ||
| PCollection<MongoDbChangeEventContext> deletes = | ||
| udfPreparation.get(ExtractUdfInputFn.DELETES_TAG); | ||
|
|
||
| // Apply the JavaScript UDF to the JSON payload extracted from the document. | ||
| PCollectionTuple udfResult = | ||
| udfInput.apply( | ||
| "Run UDF on Document", | ||
| FailsafeJavascriptUdf.<MongoDbChangeEventContext>newBuilder() | ||
| .setFileSystemPath(options.getJavascriptTextTransformGcsPath()) | ||
| .setFunctionName(options.getJavascriptTextTransformFunctionName()) | ||
| .setReloadIntervalMinutes( | ||
| options.getJavascriptTextTransformReloadIntervalMinutes()) | ||
| .setSuccessTag(UDF_SUCCESS_TAG) | ||
| .setFailureTag(UDF_FAILURE_TAG) | ||
| .build()); | ||
|
|
||
| // After successful UDF execution, we update the MongoDbChangeEventContext | ||
| // with the modified JSON string so that subsequent stages use the transformed data. | ||
|
|
||
| TupleTag<MongoDbChangeEventContext> parseSuccessTag = new TupleTag<MongoDbChangeEventContext>() {}; | ||
| TupleTag<FailsafeElement<MongoDbChangeEventContext, String>> parseFailureTag = new TupleTag<FailsafeElement<MongoDbChangeEventContext, String>>() {}; | ||
|
|
||
| PCollectionTuple mergeResult = | ||
| udfResult | ||
| .get(UDF_SUCCESS_TAG) | ||
| .setCoder( | ||
| FailsafeElementCoder.of( | ||
| SerializableCoder.of(MongoDbChangeEventContext.class), | ||
| StringUtf8Coder.of())) | ||
| .apply( | ||
| "Merge UDF Result", | ||
| ParDo.of(new MergeUdfResultFn(parseFailureTag, options.getShadowCollectionPrefix())) | ||
| .withOutputTags(parseSuccessTag, TupleTagList.of(parseFailureTag))); | ||
|
|
||
| successfulContexts = | ||
| PCollectionList.of(deletes) | ||
| .and(mergeResult.get(parseSuccessTag)) | ||
| .apply("Merge Deletes and UDF Results", Flatten.pCollections()); | ||
|
|
||
| // Handle failed UDF processing (both execution and parse failures) | ||
| PCollection<FailsafeElement<MongoDbChangeEventContext, String>> executionFailures = udfResult.get(UDF_FAILURE_TAG); | ||
| PCollection<FailsafeElement<MongoDbChangeEventContext, String>> parseFailures = mergeResult.get(parseFailureTag); | ||
|
|
||
| writeFailedUDFToDlq(options, executionFailures, dlqManager.getSevereDlqDirectoryWithDateTime() + "udf_execution_failures/", "tmp_udf_execution_failed"); | ||
| writeFailedUDFToDlq(options, parseFailures, dlqManager.getSevereDlqDirectoryWithDateTime() + "udf_parse_failures/", "tmp_udf_parse_failed"); | ||
| } |
There was a problem hiding this comment.
The logic for applying the JavaScript UDF is duplicated in both runWithBackfillFirst and runAllEventsTogether. This block is substantial and involves multiple pipeline stages (splitting deletes, UDF execution, result merging, and multi-stage DLQ handling). To improve maintainability and ensure that any future changes to the UDF logic are applied consistently across both processing modes, this should be refactored into a shared private static method.
| new DoFn<FailsafeElement<MongoDbChangeEventContext, String>, String>() { | ||
| @ProcessElement | ||
| public void processElement(ProcessContext c) { | ||
| c.output(c.element().toString()); |
There was a problem hiding this comment.
Using c.element().toString() for DLQ output results in a non-JSON string (typically formatted as FailsafeElement{originalPayload=..., payload=..., ...}). This makes the dead-letter queue difficult to process with automated tools. It is recommended to use a JSON serializer or a dedicated sanitizer (similar to MongoDbEventDeadLetterQueueSanitizer used in other parts of this template) to ensure the DLQ output is valid JSON.
| public String getDocumentDataAsJsonString() throws JsonProcessingException { | ||
| JsonNode eventNode = this.getChangeEvent(); | ||
| JsonNode dataNode = eventNode.get("data"); | ||
| return dataNode != null ? OBJECT_MAPPER.writeValueAsString(dataNode) : null; | ||
| } |
There was a problem hiding this comment.
The getDocumentDataAsJsonString method uses a standard Jackson ObjectMapper to serialize the data node. For MongoDB documents, this may lose type fidelity for BSON-specific types (e.g., $date, $numberLong, $oid) if the mapper is not specifically configured for MongoDB Extended JSON. Since the pipeline elsewhere relies on org.bson.Document and JsonMode.EXTENDED, consider using BSON utilities to extract this string to ensure the UDF receives a correctly formatted Extended JSON representation.
…JSON data in DataStreamMongoDBToFirestore