From cebfe8f28430b99a8cc589a3ea9cc3cb237fbf3d Mon Sep 17 00:00:00 2001 From: Shreya Khajanchi Date: Tue, 12 May 2026 00:56:34 +0530 Subject: [PATCH 1/5] Adding data generator engine in cdc data generator --- .../templates/dofn/DataGeneratorEngine.java | 535 ++++++++++ .../v2/templates/dofn/RowAssembler.java | 14 + .../v2/templates/model/LifecycleEvent.java | 48 + .../v2/templates/utils/Constants.java | 4 + .../v2/templates/utils/SchemaUtils.java | 147 ++- .../dofn/DataGeneratorEngineTest.java | 934 ++++++++++++++++++ .../v2/templates/utils/SchemaUtilsTest.java | 270 +++++ 7 files changed, 1945 insertions(+), 7 deletions(-) create mode 100644 v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/dofn/DataGeneratorEngine.java create mode 100644 v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/model/LifecycleEvent.java create mode 100644 v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/dofn/DataGeneratorEngineTest.java diff --git a/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/dofn/DataGeneratorEngine.java b/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/dofn/DataGeneratorEngine.java new file mode 100644 index 0000000000..262a6a8a71 --- /dev/null +++ b/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/dofn/DataGeneratorEngine.java @@ -0,0 +1,535 @@ +/* + * Copyright (C) 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.templates.dofn; + +import com.google.cloud.teleport.v2.templates.model.DataGeneratorColumn; +import com.google.cloud.teleport.v2.templates.model.DataGeneratorForeignKey; +import com.google.cloud.teleport.v2.templates.model.DataGeneratorSchema; +import com.google.cloud.teleport.v2.templates.model.DataGeneratorTable; +import com.google.cloud.teleport.v2.templates.model.LifecycleEvent; +import com.google.cloud.teleport.v2.templates.utils.Constants; +import com.google.cloud.teleport.v2.templates.utils.DataGeneratorUtils; +import com.google.cloud.teleport.v2.templates.utils.FailureRecord; +import com.google.cloud.teleport.v2.templates.utils.SchemaUtils; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import net.datafaker.Faker; +import org.apache.beam.sdk.metrics.Counter; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.state.MapState; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.values.Row; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Orchestrates schema tree traversal and synthesized value updates, scheduling lifecycle timer + * events to match operational distribution patterns. + */ +public class DataGeneratorEngine { + private static final Logger LOG = LoggerFactory.getLogger(DataGeneratorEngine.class); + + private final Integer updateInterval; + private final Integer deleteInterval; + private final Faker faker; + + private transient volatile DataGeneratorSchema schema; + private transient volatile List insertTopoOrder; + + private final Counter insertsGenerated = + Metrics.counter(DataGeneratorEngine.class, "insertsGenerated"); + private final Counter updatesGenerated = + Metrics.counter(DataGeneratorEngine.class, "updatesGenerated"); + private final Counter deletesGenerated = + Metrics.counter(DataGeneratorEngine.class, "deletesGenerated"); + private final Counter unresolvableFkChildrenDropped = + Metrics.counter(DataGeneratorEngine.class, "unresolvableFkChildrenDropped"); + + public DataGeneratorEngine(Integer updateInterval, Integer deleteInterval, Faker faker) { + this.updateInterval = updateInterval; + this.deleteInterval = deleteInterval; + this.faker = faker; + } + + /** + * Processes an initial root record from the data generator source, initializing topological + * orders, buffering the row, and cascading down to child tables. + */ + public void processRecord( + String tableName, + Row row, + MapState> eventQueueState, + ValueState> activeTimestamps, + MapState tableMapState, + Timer eventTimer, + DataGeneratorSchema loadedSchema, + MutationBatcher batcher) { + + this.schema = loadedSchema; + if (this.insertTopoOrder == null) { + this.insertTopoOrder = SchemaUtils.buildInsertTopoOrder(loadedSchema); + } + + DataGeneratorTable table = schema.tables().get(tableName); + + if (table == null) { + Metrics.counter(DataGeneratorEngine.class, "tableNotFound_" + tableName).inc(); + return; + } + + tableMapState.put(tableName, table); + + processTable( + table, + row, + eventQueueState, + activeTimestamps, + tableMapState, + eventTimer, + /* forcedDeleteTimestamp= */ 0L, + /* earliestAncestorDelete= */ Long.MAX_VALUE, + new HashMap<>(), + batcher); + } + + /** + * Processes all scheduled future lifecycle events (updates and deletes) for active timestamps up + * to the current wall-clock time. + */ + public void processScheduledEvents( + MapState> eventQueueState, + ValueState> activeTimestamps, + MapState tableMapState, + Timer eventTimer, + MutationBatcher batcher, + List pendingDlq) { + + List timestamps = activeTimestamps.read(); + if (timestamps == null || timestamps.isEmpty()) { + return; + } + + long now = System.currentTimeMillis(); + int firstFutureIdx = 0; + for (Long ts : timestamps) { + if (ts > now) { + break; + } + List events = eventQueueState.get(ts).read(); + if (events != null) { + for (LifecycleEvent event : events) { + try { + processEvent(event, tableMapState, batcher); + } catch (Exception genError) { + LOG.error( + "Lifecycle event generation failed for table {} ({})", + event.tableName, + event.type, + genError); + Metrics.counter(DataGeneratorEngine.class, "generationFailures").inc(); + pendingDlq.add(FailureRecord.toJson(event.tableName, event.type, null, genError)); + } + } + eventQueueState.remove(ts); + } + firstFutureIdx++; + } + + timestamps = new ArrayList<>(timestamps.subList(firstFutureIdx, timestamps.size())); + activeTimestamps.write(timestamps); + if (!timestamps.isEmpty()) { + eventTimer.set(Instant.ofEpochMilli(timestamps.get(0))); + } + } + + /** + * Assembles and buffers an insert row for the specified table, calculates lifecycle mutation + * schedules, and cascades generation to its child tables. + */ + private void processTable( + DataGeneratorTable table, + Row row, + MapState> eventQueueState, + ValueState> activeTimestamps, + MapState tableMapState, + Timer eventTimer, + long forcedDeleteTimestamp, + long earliestAncestorDelete, + Map ancestorRows, + MutationBatcher batcher) { + + String tableName = table.name(); + tableMapState.put(tableName, table); + + Row fullRow = RowAssembler.completeRow(table, row, faker); + String shardId = + fullRow.getSchema().hasField(Constants.SHARD_ID_COLUMN_NAME) + ? fullRow.getString(Constants.SHARD_ID_COLUMN_NAME) + : ""; + insertsGenerated.inc(); + LinkedHashMap pkMap; + try { + pkMap = RowAssembler.pkValuesOf(fullRow, table); + } catch (IllegalArgumentException e) { + LOG.error("Primary key validation failed for table {}", tableName, e); + if (batcher != null && batcher.getFailedRecords() != null) { + batcher + .getFailedRecords() + .add(FailureRecord.toJson(tableName, Constants.MUTATION_INSERT, fullRow, e)); + } + return; + } + + Row reducedRow = null; + if (!pkMap.isEmpty()) { + reducedRow = RowAssembler.createReducedRow(fullRow, table); + } + + batcher.bufferRow( + tableName, fullRow, Constants.MUTATION_INSERT, table, shardId, insertTopoOrder); + + long now = System.currentTimeMillis(); + long deleteTimestamp = 0L; + int numUpdates = 0; + long upInterval = this.updateInterval; + long delInterval = this.deleteInterval; + + if (!pkMap.isEmpty()) { + int tableInsertQps = table.insertQps(); + int tableUpdateQps = table.updateQps(); + int tableDeleteQps = table.deleteQps(); + + numUpdates = calculateNumUpdates(tableInsertQps, tableUpdateQps); + double deleteRatio = tableInsertQps > 0 ? (double) tableDeleteQps / tableInsertQps : 0.0; + + if (forcedDeleteTimestamp > 0) { + deleteTimestamp = forcedDeleteTimestamp; + } else if (ThreadLocalRandom.current().nextDouble() < deleteRatio) { + deleteTimestamp = now + upInterval * numUpdates + delInterval; + } + + long myDeleteBound = deleteTimestamp > 0 ? deleteTimestamp : Long.MAX_VALUE; + long effectiveDeleteBound = Math.min(myDeleteBound, earliestAncestorDelete); + if (effectiveDeleteBound < Long.MAX_VALUE && numUpdates > 0) { + long budget = effectiveDeleteBound - now - delInterval; + if (upInterval * numUpdates > budget) { + if (budget < 0) { + upInterval = 0; + } else { + upInterval = budget / numUpdates; + } + } + } + } + + Map updatedAncestorRows = new HashMap<>(ancestorRows); + updatedAncestorRows.put(tableName, fullRow); + + long childEarliestAncestorDelete = + deleteTimestamp > 0 + ? Math.min(earliestAncestorDelete, deleteTimestamp) + : earliestAncestorDelete; + + if (table.childTables() != null) { + for (String childTableName : table.childTables()) { + DataGeneratorTable childTable = schema.tables().get(childTableName); + if (childTable == null) { + Metrics.counter(DataGeneratorEngine.class, "childTableNotFound_" + childTableName).inc(); + continue; + } + generateAndWriteChildren( + table, + fullRow, + childTable, + eventQueueState, + activeTimestamps, + tableMapState, + eventTimer, + deleteTimestamp, + childEarliestAncestorDelete, + updatedAncestorRows, + batcher); + } + } + + if (!pkMap.isEmpty()) { + for (int i = 1; i <= numUpdates; i++) { + scheduleEvent( + now + upInterval * i, + new LifecycleEvent(pkMap, Constants.MUTATION_UPDATE, tableName, reducedRow), + eventQueueState, + activeTimestamps, + eventTimer); + } + if (deleteTimestamp > 0) { + scheduleEvent( + deleteTimestamp, + new LifecycleEvent(pkMap, Constants.MUTATION_DELETE, tableName, reducedRow), + eventQueueState, + activeTimestamps, + eventTimer); + } + } + } + + /** + * Cascades top-down row generation to child tables based on QPS ratios, inheriting referenced + * foreign key and interleaved parent columns. + */ + private void generateAndWriteChildren( + DataGeneratorTable parentTable, + Row parentRow, + DataGeneratorTable childTable, + MapState> eventQueueState, + ValueState> activeTimestamps, + MapState tableMapState, + Timer eventTimer, + long forcedDeleteTimestamp, + long earliestAncestorDelete, + Map ancestorRows, + MutationBatcher batcher) { + + int numChildren = calculateNumChildren(parentTable.insertQps(), childTable.insertQps()); + + for (int i = 0; i < numChildren; i++) { + Row childRow = generateChildRow(parentRow, childTable, ancestorRows); + if (childRow == null) { + unresolvableFkChildrenDropped.inc(); + batcher + .getFailedRecords() + .add( + FailureRecord.toJson( + childTable.name(), + FailureRecord.OPERATION_GENERATION, + null, + new IllegalArgumentException( + String.format( + "Cannot resolve structural dependency (FK/Interleave) for table: %s", + childTable.name())))); + break; + } + processTable( + childTable, + childRow, + eventQueueState, + activeTimestamps, + tableMapState, + eventTimer, + forcedDeleteTimestamp, + earliestAncestorDelete, + ancestorRows, + batcher); + } + } + + /** + * Synthesizes a single child row, resolving foreign keys and interleaved primary keys from the + * ancestor chain. + */ + private Row generateChildRow( + Row parentRow, DataGeneratorTable childTable, Map ancestorRows) { + + Map columnValues = new HashMap<>(); + + if (childTable.foreignKeys() != null && !childTable.foreignKeys().isEmpty()) { + for (DataGeneratorForeignKey fk : childTable.foreignKeys()) { + Row source = ancestorRows.get(fk.referencedTable()); + if (source == null) { + LOG.warn( + "Cannot resolve FK {} from {} -> {}: target table is not in the ancestor chain.", + fk.name(), + childTable.name(), + fk.referencedTable()); + return null; + } + for (int i = 0; i < fk.keyColumns().size(); i++) { + String refCol = fk.referencedColumns().get(i); + if (!source.getSchema().hasField(refCol)) { + LOG.warn( + "Foreign key constraint '{}' references missing column '{}' on table '{}'.", + fk.name(), + refCol, + fk.referencedTable()); + return null; + } + columnValues.put(fk.keyColumns().get(i), source.getValue(refCol)); + } + } + } + + if (childTable.interleavedInTable() != null) { + String interleavedParentName = childTable.interleavedInTable(); + Row interleavedParentRow = ancestorRows.get(interleavedParentName); + DataGeneratorTable interleavedParentTable = + schema != null && schema.tables() != null + ? schema.tables().get(interleavedParentName) + : null; + if (interleavedParentRow == null || interleavedParentTable == null) { + LOG.warn( + "Cannot resolve interleaved parent table '{}' for child '{}': parent is not in the ancestor chain or schema.", + interleavedParentName, + childTable.name()); + return null; + } + for (String pk : interleavedParentTable.primaryKeys()) { + if (!interleavedParentRow.getSchema().hasField(pk)) { + LOG.warn( + "Interleaved child table '{}' references missing primary key column '{}' on parent '{}'.", + childTable.name(), + pk, + interleavedParentName); + return null; + } + Object val = interleavedParentRow.getValue(pk); + if (val != null) { + columnValues.put(pk, val); + } + } + } + + Schema.Builder schemaBuilder = Schema.builder(); + List values = new ArrayList<>(); + + for (DataGeneratorColumn col : childTable.columns()) { + if (col.isSkipped()) { + continue; + } + Object val; + if (columnValues.containsKey(col.name())) { + val = columnValues.get(col.name()); + } else { + val = DataGeneratorUtils.generateValue(col, faker); + } + schemaBuilder.addField( + Schema.Field.of(col.name(), DataGeneratorUtils.mapToBeamFieldType(col.logicalType()))); + values.add(val); + } + + String shardId = + parentRow.getSchema().hasField(Constants.SHARD_ID_COLUMN_NAME) + ? parentRow.getString(Constants.SHARD_ID_COLUMN_NAME) + : ""; + schemaBuilder.addField( + Schema.Field.of(Constants.SHARD_ID_COLUMN_NAME, Schema.FieldType.STRING)); + values.add(shardId); + return Row.withSchema(schemaBuilder.build()).addValues(values).build(); + } + + /** + * Persists a future lifecycle event into the state queue, keeping the active timestamp list + * sorted via binary search. + */ + private void scheduleEvent( + long timestamp, + LifecycleEvent event, + MapState> eventQueueState, + ValueState> activeTimestamps, + Timer eventTimer) { + + long snappedTimestamp = (timestamp / 1000) * 1000; + + List events = eventQueueState.get(snappedTimestamp).read(); + if (events == null) { + events = new ArrayList<>(); + } + events.add(event); + eventQueueState.put(snappedTimestamp, events); + + List timestamps = activeTimestamps.read(); + if (timestamps == null) { + timestamps = new ArrayList<>(); + } + int idx = Collections.binarySearch(timestamps, snappedTimestamp); + if (idx < 0) { + timestamps.add(-(idx + 1), snappedTimestamp); + activeTimestamps.write(timestamps); + } + eventTimer.set(Instant.ofEpochMilli(timestamps.get(0))); + } + + /** + * Executes a scheduled update or delete event by assembling the mutated row and buffering it into + * the batcher. + */ + private void processEvent( + LifecycleEvent event, + MapState tableMapState, + MutationBatcher batcher) { + + DataGeneratorTable table = tableMapState.get(event.tableName).read(); + if (table == null) { + return; + } + + Row originalRow = event.reducedRow; + String shardId = + (originalRow != null && originalRow.getSchema().hasField(Constants.SHARD_ID_COLUMN_NAME)) + ? originalRow.getString(Constants.SHARD_ID_COLUMN_NAME) + : ""; + + if (Constants.MUTATION_UPDATE.equals(event.type)) { + Row updateRow = RowAssembler.generateUpdateRow(event.pkValues, table, originalRow, faker); + batcher.bufferRow( + event.tableName, updateRow, Constants.MUTATION_UPDATE, table, shardId, insertTopoOrder); + updatesGenerated.inc(); + } else if (Constants.MUTATION_DELETE.equals(event.type)) { + Row deleteRow = RowAssembler.generateDeleteRow(event.pkValues, table); + batcher.bufferRow( + event.tableName, deleteRow, Constants.MUTATION_DELETE, table, shardId, insertTopoOrder); + deletesGenerated.inc(); + } + } + + /** + * Probabilistically calculates the number of updates to generate for a single insert record based + * on QPS ratios. + */ + private int calculateNumUpdates(int insertQps, int updateQps) { + if (insertQps <= 0 || updateQps <= 0) { + return 0; + } + double ratio = (double) updateQps / insertQps; + int count = (int) ratio; + if (ThreadLocalRandom.current().nextDouble() < (ratio - count)) { + count++; + } + return count; + } + + /** + * Probabilistically calculates the number of child records to fan out per parent record based on + * QPS ratios. + */ + private int calculateNumChildren(int parentInsertQps, int childInsertQps) { + if (parentInsertQps <= 0 || childInsertQps <= 0) { + return 0; + } + double ratio = (double) childInsertQps / parentInsertQps; + int count = (int) ratio; + if (faker.random().nextDouble() < (ratio - count)) { + count++; + } + return count; + } +} diff --git a/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/dofn/RowAssembler.java b/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/dofn/RowAssembler.java index a86834820f..8dbb7ce86b 100644 --- a/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/dofn/RowAssembler.java +++ b/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/dofn/RowAssembler.java @@ -224,4 +224,18 @@ static Row completeRow(DataGeneratorTable table, Row partialRow, Faker faker) { return Row.withSchema(schemaBuilder.build()).addValues(values).build(); } + + static LinkedHashMap pkValuesOf(Row row, DataGeneratorTable table) { + LinkedHashMap pk = new LinkedHashMap<>(); + for (String pkCol : table.primaryKeys()) { + if (!row.getSchema().hasField(pkCol)) { + throw new IllegalArgumentException( + String.format( + "Required Primary Key column '%s' missing from row schema for table '%s'", + pkCol, table.name())); + } + pk.put(pkCol, row.getValue(pkCol)); + } + return pk; + } } diff --git a/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/model/LifecycleEvent.java b/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/model/LifecycleEvent.java new file mode 100644 index 0000000000..ad2907f2af --- /dev/null +++ b/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/model/LifecycleEvent.java @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.templates.model; + +import java.io.Serializable; +import java.util.LinkedHashMap; +import org.apache.beam.sdk.coders.DefaultCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.values.Row; + +/** + * A lifecycle event (UPDATE or DELETE) scheduled against a previously-inserted row. + * + *

The primary key is carried as an ordered map of {@code (columnName -> value)} so that both + * composite and non-integer PKs round-trip correctly. The map is a {@link LinkedHashMap} so + * iteration order matches the declared PK column order. + */ +@DefaultCoder(SerializableCoder.class) +public class LifecycleEvent implements Serializable { + + private static final long serialVersionUID = 1L; + + public LinkedHashMap pkValues; + public String type; + public String tableName; + public Row reducedRow; + + public LifecycleEvent( + LinkedHashMap pkValues, String type, String tableName, Row reducedRow) { + this.pkValues = pkValues; + this.type = type; + this.tableName = tableName; + this.reducedRow = reducedRow; + } +} diff --git a/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/utils/Constants.java b/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/utils/Constants.java index e1818d2cae..25902a5c75 100644 --- a/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/utils/Constants.java +++ b/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/utils/Constants.java @@ -44,4 +44,8 @@ public final class Constants { public static final String SINK_TYPE_MYSQL = "MYSQL"; public static final String SINK_TYPE_SPANNER = "SPANNER"; + + public static final String MUTATION_INSERT = "INSERT"; + public static final String MUTATION_UPDATE = "UPDATE"; + public static final String MUTATION_DELETE = "DELETE"; } diff --git a/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/utils/SchemaUtils.java b/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/utils/SchemaUtils.java index e41022abae..199a7b037d 100644 --- a/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/utils/SchemaUtils.java +++ b/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/utils/SchemaUtils.java @@ -21,14 +21,20 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.util.ArrayList; +import java.util.Collection; +import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.stream.Collectors; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** Utilities for manipulating {@link DataGeneratorSchema}. */ public class SchemaUtils { + private static final Logger LOG = LoggerFactory.getLogger(SchemaUtils.class); /** * Constructs a Directed Acyclic Graph (DAG) of tables in the schema. Identifies parent-child @@ -72,13 +78,13 @@ public static DataGeneratorSchema generateSchemaDAG(DataGeneratorSchema schema) continue; // No parents for this table } - // Sort Parents by QPS - parents.sort(java.util.Comparator.comparingInt(DataGeneratorTable::insertQps)); + // Sort Parents Topologically to respect physical FK relations + List sortedParents = sortTopologically(parents, tableMap); // Chain the Parents: P1 -> P2 -> ... -> Pn -> Child - for (int i = 0; i < parents.size() - 1; i++) { - String currentParentName = parents.get(i).name(); - String nextParentName = parents.get(i + 1).name(); + for (int i = 0; i < sortedParents.size() - 1; i++) { + String currentParentName = sortedParents.get(i).name(); + String nextParentName = sortedParents.get(i + 1).name(); // Avoid adding duplicate dependencies if a table is part of multiple chains List currentChildren = parentToSequenceChild.computeIfAbsent(currentParentName, k -> new ArrayList<>()); @@ -89,7 +95,7 @@ public static DataGeneratorSchema generateSchemaDAG(DataGeneratorSchema schema) } // Link the last parent in the chain to the child table - String lastParentName = parents.get(parents.size() - 1).name(); + String lastParentName = sortedParents.get(sortedParents.size() - 1).name(); List lastParentChildren = parentToSequenceChild.computeIfAbsent(lastParentName, k -> new ArrayList<>()); if (!lastParentChildren.contains(childName)) { @@ -106,16 +112,143 @@ public static DataGeneratorSchema generateSchemaDAG(DataGeneratorSchema schema) parentToSequenceChild.getOrDefault(tableName, ImmutableList.of()); boolean isRoot = !hasSequenceParent.contains(tableName); + boolean hasAncestorDelete = + hasPhysicalAncestorWithDeleteQps(table, tableMap, new HashSet<>()); + Integer finalDeleteQps = hasAncestorDelete ? Integer.valueOf(0) : table.deleteQps(); + newTablesBuilder.put( tableName, table.toBuilder() .childTables( ImmutableList.copyOf( - sequenceChildren)) // These are tables to generate AFTER this one + sequenceChildren)) // These are tables to generate data AFTER this one .isRoot(isRoot) + .deleteQps(finalDeleteQps) .build()); + + if (isRoot) { + LOG.info("Identified Root table: {}", tableName); + } + if (!sequenceChildren.isEmpty()) { + LOG.info("Table {} triggers children: {}", tableName, sequenceChildren); + } } return DataGeneratorSchema.builder().tables(newTablesBuilder.buildOrThrow()).build(); } + + /** + * Checks if any physical ancestor (via interleaving or foreign keys) has delete QPS configured. + * Used to suppress child table delete generation to prevent double deletion conflicts. + */ + private static boolean hasPhysicalAncestorWithDeleteQps( + DataGeneratorTable table, Map tableMap, Set visited) { + if (table == null || !visited.add(table.name())) { + return false; // Cycle or null table reached + } + + // 1. Check Interleaved Parent + if (table.interleavedInTable() != null) { + DataGeneratorTable p = tableMap.get(table.interleavedInTable()); + if (p != null) { + if (p.deleteQps() != null && p.deleteQps() > 0) { + return true; + } + if (hasPhysicalAncestorWithDeleteQps(p, tableMap, visited)) { + return true; + } + } + } + + // 2. Check Foreign Key Parents + if (table.foreignKeys() != null) { + for (DataGeneratorForeignKey fk : table.foreignKeys()) { + DataGeneratorTable p = tableMap.get(fk.referencedTable()); + if (p != null) { + if (p.deleteQps() != null && p.deleteQps() > 0) { + return true; + } + if (hasPhysicalAncestorWithDeleteQps(p, tableMap, visited)) { + return true; + } + } + } + } + return false; + } + + /** + * Sorts a collection of tables topologically, ensuring parent/referenced tables are listed before + * their dependent child tables. + */ + public static List sortTopologically( + Collection tables, Map allTables) { + List sortedInput = new ArrayList<>(tables); + // Deterministic initial sort prioritizing roots and higher QPS tables + sortedInput.sort( + Comparator.comparing( + (DataGeneratorTable t) -> t.isRoot() != null && t.isRoot(), + Comparator.reverseOrder()) + .thenComparingInt(t -> t.insertQps() != null ? t.insertQps() : 0) + .thenComparing(DataGeneratorTable::name)); + + List sorted = new ArrayList<>(); + Set visited = new HashSet<>(); + Set visiting = new HashSet<>(); + + for (DataGeneratorTable table : sortedInput) { + if (!visited.contains(table.name())) { + visitTopologically(table, allTables, visited, visiting, sorted, sortedInput); + } + } + return sorted; + } + + /** + * Recursive depth-first search helper for topological sorting. Detects circular dependencies via + * the 'visiting' tracking set. + */ + private static void visitTopologically( + DataGeneratorTable table, + Map allTables, + Set visited, + Set visiting, + List sorted, + List subset) { + visiting.add(table.name()); + + // Collect all physical dependencies (interleaved + FK parents) + List parentNames = new ArrayList<>(); + if (table.interleavedInTable() != null) { + parentNames.add(table.interleavedInTable()); + } + for (DataGeneratorForeignKey fk : table.foreignKeys()) { + parentNames.add(fk.referencedTable()); + } + + // Recursively visit dependencies first + for (String refTable : parentNames) { + if (subset.stream().anyMatch(t -> t.name().equals(refTable))) { + DataGeneratorTable parent = allTables.get(refTable); + if (parent != null && !visited.contains(refTable)) { + if (visiting.contains(refTable)) { + throw new IllegalStateException( + "Circular dependency detected in schema involving table: " + refTable); + } + visitTopologically(parent, allTables, visited, visiting, sorted, subset); + } + } + } + + visiting.remove(table.name()); + visited.add(table.name()); + sorted.add(table); + } + + /** Builds the global insertion order across all tables in the schema for pipeline execution. */ + public static List buildInsertTopoOrder(DataGeneratorSchema schema) { + List sortedTables = + sortTopologically(schema.tables().values(), schema.tables()); + return sortedTables.stream().map(DataGeneratorTable::name).collect(Collectors.toList()); + } } diff --git a/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/dofn/DataGeneratorEngineTest.java b/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/dofn/DataGeneratorEngineTest.java new file mode 100644 index 0000000000..b5b652d368 --- /dev/null +++ b/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/dofn/DataGeneratorEngineTest.java @@ -0,0 +1,934 @@ +/* + * Copyright (C) 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.templates.dofn; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.atLeastOnce; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +import com.google.cloud.teleport.v2.templates.model.DataGeneratorColumn; +import com.google.cloud.teleport.v2.templates.model.DataGeneratorForeignKey; +import com.google.cloud.teleport.v2.templates.model.DataGeneratorSchema; +import com.google.cloud.teleport.v2.templates.model.DataGeneratorTable; +import com.google.cloud.teleport.v2.templates.model.LifecycleEvent; +import com.google.cloud.teleport.v2.templates.model.LogicalType; +import com.google.cloud.teleport.v2.templates.utils.Constants; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.util.ArrayList; +import java.util.Collections; +import java.util.LinkedHashMap; +import java.util.List; +import net.datafaker.Faker; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.state.MapState; +import org.apache.beam.sdk.state.ReadableState; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.values.Row; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; +import org.mockito.ArgumentMatchers; + +@RunWith(JUnit4.class) +public class DataGeneratorEngineTest { + + @Test + public void testEngine_initializesCorrectly() { + DataGeneratorEngine engine = new DataGeneratorEngine(5000, 5000, new Faker()); + assertNotNull("Engine instance should be successfully initialized", engine); + } + + @Test + public void testProcessRecord_throwsExceptionWhenFkColumnCasingMismatches() { + DataGeneratorEngine engine = new DataGeneratorEngine(5000, 5000, new Faker()); + + DataGeneratorTable parentTable = + DataGeneratorTable.builder() + .name("Parent") + .columns( + ImmutableList.of( + DataGeneratorColumn.builder() + .name("deptCode") + .logicalType(LogicalType.STRING) + .isNullable(false) + .isGenerated(false) + .build())) + .primaryKeys(ImmutableList.of("deptCode")) + .foreignKeys(ImmutableList.of()) + .uniqueKeys(ImmutableList.of()) + .insertQps(10) + .updateQps(0) + .deleteQps(0) + .isRoot(true) + .childTables(ImmutableList.of("Child")) + .build(); + + DataGeneratorTable childTable = + DataGeneratorTable.builder() + .name("Child") + .columns( + ImmutableList.of( + DataGeneratorColumn.builder() + .name("DeptCode") + .logicalType(LogicalType.STRING) + .isNullable(false) + .isGenerated(false) + .build())) + .primaryKeys(ImmutableList.of()) + .foreignKeys( + ImmutableList.of( + DataGeneratorForeignKey.builder() + .name("fk_dept") + .keyColumns(ImmutableList.of("DeptCode")) + .referencedTable("Parent") + .referencedColumns(ImmutableList.of("DeptCode")) + .build())) + .uniqueKeys(ImmutableList.of()) + .insertQps(10) + .updateQps(0) + .deleteQps(0) + .isRoot(false) + .build(); + + DataGeneratorSchema schema = + DataGeneratorSchema.builder() + .tables(ImmutableMap.of("Parent", parentTable, "Child", childTable)) + .build(); + + Row mockRow = mock(Row.class); + Schema parentSchema = + Schema.builder().addField(Schema.Field.of("deptCode", Schema.FieldType.STRING)).build(); + when(mockRow.getSchema()).thenReturn(parentSchema); + when(mockRow.getValue("deptCode")).thenReturn("HR"); + + MutationBatcher batcher = mock(MutationBatcher.class); + List dlq = new ArrayList<>(); + when(batcher.getFailedRecords()).thenReturn(dlq); + + engine.processRecord( + "Parent", + mockRow, + mock(MapState.class), + mock(ValueState.class), + mock(MapState.class), + mock(Timer.class), + schema, + batcher); + + assertEquals("Should have exactly 1 record in the DLQ", 1, dlq.size()); + String dlqPayload = dlq.get(0); + assertTrue( + "DLQ payload should target the Child table", dlqPayload.contains("\"table\":\"Child\"")); + assertTrue( + "DLQ payload should state dependency error", + dlqPayload.contains("Cannot resolve structural dependency")); + } + + @Test + public void testProcessRecord_tableNotFound() { + DataGeneratorEngine engine = new DataGeneratorEngine(5000, 5000, new Faker()); + DataGeneratorSchema schema = DataGeneratorSchema.builder().tables(ImmutableMap.of()).build(); + + Row mockRow = mock(Row.class); + MutationBatcher batcher = mock(MutationBatcher.class); + MapState> eventQueueState = mock(MapState.class); + ValueState> activeTimestamps = mock(ValueState.class); + MapState tableMapState = mock(MapState.class); + + engine.processRecord( + "UnknownTable", + mockRow, + eventQueueState, + activeTimestamps, + tableMapState, + mock(Timer.class), + schema, + batcher); + + verifyNoInteractions(batcher); + verifyNoInteractions(eventQueueState); + verifyNoInteractions(activeTimestamps); + verifyNoInteractions(tableMapState); + } + + @Test + public void testProcessRecord_endToEndLifecycle() { + DataGeneratorEngine engine = new DataGeneratorEngine(5000, 5000, new Faker()); + + DataGeneratorColumn pkCol = + DataGeneratorColumn.builder() + .name("id") + .logicalType(LogicalType.STRING) + .isNullable(false) + .isPrimaryKey(true) + .isGenerated(false) + .build(); + + DataGeneratorTable parentTable = + DataGeneratorTable.builder() + .name("Parent") + .columns(ImmutableList.of(pkCol)) + .primaryKeys(ImmutableList.of("id")) + .foreignKeys(ImmutableList.of()) + .uniqueKeys(ImmutableList.of()) + .insertQps(10) + .updateQps(20) + .deleteQps(20) + .isRoot(true) + .childTables(ImmutableList.of("Child")) + .build(); + + DataGeneratorTable childTable = + DataGeneratorTable.builder() + .name("Child") + .columns(ImmutableList.of(pkCol)) + .primaryKeys(ImmutableList.of("id")) + .foreignKeys(ImmutableList.of()) + .uniqueKeys(ImmutableList.of()) + .interleavedInTable("Parent") + .insertQps(10) + .updateQps(20) + .deleteQps(20) + .isRoot(false) + .build(); + + DataGeneratorSchema schema = + DataGeneratorSchema.builder() + .tables(ImmutableMap.of("Parent", parentTable, "Child", childTable)) + .build(); + + Schema rowSchema = + Schema.builder() + .addField(Schema.Field.of("id", Schema.FieldType.STRING)) + .addField(Schema.Field.of(Constants.SHARD_ID_COLUMN_NAME, Schema.FieldType.STRING)) + .build(); + + Row mockRow = mock(Row.class); + when(mockRow.getSchema()).thenReturn(rowSchema); + when(mockRow.getValue("id")).thenReturn("pk123"); + when(mockRow.getString(Constants.SHARD_ID_COLUMN_NAME)).thenReturn("shard1"); + + MapState> eventQueueState = mock(MapState.class); + ReadableState> readableEvents = mock(ReadableState.class); + when(eventQueueState.get(ArgumentMatchers.anyLong())).thenReturn(readableEvents); + + List capturedEvents = new ArrayList<>(); + when(readableEvents.read()).thenReturn(capturedEvents); + + ValueState> activeTimestamps = mock(ValueState.class); + List timestampsList = new ArrayList<>(); + when(activeTimestamps.read()).thenReturn(timestampsList); + + MapState tableMapState = mock(MapState.class); + ReadableState readableTable = mock(ReadableState.class); + when(tableMapState.get(ArgumentMatchers.anyString())).thenReturn(readableTable); + when(readableTable.read()).thenReturn(parentTable); + + MutationBatcher batcher = mock(MutationBatcher.class); + when(batcher.getFailedRecords()).thenReturn(new ArrayList<>()); + + Timer eventTimer = mock(Timer.class); + + engine.processRecord( + "Parent", + mockRow, + eventQueueState, + activeTimestamps, + tableMapState, + eventTimer, + schema, + batcher); + + assertFalse( + "Active timestamps should be scheduled after processing parent record", + timestampsList.isEmpty()); + + LinkedHashMap pkMap = new LinkedHashMap<>(); + pkMap.put("id", "pk123"); + capturedEvents.add(new LifecycleEvent(pkMap, Constants.MUTATION_UPDATE, "Parent", mockRow)); + capturedEvents.add(new LifecycleEvent(pkMap, Constants.MUTATION_DELETE, "Parent", mockRow)); + + timestampsList.add(0, System.currentTimeMillis() - 5000L); + + List dlq = new ArrayList<>(); + engine.processScheduledEvents( + eventQueueState, activeTimestamps, tableMapState, eventTimer, batcher, dlq); + + assertTrue("DLQ should remain empty on successful lifecycle execution", dlq.isEmpty()); + verify(eventQueueState, atLeastOnce()).remove(ArgumentMatchers.anyLong()); + + verify(batcher, atLeastOnce()) + .bufferRow( + ArgumentMatchers.eq("Parent"), + ArgumentMatchers.any(Row.class), + ArgumentMatchers.eq(Constants.MUTATION_INSERT), + ArgumentMatchers.any(DataGeneratorTable.class), + ArgumentMatchers.anyString(), + ArgumentMatchers.any()); + verify(batcher, atLeastOnce()) + .bufferRow( + ArgumentMatchers.eq("Parent"), + ArgumentMatchers.any(Row.class), + ArgumentMatchers.eq(Constants.MUTATION_UPDATE), + ArgumentMatchers.any(DataGeneratorTable.class), + ArgumentMatchers.anyString(), + ArgumentMatchers.any()); + verify(batcher, atLeastOnce()) + .bufferRow( + ArgumentMatchers.eq("Parent"), + ArgumentMatchers.any(Row.class), + ArgumentMatchers.eq(Constants.MUTATION_DELETE), + ArgumentMatchers.any(DataGeneratorTable.class), + ArgumentMatchers.anyString(), + ArgumentMatchers.any()); + } + + @Test + public void testProcessRecord_success_compositeKeysWithDifferentNames() { + DataGeneratorEngine engine = new DataGeneratorEngine(5000, 5000, new Faker()); + + DataGeneratorColumn parentCol1 = + DataGeneratorColumn.builder() + .name("parentId1") + .logicalType(LogicalType.STRING) + .isNullable(false) + .isGenerated(false) + .build(); + DataGeneratorColumn parentCol2 = + DataGeneratorColumn.builder() + .name("parentId2") + .logicalType(LogicalType.STRING) + .isNullable(false) + .isGenerated(false) + .build(); + + DataGeneratorTable parentTable = + DataGeneratorTable.builder() + .name("Parent") + .columns(ImmutableList.of(parentCol1, parentCol2)) + .primaryKeys(ImmutableList.of("parentId1", "parentId2")) + .foreignKeys(ImmutableList.of()) + .uniqueKeys(ImmutableList.of()) + .insertQps(10) + .updateQps(0) + .deleteQps(0) + .isRoot(true) + .childTables(ImmutableList.of("Child")) + .build(); + + DataGeneratorColumn childCol1 = + DataGeneratorColumn.builder() + .name("childFkId1") + .logicalType(LogicalType.STRING) + .isNullable(false) + .isGenerated(false) + .build(); + DataGeneratorColumn childCol2 = + DataGeneratorColumn.builder() + .name("childFkId2") + .logicalType(LogicalType.STRING) + .isNullable(false) + .isGenerated(false) + .build(); + + DataGeneratorTable childTable = + DataGeneratorTable.builder() + .name("Child") + .columns(ImmutableList.of(childCol1, childCol2)) + .primaryKeys(ImmutableList.of()) + .foreignKeys( + ImmutableList.of( + DataGeneratorForeignKey.builder() + .name("fk_composite_different_names") + .keyColumns(ImmutableList.of("childFkId1", "childFkId2")) + .referencedTable("Parent") + .referencedColumns(ImmutableList.of("parentId1", "parentId2")) + .build())) + .uniqueKeys(ImmutableList.of()) + .insertQps(10) + .updateQps(0) + .deleteQps(0) + .isRoot(false) + .build(); + + DataGeneratorSchema schema = + DataGeneratorSchema.builder() + .tables(ImmutableMap.of("Parent", parentTable, "Child", childTable)) + .build(); + + Schema rowSchema = + Schema.builder() + .addField(Schema.Field.of("parentId1", Schema.FieldType.STRING)) + .addField(Schema.Field.of("parentId2", Schema.FieldType.STRING)) + .addField(Schema.Field.of(Constants.SHARD_ID_COLUMN_NAME, Schema.FieldType.STRING)) + .build(); + + Row mockRow = mock(Row.class); + when(mockRow.getSchema()).thenReturn(rowSchema); + when(mockRow.getValue("parentId1")).thenReturn("parentVal1"); + when(mockRow.getValue("parentId2")).thenReturn("parentVal2"); + when(mockRow.getString(Constants.SHARD_ID_COLUMN_NAME)).thenReturn("shard1"); + + MapState> eventQueueState = mock(MapState.class); + ValueState> activeTimestamps = mock(ValueState.class); + MapState tableMapState = mock(MapState.class); + ReadableState readableTable = mock(ReadableState.class); + when(tableMapState.get(ArgumentMatchers.anyString())).thenReturn(readableTable); + when(readableTable.read()).thenReturn(childTable); + + MutationBatcher batcher = mock(MutationBatcher.class); + List dlq = new ArrayList<>(); + when(batcher.getFailedRecords()).thenReturn(dlq); + + engine.processRecord( + "Parent", + mockRow, + eventQueueState, + activeTimestamps, + tableMapState, + mock(Timer.class), + schema, + batcher); + + assertTrue( + "DLQ should be empty for schema with composite columns of different names", dlq.isEmpty()); + + ArgumentCaptor rowCaptor = ArgumentCaptor.forClass(Row.class); + verify(batcher, times(2)) + .bufferRow( + ArgumentMatchers.anyString(), + rowCaptor.capture(), + ArgumentMatchers.eq(Constants.MUTATION_INSERT), + ArgumentMatchers.any(DataGeneratorTable.class), + ArgumentMatchers.anyString(), + ArgumentMatchers.anyList()); + + List capturedRows = rowCaptor.getAllValues(); + assertEquals("Should buffer exactly 2 rows (Parent + Child)", 2, capturedRows.size()); + + Row parentInserted = capturedRows.get(0); + assertEquals("parentVal1", parentInserted.getValue("parentId1")); + assertEquals("parentVal2", parentInserted.getValue("parentId2")); + + Row childInserted = capturedRows.get(1); + assertEquals("parentVal1", childInserted.getValue("childFkId1")); + assertEquals("parentVal2", childInserted.getValue("childFkId2")); + } + + @Test + public void testProcessRecord_invalidFkConfigurationsRouteToDlq() { + DataGeneratorEngine engine = new DataGeneratorEngine(5000, 5000, new Faker()); + + DataGeneratorColumn pkCol = + DataGeneratorColumn.builder() + .name("id") + .logicalType(LogicalType.STRING) + .isNullable(false) + .isPrimaryKey(true) + .isGenerated(false) + .build(); + + DataGeneratorTable parentTable = + DataGeneratorTable.builder() + .name("Parent") + .columns(ImmutableList.of(pkCol)) + .primaryKeys(ImmutableList.of("id")) + .foreignKeys(ImmutableList.of()) + .uniqueKeys(ImmutableList.of()) + .insertQps(10) + .updateQps(0) + .deleteQps(0) + .isRoot(true) + .childTables(ImmutableList.of("Child")) + .build(); + + Schema rowSchema = + Schema.builder() + .addField(Schema.Field.of("id", Schema.FieldType.STRING)) + .addField(Schema.Field.of(Constants.SHARD_ID_COLUMN_NAME, Schema.FieldType.STRING)) + .build(); + + Row mockRow = mock(Row.class); + when(mockRow.getSchema()).thenReturn(rowSchema); + when(mockRow.getValue("id")).thenReturn("pk123"); + when(mockRow.getString(Constants.SHARD_ID_COLUMN_NAME)).thenReturn("shard1"); + + MapState> eventQueueState = mock(MapState.class); + ValueState> activeTimestamps = mock(ValueState.class); + MapState tableMapState = mock(MapState.class); + Timer eventTimer = mock(Timer.class); + + // --- Scenario A: Referenced Table Does Not Exist (Unresolvable FK) --- + DataGeneratorTable childTableUnresolvableTable = + DataGeneratorTable.builder() + .name("Child") + .columns(ImmutableList.of(pkCol)) + .primaryKeys(ImmutableList.of("id")) + .foreignKeys( + ImmutableList.of( + DataGeneratorForeignKey.builder() + .name("fk_missing") + .keyColumns(ImmutableList.of("id")) + .referencedTable("MissingAncestor") // Non-existent table name + .referencedColumns(ImmutableList.of("id")) + .build())) + .uniqueKeys(ImmutableList.of()) + .insertQps(10) + .updateQps(0) + .deleteQps(0) + .isRoot(false) + .build(); + + DataGeneratorSchema schemaA = + DataGeneratorSchema.builder() + .tables(ImmutableMap.of("Parent", parentTable, "Child", childTableUnresolvableTable)) + .build(); + + MutationBatcher batcherA = mock(MutationBatcher.class); + List dlqA = new ArrayList<>(); + when(batcherA.getFailedRecords()).thenReturn(dlqA); + + engine.processRecord( + "Parent", + mockRow, + eventQueueState, + activeTimestamps, + tableMapState, + eventTimer, + schemaA, + batcherA); + + assertEquals( + "Should have exactly 1 record in DLQ for missing table constraint", 1, dlqA.size()); + String dlqPayloadA = dlqA.get(0); + assertTrue( + "DLQ payload should target Child table", dlqPayloadA.contains("\"table\":\"Child\"")); + assertTrue( + "DLQ payload should show dependency resolution issue", + dlqPayloadA.contains("Cannot resolve structural dependency")); + + // --- Scenario B: Referenced Column Does Not Exist (Missing Column) --- + DataGeneratorTable childTableMissingColumn = + DataGeneratorTable.builder() + .name("Child") + .columns(ImmutableList.of(pkCol)) + .primaryKeys(ImmutableList.of("id")) + .foreignKeys( + ImmutableList.of( + DataGeneratorForeignKey.builder() + .name("fk_missing_col") + .keyColumns(ImmutableList.of("id")) + .referencedTable("Parent") + .referencedColumns( + ImmutableList.of("nonexistent_col")) // Non-existent column name + .build())) + .uniqueKeys(ImmutableList.of()) + .insertQps(10) + .updateQps(0) + .deleteQps(0) + .isRoot(false) + .build(); + + DataGeneratorSchema schemaB = + DataGeneratorSchema.builder() + .tables(ImmutableMap.of("Parent", parentTable, "Child", childTableMissingColumn)) + .build(); + + MutationBatcher batcherB = mock(MutationBatcher.class); + List dlqB = new ArrayList<>(); + when(batcherB.getFailedRecords()).thenReturn(dlqB); + + engine.processRecord( + "Parent", + mockRow, + eventQueueState, + activeTimestamps, + tableMapState, + eventTimer, + schemaB, + batcherB); + + assertEquals("Should have exactly 1 record in DLQ for missing parent column", 1, dlqB.size()); + String dlqPayloadB = dlqB.get(0); + assertTrue( + "DLQ payload should target Child table", dlqPayloadB.contains("\"table\":\"Child\"")); + assertTrue( + "DLQ payload should show dependency resolution issue", + dlqPayloadB.contains("Cannot resolve structural dependency")); + } + + @Test + public void testProcessScheduledEvents_catchesErrorAndRoutesToDlq() { + DataGeneratorEngine engine = new DataGeneratorEngine(5000, 5000, new Faker()); + + MapState> eventQueueState = mock(MapState.class); + ReadableState> readableEvents = mock(ReadableState.class); + when(eventQueueState.get(ArgumentMatchers.anyLong())).thenReturn(readableEvents); + + List events = new ArrayList<>(); + events.add( + new LifecycleEvent(new LinkedHashMap(), "INVALID_TYPE", "Parent", null)); + when(readableEvents.read()).thenReturn(events); + + ValueState> activeTimestamps = mock(ValueState.class); + when(activeTimestamps.read()) + .thenReturn(Collections.singletonList(System.currentTimeMillis() - 1000L)); + + MapState tableMapState = mock(MapState.class); + ReadableState readableTable = mock(ReadableState.class); + when(tableMapState.get(ArgumentMatchers.anyString())).thenReturn(readableTable); + when(readableTable.read()).thenThrow(new RuntimeException("Forced generation error")); + + MutationBatcher batcher = mock(MutationBatcher.class); + List dlq = new ArrayList<>(); + when(batcher.getFailedRecords()).thenReturn(dlq); + + engine.processScheduledEvents( + eventQueueState, activeTimestamps, tableMapState, mock(Timer.class), batcher, dlq); + + assertEquals("Failed scheduled execution must output to the pending DLQ list", 1, dlq.size()); + String dlqPayload = dlq.get(0); + assertTrue( + "DLQ must capture target table details", dlqPayload.contains("\"table\":\"Parent\"")); + assertTrue( + "DLQ must log operation type", dlqPayload.contains("\"operation\":\"INVALID_TYPE\"")); + assertTrue( + "DLQ must contain stack trace/exception details", + dlqPayload.contains("Forced generation error")); + } + + @Test + public void testProcessScheduledEvents_success() { + DataGeneratorEngine engine = new DataGeneratorEngine(5000, 5000, new Faker()); + + DataGeneratorColumn pkCol = + DataGeneratorColumn.builder() + .name("id") + .logicalType(LogicalType.STRING) + .isNullable(false) + .isGenerated(false) + .isPrimaryKey(true) + .build(); + + DataGeneratorTable parentTable = + DataGeneratorTable.builder() + .name("Parent") + .columns(ImmutableList.of(pkCol)) + .primaryKeys(ImmutableList.of("id")) + .foreignKeys(ImmutableList.of()) + .uniqueKeys(ImmutableList.of()) + .insertQps(10) + .updateQps(0) + .deleteQps(0) + .isRoot(true) + .childTables(ImmutableList.of()) + .build(); + + Schema rowSchema = + Schema.builder() + .addField(Schema.Field.of("id", Schema.FieldType.STRING)) + .addField(Schema.Field.of(Constants.SHARD_ID_COLUMN_NAME, Schema.FieldType.STRING)) + .build(); + + Row mockRow = mock(Row.class); + when(mockRow.getSchema()).thenReturn(rowSchema); + when(mockRow.getValue("id")).thenReturn("pk123"); + when(mockRow.getString(Constants.SHARD_ID_COLUMN_NAME)).thenReturn("shard1"); + + MapState> eventQueueState = mock(MapState.class); + ReadableState> readableEvents = mock(ReadableState.class); + when(eventQueueState.get(ArgumentMatchers.anyLong())).thenReturn(readableEvents); + + LinkedHashMap pkMap = new LinkedHashMap<>(); + pkMap.put("id", "pk123"); + + List events = new ArrayList<>(); + events.add(new LifecycleEvent(pkMap, Constants.MUTATION_UPDATE, "Parent", mockRow)); + events.add(new LifecycleEvent(pkMap, Constants.MUTATION_DELETE, "Parent", mockRow)); + when(readableEvents.read()).thenReturn(events); + + ValueState> activeTimestamps = mock(ValueState.class); + List pastList = new ArrayList<>(); + pastList.add(System.currentTimeMillis() - 5000L); + when(activeTimestamps.read()).thenReturn(pastList); + + MapState tableMapState = mock(MapState.class); + ReadableState readableTable = mock(ReadableState.class); + when(tableMapState.get(ArgumentMatchers.anyString())).thenReturn(readableTable); + when(readableTable.read()).thenReturn(parentTable); + + MutationBatcher batcher = mock(MutationBatcher.class); + List dlq = new ArrayList<>(); + when(batcher.getFailedRecords()).thenReturn(new ArrayList<>()); + + engine.processScheduledEvents( + eventQueueState, activeTimestamps, tableMapState, mock(Timer.class), batcher, dlq); + + assertTrue("DLQ should be empty on successful scheduled events processing", dlq.isEmpty()); + verify(eventQueueState, atLeastOnce()).remove(ArgumentMatchers.anyLong()); + + ArgumentCaptor updateCaptor = ArgumentCaptor.forClass(Row.class); + verify(batcher) + .bufferRow( + ArgumentMatchers.eq("Parent"), + updateCaptor.capture(), + ArgumentMatchers.eq(Constants.MUTATION_UPDATE), + ArgumentMatchers.any(DataGeneratorTable.class), + ArgumentMatchers.anyString(), + ArgumentMatchers.any()); + assertEquals("pk123", updateCaptor.getValue().getValue("id")); + + ArgumentCaptor deleteCaptor = ArgumentCaptor.forClass(Row.class); + verify(batcher) + .bufferRow( + ArgumentMatchers.eq("Parent"), + deleteCaptor.capture(), + ArgumentMatchers.eq(Constants.MUTATION_DELETE), + ArgumentMatchers.any(DataGeneratorTable.class), + ArgumentMatchers.anyString(), + ArgumentMatchers.any()); + assertEquals("pk123", deleteCaptor.getValue().getValue("id")); + } + + @Test + public void testProcessRecord_missingInterleavedParentRoutesToDlq() { + DataGeneratorEngine engine = new DataGeneratorEngine(5000, 5000, new Faker()); + + DataGeneratorColumn pkCol = + DataGeneratorColumn.builder() + .name("id") + .logicalType(LogicalType.STRING) + .isNullable(false) + .isGenerated(false) + .isPrimaryKey(true) + .build(); + + DataGeneratorTable parentTable = + DataGeneratorTable.builder() + .name("Parent") + .columns(ImmutableList.of(pkCol)) + .primaryKeys(ImmutableList.of("id")) + .foreignKeys(ImmutableList.of()) + .uniqueKeys(ImmutableList.of()) + .insertQps(10) + .updateQps(0) + .deleteQps(0) + .isRoot(true) + .childTables(ImmutableList.of("Child")) + .build(); + + DataGeneratorTable childTable = + DataGeneratorTable.builder() + .name("Child") + .columns(ImmutableList.of(pkCol)) + .primaryKeys(ImmutableList.of("id")) + .foreignKeys(ImmutableList.of()) + .uniqueKeys(ImmutableList.of()) + .interleavedInTable("MissingAncestor") + .insertQps(10) + .updateQps(0) + .deleteQps(0) + .isRoot(false) + .build(); + + DataGeneratorSchema schema = + DataGeneratorSchema.builder() + .tables(ImmutableMap.of("Parent", parentTable, "Child", childTable)) + .build(); + + Schema rowSchema = + Schema.builder() + .addField(Schema.Field.of("id", Schema.FieldType.STRING)) + .addField(Schema.Field.of(Constants.SHARD_ID_COLUMN_NAME, Schema.FieldType.STRING)) + .build(); + + Row mockRow = mock(Row.class); + when(mockRow.getSchema()).thenReturn(rowSchema); + when(mockRow.getValue("id")).thenReturn("pk123"); + when(mockRow.getString(Constants.SHARD_ID_COLUMN_NAME)).thenReturn("shard1"); + + MapState> eventQueueState = mock(MapState.class); + ValueState> activeTimestamps = mock(ValueState.class); + MapState tableMapState = mock(MapState.class); + Timer eventTimer = mock(Timer.class); + + MutationBatcher batcher = mock(MutationBatcher.class); + List dlq = new ArrayList<>(); + when(batcher.getFailedRecords()).thenReturn(dlq); + + engine.processRecord( + "Parent", + mockRow, + eventQueueState, + activeTimestamps, + tableMapState, + eventTimer, + schema, + batcher); + + assertEquals( + "Should have exactly 1 record in DLQ for missing interleaved parent mapping", + 1, + dlq.size()); + String dlqPayload = dlq.get(0); + assertTrue("DLQ payload should target Child table", dlqPayload.contains("\"table\":\"Child\"")); + assertTrue( + "DLQ payload should show dependency resolution issue", + dlqPayload.contains("Cannot resolve structural dependency")); + } + + @Test + public void testProcessScheduledEvents_emptyActiveTimestamps() { + DataGeneratorEngine engine = new DataGeneratorEngine(5000, 5000, new Faker()); + + ValueState> activeTimestamps = mock(ValueState.class); + when(activeTimestamps.read()).thenReturn(Collections.emptyList()); + + MapState> eventQueueState = mock(MapState.class); + MapState tableMapState = mock(MapState.class); + + engine.processScheduledEvents( + eventQueueState, + activeTimestamps, + tableMapState, + mock(Timer.class), + mock(MutationBatcher.class), + new ArrayList<>()); + + verifyNoInteractions(eventQueueState); + verifyNoInteractions(tableMapState); + } + + @Test + public void testProcessScheduledEvents_futureTimestampBreaksLoop() { + DataGeneratorEngine engine = new DataGeneratorEngine(5000, 5000, new Faker()); + + ValueState> activeTimestamps = mock(ValueState.class); + when(activeTimestamps.read()) + .thenReturn(Collections.singletonList(System.currentTimeMillis() + 100000L)); + + MapState> eventQueueState = mock(MapState.class); + MapState tableMapState = mock(MapState.class); + + engine.processScheduledEvents( + eventQueueState, + activeTimestamps, + tableMapState, + mock(Timer.class), + mock(MutationBatcher.class), + new ArrayList<>()); + + verifyNoInteractions(eventQueueState); + verifyNoInteractions(tableMapState); + } + + @Test + public void testGenerateChildRow_missingInterleavedPkColumnOnParent() { + DataGeneratorEngine engine = new DataGeneratorEngine(5000, 5000, new Faker()); + + DataGeneratorColumn pkCol = + DataGeneratorColumn.builder() + .name("id") + .logicalType(LogicalType.STRING) + .isNullable(false) + .isGenerated(false) + .isPrimaryKey(true) + .build(); + + DataGeneratorTable parentTable = + DataGeneratorTable.builder() + .name("Parent") + .columns(ImmutableList.of()) + .primaryKeys(ImmutableList.of("id")) + .foreignKeys(ImmutableList.of()) + .uniqueKeys(ImmutableList.of()) + .insertQps(10) + .updateQps(0) + .deleteQps(0) + .isRoot(true) + .childTables(ImmutableList.of("Child")) + .build(); + + DataGeneratorTable childTable = + DataGeneratorTable.builder() + .name("Child") + .columns(ImmutableList.of(pkCol)) + .primaryKeys(ImmutableList.of("id")) + .foreignKeys(ImmutableList.of()) + .uniqueKeys(ImmutableList.of()) + .interleavedInTable("Parent") + .insertQps(10) + .updateQps(0) + .deleteQps(0) + .isRoot(false) + .build(); + + DataGeneratorSchema schema = + DataGeneratorSchema.builder() + .tables(ImmutableMap.of("Parent", parentTable, "Child", childTable)) + .build(); + + Schema rowSchemaWithoutId = + Schema.builder() + .addField(Schema.Field.of("other_col", Schema.FieldType.STRING)) + .addField(Schema.Field.of(Constants.SHARD_ID_COLUMN_NAME, Schema.FieldType.STRING)) + .build(); + + Row mockRow = mock(Row.class); + when(mockRow.getSchema()).thenReturn(rowSchemaWithoutId); + when(mockRow.getString(Constants.SHARD_ID_COLUMN_NAME)).thenReturn("shard1"); + + MapState> eventQueueState = mock(MapState.class); + ValueState> activeTimestamps = mock(ValueState.class); + MapState tableMapState = mock(MapState.class); + Timer eventTimer = mock(Timer.class); + + MutationBatcher batcher = mock(MutationBatcher.class); + List dlq = new ArrayList<>(); + when(batcher.getFailedRecords()).thenReturn(dlq); + + engine.processRecord( + "Parent", + mockRow, + eventQueueState, + activeTimestamps, + tableMapState, + eventTimer, + schema, + batcher); + + assertEquals( + "Should have exactly 1 record in DLQ for missing source interleaved columns", + 1, + dlq.size()); + String dlqPayload = dlq.get(0); + assertTrue( + "DLQ payload should target Parent table", dlqPayload.contains("\"table\":\"Parent\"")); + assertTrue( + "DLQ payload should show dependency resolution issue", + dlqPayload.contains("Required Primary Key column")); + } +} diff --git a/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/utils/SchemaUtilsTest.java b/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/utils/SchemaUtilsTest.java index 7a0df63bdd..09f246bf0c 100644 --- a/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/utils/SchemaUtilsTest.java +++ b/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/utils/SchemaUtilsTest.java @@ -24,6 +24,7 @@ import com.google.cloud.teleport.v2.templates.model.DataGeneratorTable; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; +import java.util.List; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -376,4 +377,273 @@ public void testDAGConstructionGrandChild() { assertEquals(0, dagSchema.tables().get("C2").childTables().size()); assertEquals(0, dagSchema.tables().get("GC1").childTables().size()); } + + @Test + public void testDAGConstructionMultiParentChainComplex() { + DataGeneratorTable departments = + DataGeneratorTable.builder() + .name("Departments") + .insertQps(10) + .columns(ImmutableList.of()) + .primaryKeys(ImmutableList.of()) + .foreignKeys(ImmutableList.of()) + .uniqueKeys(ImmutableList.of()) + .build(); + + DataGeneratorTable employees = + DataGeneratorTable.builder() + .name("EmployeeAssignments") + .insertQps(100) + .columns(ImmutableList.of()) + .primaryKeys(ImmutableList.of()) + .foreignKeys( + ImmutableList.of( + DataGeneratorForeignKey.builder() + .name("fk_emp_dept") + .keyColumns(ImmutableList.of("DeptCode")) + .referencedTable("Departments") + .referencedColumns(ImmutableList.of("DeptCode")) + .build())) + .uniqueKeys(ImmutableList.of()) + .build(); + + DataGeneratorTable projects = + DataGeneratorTable.builder() + .name("Projects") + .insertQps(50) + .columns(ImmutableList.of()) + .primaryKeys(ImmutableList.of()) + .foreignKeys( + ImmutableList.of( + DataGeneratorForeignKey.builder() + .name("fk_proj_dept") + .keyColumns(ImmutableList.of("DeptCode")) + .referencedTable("Departments") + .referencedColumns(ImmutableList.of("DeptCode")) + .build(), + DataGeneratorForeignKey.builder() + .name("fk_proj_emp") + .keyColumns(ImmutableList.of("EmpId")) + .referencedTable("EmployeeAssignments") + .referencedColumns(ImmutableList.of("EmpId")) + .build())) + .uniqueKeys(ImmutableList.of()) + .build(); + + DataGeneratorSchema schema = + DataGeneratorSchema.builder() + .tables( + ImmutableMap.of( + "Departments", + departments, + "EmployeeAssignments", + employees, + "Projects", + projects)) + .build(); + + DataGeneratorSchema dagSchema = SchemaUtils.generateSchemaDAG(schema); + + DataGeneratorTable newDept = dagSchema.tables().get("Departments"); + DataGeneratorTable newEmp = dagSchema.tables().get("EmployeeAssignments"); + DataGeneratorTable newProj = dagSchema.tables().get("Projects"); + + assertTrue(newDept.isRoot()); // Departments should be root! + assertFalse(newEmp.isRoot()); + assertFalse(newProj.isRoot()); + + // Departments should have EmployeeAssignments as child + assertEquals(1, newDept.childTables().size()); + assertEquals("EmployeeAssignments", newDept.childTables().get(0)); + + // EmployeeAssignments should have Projects as child + assertEquals(1, newEmp.childTables().size()); + assertEquals("Projects", newEmp.childTables().get(0)); + } + + @Test + public void testBuildInsertTopoOrderRootsBeforeChildren() { + DataGeneratorTable parent = + DataGeneratorTable.builder() + .name("Parent") + .columns(ImmutableList.of()) + .primaryKeys(ImmutableList.of()) + .foreignKeys(ImmutableList.of()) + .uniqueKeys(ImmutableList.of()) + .insertQps(1) + .isRoot(true) + .childTables(ImmutableList.of("Child")) + .build(); + DataGeneratorTable child = + DataGeneratorTable.builder() + .name("Child") + .columns(ImmutableList.of()) + .primaryKeys(ImmutableList.of()) + .foreignKeys(ImmutableList.of()) + .uniqueKeys(ImmutableList.of()) + .insertQps(1) + .isRoot(false) + .build(); + + DataGeneratorSchema schema = + DataGeneratorSchema.builder() + .tables(ImmutableMap.of("Parent", parent, "Child", child)) + .build(); + + List order = SchemaUtils.buildInsertTopoOrder(schema); + assertEquals(2, order.size()); + assertEquals("Parent", order.get(0)); + assertEquals("Child", order.get(1)); + } + + @Test + public void testBuildInsertTopoOrderMultipleRootsSortedByName() { + DataGeneratorSchema schema = + DataGeneratorSchema.builder() + .tables( + ImmutableMap.of( + "B", + DataGeneratorTable.builder() + .name("B") + .columns(ImmutableList.of()) + .primaryKeys(ImmutableList.of()) + .foreignKeys(ImmutableList.of()) + .uniqueKeys(ImmutableList.of()) + .insertQps(1) + .isRoot(true) + .build(), + "A", + DataGeneratorTable.builder() + .name("A") + .columns(ImmutableList.of()) + .primaryKeys(ImmutableList.of()) + .foreignKeys(ImmutableList.of()) + .uniqueKeys(ImmutableList.of()) + .insertQps(1) + .isRoot(true) + .build(), + "C", + DataGeneratorTable.builder() + .name("C") + .columns(ImmutableList.of()) + .primaryKeys(ImmutableList.of()) + .foreignKeys(ImmutableList.of()) + .uniqueKeys(ImmutableList.of()) + .insertQps(1) + .isRoot(true) + .build())) + .build(); + + List order = SchemaUtils.buildInsertTopoOrder(schema); + assertEquals(3, order.size()); + assertEquals("A", order.get(0)); + assertEquals("B", order.get(1)); + assertEquals("C", order.get(2)); + } + + @Test + public void testSetSchemaDAG_overridesChildDeleteQpsWhenAncestorHasDeletes() { + // Grandparent (deleteQps = 5) -> Parent (deleteQps = 0) -> Child (deleteQps = 10) + DataGeneratorTable grandparent = + DataGeneratorTable.builder() + .name("Grandparent") + .columns(ImmutableList.of()) + .primaryKeys(ImmutableList.of()) + .foreignKeys(ImmutableList.of()) + .uniqueKeys(ImmutableList.of()) + .insertQps(10) + .deleteQps(5) + .build(); + + DataGeneratorTable parent = + DataGeneratorTable.builder() + .name("Parent") + .columns(ImmutableList.of()) + .primaryKeys(ImmutableList.of()) + .foreignKeys( + ImmutableList.of( + DataGeneratorForeignKey.builder() + .name("fk_gp") + .keyColumns(ImmutableList.of("gpId")) + .referencedTable("Grandparent") + .referencedColumns(ImmutableList.of("id")) + .build())) + .uniqueKeys(ImmutableList.of()) + .insertQps(10) + .deleteQps(0) + .build(); + + DataGeneratorTable child = + DataGeneratorTable.builder() + .name("Child") + .columns(ImmutableList.of()) + .primaryKeys(ImmutableList.of()) + .foreignKeys( + ImmutableList.of( + DataGeneratorForeignKey.builder() + .name("fk_parent") + .keyColumns(ImmutableList.of("parentId")) + .referencedTable("Parent") + .referencedColumns(ImmutableList.of("id")) + .build())) + .uniqueKeys(ImmutableList.of()) + .insertQps(10) + .deleteQps(10) + .build(); + + DataGeneratorSchema schema = + DataGeneratorSchema.builder() + .tables(ImmutableMap.of("Grandparent", grandparent, "Parent", parent, "Child", child)) + .build(); + + DataGeneratorSchema dagSchema = SchemaUtils.generateSchemaDAG(schema); + + assertEquals(Integer.valueOf(5), dagSchema.tables().get("Grandparent").deleteQps()); + assertEquals(Integer.valueOf(0), dagSchema.tables().get("Parent").deleteQps()); + assertEquals( + Integer.valueOf(0), + dagSchema.tables().get("Child").deleteQps()); // Overwritten due to grandparent! + } + + @Test(expected = IllegalStateException.class) + public void testCircularDependencyThrowsException() { + DataGeneratorTable t1 = + DataGeneratorTable.builder() + .name("T1") + .columns(ImmutableList.of()) + .primaryKeys(ImmutableList.of()) + .foreignKeys( + ImmutableList.of( + DataGeneratorForeignKey.builder() + .name("fk_t1_t2") + .keyColumns(ImmutableList.of("t2Id")) + .referencedTable("T2") + .referencedColumns(ImmutableList.of("id")) + .build())) + .uniqueKeys(ImmutableList.of()) + .insertQps(10) + .build(); + + DataGeneratorTable t2 = + DataGeneratorTable.builder() + .name("T2") + .columns(ImmutableList.of()) + .primaryKeys(ImmutableList.of()) + .foreignKeys( + ImmutableList.of( + DataGeneratorForeignKey.builder() + .name("fk_t2_t1") + .keyColumns(ImmutableList.of("t1Id")) + .referencedTable("T1") + .referencedColumns(ImmutableList.of("id")) + .build())) + .uniqueKeys(ImmutableList.of()) + .insertQps(10) + .build(); + + DataGeneratorSchema schema = + DataGeneratorSchema.builder().tables(ImmutableMap.of("T1", t1, "T2", t2)).build(); + + SchemaUtils.buildInsertTopoOrder(schema); + } } From eea9de363a5f3680ced5030f6ec480d6281f368b Mon Sep 17 00:00:00 2001 From: Shreya Khajanchi Date: Tue, 12 May 2026 12:25:31 +0530 Subject: [PATCH 2/5] fixed dag creation --- .../templates/dofn/DataGeneratorEngine.java | 20 +- .../v2/templates/utils/SchemaUtils.java | 268 +++++++++--------- .../v2/templates/utils/SchemaUtilsTest.java | 63 +++- 3 files changed, 196 insertions(+), 155 deletions(-) diff --git a/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/dofn/DataGeneratorEngine.java b/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/dofn/DataGeneratorEngine.java index 262a6a8a71..e95a30c890 100644 --- a/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/dofn/DataGeneratorEngine.java +++ b/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/dofn/DataGeneratorEngine.java @@ -100,7 +100,7 @@ public void processRecord( tableMapState.put(tableName, table); - processTable( + generateAndBufferInsertWithLifecycle( table, row, eventQueueState, @@ -140,7 +140,7 @@ public void processScheduledEvents( if (events != null) { for (LifecycleEvent event : events) { try { - processEvent(event, tableMapState, batcher); + executeScheduledLifecycleMutation(event, tableMapState, batcher); } catch (Exception genError) { LOG.error( "Lifecycle event generation failed for table {} ({})", @@ -167,7 +167,7 @@ public void processScheduledEvents( * Assembles and buffers an insert row for the specified table, calculates lifecycle mutation * schedules, and cascades generation to its child tables. */ - private void processTable( + private void generateAndBufferInsertWithLifecycle( DataGeneratorTable table, Row row, MapState> eventQueueState, @@ -182,6 +182,7 @@ private void processTable( String tableName = table.name(); tableMapState.put(tableName, table); + // 1. Complete Row & Buffer Insert Mutation Row fullRow = RowAssembler.completeRow(table, row, faker); String shardId = fullRow.getSchema().hasField(Constants.SHARD_ID_COLUMN_NAME) @@ -209,6 +210,7 @@ private void processTable( batcher.bufferRow( tableName, fullRow, Constants.MUTATION_INSERT, table, shardId, insertTopoOrder); + // 2. Calculate Lifecycle Timing Bounds (Updates & Deletes) long now = System.currentTimeMillis(); long deleteTimestamp = 0L; int numUpdates = 0; @@ -243,6 +245,7 @@ private void processTable( } } + // 3. Cascade Generation Fan-Out to Child Tables Map updatedAncestorRows = new HashMap<>(ancestorRows); updatedAncestorRows.put(tableName, fullRow); @@ -273,9 +276,10 @@ private void processTable( } } + // 4. Enqueue Future Lifecycle Mutations in State if (!pkMap.isEmpty()) { for (int i = 1; i <= numUpdates; i++) { - scheduleEvent( + enqueueLifecycleEvent( now + upInterval * i, new LifecycleEvent(pkMap, Constants.MUTATION_UPDATE, tableName, reducedRow), eventQueueState, @@ -283,7 +287,7 @@ private void processTable( eventTimer); } if (deleteTimestamp > 0) { - scheduleEvent( + enqueueLifecycleEvent( deleteTimestamp, new LifecycleEvent(pkMap, Constants.MUTATION_DELETE, tableName, reducedRow), eventQueueState, @@ -329,7 +333,7 @@ private void generateAndWriteChildren( childTable.name())))); break; } - processTable( + generateAndBufferInsertWithLifecycle( childTable, childRow, eventQueueState, @@ -440,7 +444,7 @@ private Row generateChildRow( * Persists a future lifecycle event into the state queue, keeping the active timestamp list * sorted via binary search. */ - private void scheduleEvent( + private void enqueueLifecycleEvent( long timestamp, LifecycleEvent event, MapState> eventQueueState, @@ -472,7 +476,7 @@ private void scheduleEvent( * Executes a scheduled update or delete event by assembling the mutated row and buffering it into * the batcher. */ - private void processEvent( + private void executeScheduledLifecycleMutation( LifecycleEvent event, MapState tableMapState, MutationBatcher batcher) { diff --git a/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/utils/SchemaUtils.java b/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/utils/SchemaUtils.java index 199a7b037d..077d7ad65c 100644 --- a/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/utils/SchemaUtils.java +++ b/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/utils/SchemaUtils.java @@ -21,12 +21,12 @@ import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.util.ArrayList; -import java.util.Collection; import java.util.Comparator; import java.util.HashMap; import java.util.HashSet; import java.util.List; import java.util.Map; +import java.util.PriorityQueue; import java.util.Set; import java.util.stream.Collectors; import org.slf4j.Logger; @@ -37,81 +37,61 @@ public class SchemaUtils { private static final Logger LOG = LoggerFactory.getLogger(SchemaUtils.class); /** - * Constructs a Directed Acyclic Graph (DAG) of tables in the schema. Identifies parent-child - * relationships based on Foreign Keys and Interleaving. Handles multiple parents by selecting the - * one with the *least* QPS. Populates the `children` list for each table and sets `isRoot` - * accordingly. - * - *

Note: Circular dependencies are not supported right now. - * - * @param schema The input schema. - * @return A new schema with DAG information populated. + * Constructs a Directed Acyclic Graph (DAG) of tables in the schema. Merges parallel parent + * tracks into sequential execution chains sorted by QPS. */ public static DataGeneratorSchema generateSchemaDAG(DataGeneratorSchema schema) { Map tableMap = schema.tables(); Map> parentToSequenceChild = new HashMap<>(); Set hasSequenceParent = new HashSet<>(); - // 1. Build Dependency Chains for Each Table - for (DataGeneratorTable childTable : tableMap.values()) { - String childName = childTable.name(); - - // Collect Parents (Interleaved and FK) - List parents = new ArrayList<>(); - - if (childTable.interleavedInTable() != null) { - String parentName = childTable.interleavedInTable(); - DataGeneratorTable parentTable = tableMap.get(parentName); - if (parentTable != null) { - parents.add(parentTable); - } - } + // 1. Generate a single, authoritative global topological order sorted by QPS + List globalOrder = sortGloballyTopologically(tableMap); - for (DataGeneratorForeignKey fk : childTable.foreignKeys()) { - DataGeneratorTable parentTable = tableMap.get(fk.referencedTable()); - if (parentTable != null && !parents.contains(parentTable)) { - parents.add(parentTable); - } - } + // 2. Build Sequential Execution Chains using sub-sequences of the global order + for (DataGeneratorTable childTable : tableMap.values()) { + Set uniqueParents = new HashSet<>(); + // Gather all recursive upstream ancestors (interleaving + foreign keys) + collectAncestors(childTable, tableMap, uniqueParents); - if (parents.isEmpty()) { - continue; // No parents for this table + if (uniqueParents.isEmpty()) { + continue; // Standalone table with no dependencies } - // Sort Parents Topologically to respect physical FK relations - List sortedParents = sortTopologically(parents, tableMap); - - // Chain the Parents: P1 -> P2 -> ... -> Pn -> Child - for (int i = 0; i < sortedParents.size() - 1; i++) { - String currentParentName = sortedParents.get(i).name(); - String nextParentName = sortedParents.get(i + 1).name(); - // Avoid adding duplicate dependencies if a table is part of multiple chains - List currentChildren = - parentToSequenceChild.computeIfAbsent(currentParentName, k -> new ArrayList<>()); - if (!currentChildren.contains(nextParentName)) { - currentChildren.add(nextParentName); + Set lineageNames = + uniqueParents.stream().map(DataGeneratorTable::name).collect(Collectors.toSet()); + lineageNames.add(childTable.name()); + + // Filter the master global order to keep only this table's specific lineage + List executionChain = + globalOrder.stream() + .filter(t -> lineageNames.contains(t.name())) + .collect(Collectors.toList()); + + // Link the sorted lineage chain together consecutively (P1 -> P2 -> ... -> Child) + for (int i = 0; i < executionChain.size() - 1; i++) { + String currentTable = executionChain.get(i).name(); + String nextTable = executionChain.get(i + 1).name(); + + List sequenceChildren = + parentToSequenceChild.computeIfAbsent(currentTable, k -> new ArrayList<>()); + if (!sequenceChildren.contains(nextTable)) { + sequenceChildren.add(nextTable); } - hasSequenceParent.add(nextParentName); + hasSequenceParent.add(nextTable); } - - // Link the last parent in the chain to the child table - String lastParentName = sortedParents.get(sortedParents.size() - 1).name(); - List lastParentChildren = - parentToSequenceChild.computeIfAbsent(lastParentName, k -> new ArrayList<>()); - if (!lastParentChildren.contains(childName)) { - lastParentChildren.add(childName); - } - hasSequenceParent.add(childName); } - // 2. Update Tables with Sequence Children and isRoot + // 3. Construct Final Table Definitions ImmutableMap.Builder newTablesBuilder = ImmutableMap.builder(); for (DataGeneratorTable table : tableMap.values()) { String tableName = table.name(); List sequenceChildren = parentToSequenceChild.getOrDefault(tableName, ImmutableList.of()); + // A table is a root if it is not triggered as a sequence child by any other table boolean isRoot = !hasSequenceParent.contains(tableName); + // Prevent double deletion conflicts by suppressing delete generation on child tables boolean hasAncestorDelete = hasPhysicalAncestorWithDeleteQps(table, tableMap, new HashSet<>()); Integer finalDeleteQps = hasAncestorDelete ? Integer.valueOf(0) : table.deleteQps(); @@ -119,9 +99,7 @@ public static DataGeneratorSchema generateSchemaDAG(DataGeneratorSchema schema) newTablesBuilder.put( tableName, table.toBuilder() - .childTables( - ImmutableList.copyOf( - sequenceChildren)) // These are tables to generate data AFTER this one + .childTables(ImmutableList.copyOf(sequenceChildren)) .isRoot(isRoot) .deleteQps(finalDeleteQps) .build()); @@ -138,16 +116,107 @@ public static DataGeneratorSchema generateSchemaDAG(DataGeneratorSchema schema) } /** - * Checks if any physical ancestor (via interleaving or foreign keys) has delete QPS configured. - * Used to suppress child table delete generation to prevent double deletion conflicts. + * Standard Kahn's Algorithm for Global Topological Sorting. Uses a PriorityQueue to ensure that + * when multiple parent tables are unblocked, the one with the lowest insertQps is selected first. + */ + private static List sortGloballyTopologically( + Map tableMap) { + Map inDegree = new HashMap<>(); + Map> adjacencyList = new HashMap<>(); + + for (String tableName : tableMap.keySet()) { + inDegree.put(tableName, 0); + adjacencyList.put(tableName, new ArrayList<>()); + } + + // Map physical dependencies (interleaving and FKs) into graph directed edges + for (DataGeneratorTable table : tableMap.values()) { + String child = table.name(); + List parents = new ArrayList<>(); + if (table.interleavedInTable() != null) { + parents.add(table.interleavedInTable()); + } + if (table.foreignKeys() != null) { + for (DataGeneratorForeignKey fk : table.foreignKeys()) { + parents.add(fk.referencedTable()); + } + } + + for (String parent : parents) { + if (tableMap.containsKey(parent)) { + adjacencyList.get(parent).add(child); + inDegree.put(child, inDegree.get(child) + 1); + } + } + } + + // Priority Queue sorts prioritizing roots, then ascending by QPS, then alphabetically by name + PriorityQueue queue = + new PriorityQueue<>( + Comparator.comparing( + (DataGeneratorTable t) -> t.isRoot() != null && t.isRoot(), + Comparator.reverseOrder()) + .thenComparingInt( + (DataGeneratorTable t) -> t.insertQps() != null ? t.insertQps() : 0) + .thenComparing(DataGeneratorTable::name)); + + for (Map.Entry entry : inDegree.entrySet()) { + if (entry.getValue() == 0) { + queue.add(tableMap.get(entry.getKey())); + } + } + + List globalOrder = new ArrayList<>(); + while (!queue.isEmpty()) { + DataGeneratorTable current = queue.poll(); + globalOrder.add(current); + + for (String neighbor : adjacencyList.get(current.name())) { + int updatedInDegree = inDegree.get(neighbor) - 1; + inDegree.put(neighbor, updatedInDegree); + if (updatedInDegree == 0) { + queue.add(tableMap.get(neighbor)); + } + } + } + + if (globalOrder.size() != tableMap.size()) { + throw new IllegalStateException("Circular dependency detected in schema layout."); + } + + return globalOrder; + } + + /** Recursively collects all physical upstream ancestor tables (via interleaving or FKs). */ + private static void collectAncestors( + DataGeneratorTable table, + Map tableMap, + Set uniqueParents) { + if (table.interleavedInTable() != null) { + DataGeneratorTable parent = tableMap.get(table.interleavedInTable()); + if (parent != null && uniqueParents.add(parent)) { + collectAncestors(parent, tableMap, uniqueParents); + } + } + if (table.foreignKeys() != null) { + for (DataGeneratorForeignKey fk : table.foreignKeys()) { + DataGeneratorTable parent = tableMap.get(fk.referencedTable()); + if (parent != null && uniqueParents.add(parent)) { + collectAncestors(parent, tableMap, uniqueParents); + } + } + } + } + + /** + * Checks if any physical ancestor (interleaved or foreign key) has delete QPS configured. Used to + * suppress child table delete generation to prevent double deletion conflicts. */ private static boolean hasPhysicalAncestorWithDeleteQps( DataGeneratorTable table, Map tableMap, Set visited) { if (table == null || !visited.add(table.name())) { - return false; // Cycle or null table reached + return false; } - - // 1. Check Interleaved Parent if (table.interleavedInTable() != null) { DataGeneratorTable p = tableMap.get(table.interleavedInTable()); if (p != null) { @@ -159,8 +228,6 @@ private static boolean hasPhysicalAncestorWithDeleteQps( } } } - - // 2. Check Foreign Key Parents if (table.foreignKeys() != null) { for (DataGeneratorForeignKey fk : table.foreignKeys()) { DataGeneratorTable p = tableMap.get(fk.referencedTable()); @@ -177,78 +244,9 @@ private static boolean hasPhysicalAncestorWithDeleteQps( return false; } - /** - * Sorts a collection of tables topologically, ensuring parent/referenced tables are listed before - * their dependent child tables. - */ - public static List sortTopologically( - Collection tables, Map allTables) { - List sortedInput = new ArrayList<>(tables); - // Deterministic initial sort prioritizing roots and higher QPS tables - sortedInput.sort( - Comparator.comparing( - (DataGeneratorTable t) -> t.isRoot() != null && t.isRoot(), - Comparator.reverseOrder()) - .thenComparingInt(t -> t.insertQps() != null ? t.insertQps() : 0) - .thenComparing(DataGeneratorTable::name)); - - List sorted = new ArrayList<>(); - Set visited = new HashSet<>(); - Set visiting = new HashSet<>(); - - for (DataGeneratorTable table : sortedInput) { - if (!visited.contains(table.name())) { - visitTopologically(table, allTables, visited, visiting, sorted, sortedInput); - } - } - return sorted; - } - - /** - * Recursive depth-first search helper for topological sorting. Detects circular dependencies via - * the 'visiting' tracking set. - */ - private static void visitTopologically( - DataGeneratorTable table, - Map allTables, - Set visited, - Set visiting, - List sorted, - List subset) { - visiting.add(table.name()); - - // Collect all physical dependencies (interleaved + FK parents) - List parentNames = new ArrayList<>(); - if (table.interleavedInTable() != null) { - parentNames.add(table.interleavedInTable()); - } - for (DataGeneratorForeignKey fk : table.foreignKeys()) { - parentNames.add(fk.referencedTable()); - } - - // Recursively visit dependencies first - for (String refTable : parentNames) { - if (subset.stream().anyMatch(t -> t.name().equals(refTable))) { - DataGeneratorTable parent = allTables.get(refTable); - if (parent != null && !visited.contains(refTable)) { - if (visiting.contains(refTable)) { - throw new IllegalStateException( - "Circular dependency detected in schema involving table: " + refTable); - } - visitTopologically(parent, allTables, visited, visiting, sorted, subset); - } - } - } - - visiting.remove(table.name()); - visited.add(table.name()); - sorted.add(table); - } - /** Builds the global insertion order across all tables in the schema for pipeline execution. */ public static List buildInsertTopoOrder(DataGeneratorSchema schema) { - List sortedTables = - sortTopologically(schema.tables().values(), schema.tables()); + List sortedTables = sortGloballyTopologically(schema.tables()); return sortedTables.stream().map(DataGeneratorTable::name).collect(Collectors.toList()); } } diff --git a/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/utils/SchemaUtilsTest.java b/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/utils/SchemaUtilsTest.java index 09f246bf0c..ccadb2622f 100644 --- a/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/utils/SchemaUtilsTest.java +++ b/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/utils/SchemaUtilsTest.java @@ -380,13 +380,40 @@ public void testDAGConstructionGrandChild() { @Test public void testDAGConstructionMultiParentChainComplex() { + DataGeneratorTable organizations = + DataGeneratorTable.builder() + .name("Organizations") + .insertQps(500) + .columns(ImmutableList.of()) + .primaryKeys(ImmutableList.of()) + .foreignKeys(ImmutableList.of()) + .uniqueKeys(ImmutableList.of()) + .build(); + + DataGeneratorTable xyz = + DataGeneratorTable.builder() + .name("xyz") + .insertQps(400) + .columns(ImmutableList.of()) + .primaryKeys(ImmutableList.of()) + .foreignKeys(ImmutableList.of()) + .uniqueKeys(ImmutableList.of()) + .build(); + DataGeneratorTable departments = DataGeneratorTable.builder() .name("Departments") - .insertQps(10) + .insertQps(200) .columns(ImmutableList.of()) .primaryKeys(ImmutableList.of()) - .foreignKeys(ImmutableList.of()) + .foreignKeys( + ImmutableList.of( + DataGeneratorForeignKey.builder() + .name("fk_dept_org") + .keyColumns(ImmutableList.of("OrgId")) + .referencedTable("Organizations") + .referencedColumns(ImmutableList.of("OrgId")) + .build())) .uniqueKeys(ImmutableList.of()) .build(); @@ -399,10 +426,10 @@ public void testDAGConstructionMultiParentChainComplex() { .foreignKeys( ImmutableList.of( DataGeneratorForeignKey.builder() - .name("fk_emp_dept") - .keyColumns(ImmutableList.of("DeptCode")) - .referencedTable("Departments") - .referencedColumns(ImmutableList.of("DeptCode")) + .name("fk_emp_xyz") + .keyColumns(ImmutableList.of("xyzId")) + .referencedTable("xyz") + .referencedColumns(ImmutableList.of("xyzId")) .build())) .uniqueKeys(ImmutableList.of()) .build(); @@ -434,6 +461,10 @@ public void testDAGConstructionMultiParentChainComplex() { DataGeneratorSchema.builder() .tables( ImmutableMap.of( + "Organizations", + organizations, + "xyz", + xyz, "Departments", departments, "EmployeeAssignments", @@ -444,21 +475,29 @@ public void testDAGConstructionMultiParentChainComplex() { DataGeneratorSchema dagSchema = SchemaUtils.generateSchemaDAG(schema); + DataGeneratorTable newOrg = dagSchema.tables().get("Organizations"); + DataGeneratorTable newXyz = dagSchema.tables().get("xyz"); DataGeneratorTable newDept = dagSchema.tables().get("Departments"); DataGeneratorTable newEmp = dagSchema.tables().get("EmployeeAssignments"); DataGeneratorTable newProj = dagSchema.tables().get("Projects"); - assertTrue(newDept.isRoot()); // Departments should be root! + assertTrue(newXyz.isRoot()); assertFalse(newEmp.isRoot()); + assertFalse(newOrg.isRoot()); + assertFalse(newDept.isRoot()); assertFalse(newProj.isRoot()); - // Departments should have EmployeeAssignments as child - assertEquals(1, newDept.childTables().size()); - assertEquals("EmployeeAssignments", newDept.childTables().get(0)); + assertEquals(1, newXyz.childTables().size()); + assertEquals("EmployeeAssignments", newXyz.childTables().get(0)); - // EmployeeAssignments should have Projects as child assertEquals(1, newEmp.childTables().size()); - assertEquals("Projects", newEmp.childTables().get(0)); + assertEquals("Organizations", newEmp.childTables().get(0)); + + assertEquals(1, newOrg.childTables().size()); + assertEquals("Departments", newOrg.childTables().get(0)); + + assertEquals(1, newDept.childTables().size()); + assertEquals("Projects", newDept.childTables().get(0)); } @Test From 407eda948f378e241478467f632f39db18dc3440 Mon Sep 17 00:00:00 2001 From: Shreya Khajanchi Date: Tue, 12 May 2026 13:39:47 +0530 Subject: [PATCH 3/5] batch and write code --- .../v2/templates/dofn/BatchAndWriteFn.java | 255 ++++++++++++++++++ .../v2/templates/model/GeneratedRecord.java | 32 +++ .../templates/transforms/BatchAndWrite.java | 93 +++++++ .../templates/dofn/BatchAndWriteFnTest.java | 2 + .../transforms/BatchAndWriteTest.java | 2 + 5 files changed, 384 insertions(+) create mode 100644 v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/dofn/BatchAndWriteFn.java create mode 100644 v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/model/GeneratedRecord.java create mode 100644 v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/transforms/BatchAndWrite.java create mode 100644 v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/dofn/BatchAndWriteFnTest.java create mode 100644 v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/transforms/BatchAndWriteTest.java diff --git a/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/dofn/BatchAndWriteFn.java b/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/dofn/BatchAndWriteFn.java new file mode 100644 index 0000000000..6ece91a6e9 --- /dev/null +++ b/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/dofn/BatchAndWriteFn.java @@ -0,0 +1,255 @@ +/* + * Copyright (C) 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.templates.dofn; + +import com.google.cloud.teleport.v2.templates.CdcDataGeneratorOptions.SinkType; +import com.google.cloud.teleport.v2.templates.model.DataGeneratorSchema; +import com.google.cloud.teleport.v2.templates.model.DataGeneratorTable; +import com.google.cloud.teleport.v2.templates.model.GeneratedRecord; +import com.google.cloud.teleport.v2.templates.model.LifecycleEvent; +import com.google.cloud.teleport.v2.templates.mysql.MySqlDataWriter; +import com.google.cloud.teleport.v2.templates.sink.DataWriter; +import com.google.cloud.teleport.v2.templates.spanner.SpannerDataWriter; +import com.google.cloud.teleport.v2.templates.utils.FailureRecord; +import com.google.cloud.teleport.v2.templates.utils.SchemaUtils; +import com.google.common.annotations.VisibleForTesting; +import java.util.List; +import java.util.function.Consumer; +import net.datafaker.Faker; +import org.apache.beam.sdk.coders.ListCoder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.coders.VarLongCoder; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.state.MapState; +import org.apache.beam.sdk.state.StateSpec; +import org.apache.beam.sdk.state.StateSpecs; +import org.apache.beam.sdk.state.TimeDomain; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.TimerSpec; +import org.apache.beam.sdk.state.TimerSpecs; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.Row; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Stateful {@link DoFn} that manages persistence lifecycles, state fields, and timers, delegating + * core traversal to {@link DataGeneratorEngine} and mutation batch writing to {@link + * MutationBatcher}. + */ +public class BatchAndWriteFn extends DoFn, String> { + + private static final Logger LOG = LoggerFactory.getLogger(BatchAndWriteFn.class); + + private final SinkType sinkType; + private final String sinkOptionsPath; + private final int batchSize; + private final Integer jdbcPoolSize; + private final Integer updateInterval; + private final Integer deleteInterval; + private final PCollectionView schemaView; + + private transient DataWriter writer; + private transient Faker faker; + private transient volatile DataGeneratorSchema schema; + private transient volatile List insertTopoOrder; + + private transient DataGeneratorEngine dataGeneratorEngine; + private transient MutationBatcher batcher; + + @StateId("eventQueue") + private final StateSpec>> eventQueueSpec = + StateSpecs.map(VarLongCoder.of(), ListCoder.of(SerializableCoder.of(LifecycleEvent.class))); + + @StateId("activeTimestamps") + private final StateSpec>> activeTimestampsSpec = + StateSpecs.value(ListCoder.of(VarLongCoder.of())); + + @StateId("tableMapState") + private final StateSpec> tableMapSpec = + StateSpecs.map(StringUtf8Coder.of(), SerializableCoder.of(DataGeneratorTable.class)); + + @TimerId("eventTimer") + private final TimerSpec eventTimerSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); + + public BatchAndWriteFn( + SinkType sinkType, + String sinkOptionsPath, + Integer batchSize, + Integer jdbcPoolSize, + Integer updateInterval, + Integer deleteInterval, + PCollectionView schemaView) { + this.sinkType = sinkType; + this.sinkOptionsPath = sinkOptionsPath; + this.batchSize = batchSize; + this.jdbcPoolSize = jdbcPoolSize; + this.updateInterval = updateInterval; + this.deleteInterval = deleteInterval; + this.schemaView = schemaView; + } + + @Setup + public void setup() { + this.schema = null; + this.insertTopoOrder = null; + if (writer == null) { + writer = createWriter(sinkType, sinkOptionsPath); + } + if (faker == null) { + faker = new Faker(); + } + + this.batcher = new MutationBatcher(batchSize, jdbcPoolSize, writer); + this.dataGeneratorEngine = new DataGeneratorEngine(updateInterval, deleteInterval, faker); + } + + @VisibleForTesting + protected DataWriter createWriter(SinkType type, String configPath) { + switch (type) { + case MYSQL: + return new MySqlDataWriter(configPath); + case SPANNER: + return new SpannerDataWriter(configPath); + default: + throw new IllegalArgumentException("Unsupported sink type: " + type); + } + } + + @StartBundle + public void startBundle() { + this.batcher.startBundle(); + } + + @ProcessElement + public void processElement( + ProcessContext c, + @StateId("eventQueue") MapState> eventQueueState, + @StateId("activeTimestamps") ValueState> activeTimestamps, + @StateId("tableMapState") MapState tableMapState, + @TimerId("eventTimer") Timer eventTimer) { + + ensureSchemaInitialized(c); + + GeneratedRecord record = c.element().getValue(); + String tableName = record.tableName(); + Row pkValues = record.primaryKeyValues(); + + try { + dataGeneratorEngine.processRecord( + tableName, + pkValues, + eventQueueState, + activeTimestamps, + tableMapState, + eventTimer, + schema, + batcher); + } catch (Exception genError) { + LOG.error("Generation failed for table {}", tableName, genError); + Metrics.counter(BatchAndWriteFn.class, "generationFailures").inc(); + batcher + .getFailedRecords() + .add( + FailureRecord.toJson( + tableName, FailureRecord.OPERATION_GENERATION, pkValues, genError)); + } + + writeFailedRecords(c::output); + } + + @OnTimer("eventTimer") + public void onTimer( + OnTimerContext c, + @StateId("eventQueue") MapState> eventQueueState, + @StateId("activeTimestamps") ValueState> activeTimestamps, + @StateId("tableMapState") MapState tableMapState, + @TimerId("eventTimer") Timer eventTimer) { + + try { + dataGeneratorEngine.processScheduledEvents( + eventQueueState, + activeTimestamps, + tableMapState, + eventTimer, + batcher, + batcher.getFailedRecords()); + } catch (Exception timerError) { + LOG.error("Scheduled events generation failed during timer processing", timerError); + Metrics.counter(BatchAndWriteFn.class, "generationFailures").inc(); + batcher + .getFailedRecords() + .add( + FailureRecord.toJson( + "UNKNOWN_TABLE", FailureRecord.OPERATION_GENERATION, null, timerError)); + } + + writeFailedRecords(c::output); + } + + @FinishBundle + public void finishBundle(FinishBundleContext c) { + batcher.flushInsertsInTopoOrder(insertTopoOrder); + batcher.flushUpdates(); + batcher.flushDeletesInReverseTopoOrder(insertTopoOrder); + + List pendingDlq = batcher.getFailedRecords(); + if (pendingDlq != null && !pendingDlq.isEmpty()) { + Instant now = Instant.now(); + for (String record : pendingDlq) { + c.output(record, now, GlobalWindow.INSTANCE); + } + batcher.clearDlq(); + } + } + + @Teardown + public void teardown() { + if (writer != null) { + try { + writer.close(); + } catch (Exception e) { + throw new RuntimeException("Failed to close writer", e); + } + } + } + + private void ensureSchemaInitialized(ProcessContext c) { + if (schema != null) { + return; + } + DataGeneratorSchema loaded = c.sideInput(schemaView); + this.insertTopoOrder = SchemaUtils.buildInsertTopoOrder(loaded); + this.schema = loaded; + } + + private void writeFailedRecords(Consumer sink) { + List dlq = batcher.getFailedRecords(); + if (dlq == null || dlq.isEmpty()) { + return; + } + for (String record : dlq) { + sink.accept(record); + } + batcher.clearDlq(); + } +} diff --git a/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/model/GeneratedRecord.java b/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/model/GeneratedRecord.java new file mode 100644 index 0000000000..198ce0abe6 --- /dev/null +++ b/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/model/GeneratedRecord.java @@ -0,0 +1,32 @@ +/* + * Copyright (C) 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.templates.model; + +import com.google.auto.value.AutoValue; +import java.io.Serializable; +import org.apache.beam.sdk.values.Row; + +/** Type-safe container wrapping table names and primary key values. */ +@AutoValue +public abstract class GeneratedRecord implements Serializable { + public abstract String tableName(); + + public abstract Row primaryKeyValues(); + + public static GeneratedRecord create(String tableName, Row primaryKeyValues) { + return new AutoValue_GeneratedRecord(tableName, primaryKeyValues); + } +} diff --git a/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/transforms/BatchAndWrite.java b/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/transforms/BatchAndWrite.java new file mode 100644 index 0000000000..43715177d6 --- /dev/null +++ b/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/transforms/BatchAndWrite.java @@ -0,0 +1,93 @@ +/* + * Copyright (C) 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.templates.transforms; + +import com.google.cloud.teleport.v2.templates.CdcDataGeneratorOptions.SinkType; +import com.google.cloud.teleport.v2.templates.dofn.BatchAndWriteFn; +import com.google.cloud.teleport.v2.templates.model.DataGeneratorSchema; +import com.google.cloud.teleport.v2.templates.model.GeneratedRecord; +import com.google.cloud.teleport.v2.templates.utils.FailureRecord; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; + +/** + * {@link PTransform} that takes a stream of partially-generated keyed rows, completes them, + * recursively materialises child rows from the schema DAG, batches them by {@code (table, shard, + * op)}, and writes to the sink configured by {@code sinkType}/{@code sinkOptionsPath}. + * + *

Output is a {@code PCollection} of JSON-encoded {@link FailureRecord}s — one per row + * that could not be generated or that the sink rejected. The output is empty when no failures + * occur. + */ +public class BatchAndWrite + extends PTransform>, PCollection> { + + private final SinkType sinkType; + private final String sinkOptionsPath; + private final Integer batchSize; + private final Integer jdbcPoolSize; + private final Integer updateInterval; + private final Integer deleteInterval; + private final PCollectionView schemaView; + + /** + * @param sinkType which sink writer the underlying {@link BatchAndWriteFn} should instantiate + * @param sinkOptionsPath path/URI to the sink-specific configuration document + * @param batchSize maximum rows buffered per {@code (table, shard, op)} before flush; + * @param jdbcPoolSize connection pool size limit per MySQL shard node + * @param updateInterval custom UPDATE interval in seconds for lifecycle events + * @param deleteInterval custom trailing DELETE interval in seconds for lifecycle events + * @param schemaView side input carrying the {@link DataGeneratorSchema} + */ + public BatchAndWrite( + SinkType sinkType, + String sinkOptionsPath, + Integer batchSize, + Integer jdbcPoolSize, + Integer updateInterval, + Integer deleteInterval, + PCollectionView schemaView) { + this.sinkType = sinkType; + this.sinkOptionsPath = sinkOptionsPath; + this.batchSize = batchSize; + this.jdbcPoolSize = jdbcPoolSize; + this.updateInterval = updateInterval; + this.deleteInterval = deleteInterval; + this.schemaView = schemaView; + } + + @Override + public PCollection expand(PCollection> input) { + return input + .apply( + "BatchAndWriteFn", + ParDo.of( + new BatchAndWriteFn( + sinkType, + sinkOptionsPath, + batchSize, + jdbcPoolSize, + updateInterval, + deleteInterval, + schemaView)) + .withSideInputs(schemaView)) + .setCoder(StringUtf8Coder.of()); + } +} diff --git a/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/dofn/BatchAndWriteFnTest.java b/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/dofn/BatchAndWriteFnTest.java new file mode 100644 index 0000000000..6f94eb7074 --- /dev/null +++ b/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/dofn/BatchAndWriteFnTest.java @@ -0,0 +1,2 @@ +package com.google.cloud.teleport.v2.templates.dofn;public class BatchAndWriteFnTest { +} diff --git a/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/transforms/BatchAndWriteTest.java b/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/transforms/BatchAndWriteTest.java new file mode 100644 index 0000000000..551d065d12 --- /dev/null +++ b/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/transforms/BatchAndWriteTest.java @@ -0,0 +1,2 @@ +package com.google.cloud.teleport.v2.templates.transforms;public class BatchAndWriteTest { +} From 51e1dac265910d9135659301086e8e47bbe79cbc Mon Sep 17 00:00:00 2001 From: Shreya Khajanchi Date: Wed, 13 May 2026 20:33:21 +0530 Subject: [PATCH 4/5] adding uts --- .../v2/templates/dofn/BatchAndWriteFn.java | 84 +++- .../templates/dofn/BatchAndWriteFnTest.java | 453 +++++++++++++++++- .../transforms/BatchAndWriteTest.java | 67 ++- 3 files changed, 597 insertions(+), 7 deletions(-) diff --git a/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/dofn/BatchAndWriteFn.java b/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/dofn/BatchAndWriteFn.java index 6ece91a6e9..d6a3c71a42 100644 --- a/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/dofn/BatchAndWriteFn.java +++ b/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/dofn/BatchAndWriteFn.java @@ -88,6 +88,10 @@ public class BatchAndWriteFn extends DoFn, String> private final StateSpec> tableMapSpec = StateSpecs.map(StringUtf8Coder.of(), SerializableCoder.of(DataGeneratorTable.class)); + @StateId("insertTopoOrderState") + private final StateSpec>> insertTopoOrderSpec = + StateSpecs.value(ListCoder.of(StringUtf8Coder.of())); + @TimerId("eventTimer") private final TimerSpec eventTimerSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME); @@ -146,9 +150,10 @@ public void processElement( @StateId("eventQueue") MapState> eventQueueState, @StateId("activeTimestamps") ValueState> activeTimestamps, @StateId("tableMapState") MapState tableMapState, + @StateId("insertTopoOrderState") ValueState> insertTopoOrderState, @TimerId("eventTimer") Timer eventTimer) { - ensureSchemaInitialized(c); + ensureSchemaInitialized(c, insertTopoOrderState); GeneratedRecord record = c.element().getValue(); String tableName = record.tableName(); @@ -163,7 +168,8 @@ public void processElement( tableMapState, eventTimer, schema, - batcher); + batcher, + insertTopoOrder); } catch (Exception genError) { LOG.error("Generation failed for table {}", tableName, genError); Metrics.counter(BatchAndWriteFn.class, "generationFailures").inc(); @@ -183,8 +189,13 @@ public void onTimer( @StateId("eventQueue") MapState> eventQueueState, @StateId("activeTimestamps") ValueState> activeTimestamps, @StateId("tableMapState") MapState tableMapState, + @StateId("insertTopoOrderState") ValueState> insertTopoOrderState, @TimerId("eventTimer") Timer eventTimer) { + if (this.insertTopoOrder == null) { + this.insertTopoOrder = insertTopoOrderState.read(); + } + try { dataGeneratorEngine.processScheduledEvents( eventQueueState, @@ -192,7 +203,8 @@ public void onTimer( tableMapState, eventTimer, batcher, - batcher.getFailedRecords()); + batcher.getFailedRecords(), + this.insertTopoOrder); } catch (Exception timerError) { LOG.error("Scheduled events generation failed during timer processing", timerError); Metrics.counter(BatchAndWriteFn.class, "generationFailures").inc(); @@ -233,12 +245,14 @@ public void teardown() { } } - private void ensureSchemaInitialized(ProcessContext c) { - if (schema != null) { + private void ensureSchemaInitialized( + ProcessContext c, ValueState> insertTopoOrderState) { + if (schema != null && insertTopoOrder != null) { return; } DataGeneratorSchema loaded = c.sideInput(schemaView); this.insertTopoOrder = SchemaUtils.buildInsertTopoOrder(loaded); + insertTopoOrderState.write(this.insertTopoOrder); this.schema = loaded; } @@ -252,4 +266,64 @@ private void writeFailedRecords(Consumer sink) { } batcher.clearDlq(); } + + @VisibleForTesting + DataWriter getWriter() { + return writer; + } + + @VisibleForTesting + void setWriter(DataWriter writer) { + this.writer = writer; + } + + @VisibleForTesting + Faker getFaker() { + return faker; + } + + @VisibleForTesting + void setFaker(Faker faker) { + this.faker = faker; + } + + @VisibleForTesting + DataGeneratorSchema getSchema() { + return schema; + } + + @VisibleForTesting + void setSchema(DataGeneratorSchema schema) { + this.schema = schema; + } + + @VisibleForTesting + List getInsertTopoOrder() { + return insertTopoOrder; + } + + @VisibleForTesting + void setInsertTopoOrder(List insertTopoOrder) { + this.insertTopoOrder = insertTopoOrder; + } + + @VisibleForTesting + DataGeneratorEngine getDataGeneratorEngine() { + return dataGeneratorEngine; + } + + @VisibleForTesting + void setDataGeneratorEngine(DataGeneratorEngine dataGeneratorEngine) { + this.dataGeneratorEngine = dataGeneratorEngine; + } + + @VisibleForTesting + MutationBatcher getBatcher() { + return batcher; + } + + @VisibleForTesting + void setBatcher(MutationBatcher batcher) { + this.batcher = batcher; + } } diff --git a/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/dofn/BatchAndWriteFnTest.java b/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/dofn/BatchAndWriteFnTest.java index 6f94eb7074..f147613de7 100644 --- a/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/dofn/BatchAndWriteFnTest.java +++ b/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/dofn/BatchAndWriteFnTest.java @@ -1,2 +1,453 @@ -package com.google.cloud.teleport.v2.templates.dofn;public class BatchAndWriteFnTest { +/* + * Copyright (C) 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.templates.dofn; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyInt; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import com.google.cloud.teleport.v2.templates.CdcDataGeneratorOptions.SinkType; +import com.google.cloud.teleport.v2.templates.model.DataGeneratorColumn; +import com.google.cloud.teleport.v2.templates.model.DataGeneratorSchema; +import com.google.cloud.teleport.v2.templates.model.DataGeneratorTable; +import com.google.cloud.teleport.v2.templates.model.GeneratedRecord; +import com.google.cloud.teleport.v2.templates.model.LogicalType; +import com.google.cloud.teleport.v2.templates.mysql.MySqlDataWriter; +import com.google.cloud.teleport.v2.templates.sink.DataWriter; +import com.google.cloud.teleport.v2.templates.spanner.SpannerDataWriter; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; +import net.datafaker.Faker; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.state.MapState; +import org.apache.beam.sdk.state.Timer; +import org.apache.beam.sdk.state.ValueState; +import org.apache.beam.sdk.transforms.DoFn.FinishBundleContext; +import org.apache.beam.sdk.transforms.DoFn.OnTimerContext; +import org.apache.beam.sdk.transforms.DoFn.ProcessContext; +import org.apache.beam.sdk.transforms.windowing.GlobalWindow; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.Row; +import org.joda.time.Instant; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Comprehensive unit tests covering code paths and lifecycle of {@link BatchAndWriteFn}. */ +@RunWith(JUnit4.class) +@SuppressWarnings("unchecked") +public class BatchAndWriteFnTest { + + @Test + public void constructor_nonPositiveBatchSize_fallsBackToDefault() { + BatchAndWriteFn fn = + new BatchAndWriteFn(SinkType.SPANNER, "{}", 0, null, 10, 10, mock(PCollectionView.class)); + assertNotNull(fn); + } + + @Test + public void testSetup_initializesDefaultWriterAndFaker() throws Exception { + BatchAndWriteFn fn = + new BatchAndWriteFn(SinkType.SPANNER, "{}", 1, null, 10, 10, mock(PCollectionView.class)); + fn.setup(); + + assertNotNull(fn.getWriter()); + assertNotNull(fn.getFaker()); + assertNotNull(fn.getBatcher()); + assertNotNull(fn.getDataGeneratorEngine()); + } + + @Test + public void testSetup_retainsInjectedWriterAndFaker() throws Exception { + BatchAndWriteFn fn = + new BatchAndWriteFn(SinkType.SPANNER, "{}", 1, null, 10, 10, mock(PCollectionView.class)); + DataWriter mockWriter = mock(DataWriter.class); + Faker mockFaker = mock(Faker.class); + + fn.setWriter(mockWriter); + fn.setFaker(mockFaker); + + fn.setup(); + + assertEquals(mockWriter, fn.getWriter()); + assertEquals(mockFaker, fn.getFaker()); + } + + @Test + public void testCreateWriter_mySql() { + BatchAndWriteFn fn = + new BatchAndWriteFn(SinkType.MYSQL, "{}", 1, null, 10, 10, mock(PCollectionView.class)); + DataWriter writer = fn.createWriter(SinkType.MYSQL, "{}"); + assertTrue(writer instanceof MySqlDataWriter); + } + + @Test + public void testCreateWriter_spanner() { + BatchAndWriteFn fn = + new BatchAndWriteFn(SinkType.SPANNER, "{}", 1, null, 10, 10, mock(PCollectionView.class)); + DataWriter writer = fn.createWriter(SinkType.SPANNER, "{}"); + assertTrue(writer instanceof SpannerDataWriter); + } + + @Test(expected = NullPointerException.class) + public void testCreateWriter_unsupportedThrowsException() { + BatchAndWriteFn fn = + new BatchAndWriteFn(SinkType.SPANNER, "{}", 1, null, 10, 10, mock(PCollectionView.class)); + // Passing null or an invalid type not in switch will trigger default case / exception + fn.createWriter((SinkType) null, "{}"); + } + + @Test + public void testStartBundle_callsBatcherStartBundle() throws Exception { + BatchAndWriteFn fn = + new BatchAndWriteFn(SinkType.SPANNER, "{}", 1, null, 10, 10, mock(PCollectionView.class)); + MutationBatcher mockBatcher = mock(MutationBatcher.class); + fn.setBatcher(mockBatcher); + + fn.startBundle(); + + verify(mockBatcher).startBundle(); + } + + @Test + public void testProcessElement_initializesSchemaWhenNull() throws Exception { + DataGeneratorTable users = simpleUsersTable(); + DataGeneratorSchema schema = + DataGeneratorSchema.builder().tables(ImmutableMap.of(users.name(), users)).build(); + + PCollectionView schemaView = mock(PCollectionView.class); + ProcessContext c = mock(ProcessContext.class); + when(c.sideInput(schemaView)).thenReturn(schema); + + Schema rowSchema = Schema.builder().addInt64Field("id").build(); + Row row = Row.withSchema(rowSchema).addValue(1L).build(); + when(c.element()).thenReturn(KV.of(0, GeneratedRecord.create("Users", row))); + + BatchAndWriteFn fn = new BatchAndWriteFn(SinkType.SPANNER, "{}", 1, null, 10, 10, schemaView); + DataWriter mockWriter = mock(DataWriter.class); + fn.setWriter(mockWriter); + fn.setup(); + fn.startBundle(); + + ValueState> mockInsertTopoOrderState = mock(ValueState.class); + + fn.processElement( + c, + mock(MapState.class), + mock(ValueState.class), + mock(MapState.class), + mockInsertTopoOrderState, + mock(Timer.class)); + + verify(c).sideInput(schemaView); + verify(mockInsertTopoOrderState).write(any(List.class)); + verify(mockWriter).insert(any(), eq(users), any(), anyInt()); + } + + @Test + public void testProcessElement_skipsSchemaInitializationWhenAlreadyLoaded() throws Exception { + DataGeneratorTable users = simpleUsersTable(); + DataGeneratorSchema schema = + DataGeneratorSchema.builder().tables(ImmutableMap.of(users.name(), users)).build(); + + PCollectionView schemaView = mock(PCollectionView.class); + ProcessContext c = mock(ProcessContext.class); + when(c.sideInput(schemaView)).thenReturn(schema); + + Schema rowSchema = Schema.builder().addInt64Field("id").build(); + Row row = Row.withSchema(rowSchema).addValue(1L).build(); + when(c.element()).thenReturn(KV.of(0, GeneratedRecord.create("Users", row))); + + BatchAndWriteFn fn = new BatchAndWriteFn(SinkType.SPANNER, "{}", 1, null, 10, 10, schemaView); + DataWriter mockWriter = mock(DataWriter.class); + fn.setWriter(mockWriter); + fn.setup(); + fn.startBundle(); + + // Pre-populate schema and insertTopoOrder + fn.setSchema(schema); + fn.setInsertTopoOrder(ImmutableList.of("Users")); + + ValueState> mockInsertTopoOrderState = mock(ValueState.class); + + fn.processElement( + c, + mock(MapState.class), + mock(ValueState.class), + mock(MapState.class), + mockInsertTopoOrderState, + mock(Timer.class)); + + // verify sideInput was never called since schema was already initialized + verify(c, never()).sideInput(any()); + verify(mockInsertTopoOrderState, never()).write(any()); + verify(mockWriter).insert(any(), eq(users), any(), anyInt()); + } + + @Test + public void testProcessElement_normalExecution_flushesDlqWhenPresent() throws Exception { + DataGeneratorTable users = simpleUsersTable(); + DataGeneratorSchema schema = + DataGeneratorSchema.builder().tables(ImmutableMap.of(users.name(), users)).build(); + + PCollectionView schemaView = mock(PCollectionView.class); + ProcessContext c = mock(ProcessContext.class); + when(c.sideInput(schemaView)).thenReturn(schema); + + Schema rowSchema = Schema.builder().addInt64Field("id").build(); + Row row = Row.withSchema(rowSchema).addValue(1L).build(); + when(c.element()).thenReturn(KV.of(0, GeneratedRecord.create("Users", row))); + + BatchAndWriteFn fn = new BatchAndWriteFn(SinkType.SPANNER, "{}", 1, null, 10, 10, schemaView); + DataWriter mockWriter = mock(DataWriter.class); + fn.setWriter(mockWriter); + fn.setup(); + fn.startBundle(); + + MutationBatcher mockBatcher = mock(MutationBatcher.class); + List dlq = new ArrayList<>(); + dlq.add("dlq_record_1"); + when(mockBatcher.getFailedRecords()).thenReturn(dlq); + fn.setBatcher(mockBatcher); + + fn.processElement( + c, + mock(MapState.class), + mock(ValueState.class), + mock(MapState.class), + mock(ValueState.class), + mock(Timer.class)); + + verify(c).output(eq("dlq_record_1")); + verify(mockBatcher).clearDlq(); + } + + @Test + public void testProcessElement_engineFailure_catchesAndOutputsToDlq() throws Exception { + DataGeneratorTable users = simpleUsersTable(); + DataGeneratorSchema schema = + DataGeneratorSchema.builder().tables(ImmutableMap.of(users.name(), users)).build(); + + PCollectionView schemaView = mock(PCollectionView.class); + ProcessContext c = mock(ProcessContext.class); + when(c.sideInput(schemaView)).thenReturn(schema); + + Schema rowSchema = Schema.builder().addInt64Field("id").build(); + Row row = Row.withSchema(rowSchema).addValue(1L).build(); + when(c.element()).thenReturn(KV.of(0, GeneratedRecord.create("Users", row))); + + BatchAndWriteFn fn = new BatchAndWriteFn(SinkType.SPANNER, "{}", 1, null, 10, 10, schemaView); + DataWriter mockWriter = mock(DataWriter.class); + doThrow(new RuntimeException("simulated sink failure")) + .when(mockWriter) + .insert(any(), any(), any(), anyInt()); + fn.setWriter(mockWriter); + fn.setup(); + fn.startBundle(); + + fn.processElement( + c, + mock(MapState.class), + mock(ValueState.class), + mock(MapState.class), + mock(ValueState.class), + mock(Timer.class)); + + verify(c).output(any(String.class)); + } + + @Test + public void testOnTimer_restoresInsertTopoOrderFromStateWhenNull() throws Exception { + BatchAndWriteFn fn = + new BatchAndWriteFn(SinkType.SPANNER, "{}", 1, null, 10, 10, mock(PCollectionView.class)); + fn.setWriter(mock(DataWriter.class)); + fn.setup(); + fn.startBundle(); + + fn.setDataGeneratorEngine(mock(DataGeneratorEngine.class)); + // ensure insertTopoOrder is null in memory + fn.setInsertTopoOrder(null); + + ValueState> mockInsertTopoOrderState = mock(ValueState.class); + when(mockInsertTopoOrderState.read()).thenReturn(ImmutableList.of("TableA", "TableB")); + + fn.onTimer( + mock(OnTimerContext.class), + mock(MapState.class), + mock(ValueState.class), + mock(MapState.class), + mockInsertTopoOrderState, + mock(Timer.class)); + + verify(mockInsertTopoOrderState).read(); + assertEquals(ImmutableList.of("TableA", "TableB"), fn.getInsertTopoOrder()); + } + + @Test + public void testOnTimer_skipsStateReadWhenInsertTopoOrderIsPresent() throws Exception { + BatchAndWriteFn fn = + new BatchAndWriteFn(SinkType.SPANNER, "{}", 1, null, 10, 10, mock(PCollectionView.class)); + fn.setWriter(mock(DataWriter.class)); + fn.setup(); + fn.startBundle(); + + fn.setDataGeneratorEngine(mock(DataGeneratorEngine.class)); + // Pre-populate insertTopoOrder in memory + fn.setInsertTopoOrder(ImmutableList.of("TableA")); + + ValueState> mockInsertTopoOrderState = mock(ValueState.class); + + fn.onTimer( + mock(OnTimerContext.class), + mock(MapState.class), + mock(ValueState.class), + mock(MapState.class), + mockInsertTopoOrderState, + mock(Timer.class)); + + verify(mockInsertTopoOrderState, never()).read(); + } + + @Test + public void testOnTimer_exceptionRoutesToDlq() throws Exception { + BatchAndWriteFn fn = + new BatchAndWriteFn(SinkType.SPANNER, "{}", 1, null, 10, 10, mock(PCollectionView.class)); + fn.setWriter(mock(DataWriter.class)); + fn.setup(); + fn.startBundle(); + + DataGeneratorEngine mockEngine = mock(DataGeneratorEngine.class); + doThrow(new RuntimeException("timer failure")) + .when(mockEngine) + .processScheduledEvents(any(), any(), any(), any(), any(), any(), any()); + fn.setDataGeneratorEngine(mockEngine); + + OnTimerContext c = mock(OnTimerContext.class); + + fn.onTimer( + c, + mock(MapState.class), + mock(ValueState.class), + mock(MapState.class), + mock(ValueState.class), + mock(Timer.class)); + + verify(c).output(any(String.class)); + } + + @Test + public void testFinishBundle_flushesBatcherAndEmitsPendingDlq() throws Exception { + BatchAndWriteFn fn = + new BatchAndWriteFn(SinkType.SPANNER, "{}", 1, null, 10, 10, mock(PCollectionView.class)); + fn.setWriter(mock(DataWriter.class)); + fn.setup(); + fn.startBundle(); + + List topoOrder = ImmutableList.of("TableA", "TableB"); + fn.setInsertTopoOrder(topoOrder); + + MutationBatcher mockBatcher = mock(MutationBatcher.class); + when(mockBatcher.getFailedRecords()).thenReturn(Arrays.asList("dlq_record_1")); + fn.setBatcher(mockBatcher); + + FinishBundleContext context = mock(FinishBundleContext.class); + + fn.finishBundle(context); + + verify(mockBatcher).flushInsertsInTopoOrder(eq(topoOrder)); + verify(mockBatcher).flushUpdates(); + verify(mockBatcher).flushDeletesInReverseTopoOrder(eq(topoOrder)); + verify(context).output(eq("dlq_record_1"), any(Instant.class), eq(GlobalWindow.INSTANCE)); + verify(mockBatcher).clearDlq(); + } + + @Test + public void testTeardown_closesWriterSuccessfully() throws Exception { + BatchAndWriteFn fn = + new BatchAndWriteFn(SinkType.SPANNER, "{}", 1, null, 10, 10, mock(PCollectionView.class)); + DataWriter mockWriter = mock(DataWriter.class); + fn.setWriter(mockWriter); + + fn.teardown(); + + verify(mockWriter).close(); + } + + @Test + public void testTeardown_nullWriterDoesNothing() throws Exception { + BatchAndWriteFn fn = + new BatchAndWriteFn(SinkType.SPANNER, "{}", 1, null, 10, 10, mock(PCollectionView.class)); + fn.setWriter(null); + + fn.teardown(); + } + + @Test(expected = RuntimeException.class) + public void testTeardown_writerCloseThrowsException() throws Exception { + BatchAndWriteFn fn = + new BatchAndWriteFn(SinkType.SPANNER, "{}", 1, null, 10, 10, mock(PCollectionView.class)); + DataWriter mockWriter = mock(DataWriter.class); + doThrow(new RuntimeException("simulated close error")).when(mockWriter).close(); + fn.setWriter(mockWriter); + + fn.teardown(); + } + + // =========================================================================== + // Helpers + // =========================================================================== + + private static DataGeneratorTable simpleUsersTable() { + return DataGeneratorTable.builder() + .name("Users") + .columns(ImmutableList.of(intColumn("id"))) + .primaryKeys(ImmutableList.of("id")) + .foreignKeys(ImmutableList.of()) + .uniqueKeys(ImmutableList.of()) + .insertQps(1) + .updateQps(0) + .deleteQps(0) + .isRoot(true) + .recordsPerTick(1.0) + .build(); + } + + private static DataGeneratorColumn intColumn(String name) { + return DataGeneratorColumn.builder() + .name(name) + .logicalType(LogicalType.INT64) + .isPrimaryKey(false) + .isNullable(false) + .isSkipped(false) + .isGenerated(false) + .size(null) + .precision(null) + .scale(null) + .build(); + } } diff --git a/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/transforms/BatchAndWriteTest.java b/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/transforms/BatchAndWriteTest.java index 551d065d12..e1e653f38c 100644 --- a/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/transforms/BatchAndWriteTest.java +++ b/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/transforms/BatchAndWriteTest.java @@ -1,2 +1,67 @@ -package com.google.cloud.teleport.v2.templates.transforms;public class BatchAndWriteTest { +/* + * Copyright (C) 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.templates.transforms; + +import static org.junit.Assert.assertEquals; + +import com.google.cloud.teleport.v2.templates.CdcDataGeneratorOptions.SinkType; +import com.google.cloud.teleport.v2.templates.model.DataGeneratorSchema; +import com.google.cloud.teleport.v2.templates.model.GeneratedRecord; +import com.google.common.collect.ImmutableMap; +import org.apache.beam.sdk.coders.StringUtf8Coder; +import org.apache.beam.sdk.schemas.Schema; +import org.apache.beam.sdk.testing.TestPipeline; +import org.apache.beam.sdk.transforms.Create; +import org.apache.beam.sdk.transforms.View; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PCollectionView; +import org.apache.beam.sdk.values.Row; +import org.junit.Rule; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Comprehensive unit tests for {@link BatchAndWrite}. */ +@RunWith(JUnit4.class) +public class BatchAndWriteTest { + + @Rule public final transient TestPipeline pipeline = TestPipeline.create(); + + @Test + public void testBatchAndWriteExpansion_spannerSink() { + DataGeneratorSchema schema = DataGeneratorSchema.builder().tables(ImmutableMap.of()).build(); + + PCollectionView schemaView = + pipeline.apply("CreateSchema", Create.of(schema)).apply("AsSingleton", View.asSingleton()); + + Schema rowSchema = Schema.builder().addInt64Field("id").build(); + Row row = Row.withSchema(rowSchema).addValues(123L).build(); + GeneratedRecord record = GeneratedRecord.create("SampleTable", row); + + PCollection> input = + pipeline.apply("CreateInput", Create.of(KV.of(0, record))); + + BatchAndWrite transform = + new BatchAndWrite(SinkType.SPANNER, "{}", 100, 10, 5000, 10000, schemaView); + + PCollection output = input.apply("BatchAndWriteTransformSpanner", transform); + assertEquals(StringUtf8Coder.of(), output.getCoder()); + + // Run the pipeline to verify valid expansion and setup + pipeline.run(); + } } From f4a73ec3616cf398d7bd4a14582d6edfb0a3f722 Mon Sep 17 00:00:00 2001 From: Shreya Khajanchi Date: Thu, 14 May 2026 12:02:57 +0530 Subject: [PATCH 5/5] addressing comments --- .../v2/templates/dofn/BatchAndWriteFn.java | 23 +---- .../v2/templates/sink/DataWriterFactory.java | 47 ++++++++++ .../templates/transforms/BatchAndWrite.java | 93 ------------------- .../templates/dofn/BatchAndWriteFnTest.java | 27 ------ .../templates/sink/DataWriterFactoryTest.java | 50 ++++++++++ .../transforms/BatchAndWriteTest.java | 67 ------------- 6 files changed, 100 insertions(+), 207 deletions(-) create mode 100644 v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/sink/DataWriterFactory.java delete mode 100644 v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/transforms/BatchAndWrite.java create mode 100644 v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/sink/DataWriterFactoryTest.java delete mode 100644 v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/transforms/BatchAndWriteTest.java diff --git a/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/dofn/BatchAndWriteFn.java b/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/dofn/BatchAndWriteFn.java index d6a3c71a42..e74e991f1a 100644 --- a/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/dofn/BatchAndWriteFn.java +++ b/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/dofn/BatchAndWriteFn.java @@ -20,9 +20,8 @@ import com.google.cloud.teleport.v2.templates.model.DataGeneratorTable; import com.google.cloud.teleport.v2.templates.model.GeneratedRecord; import com.google.cloud.teleport.v2.templates.model.LifecycleEvent; -import com.google.cloud.teleport.v2.templates.mysql.MySqlDataWriter; import com.google.cloud.teleport.v2.templates.sink.DataWriter; -import com.google.cloud.teleport.v2.templates.spanner.SpannerDataWriter; +import com.google.cloud.teleport.v2.templates.sink.DataWriterFactory; import com.google.cloud.teleport.v2.templates.utils.FailureRecord; import com.google.cloud.teleport.v2.templates.utils.SchemaUtils; import com.google.common.annotations.VisibleForTesting; @@ -117,7 +116,7 @@ public void setup() { this.schema = null; this.insertTopoOrder = null; if (writer == null) { - writer = createWriter(sinkType, sinkOptionsPath); + writer = DataWriterFactory.createWriter(sinkType, sinkOptionsPath); } if (faker == null) { faker = new Faker(); @@ -127,18 +126,6 @@ public void setup() { this.dataGeneratorEngine = new DataGeneratorEngine(updateInterval, deleteInterval, faker); } - @VisibleForTesting - protected DataWriter createWriter(SinkType type, String configPath) { - switch (type) { - case MYSQL: - return new MySqlDataWriter(configPath); - case SPANNER: - return new SpannerDataWriter(configPath); - default: - throw new IllegalArgumentException("Unsupported sink type: " + type); - } - } - @StartBundle public void startBundle() { this.batcher.startBundle(); @@ -208,11 +195,7 @@ public void onTimer( } catch (Exception timerError) { LOG.error("Scheduled events generation failed during timer processing", timerError); Metrics.counter(BatchAndWriteFn.class, "generationFailures").inc(); - batcher - .getFailedRecords() - .add( - FailureRecord.toJson( - "UNKNOWN_TABLE", FailureRecord.OPERATION_GENERATION, null, timerError)); + batcher.getFailedRecords().add(FailureRecord.toJson("UNKNOWN_TABLE", null, null, timerError)); } writeFailedRecords(c::output); diff --git a/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/sink/DataWriterFactory.java b/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/sink/DataWriterFactory.java new file mode 100644 index 0000000000..77f4768373 --- /dev/null +++ b/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/sink/DataWriterFactory.java @@ -0,0 +1,47 @@ +/* + * Copyright (C) 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.templates.sink; + +import com.google.cloud.teleport.v2.templates.CdcDataGeneratorOptions.SinkType; +import com.google.cloud.teleport.v2.templates.mysql.MySqlDataWriter; +import com.google.cloud.teleport.v2.templates.spanner.SpannerDataWriter; + +/** + * Factory class for creating {@link DataWriter} instances based on the configured {@link SinkType}. + */ +public class DataWriterFactory { + + private DataWriterFactory() {} + + /** + * Creates a {@link DataWriter} for the specified sink type and configuration path. + * + * @param type the sink type to create a writer for + * @param configPath the path to the sink configuration document + * @return a new {@link DataWriter} instance + * @throws IllegalArgumentException if the sink type is unsupported + */ + public static DataWriter createWriter(SinkType type, String configPath) { + switch (type) { + case MYSQL: + return new MySqlDataWriter(configPath); + case SPANNER: + return new SpannerDataWriter(configPath); + default: + throw new IllegalArgumentException("Unsupported sink type: " + type); + } + } +} diff --git a/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/transforms/BatchAndWrite.java b/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/transforms/BatchAndWrite.java deleted file mode 100644 index 43715177d6..0000000000 --- a/v2/cdc-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/transforms/BatchAndWrite.java +++ /dev/null @@ -1,93 +0,0 @@ -/* - * Copyright (C) 2026 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.teleport.v2.templates.transforms; - -import com.google.cloud.teleport.v2.templates.CdcDataGeneratorOptions.SinkType; -import com.google.cloud.teleport.v2.templates.dofn.BatchAndWriteFn; -import com.google.cloud.teleport.v2.templates.model.DataGeneratorSchema; -import com.google.cloud.teleport.v2.templates.model.GeneratedRecord; -import com.google.cloud.teleport.v2.templates.utils.FailureRecord; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.transforms.PTransform; -import org.apache.beam.sdk.transforms.ParDo; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; - -/** - * {@link PTransform} that takes a stream of partially-generated keyed rows, completes them, - * recursively materialises child rows from the schema DAG, batches them by {@code (table, shard, - * op)}, and writes to the sink configured by {@code sinkType}/{@code sinkOptionsPath}. - * - *

Output is a {@code PCollection} of JSON-encoded {@link FailureRecord}s — one per row - * that could not be generated or that the sink rejected. The output is empty when no failures - * occur. - */ -public class BatchAndWrite - extends PTransform>, PCollection> { - - private final SinkType sinkType; - private final String sinkOptionsPath; - private final Integer batchSize; - private final Integer jdbcPoolSize; - private final Integer updateInterval; - private final Integer deleteInterval; - private final PCollectionView schemaView; - - /** - * @param sinkType which sink writer the underlying {@link BatchAndWriteFn} should instantiate - * @param sinkOptionsPath path/URI to the sink-specific configuration document - * @param batchSize maximum rows buffered per {@code (table, shard, op)} before flush; - * @param jdbcPoolSize connection pool size limit per MySQL shard node - * @param updateInterval custom UPDATE interval in seconds for lifecycle events - * @param deleteInterval custom trailing DELETE interval in seconds for lifecycle events - * @param schemaView side input carrying the {@link DataGeneratorSchema} - */ - public BatchAndWrite( - SinkType sinkType, - String sinkOptionsPath, - Integer batchSize, - Integer jdbcPoolSize, - Integer updateInterval, - Integer deleteInterval, - PCollectionView schemaView) { - this.sinkType = sinkType; - this.sinkOptionsPath = sinkOptionsPath; - this.batchSize = batchSize; - this.jdbcPoolSize = jdbcPoolSize; - this.updateInterval = updateInterval; - this.deleteInterval = deleteInterval; - this.schemaView = schemaView; - } - - @Override - public PCollection expand(PCollection> input) { - return input - .apply( - "BatchAndWriteFn", - ParDo.of( - new BatchAndWriteFn( - sinkType, - sinkOptionsPath, - batchSize, - jdbcPoolSize, - updateInterval, - deleteInterval, - schemaView)) - .withSideInputs(schemaView)) - .setCoder(StringUtf8Coder.of()); - } -} diff --git a/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/dofn/BatchAndWriteFnTest.java b/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/dofn/BatchAndWriteFnTest.java index f147613de7..4cbbd6ce18 100644 --- a/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/dofn/BatchAndWriteFnTest.java +++ b/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/dofn/BatchAndWriteFnTest.java @@ -17,7 +17,6 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyInt; import static org.mockito.ArgumentMatchers.eq; @@ -33,9 +32,7 @@ import com.google.cloud.teleport.v2.templates.model.DataGeneratorTable; import com.google.cloud.teleport.v2.templates.model.GeneratedRecord; import com.google.cloud.teleport.v2.templates.model.LogicalType; -import com.google.cloud.teleport.v2.templates.mysql.MySqlDataWriter; import com.google.cloud.teleport.v2.templates.sink.DataWriter; -import com.google.cloud.teleport.v2.templates.spanner.SpannerDataWriter; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; import java.util.ArrayList; @@ -98,30 +95,6 @@ public void testSetup_retainsInjectedWriterAndFaker() throws Exception { assertEquals(mockFaker, fn.getFaker()); } - @Test - public void testCreateWriter_mySql() { - BatchAndWriteFn fn = - new BatchAndWriteFn(SinkType.MYSQL, "{}", 1, null, 10, 10, mock(PCollectionView.class)); - DataWriter writer = fn.createWriter(SinkType.MYSQL, "{}"); - assertTrue(writer instanceof MySqlDataWriter); - } - - @Test - public void testCreateWriter_spanner() { - BatchAndWriteFn fn = - new BatchAndWriteFn(SinkType.SPANNER, "{}", 1, null, 10, 10, mock(PCollectionView.class)); - DataWriter writer = fn.createWriter(SinkType.SPANNER, "{}"); - assertTrue(writer instanceof SpannerDataWriter); - } - - @Test(expected = NullPointerException.class) - public void testCreateWriter_unsupportedThrowsException() { - BatchAndWriteFn fn = - new BatchAndWriteFn(SinkType.SPANNER, "{}", 1, null, 10, 10, mock(PCollectionView.class)); - // Passing null or an invalid type not in switch will trigger default case / exception - fn.createWriter((SinkType) null, "{}"); - } - @Test public void testStartBundle_callsBatcherStartBundle() throws Exception { BatchAndWriteFn fn = diff --git a/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/sink/DataWriterFactoryTest.java b/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/sink/DataWriterFactoryTest.java new file mode 100644 index 0000000000..68d26dcc2d --- /dev/null +++ b/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/sink/DataWriterFactoryTest.java @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.templates.sink; + +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import com.google.cloud.teleport.v2.templates.CdcDataGeneratorOptions.SinkType; +import com.google.cloud.teleport.v2.templates.mysql.MySqlDataWriter; +import com.google.cloud.teleport.v2.templates.spanner.SpannerDataWriter; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** Comprehensive unit tests for {@link DataWriterFactory}. */ +@RunWith(JUnit4.class) +public class DataWriterFactoryTest { + + @Test + public void testCreateWriter_mySql() { + DataWriter writer = DataWriterFactory.createWriter(SinkType.MYSQL, "{}"); + assertNotNull(writer); + assertTrue(writer instanceof MySqlDataWriter); + } + + @Test + public void testCreateWriter_spanner() { + DataWriter writer = DataWriterFactory.createWriter(SinkType.SPANNER, "{}"); + assertNotNull(writer); + assertTrue(writer instanceof SpannerDataWriter); + } + + @Test(expected = NullPointerException.class) + public void testCreateWriter_unsupportedThrowsException() { + DataWriterFactory.createWriter(null, "{}"); + } +} diff --git a/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/transforms/BatchAndWriteTest.java b/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/transforms/BatchAndWriteTest.java deleted file mode 100644 index e1e653f38c..0000000000 --- a/v2/cdc-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/transforms/BatchAndWriteTest.java +++ /dev/null @@ -1,67 +0,0 @@ -/* - * Copyright (C) 2026 Google LLC - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ -package com.google.cloud.teleport.v2.templates.transforms; - -import static org.junit.Assert.assertEquals; - -import com.google.cloud.teleport.v2.templates.CdcDataGeneratorOptions.SinkType; -import com.google.cloud.teleport.v2.templates.model.DataGeneratorSchema; -import com.google.cloud.teleport.v2.templates.model.GeneratedRecord; -import com.google.common.collect.ImmutableMap; -import org.apache.beam.sdk.coders.StringUtf8Coder; -import org.apache.beam.sdk.schemas.Schema; -import org.apache.beam.sdk.testing.TestPipeline; -import org.apache.beam.sdk.transforms.Create; -import org.apache.beam.sdk.transforms.View; -import org.apache.beam.sdk.values.KV; -import org.apache.beam.sdk.values.PCollection; -import org.apache.beam.sdk.values.PCollectionView; -import org.apache.beam.sdk.values.Row; -import org.junit.Rule; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; - -/** Comprehensive unit tests for {@link BatchAndWrite}. */ -@RunWith(JUnit4.class) -public class BatchAndWriteTest { - - @Rule public final transient TestPipeline pipeline = TestPipeline.create(); - - @Test - public void testBatchAndWriteExpansion_spannerSink() { - DataGeneratorSchema schema = DataGeneratorSchema.builder().tables(ImmutableMap.of()).build(); - - PCollectionView schemaView = - pipeline.apply("CreateSchema", Create.of(schema)).apply("AsSingleton", View.asSingleton()); - - Schema rowSchema = Schema.builder().addInt64Field("id").build(); - Row row = Row.withSchema(rowSchema).addValues(123L).build(); - GeneratedRecord record = GeneratedRecord.create("SampleTable", row); - - PCollection> input = - pipeline.apply("CreateInput", Create.of(KV.of(0, record))); - - BatchAndWrite transform = - new BatchAndWrite(SinkType.SPANNER, "{}", 100, 10, 5000, 10000, schemaView); - - PCollection output = input.apply("BatchAndWriteTransformSpanner", transform); - assertEquals(StringUtf8Coder.of(), output.getCoder()); - - // Run the pipeline to verify valid expansion and setup - pipeline.run(); - } -}