Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import com.google.cloud.teleport.v2.transforms.JavascriptTextTransformer.JavascriptTextTransformerOptions;
import com.google.cloud.teleport.v2.transforms.MongoDbEventDeadLetterQueueSanitizer;
import com.google.cloud.teleport.v2.transforms.ProcessChangeEventFn;
import com.google.cloud.teleport.v2.transforms.Utils;
import com.google.cloud.teleport.v2.values.FailsafeElement;
import com.google.common.base.Strings;
import com.mongodb.MongoClientSettings;
Expand All @@ -58,6 +57,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions;
import org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
import org.apache.beam.sdk.Pipeline;
Expand All @@ -81,7 +81,6 @@
import org.apache.beam.sdk.values.TupleTagList;
import org.bson.Document;
import org.bson.UuidRepresentation;
import org.bson.conversions.Bson;
import org.joda.time.Duration;
import org.joda.time.Instant;
import org.slf4j.Logger;
Expand Down Expand Up @@ -1136,47 +1135,74 @@ public void finishBundle(FinishBundleContext context) {
}
}

private void processBatch(String collectionName, MultiOutputReceiver out) {
List<MongoDbChangeEventContext> events = bufferedEvents.get(collectionName);
MongoCollection<Document> collection = collectionMap.get(collectionName);

if (events.isEmpty()) {
return;
}

try {
// Create bulk operation
List<WriteModel<Document>> bulkOperations = new ArrayList<>(events.size());

// Add operations to bulk
for (MongoDbChangeEventContext event : events) {
Object docId = event.getDocumentId();
Bson lookupById = eq("_id", docId);

if (event.isDeleteEvent()) {
// Add delete operation
bulkOperations.add(new DeleteOneModel<>(lookupById));
private record PreparedBatch(
List<WriteModel<Document>> bulkOperations,
List<MongoDbChangeEventContext> processedEvents,
List<FailsafeElement<MongoDbChangeEventContext, MongoDbChangeEventContext>>
failedElements) {}

private PreparedBatch prepareBatch(List<MongoDbChangeEventContext> events) {
List<WriteModel<Document>> bulkOperations = new ArrayList<>(events.size());
List<MongoDbChangeEventContext> processedEvents = new ArrayList<>();
List<FailsafeElement<MongoDbChangeEventContext, MongoDbChangeEventContext>> failedElements =
new ArrayList<>();

for (MongoDbChangeEventContext event : events) {
var documentId = event.getDocumentId();
var lookupById = eq("_id", documentId);
if (event.isDeleteEvent()) {
bulkOperations.add(new DeleteOneModel<>(lookupById));
processedEvents.add(event);
} else {
Document doc = event.getDataAsDocument();
if (event.hasParseError()) {
Exception exception = event.getParseError();
FailsafeElement<MongoDbChangeEventContext, MongoDbChangeEventContext> failedElement =
FailsafeElement.of(event, event);
failedElement.setErrorMessage(exception.getMessage());
failedElement.setStacktrace(Arrays.deepToString(exception.getStackTrace()));
failedElements.add(failedElement);
} else {
// Add upsert operation
bulkOperations.add(
new ReplaceOneModel<>(
lookupById,
Utils.jsonToDocument(event.getDataAsJsonString(), event.getDocumentId()),
new ReplaceOptions().upsert(true)));
new ReplaceOneModel<>(lookupById, doc, new ReplaceOptions().upsert(true)));
processedEvents.add(event);
}
}
}
Comment thread
le-michael marked this conversation as resolved.
return new PreparedBatch(bulkOperations, processedEvents, failedElements);
}

private void writeBatch(
String collectionName,
List<MongoDbChangeEventContext> events,
MongoCollection<Document> collection,
Consumer<MongoDbChangeEventContext> successConsumer,
Consumer<FailsafeElement<MongoDbChangeEventContext, MongoDbChangeEventContext>>
failureConsumer) {

PreparedBatch preparedBatch = prepareBatch(events);

// Write any failed parse events to the DLQ.
for (var failedElement : preparedBatch.failedElements()) {
failureConsumer.accept(failedElement);
}

if (preparedBatch.bulkOperations().isEmpty()) {
return;
}

try {
// Execute bulk write
BulkWriteResult result = collection.bulkWrite(bulkOperations);
BulkWriteResult result = collection.bulkWrite(preparedBatch.bulkOperations());
LOG.debug(
"Bulk write completed for collection {}: {} inserts/updates, {} deletes",
collectionName,
result.getInsertedCount() + result.getModifiedCount() + result.getUpserts().size(),
result.getDeletedCount());

// Output successful events
for (MongoDbChangeEventContext event : events) {
out.get(successfulWriteTag).output(event);
for (MongoDbChangeEventContext event : preparedBatch.processedEvents()) {
successConsumer.accept(event);
}
} catch (Exception e) {
LOG.error(
Expand All @@ -1186,78 +1212,51 @@ private void processBatch(String collectionName, MultiOutputReceiver out) {
e);

// On error, output all events as failed
for (MongoDbChangeEventContext event : events) {
for (MongoDbChangeEventContext event : preparedBatch.processedEvents()) {
FailsafeElement<MongoDbChangeEventContext, MongoDbChangeEventContext> failedElement =
FailsafeElement.of(event, event);
failedElement.setErrorMessage(e.getMessage());
failedElement.setStacktrace(Arrays.deepToString(e.getStackTrace()));
out.get(failedWriteTag).output(failedElement);
failureConsumer.accept(failedElement);
}
}

// Clear the processed batch
events.clear();
}

private void processBatchFinish(String collectionName, FinishBundleContext context) {
private void processBatch(String collectionName, MultiOutputReceiver out) {
List<MongoDbChangeEventContext> events = bufferedEvents.get(collectionName);
MongoCollection<Document> collection = collectionMap.get(collectionName);

if (events.isEmpty()) {
return;
}

try {
// Create bulk operation
List<WriteModel<Document>> bulkOperations = new ArrayList<>(events.size());

// Add operations to bulk
for (MongoDbChangeEventContext event : events) {
Object docId = event.getDocumentId();
Bson lookupById = eq("_id", docId);

if (event.isDeleteEvent()) {
// Add delete operation
bulkOperations.add(new DeleteOneModel<>(lookupById));
} else {
// Add upsert operation
bulkOperations.add(
new ReplaceOneModel<>(
lookupById,
Utils.jsonToDocument(event.getDataAsJsonString(), event.getDocumentId()),
new ReplaceOptions().upsert(true)));
}
}
writeBatch(
collectionName,
events,
collection,
event -> out.get(successfulWriteTag).output(event),
failedElement -> out.get(failedWriteTag).output(failedElement));

// Execute bulk write
BulkWriteResult result = collection.bulkWrite(bulkOperations);
LOG.debug(
"Bulk write completed for collection {}: {} inserts/updates, {} deletes",
collectionName,
result.getInsertedCount() + result.getModifiedCount() + result.getUpserts().size(),
result.getDeletedCount());
// Clear the processed batch
events.clear();
}

// Output successful events
for (MongoDbChangeEventContext event : events) {
context.output(successfulWriteTag, event, Instant.now(), GlobalWindow.INSTANCE);
}
} catch (Exception e) {
LOG.error(
"Error processing backfill batch for collection {}: {}",
collectionName,
e.getMessage(),
e);
private void processBatchFinish(String collectionName, FinishBundleContext context) {
List<MongoDbChangeEventContext> events = bufferedEvents.get(collectionName);
MongoCollection<Document> collection = collectionMap.get(collectionName);

// On error, output all events as failed
for (MongoDbChangeEventContext event : events) {
FailsafeElement<MongoDbChangeEventContext, MongoDbChangeEventContext> failedElement =
FailsafeElement.of(event, event);
failedElement.setErrorMessage(e.getMessage());
failedElement.setStacktrace(Arrays.deepToString(e.getStackTrace()));
context.output(failedWriteTag, failedElement, Instant.now(), GlobalWindow.INSTANCE);
}
if (events.isEmpty()) {
return;
}

writeBatch(
collectionName,
events,
collection,
event -> context.output(successfulWriteTag, event, Instant.now(), GlobalWindow.INSTANCE),
failedElement ->
context.output(failedWriteTag, failedElement, Instant.now(), GlobalWindow.INSTANCE));

// Clear the processed batch
events.clear();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ public class MongoDbChangeEventContext implements Serializable {
private final Document timestampDoc;
private boolean isDlqReconsumed;
private int retryCount;
private transient Document parsedDocument;
private transient Exception parseError;

/** Gets the change type from the event metadata. */
private String getChangeType(JsonNode changeEvent) {
Expand Down Expand Up @@ -221,6 +223,28 @@ public String getDataAsJsonString() {
return jsonStringData;
}

public Document getDataAsDocument() {
if (isDeleteEvent) {
return null;
}
if (parsedDocument == null && parseError == null) {
try {
parsedDocument = Utils.jsonToDocument(getDataAsJsonString(), getDocumentId());
} catch (Exception e) {
parseError = e;
}
}
return parsedDocument;
}

public boolean hasParseError() {
return parseError != null;
}

public Exception getParseError() {
return parseError;
}

public Document getTimestampDoc() {
return timestampDoc;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,12 @@ public static Document jsonToDocument(String jsonString, Object documentId) {
"Document parsing for {} failed due to {}, try casting.", jsonString, ex.getMessage());
rawDoc = (Document) Document.parse(jsonString).get(DATA_COL);
}
if (rawDoc == null) {
throw new IllegalArgumentException(
String.format(
"JSON string does not contain a valid '%s' field or it is null: %s",
DATA_COL, jsonString));
}
rawDoc.put(MongoDbChangeEventContext.DOC_ID_COL, documentId);
return rawDoc;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,12 +131,47 @@ public void testIsNewerTimestamp_sameTimestamp() {
@Test
public void testJsonToDocument() {
String jsonString =
"{\"_id\":\"{\\\"$oid\\\": \\\"6811235eaf8583310cb9d2e9\\\"}\",\"data\":\"{\\\"_id\\\": {\\\"$oid\\\": \\\"6811235eaf8583310cb9d2e9\\\"},\\\"arrayField\\\": [\\\"hello\\\",10],\\\"dateField\\\": {\\\"$date\\\": 1565546054692},\\\"dateBefore1970\\\": {\\\"$date\\\": -1577923200000},\\\"decimal128Field\\\": {\\\"$numberDecimal\\\": \\\"10.99\\\"},\\\"documentField\\\": {\\\"a\\\": \\\"hello\\\"},\\\"doubleField\\\": 10.5,\\\"infiniteNumber\\\": Infinity,\\\"int32field\\\": 10,\\\"int64Field\\\": {\\\"$numberLong\\\": \\\"50\\\"},\\\"minKeyField\\\": {\\\"$minKey\\\": 1},\\\"maxKeyField\\\": {\\\"$maxKey\\\": 1},\\\"regexField\\\": {\\\"$regex\\\": \\\"^H\\\",\\\"$options\\\": \\\"i\\\"},\\\"timestampField\\\": {\\\"$timestamp\\\": {\\\"t\\\": 1565545664,\\\"i\\\": 1}},\\\"uuid\\\": {\\\"$binary\\\": \\\"OyQRAeK7QlWMr0E2xWapYg==\\\",\\\"$type\\\": \\\"04\\\"}}\",\"_metadata_stream\":\"extended-types-test\",\"_metadata_timestamp\":1745957556,\"_metadata_read_timestamp\":1745957556,\"_metadata_dataflow_timestamp\":1745963670,\"_metadata_read_method\":\"backfill\",\"_metadata_source_type\":\"backfill\",\"_metadata_deleted\":false,\"_metadata_table\":null,\"_metadata_change_type\":\"READ\",\"_metadata_primary_keys\":null,\"_metadata_uuid\":\"2a9cf8eb-4f35-433c-899a-39921d4c8587\",\"_metadata_timestamp_seconds\":\"1745957556\",\"_metadata_timestamp_nanos\":\"184498000\",\"_metadata_source\":{\"database\":\"extended_types\",\"collection\":\"mycol\",\"change_type\":\"READ\",\"is_deleted\":false,\"primary_key\":[\"_id\"]},\"_metadata_error\":null,\"_metadata_retry_count\":116}";
"{\"_id\":\"{\\\"$oid\\\": \\\"6811235eaf8583310cb9d2e9\\\"}\",\"data\":\"{\\\"_id\\\":"
+ " {\\\"$oid\\\": \\\"6811235eaf8583310cb9d2e9\\\"},\\\"arrayField\\\":"
+ " [\\\"hello\\\",10],\\\"dateField\\\": {\\\"$date\\\":"
+ " 1565546054692},\\\"dateBefore1970\\\": {\\\"$date\\\":"
+ " -1577923200000},\\\"decimal128Field\\\": {\\\"$numberDecimal\\\":"
+ " \\\"10.99\\\"},\\\"documentField\\\": {\\\"a\\\":"
+ " \\\"hello\\\"},\\\"doubleField\\\": 10.5,\\\"infiniteNumber\\\":"
+ " Infinity,\\\"int32field\\\": 10,\\\"int64Field\\\": {\\\"$numberLong\\\":"
+ " \\\"50\\\"},\\\"minKeyField\\\": {\\\"$minKey\\\": 1},\\\"maxKeyField\\\":"
+ " {\\\"$maxKey\\\": 1},\\\"regexField\\\": {\\\"$regex\\\":"
+ " \\\"^H\\\",\\\"$options\\\": \\\"i\\\"},\\\"timestampField\\\":"
+ " {\\\"$timestamp\\\": {\\\"t\\\": 1565545664,\\\"i\\\": 1}},\\\"uuid\\\":"
+ " {\\\"$binary\\\": \\\"OyQRAeK7QlWMr0E2xWapYg==\\\",\\\"$type\\\":"
+ " \\\"04\\\"}}\",\"_metadata_stream\":\"extended-types-test\",\"_metadata_timestamp\":1745957556,\"_metadata_read_timestamp\":1745957556,\"_metadata_dataflow_timestamp\":1745963670,\"_metadata_read_method\":\"backfill\",\"_metadata_source_type\":\"backfill\",\"_metadata_deleted\":false,\"_metadata_table\":null,\"_metadata_change_type\":\"READ\",\"_metadata_primary_keys\":null,\"_metadata_uuid\":\"2a9cf8eb-4f35-433c-899a-39921d4c8587\",\"_metadata_timestamp_seconds\":\"1745957556\",\"_metadata_timestamp_nanos\":\"184498000\",\"_metadata_source\":{\"database\":\"extended_types\",\"collection\":\"mycol\",\"change_type\":\"READ\",\"is_deleted\":false,\"primary_key\":[\"_id\"]},\"_metadata_error\":null,\"_metadata_retry_count\":116}";
Document result = Utils.jsonToDocument(jsonString, 1L);
assertTrue(
result
.toJson()
.equals(
"{\"_id\": 1, \"arrayField\": [\"hello\", 10], \"dateField\": {\"$date\": \"2019-08-11T17:54:14.692Z\"}, \"dateBefore1970\": {\"$date\": {\"$numberLong\": \"-1577923200000\"}}, \"decimal128Field\": {\"$numberDecimal\": \"10.99\"}, \"documentField\": {\"a\": \"hello\"}, \"doubleField\": 10.5, \"infiniteNumber\": {\"$numberDouble\": \"Infinity\"}, \"int32field\": 10, \"int64Field\": 50, \"minKeyField\": {\"$minKey\": 1}, \"maxKeyField\": {\"$maxKey\": 1}, \"regexField\": {\"$regularExpression\": {\"pattern\": \"^H\", \"options\": \"i\"}}, \"timestampField\": {\"$timestamp\": {\"t\": 1565545664, \"i\": 1}}, \"uuid\": {\"$binary\": {\"base64\": \"OyQRAeK7QlWMr0E2xWapYg==\", \"subType\": \"04\"}}}"));
"{\"_id\": 1, \"arrayField\": [\"hello\", 10], \"dateField\": {\"$date\":"
+ " \"2019-08-11T17:54:14.692Z\"}, \"dateBefore1970\": {\"$date\":"
+ " {\"$numberLong\": \"-1577923200000\"}}, \"decimal128Field\":"
+ " {\"$numberDecimal\": \"10.99\"}, \"documentField\": {\"a\": \"hello\"},"
+ " \"doubleField\": 10.5, \"infiniteNumber\": {\"$numberDouble\":"
+ " \"Infinity\"}, \"int32field\": 10, \"int64Field\": 50, \"minKeyField\":"
+ " {\"$minKey\": 1}, \"maxKeyField\": {\"$maxKey\": 1}, \"regexField\":"
+ " {\"$regularExpression\": {\"pattern\": \"^H\", \"options\": \"i\"}},"
+ " \"timestampField\": {\"$timestamp\": {\"t\": 1565545664, \"i\": 1}},"
+ " \"uuid\": {\"$binary\": {\"base64\": \"OyQRAeK7QlWMr0E2xWapYg==\","
+ " \"subType\": \"04\"}}}"));
}

@Test(expected = IllegalArgumentException.class)
public void testJsonToDocument_missingDataField() {
String jsonString = "{\"id\": 123, \"name\": \"test\"}";
Utils.jsonToDocument(jsonString, 1L);
}

@Test(expected = IllegalArgumentException.class)
public void testJsonToDocument_nullDataField() {
String jsonString = "{\"data\": null}";
Utils.jsonToDocument(jsonString, 1L);
}
}
Loading