diff --git a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java index 695d953961..9cdd1a50f2 100644 --- a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java +++ b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/templates/DataStreamMongoDBToFirestore.java @@ -536,7 +536,10 @@ private static PipelineResult runWithBackfillFirst(Options options, String conne .setFailureTag(UDF_FAILURE_TAG) .build()); - jsonRecords = udfResult.get(UDF_SUCCESS_TAG); + jsonRecords = + udfResult + .get(UDF_SUCCESS_TAG) + .setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); // Handle failed UDF processing writeFailedJsonToDlq(options, udfResult, dlqManager, UDF_FAILURE_TAG); @@ -723,7 +726,10 @@ private static PipelineResult runAllEventsTogether(Options options, String conne .setFailureTag(UDF_FAILURE_TAG) .build()); - jsonRecords = udfResult.get(UDF_SUCCESS_TAG); + jsonRecords = + udfResult + .get(UDF_SUCCESS_TAG) + .setCoder(FailsafeElementCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); // Handle failed UDF processing writeFailedJsonToDlq(options, udfResult, dlqManager, UDF_FAILURE_TAG); diff --git a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/transforms/CreateMongoDbChangeEventContextFn.java b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/transforms/CreateMongoDbChangeEventContextFn.java index 347880476b..1175348657 100644 --- a/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/transforms/CreateMongoDbChangeEventContextFn.java +++ b/v2/datastream-mongodb-to-firestore/src/main/java/com/google/cloud/teleport/v2/transforms/CreateMongoDbChangeEventContextFn.java @@ -48,7 +48,7 @@ public CreateMongoDbChangeEventContextFn(String shadowCollectionPrefix) { public void processElement(ProcessContext context, MultiOutputReceiver out) { FailsafeElement element = context.element(); try { - JsonNode jsonNode = OBJECT_MAPPER.readTree(element.getOriginalPayload()); + JsonNode jsonNode = OBJECT_MAPPER.readTree(element.getPayload()); MongoDbChangeEventContext changeEventContext = new MongoDbChangeEventContext(jsonNode, shadowCollectionPrefix); out.get(successfulCreationTag).output(changeEventContext);