Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public class MongoDbChangeEventContext implements Serializable {
private final Document timestampDoc;
private boolean isDlqReconsumed;
private int retryCount;
// Stores the modified JSON string after UDF transformation.
private String modifiedJsonStringData;

/** Gets the change type from the event metadata. */
private String getChangeType(JsonNode changeEvent) {
Expand Down Expand Up @@ -237,6 +239,30 @@ public int getRetryCount() {
return retryCount;
}

/**
* Returns the document data as a JSON string.
*/
public String getDocumentDataAsJsonString() throws JsonProcessingException {
JsonNode eventNode = this.getChangeEvent();
JsonNode dataNode = eventNode.get("data");
return dataNode != null ? OBJECT_MAPPER.writeValueAsString(dataNode) : null;
}
Comment on lines +245 to +249
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The getDocumentDataAsJsonString method uses a standard Jackson ObjectMapper to serialize the data node. For MongoDB documents, this may lose type fidelity for BSON-specific types (e.g., $date, $numberLong, $oid) if the mapper is not specifically configured for MongoDB Extended JSON. Since the pipeline elsewhere relies on org.bson.Document and JsonMode.EXTENDED, consider using BSON utilities to extract this string to ensure the UDF receives a correctly formatted Extended JSON representation.


/**
* Returns the modified JSON string data if it was updated by a UDF,
* otherwise falls back to the original jsonStringData.
*/
public String getModifiedJsonStringData() {
return modifiedJsonStringData != null ? modifiedJsonStringData : jsonStringData;
}

/**
* Sets the modified JSON string data after UDF transformation.
*/
public void setModifiedJsonStringData(String modifiedJsonStringData) {
this.modifiedJsonStringData = modifiedJsonStringData;
}

/**
* Override toString() to provide a proper JSON representation of this object. This ensures that
* when the object is serialized to a string, it produces valid JSON.
Expand All @@ -252,7 +278,13 @@ public String toString() {
// Add other important fields
jsonNode.put("dataCollection", this.dataCollection);
jsonNode.put("shadowCollection", this.shadowCollection);
jsonNode.put("documentId", Utils.documentIdToString(this.documentId));
if (this.documentId instanceof ObjectId) {
ObjectNode oidNode = OBJECT_MAPPER.createObjectNode();
oidNode.put("$oid", this.documentId.toString());
jsonNode.set("documentId", oidNode);
} else {
jsonNode.put("documentId", Utils.documentIdToString(this.documentId));
}
jsonNode.put("isDeleteEvent", this.isDeleteEvent);

// Convert timestamp document to JSON
Expand All @@ -265,6 +297,7 @@ public String toString() {

jsonNode.put(DatastreamConstants.IS_DLQ_RECONSUMED, this.isDlqReconsumed);
jsonNode.put(DatastreamConstants.RETRY_COUNT, this.retryCount);
jsonNode.put("modifiedJsonStringData", this.modifiedJsonStringData);

return OBJECT_MAPPER.writeValueAsString(jsonNode);
} catch (JsonProcessingException e) {
Expand All @@ -284,10 +317,30 @@ public String toString() {
}
}

public boolean equals(Object other) {
if (other instanceof MongoDbChangeEventContext) {
return Objects.equals(this.toString(), other.toString());
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
return false;
if (o == null || getClass() != o.getClass()) {
return false;
}
MongoDbChangeEventContext that = (MongoDbChangeEventContext) o;
return isDeleteEvent == that.isDeleteEvent &&
isDlqReconsumed == that.isDlqReconsumed &&
retryCount == that.retryCount &&
Objects.equals(changeEvent, that.changeEvent) &&
Objects.equals(dataCollection, that.dataCollection) &&
Objects.equals(shadowCollection, that.shadowCollection) &&
Objects.equals(documentId, that.documentId) &&
Objects.equals(timestampDoc, that.timestampDoc) &&
Objects.equals(modifiedJsonStringData, that.modifiedJsonStringData);
}

@Override
public int hashCode() {
return Objects.hash(changeEvent, dataCollection, shadowCollection, documentId,
isDeleteEvent, timestampDoc, isDlqReconsumed, retryCount,
modifiedJsonStringData);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,12 @@

import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.cloud.teleport.v2.templates.datastream.DatastreamConstants;
import com.google.cloud.teleport.v2.templates.datastream.MongoDbChangeEventContext;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Throwables;
import org.apache.beam.sdk.metrics.Counter;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.TupleTag;
import org.slf4j.Logger;
Expand All @@ -32,6 +35,9 @@ public class CreateMongoDbChangeEventContextFn
private static final Logger LOG =
LoggerFactory.getLogger(CreateMongoDbChangeEventContextFn.class);
private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper();

private static final Counter DELETION_AFTER_UPDATE_COUNTER =
Metrics.counter(CreateMongoDbChangeEventContextFn.class, "deletion-after-update");

public static TupleTag<MongoDbChangeEventContext> successfulCreationTag =
new TupleTag<>("successfulCreation");
Expand All @@ -49,6 +55,19 @@ public void processElement(ProcessContext context, MultiOutputReceiver out) {
FailsafeElement<String, String> element = context.element();
try {
JsonNode jsonNode = OBJECT_MAPPER.readTree(element.getOriginalPayload());

// Check for update with null data
String changeType = "";
if (jsonNode.has(DatastreamConstants.EVENT_CHANGE_TYPE_KEY)) {
changeType = jsonNode.get(DatastreamConstants.EVENT_CHANGE_TYPE_KEY).asText();
}
JsonNode dataNode = jsonNode.get("data");
if ("UPDATE".equalsIgnoreCase(changeType) && (dataNode == null || dataNode.isNull())) {
DELETION_AFTER_UPDATE_COUNTER.inc();
LOG.debug("Ignoring update event with null data for document ID: {}", jsonNode.get(DatastreamConstants.MONGODB_DOCUMENT_ID));
return; // Ignore the event
}

MongoDbChangeEventContext changeEventContext =
new MongoDbChangeEventContext(jsonNode, shadowCollectionPrefix);
out.get(successfulCreationTag).output(changeEventContext);
Expand Down
Loading