From a71fa3243fa9ef242ce4587a6dca9bd0a19db6b7 Mon Sep 17 00:00:00 2001 From: Michael Le Date: Wed, 29 Apr 2026 22:29:22 -0400 Subject: [PATCH 1/3] Use getPayload instead of getOriginalPayload to process the UDF change --- .../v2/transforms/CreateMongoDbChangeEventContextFn.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/transforms/CreateMongoDbChangeEventContextFn.java b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/transforms/CreateMongoDbChangeEventContextFn.java index 347880476b..1175348657 100644 --- a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/transforms/CreateMongoDbChangeEventContextFn.java +++ b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/transforms/CreateMongoDbChangeEventContextFn.java @@ -48,7 +48,7 @@ public CreateMongoDbChangeEventContextFn(String shadowCollectionPrefix) { public void processElement(ProcessContext context, MultiOutputReceiver out) { FailsafeElement element = context.element(); try { - JsonNode jsonNode = OBJECT_MAPPER.readTree(element.getOriginalPayload()); + JsonNode jsonNode = OBJECT_MAPPER.readTree(element.getPayload()); MongoDbChangeEventContext changeEventContext = new MongoDbChangeEventContext(jsonNode, shadowCollectionPrefix); out.get(successfulCreationTag).output(changeEventContext); From fda464c5526ec9098ddb70f1ab45bb89471da3ec Mon Sep 17 00:00:00 2001 From: Michael Le Date: Wed, 29 Apr 2026 22:43:46 -0400 Subject: [PATCH 2/3] Add coders --- .../teleport/v2/templates/DataStreamMongoDBToFirestore.java | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java index 695d953961..e0485ff949 100644 --- a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java +++ b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java @@ -536,7 +536,8 @@ private static PipelineResult runWithBackfillFirst(Options options, String conne .setFailureTag(UDF_FAILURE_TAG) .build()); - jsonRecords = udfResult.get(UDF_SUCCESS_TAG); + jsonRecords = udfResult.get(UDF_SUCCESS_TAG) + .setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); // Handle failed UDF processing writeFailedJsonToDlq(options, udfResult, dlqManager, UDF_FAILURE_TAG); @@ -723,7 +724,8 @@ private static PipelineResult runAllEventsTogether(Options options, String conne .setFailureTag(UDF_FAILURE_TAG) .build()); - jsonRecords = udfResult.get(UDF_SUCCESS_TAG); + jsonRecords = udfResult.get(UDF_SUCCESS_TAG) + .setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); // Handle failed UDF processing writeFailedJsonToDlq(options, udfResult, dlqManager, UDF_FAILURE_TAG); From e058fd81b5ed74aac15c56ae3a5f58d1615ce17a Mon Sep 17 00:00:00 2001 From: Michael Le Date: Thu, 30 Apr 2026 02:44:38 +0000 Subject: [PATCH 3/3] Format code changes --- .../v2/templates/DataStreamMongoDBToFirestore.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java index e0485ff949..9cdd1a50f2 100644 --- a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java +++ b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java @@ -536,8 +536,10 @@ private static PipelineResult runWithBackfillFirst(Options options, String conne .setFailureTag(UDF_FAILURE_TAG) .build()); - jsonRecords = udfResult.get(UDF_SUCCESS_TAG) - .setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); + jsonRecords = + udfResult + .get(UDF_SUCCESS_TAG) + .setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); // Handle failed UDF processing writeFailedJsonToDlq(options, udfResult, dlqManager, UDF_FAILURE_TAG); @@ -724,8 +726,10 @@ private static PipelineResult runAllEventsTogether(Options options, String conne .setFailureTag(UDF_FAILURE_TAG) .build()); - jsonRecords = udfResult.get(UDF_SUCCESS_TAG) - .setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); + jsonRecords = + udfResult + .get(UDF_SUCCESS_TAG) + .setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); // Handle failed UDF processing writeFailedJsonToDlq(options, udfResult, dlqManager, UDF_FAILURE_TAG);