Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,312 @@
/*
* Copyright (C) 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.dofn;

import com.google.cloud.teleport.v2.templates.CdcDataGeneratorOptions.SinkType;
import com.google.cloud.teleport.v2.templates.model.DataGeneratorSchema;
import com.google.cloud.teleport.v2.templates.model.DataGeneratorTable;
import com.google.cloud.teleport.v2.templates.model.GeneratedRecord;
import com.google.cloud.teleport.v2.templates.model.LifecycleEvent;
import com.google.cloud.teleport.v2.templates.sink.DataWriter;
import com.google.cloud.teleport.v2.templates.sink.DataWriterFactory;
import com.google.cloud.teleport.v2.templates.utils.FailureRecord;
import com.google.cloud.teleport.v2.templates.utils.SchemaUtils;
import com.google.common.annotations.VisibleForTesting;
import java.util.List;
import java.util.function.Consumer;
import net.datafaker.Faker;
import org.apache.beam.sdk.coders.ListCoder;
import org.apache.beam.sdk.coders.SerializableCoder;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.coders.VarLongCoder;
import org.apache.beam.sdk.metrics.Metrics;
import org.apache.beam.sdk.state.MapState;
import org.apache.beam.sdk.state.StateSpec;
import org.apache.beam.sdk.state.StateSpecs;
import org.apache.beam.sdk.state.TimeDomain;
import org.apache.beam.sdk.state.Timer;
import org.apache.beam.sdk.state.TimerSpec;
import org.apache.beam.sdk.state.TimerSpecs;
import org.apache.beam.sdk.state.ValueState;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.windowing.GlobalWindow;
import org.apache.beam.sdk.values.KV;
import org.apache.beam.sdk.values.PCollectionView;
import org.apache.beam.sdk.values.Row;
import org.joda.time.Instant;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Stateful {@link DoFn} that manages persistence lifecycles, state fields, and timers, delegating
* core traversal to {@link DataGeneratorEngine} and mutation batch writing to {@link
* MutationBatcher}.
*/
public class BatchAndWriteFn extends DoFn<KV<Integer, GeneratedRecord>, String> {

private static final Logger LOG = LoggerFactory.getLogger(BatchAndWriteFn.class);

private final SinkType sinkType;
private final String sinkOptionsPath;
private final int batchSize;
private final Integer jdbcPoolSize;
private final Integer updateInterval;
private final Integer deleteInterval;
private final PCollectionView<DataGeneratorSchema> schemaView;

private transient DataWriter writer;
private transient Faker faker;
private transient volatile DataGeneratorSchema schema;
private transient volatile List<String> insertTopoOrder;
Comment on lines +72 to +73
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does volatile do here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using volatile guarantees that once schema and insertTopoOrder are published by the first bundle, all subsequent bundles and timer callbacks on that worker instance will deterministically observe the fully constructed objects


private transient DataGeneratorEngine dataGeneratorEngine;
private transient MutationBatcher batcher;

@StateId("eventQueue")
private final StateSpec<MapState<Long, List<LifecycleEvent>>> eventQueueSpec =
StateSpecs.map(VarLongCoder.of(), ListCoder.of(SerializableCoder.of(LifecycleEvent.class)));

@StateId("activeTimestamps")
private final StateSpec<ValueState<List<Long>>> activeTimestampsSpec =
StateSpecs.value(ListCoder.of(VarLongCoder.of()));

@StateId("tableMapState")
private final StateSpec<MapState<String, DataGeneratorTable>> tableMapSpec =
StateSpecs.map(StringUtf8Coder.of(), SerializableCoder.of(DataGeneratorTable.class));

@StateId("insertTopoOrderState")
private final StateSpec<ValueState<List<String>>> insertTopoOrderSpec =
StateSpecs.value(ListCoder.of(StringUtf8Coder.of()));

@TimerId("eventTimer")
private final TimerSpec eventTimerSpec = TimerSpecs.timer(TimeDomain.PROCESSING_TIME);

public BatchAndWriteFn(
SinkType sinkType,
String sinkOptionsPath,
Integer batchSize,
Integer jdbcPoolSize,
Integer updateInterval,
Integer deleteInterval,
PCollectionView<DataGeneratorSchema> schemaView) {
this.sinkType = sinkType;
this.sinkOptionsPath = sinkOptionsPath;
this.batchSize = batchSize;
Comment thread
shreyakhajanchi marked this conversation as resolved.
this.jdbcPoolSize = jdbcPoolSize;
this.updateInterval = updateInterval;
this.deleteInterval = deleteInterval;
Comment thread
shreyakhajanchi marked this conversation as resolved.
this.schemaView = schemaView;
}

@Setup
public void setup() {
this.schema = null;
this.insertTopoOrder = null;
if (writer == null) {
writer = DataWriterFactory.createWriter(sinkType, sinkOptionsPath);
}
if (faker == null) {
faker = new Faker();
}

this.batcher = new MutationBatcher(batchSize, jdbcPoolSize, writer);
this.dataGeneratorEngine = new DataGeneratorEngine(updateInterval, deleteInterval, faker);
}

@StartBundle
public void startBundle() {
this.batcher.startBundle();
}

@ProcessElement
public void processElement(
ProcessContext c,
@StateId("eventQueue") MapState<Long, List<LifecycleEvent>> eventQueueState,
@StateId("activeTimestamps") ValueState<List<Long>> activeTimestamps,
@StateId("tableMapState") MapState<String, DataGeneratorTable> tableMapState,
@StateId("insertTopoOrderState") ValueState<List<String>> insertTopoOrderState,
@TimerId("eventTimer") Timer eventTimer) {

ensureSchemaInitialized(c, insertTopoOrderState);

GeneratedRecord record = c.element().getValue();
String tableName = record.tableName();
Row pkValues = record.primaryKeyValues();

try {
dataGeneratorEngine.processRecord(
tableName,
pkValues,
eventQueueState,
activeTimestamps,
tableMapState,
eventTimer,
schema,
batcher,
insertTopoOrder);
} catch (Exception genError) {
LOG.error("Generation failed for table {}", tableName, genError);
Metrics.counter(BatchAndWriteFn.class, "generationFailures").inc();
batcher
.getFailedRecords()
.add(
FailureRecord.toJson(
tableName, FailureRecord.OPERATION_GENERATION, pkValues, genError));
}

writeFailedRecords(c::output);
}

@OnTimer("eventTimer")
public void onTimer(
OnTimerContext c,
@StateId("eventQueue") MapState<Long, List<LifecycleEvent>> eventQueueState,
@StateId("activeTimestamps") ValueState<List<Long>> activeTimestamps,
@StateId("tableMapState") MapState<String, DataGeneratorTable> tableMapState,
@StateId("insertTopoOrderState") ValueState<List<String>> insertTopoOrderState,
@TimerId("eventTimer") Timer eventTimer) {

if (this.insertTopoOrder == null) {
this.insertTopoOrder = insertTopoOrderState.read();
}

try {
dataGeneratorEngine.processScheduledEvents(
eventQueueState,
activeTimestamps,
tableMapState,
eventTimer,
batcher,
batcher.getFailedRecords(),
this.insertTopoOrder);
} catch (Exception timerError) {
LOG.error("Scheduled events generation failed during timer processing", timerError);
Metrics.counter(BatchAndWriteFn.class, "generationFailures").inc();
batcher.getFailedRecords().add(FailureRecord.toJson("UNKNOWN_TABLE", null, null, timerError));
}

writeFailedRecords(c::output);
}

@FinishBundle
public void finishBundle(FinishBundleContext c) {
batcher.flushInsertsInTopoOrder(insertTopoOrder);
batcher.flushUpdates();
batcher.flushDeletesInReverseTopoOrder(insertTopoOrder);

List<String> pendingDlq = batcher.getFailedRecords();
if (pendingDlq != null && !pendingDlq.isEmpty()) {
Instant now = Instant.now();
for (String record : pendingDlq) {
c.output(record, now, GlobalWindow.INSTANCE);
}
batcher.clearDlq();
}
}

@Teardown
public void teardown() {
if (writer != null) {
try {
writer.close();
} catch (Exception e) {
throw new RuntimeException("Failed to close writer", e);
}
}
}

private void ensureSchemaInitialized(
ProcessContext c, ValueState<List<String>> insertTopoOrderState) {
if (schema != null && insertTopoOrder != null) {
return;
}
DataGeneratorSchema loaded = c.sideInput(schemaView);
this.insertTopoOrder = SchemaUtils.buildInsertTopoOrder(loaded);
insertTopoOrderState.write(this.insertTopoOrder);
this.schema = loaded;
}

private void writeFailedRecords(Consumer<String> sink) {
List<String> dlq = batcher.getFailedRecords();
if (dlq == null || dlq.isEmpty()) {
return;
}
for (String record : dlq) {
sink.accept(record);
}
batcher.clearDlq();
}
Comment on lines +242 to +251
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't get what this method is doing (I don't fully understand the Consumer function). Where are the failed records from the DLQ being written to?

Also, what is the function of the DLQ? Are users expected to re-run the DLQ, or is it more for reporting purpose?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is outputting the failed records from the dofn, it is written to a gcs file and is only for reporting not retrying


@VisibleForTesting
DataWriter getWriter() {
return writer;
}

@VisibleForTesting
void setWriter(DataWriter writer) {
this.writer = writer;
}

@VisibleForTesting
Faker getFaker() {
return faker;
}

@VisibleForTesting
void setFaker(Faker faker) {
this.faker = faker;
}

@VisibleForTesting
DataGeneratorSchema getSchema() {
return schema;
}

@VisibleForTesting
void setSchema(DataGeneratorSchema schema) {
this.schema = schema;
}

@VisibleForTesting
List<String> getInsertTopoOrder() {
return insertTopoOrder;
}

@VisibleForTesting
void setInsertTopoOrder(List<String> insertTopoOrder) {
this.insertTopoOrder = insertTopoOrder;
}

@VisibleForTesting
DataGeneratorEngine getDataGeneratorEngine() {
return dataGeneratorEngine;
}

@VisibleForTesting
void setDataGeneratorEngine(DataGeneratorEngine dataGeneratorEngine) {
this.dataGeneratorEngine = dataGeneratorEngine;
}

@VisibleForTesting
MutationBatcher getBatcher() {
return batcher;
}

@VisibleForTesting
void setBatcher(MutationBatcher batcher) {
this.batcher = batcher;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
/*
* Copyright (C) 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.model;

import com.google.auto.value.AutoValue;
import java.io.Serializable;
import org.apache.beam.sdk.values.Row;

/** Type-safe container wrapping table names and primary key values. */
@AutoValue
public abstract class GeneratedRecord implements Serializable {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The more Serializable you introduce now the harder it will be to properly clean it up later...just keep it in mind.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand, but i have retained it in classes which use beam row, because my analysis suggests that it might require custom coder and solving it for 1 class will help me solve for all others so I would like to take them up at once

public abstract String tableName();

public abstract Row primaryKeyValues();

public static GeneratedRecord create(String tableName, Row primaryKeyValues) {
return new AutoValue_GeneratedRecord(tableName, primaryKeyValues);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright (C) 2026 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may not
* use this file except in compliance with the License. You may obtain a copy of
* the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
package com.google.cloud.teleport.v2.templates.sink;

import com.google.cloud.teleport.v2.templates.CdcDataGeneratorOptions.SinkType;
import com.google.cloud.teleport.v2.templates.mysql.MySqlDataWriter;
import com.google.cloud.teleport.v2.templates.spanner.SpannerDataWriter;

/**
* Factory class for creating {@link DataWriter} instances based on the configured {@link SinkType}.
*/
public class DataWriterFactory {

private DataWriterFactory() {}

/**
* Creates a {@link DataWriter} for the specified sink type and configuration path.
*
* @param type the sink type to create a writer for
* @param configPath the path to the sink configuration document
* @return a new {@link DataWriter} instance
* @throws IllegalArgumentException if the sink type is unsupported
*/
public static DataWriter createWriter(SinkType type, String configPath) {
switch (type) {
case MYSQL:
return new MySqlDataWriter(configPath);
case SPANNER:
return new SpannerDataWriter(configPath);
default:
throw new IllegalArgumentException("Unsupported sink type: " + type);
}
}
}
Loading
Loading