diff --git a/v2/googlecloud-to-googlecloud/README_Bigtable_Change_Streams_to_Bigtable.md b/v2/googlecloud-to-googlecloud/README_Bigtable_Change_Streams_to_Bigtable.md new file mode 100644 index 0000000000..578d4b5ce9 --- /dev/null +++ b/v2/googlecloud-to-googlecloud/README_Bigtable_Change_Streams_to_Bigtable.md @@ -0,0 +1,354 @@ + +Bigtable Change Streams to Bigtable Replicator template +--- +A streaming pipeline that replicates Bigtable change stream mutations to another +Bigtable instance. + + + +:bulb: This is a generated documentation based +on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/contributor-docs/code-contributions.md#metadata-annotations) +. Do not change this file directly. + +## Parameters + +### Required parameters + +* **bigtableChangeStreamAppProfile**: The Bigtable application profile ID. The application profile must use single-cluster routing and allow single-row transactions. +* **bigtableReadInstanceId**: The source Bigtable instance ID. +* **bigtableReadTableId**: The source Bigtable table ID. +* **bigtableWriteInstanceId**: The ID of the Bigtable instance that contains the table. +* **bigtableWriteTableId**: The ID of the Bigtable table to write to. +* **bigtableWriteColumnFamily**: The name of the column family of the Bigtable table to write data into. + +### Optional parameters + +* **bidirectionalReplicationEnabled**: Whether bidirectional replication between bigtable instances is enabled, adds additional logic to filter out replicated mutations. Defaults to: false. +* **cbtQualifier**: Bidirectional replication source CBT qualifier to append. Defaults to: BIDIRECTIONAL_REPL_SOURCE_CBT. +* **cbtFilterQualifier**: Bidirectional replication filter CBT qualifier to check and ignore. Defaults to: BIDIRECTIONAL_REPL_SOURCE_CBT. +* **dryRunEnabled**: When dry run is enabled, pipeline will not write to Bigtable. Defaults to: false. +* **filterGCMutations**: Filters out garbage collection Delete mutations from CBT. Defaults to: false. +* **addRedistribute**: When set to true, redistributes change stream mutations by their row key to balance load across workers. Defaults to: false. +* **bigtableChangeStreamMetadataInstanceId**: The Bigtable change streams metadata instance ID. Defaults to empty. +* **bigtableChangeStreamMetadataTableTableId**: The ID of the Bigtable change streams connector metadata table. If not provided, a Bigtable change streams connector metadata table is automatically created during pipeline execution. Defaults to empty. +* **bigtableChangeStreamCharset**: The Bigtable change streams charset name. Defaults to: UTF-8. +* **bigtableChangeStreamStartTimestamp**: The starting timestamp (https://tools.ietf.org/html/rfc3339), inclusive, to use for reading change streams. For example, `2022-05-05T07:59:59Z`. Defaults to the timestamp of the pipeline start time. +* **bigtableChangeStreamIgnoreColumnFamilies**: A comma-separated list of column family name changes to ignore. Defaults to empty. +* **bigtableChangeStreamIgnoreColumns**: A comma-separated list of column name changes to ignore. Example: "cf1:col1,cf2:col2". Defaults to empty. +* **bigtableChangeStreamName**: A unique name for the client pipeline. Lets you resume processing from the point at which a previously running pipeline stopped. Defaults to an automatically generated name. See the Dataflow job logs for the value used. +* **bigtableChangeStreamResume**: When set to `true`, a new pipeline resumes processing from the point at which a previously running pipeline with the same `bigtableChangeStreamName` value stopped. If the pipeline with the given `bigtableChangeStreamName` value has never run, a new pipeline doesn't start. When set to `false`, a new pipeline starts. If a pipeline with the same `bigtableChangeStreamName` value has already run for the given source, a new pipeline doesn't start. Defaults to `false`. +* **bigtableReadChangeStreamTimeoutMs**: The timeout for Bigtable ReadChangeStream requests in milliseconds. +* **bigtableReadProjectId**: The Bigtable project ID. The default is the project for the Dataflow job. +* **bigtableReadAppProfile**: Bigtable App Profile to use for reads. The default for this parameter is the Bigtable instance's default app profile. +* **bigtableRpcAttemptTimeoutMs**: The timeout for each Bigtable RPC attempt in milliseconds. +* **bigtableRpcTimeoutMs**: The total timeout for a Bigtable RPC operation in milliseconds. +* **bigtableAdditionalRetryCodes**: The additional retry codes. For example, `RESOURCE_EXHAUSTED,DEADLINE_EXCEEDED`. +* **bigtableWriteAppProfile**: The ID of the Bigtable application profile to use for the export. If you do not specify an app profile, Bigtable uses the default app profile (https://cloud.google.com/bigtable/docs/app-profiles#default-app-profile) of the instance. +* **bigtableWriteProjectId**: The ID of the Google Cloud project that contains the Bigtable instanceto write data to. +* **bigtableBulkWriteLatencyTargetMs**: The latency target of Bigtable in milliseconds for latency-based throttling. +* **bigtableBulkWriteMaxRowKeyCount**: The maximum number of row keys in a Bigtable batch write operation. +* **bigtableBulkWriteMaxRequestSizeBytes**: The maximum bytes to include per Bigtable batch write operation. +* **bigtableBulkWriteFlowControl**: When set to true, enables bulk write flow control which will useserver's signal to throttle the writes. Defaults to: false. + + + +## Getting Started + +### Requirements + +* Java 17 +* Maven +* [gcloud CLI](https://cloud.google.com/sdk/gcloud), and execution of the + following commands: + * `gcloud auth login` + * `gcloud auth application-default login` + +:star2: Those dependencies are pre-installed if you use Google Cloud Shell! + +[![Open in Cloud Shell](http://gstatic.com/cloudssh/images/open-btn.svg)](https://console.cloud.google.com/cloudshell/editor?cloudshell_git_repo=https%3A%2F%2Fgithub.com%2FGoogleCloudPlatform%2FDataflowTemplates.git&cloudshell_open_in_editor=v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/bigtablechangestreamstobigtable/BigtableChangeStreamsToBigtable.java) + +### Templates Plugin + +This README provides instructions using +the [Templates Plugin](https://github.com/GoogleCloudPlatform/DataflowTemplates/blob/main/contributor-docs/code-contributions.md#templates-plugin). + +#### Validating the Template + +This template has a validation command that is used to check code quality. + +```shell +mvn clean install -PtemplatesValidate \ +-DskipTests -am \ +-pl v2/googlecloud-to-googlecloud +``` + +### Building Template + +This template is a Flex Template, meaning that the pipeline code will be +containerized and the container will be executed on Dataflow. Please +check [Use Flex Templates](https://cloud.google.com/dataflow/docs/guides/templates/using-flex-templates) +and [Configure Flex Templates](https://cloud.google.com/dataflow/docs/guides/templates/configuring-flex-templates) +for more information. + +#### Staging the Template + +If the plan is to just stage the template (i.e., make it available to use) by +the `gcloud` command or Dataflow "Create job from template" UI, +the `-PtemplatesStage` profile should be used: + +```shell +export PROJECT= +export BUCKET_NAME= +export ARTIFACT_REGISTRY_REPO=-docker.pkg.dev/$PROJECT/ + +mvn clean package -PtemplatesStage \ +-DskipTests \ +-DprojectId="$PROJECT" \ +-DbucketName="$BUCKET_NAME" \ +-DartifactRegistry="$ARTIFACT_REGISTRY_REPO" \ +-DstagePrefix="templates" \ +-DtemplateName="Bigtable_Change_Streams_to_Bigtable" \ +-pl v2/googlecloud-to-googlecloud -am +``` + +The `-DartifactRegistry` parameter can be specified to set the artifact registry repository of the Flex Templates image. +If not provided, it defaults to `gcr.io/`. + +The command should build and save the template to Google Cloud, and then print +the complete location on Cloud Storage: + +``` +Flex Template was staged! gs:///templates/flex/Bigtable_Change_Streams_to_Bigtable +``` + +The specific path should be copied as it will be used in the following steps. + +#### Running the Template + +**Using the staged template**: + +You can use the path above run the template (or share with others for execution). + +To start a job with the template at any time using `gcloud`, you are going to +need valid resources for the required parameters. + +Provided that, the following command line can be used: + +```shell +export PROJECT= +export BUCKET_NAME= +export REGION=us-central1 +export TEMPLATE_SPEC_GCSPATH="gs://$BUCKET_NAME/templates/flex/Bigtable_Change_Streams_to_Bigtable" + +### Required +export BIGTABLE_CHANGE_STREAM_APP_PROFILE= +export BIGTABLE_READ_INSTANCE_ID= +export BIGTABLE_READ_TABLE_ID= +export BIGTABLE_WRITE_INSTANCE_ID= +export BIGTABLE_WRITE_TABLE_ID= +export BIGTABLE_WRITE_COLUMN_FAMILY= + +### Optional +export BIDIRECTIONAL_REPLICATION_ENABLED=false +export CBT_QUALIFIER=BIDIRECTIONAL_REPL_SOURCE_CBT +export CBT_FILTER_QUALIFIER=BIDIRECTIONAL_REPL_SOURCE_CBT +export DRY_RUN_ENABLED=false +export FILTER_GCMUTATIONS=false +export ADD_REDISTRIBUTE=false +export BIGTABLE_CHANGE_STREAM_METADATA_INSTANCE_ID="" +export BIGTABLE_CHANGE_STREAM_METADATA_TABLE_TABLE_ID="" +export BIGTABLE_CHANGE_STREAM_CHARSET=UTF-8 +export BIGTABLE_CHANGE_STREAM_START_TIMESTAMP="" +export BIGTABLE_CHANGE_STREAM_IGNORE_COLUMN_FAMILIES="" +export BIGTABLE_CHANGE_STREAM_IGNORE_COLUMNS="" +export BIGTABLE_CHANGE_STREAM_NAME= +export BIGTABLE_CHANGE_STREAM_RESUME=false +export BIGTABLE_READ_CHANGE_STREAM_TIMEOUT_MS= +export BIGTABLE_READ_PROJECT_ID="" +export BIGTABLE_READ_APP_PROFILE=default +export BIGTABLE_RPC_ATTEMPT_TIMEOUT_MS= +export BIGTABLE_RPC_TIMEOUT_MS= +export BIGTABLE_ADDITIONAL_RETRY_CODES= +export BIGTABLE_WRITE_APP_PROFILE=default +export BIGTABLE_WRITE_PROJECT_ID= +export BIGTABLE_BULK_WRITE_LATENCY_TARGET_MS= +export BIGTABLE_BULK_WRITE_MAX_ROW_KEY_COUNT= +export BIGTABLE_BULK_WRITE_MAX_REQUEST_SIZE_BYTES= +export BIGTABLE_BULK_WRITE_FLOW_CONTROL=false + +gcloud dataflow flex-template run "bigtable-change-streams-to-bigtable-job" \ + --project "$PROJECT" \ + --region "$REGION" \ + --template-file-gcs-location "$TEMPLATE_SPEC_GCSPATH" \ + --parameters "bidirectionalReplicationEnabled=$BIDIRECTIONAL_REPLICATION_ENABLED" \ + --parameters "cbtQualifier=$CBT_QUALIFIER" \ + --parameters "cbtFilterQualifier=$CBT_FILTER_QUALIFIER" \ + --parameters "dryRunEnabled=$DRY_RUN_ENABLED" \ + --parameters "filterGCMutations=$FILTER_GCMUTATIONS" \ + --parameters "addRedistribute=$ADD_REDISTRIBUTE" \ + --parameters "bigtableChangeStreamMetadataInstanceId=$BIGTABLE_CHANGE_STREAM_METADATA_INSTANCE_ID" \ + --parameters "bigtableChangeStreamMetadataTableTableId=$BIGTABLE_CHANGE_STREAM_METADATA_TABLE_TABLE_ID" \ + --parameters "bigtableChangeStreamAppProfile=$BIGTABLE_CHANGE_STREAM_APP_PROFILE" \ + --parameters "bigtableChangeStreamCharset=$BIGTABLE_CHANGE_STREAM_CHARSET" \ + --parameters "bigtableChangeStreamStartTimestamp=$BIGTABLE_CHANGE_STREAM_START_TIMESTAMP" \ + --parameters "bigtableChangeStreamIgnoreColumnFamilies=$BIGTABLE_CHANGE_STREAM_IGNORE_COLUMN_FAMILIES" \ + --parameters "bigtableChangeStreamIgnoreColumns=$BIGTABLE_CHANGE_STREAM_IGNORE_COLUMNS" \ + --parameters "bigtableChangeStreamName=$BIGTABLE_CHANGE_STREAM_NAME" \ + --parameters "bigtableChangeStreamResume=$BIGTABLE_CHANGE_STREAM_RESUME" \ + --parameters "bigtableReadChangeStreamTimeoutMs=$BIGTABLE_READ_CHANGE_STREAM_TIMEOUT_MS" \ + --parameters "bigtableReadInstanceId=$BIGTABLE_READ_INSTANCE_ID" \ + --parameters "bigtableReadTableId=$BIGTABLE_READ_TABLE_ID" \ + --parameters "bigtableReadProjectId=$BIGTABLE_READ_PROJECT_ID" \ + --parameters "bigtableReadAppProfile=$BIGTABLE_READ_APP_PROFILE" \ + --parameters "bigtableRpcAttemptTimeoutMs=$BIGTABLE_RPC_ATTEMPT_TIMEOUT_MS" \ + --parameters "bigtableRpcTimeoutMs=$BIGTABLE_RPC_TIMEOUT_MS" \ + --parameters "bigtableAdditionalRetryCodes=$BIGTABLE_ADDITIONAL_RETRY_CODES" \ + --parameters "bigtableWriteInstanceId=$BIGTABLE_WRITE_INSTANCE_ID" \ + --parameters "bigtableWriteTableId=$BIGTABLE_WRITE_TABLE_ID" \ + --parameters "bigtableWriteColumnFamily=$BIGTABLE_WRITE_COLUMN_FAMILY" \ + --parameters "bigtableWriteAppProfile=$BIGTABLE_WRITE_APP_PROFILE" \ + --parameters "bigtableWriteProjectId=$BIGTABLE_WRITE_PROJECT_ID" \ + --parameters "bigtableBulkWriteLatencyTargetMs=$BIGTABLE_BULK_WRITE_LATENCY_TARGET_MS" \ + --parameters "bigtableBulkWriteMaxRowKeyCount=$BIGTABLE_BULK_WRITE_MAX_ROW_KEY_COUNT" \ + --parameters "bigtableBulkWriteMaxRequestSizeBytes=$BIGTABLE_BULK_WRITE_MAX_REQUEST_SIZE_BYTES" \ + --parameters "bigtableBulkWriteFlowControl=$BIGTABLE_BULK_WRITE_FLOW_CONTROL" +``` + +For more information about the command, please check: +https://cloud.google.com/sdk/gcloud/reference/dataflow/flex-template/run + + +**Using the plugin**: + +Instead of just generating the template in the folder, it is possible to stage +and run the template in a single command. This may be useful for testing when +changing the templates. + +```shell +export PROJECT= +export BUCKET_NAME= +export REGION=us-central1 + +### Required +export BIGTABLE_CHANGE_STREAM_APP_PROFILE= +export BIGTABLE_READ_INSTANCE_ID= +export BIGTABLE_READ_TABLE_ID= +export BIGTABLE_WRITE_INSTANCE_ID= +export BIGTABLE_WRITE_TABLE_ID= +export BIGTABLE_WRITE_COLUMN_FAMILY= + +### Optional +export BIDIRECTIONAL_REPLICATION_ENABLED=false +export CBT_QUALIFIER=BIDIRECTIONAL_REPL_SOURCE_CBT +export CBT_FILTER_QUALIFIER=BIDIRECTIONAL_REPL_SOURCE_CBT +export DRY_RUN_ENABLED=false +export FILTER_GCMUTATIONS=false +export ADD_REDISTRIBUTE=false +export BIGTABLE_CHANGE_STREAM_METADATA_INSTANCE_ID="" +export BIGTABLE_CHANGE_STREAM_METADATA_TABLE_TABLE_ID="" +export BIGTABLE_CHANGE_STREAM_CHARSET=UTF-8 +export BIGTABLE_CHANGE_STREAM_START_TIMESTAMP="" +export BIGTABLE_CHANGE_STREAM_IGNORE_COLUMN_FAMILIES="" +export BIGTABLE_CHANGE_STREAM_IGNORE_COLUMNS="" +export BIGTABLE_CHANGE_STREAM_NAME= +export BIGTABLE_CHANGE_STREAM_RESUME=false +export BIGTABLE_READ_CHANGE_STREAM_TIMEOUT_MS= +export BIGTABLE_READ_PROJECT_ID="" +export BIGTABLE_READ_APP_PROFILE=default +export BIGTABLE_RPC_ATTEMPT_TIMEOUT_MS= +export BIGTABLE_RPC_TIMEOUT_MS= +export BIGTABLE_ADDITIONAL_RETRY_CODES= +export BIGTABLE_WRITE_APP_PROFILE=default +export BIGTABLE_WRITE_PROJECT_ID= +export BIGTABLE_BULK_WRITE_LATENCY_TARGET_MS= +export BIGTABLE_BULK_WRITE_MAX_ROW_KEY_COUNT= +export BIGTABLE_BULK_WRITE_MAX_REQUEST_SIZE_BYTES= +export BIGTABLE_BULK_WRITE_FLOW_CONTROL=false + +mvn clean package -PtemplatesRun \ +-DskipTests \ +-DprojectId="$PROJECT" \ +-DbucketName="$BUCKET_NAME" \ +-Dregion="$REGION" \ +-DjobName="bigtable-change-streams-to-bigtable-job" \ +-DtemplateName="Bigtable_Change_Streams_to_Bigtable" \ +-Dparameters="bidirectionalReplicationEnabled=$BIDIRECTIONAL_REPLICATION_ENABLED,cbtQualifier=$CBT_QUALIFIER,cbtFilterQualifier=$CBT_FILTER_QUALIFIER,dryRunEnabled=$DRY_RUN_ENABLED,filterGCMutations=$FILTER_GCMUTATIONS,addRedistribute=$ADD_REDISTRIBUTE,bigtableChangeStreamMetadataInstanceId=$BIGTABLE_CHANGE_STREAM_METADATA_INSTANCE_ID,bigtableChangeStreamMetadataTableTableId=$BIGTABLE_CHANGE_STREAM_METADATA_TABLE_TABLE_ID,bigtableChangeStreamAppProfile=$BIGTABLE_CHANGE_STREAM_APP_PROFILE,bigtableChangeStreamCharset=$BIGTABLE_CHANGE_STREAM_CHARSET,bigtableChangeStreamStartTimestamp=$BIGTABLE_CHANGE_STREAM_START_TIMESTAMP,bigtableChangeStreamIgnoreColumnFamilies=$BIGTABLE_CHANGE_STREAM_IGNORE_COLUMN_FAMILIES,bigtableChangeStreamIgnoreColumns=$BIGTABLE_CHANGE_STREAM_IGNORE_COLUMNS,bigtableChangeStreamName=$BIGTABLE_CHANGE_STREAM_NAME,bigtableChangeStreamResume=$BIGTABLE_CHANGE_STREAM_RESUME,bigtableReadChangeStreamTimeoutMs=$BIGTABLE_READ_CHANGE_STREAM_TIMEOUT_MS,bigtableReadInstanceId=$BIGTABLE_READ_INSTANCE_ID,bigtableReadTableId=$BIGTABLE_READ_TABLE_ID,bigtableReadProjectId=$BIGTABLE_READ_PROJECT_ID,bigtableReadAppProfile=$BIGTABLE_READ_APP_PROFILE,bigtableRpcAttemptTimeoutMs=$BIGTABLE_RPC_ATTEMPT_TIMEOUT_MS,bigtableRpcTimeoutMs=$BIGTABLE_RPC_TIMEOUT_MS,bigtableAdditionalRetryCodes=$BIGTABLE_ADDITIONAL_RETRY_CODES,bigtableWriteInstanceId=$BIGTABLE_WRITE_INSTANCE_ID,bigtableWriteTableId=$BIGTABLE_WRITE_TABLE_ID,bigtableWriteColumnFamily=$BIGTABLE_WRITE_COLUMN_FAMILY,bigtableWriteAppProfile=$BIGTABLE_WRITE_APP_PROFILE,bigtableWriteProjectId=$BIGTABLE_WRITE_PROJECT_ID,bigtableBulkWriteLatencyTargetMs=$BIGTABLE_BULK_WRITE_LATENCY_TARGET_MS,bigtableBulkWriteMaxRowKeyCount=$BIGTABLE_BULK_WRITE_MAX_ROW_KEY_COUNT,bigtableBulkWriteMaxRequestSizeBytes=$BIGTABLE_BULK_WRITE_MAX_REQUEST_SIZE_BYTES,bigtableBulkWriteFlowControl=$BIGTABLE_BULK_WRITE_FLOW_CONTROL" \ +-f v2/googlecloud-to-googlecloud +``` + +## Terraform + +Dataflow supports the utilization of Terraform to manage template jobs, +see [dataflow_flex_template_job](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/dataflow_flex_template_job). + +Terraform modules have been generated for most templates in this repository. This includes the relevant parameters +specific to the template. If available, they may be used instead of +[dataflow_flex_template_job](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/dataflow_flex_template_job) +directly. + +To use the autogenerated module, execute the standard +[terraform workflow](https://developer.hashicorp.com/terraform/intro/core-workflow): + +```shell +cd v2/googlecloud-to-googlecloud/terraform/Bigtable_Change_Streams_to_Bigtable +terraform init +terraform apply +``` + +To use +[dataflow_flex_template_job](https://registry.terraform.io/providers/hashicorp/google/latest/docs/resources/dataflow_flex_template_job) +directly: + +```terraform +provider "google-beta" { + project = var.project +} +variable "project" { + default = "" +} +variable "region" { + default = "us-central1" +} + +resource "google_dataflow_flex_template_job" "bigtable_change_streams_to_bigtable" { + + provider = google-beta + container_spec_gcs_path = "gs://dataflow-templates-${var.region}/latest/flex/Bigtable_Change_Streams_to_Bigtable" + name = "bigtable-change-streams-to-bigtable" + region = var.region + parameters = { + bigtableChangeStreamAppProfile = "" + bigtableReadInstanceId = "" + bigtableReadTableId = "" + bigtableWriteInstanceId = "" + bigtableWriteTableId = "" + bigtableWriteColumnFamily = "" + # bidirectionalReplicationEnabled = "false" + # cbtQualifier = "BIDIRECTIONAL_REPL_SOURCE_CBT" + # cbtFilterQualifier = "BIDIRECTIONAL_REPL_SOURCE_CBT" + # dryRunEnabled = "false" + # filterGCMutations = "false" + # addRedistribute = "false" + # bigtableChangeStreamMetadataInstanceId = "" + # bigtableChangeStreamMetadataTableTableId = "" + # bigtableChangeStreamCharset = "UTF-8" + # bigtableChangeStreamStartTimestamp = "" + # bigtableChangeStreamIgnoreColumnFamilies = "" + # bigtableChangeStreamIgnoreColumns = "" + # bigtableChangeStreamName = "" + # bigtableChangeStreamResume = "false" + # bigtableReadChangeStreamTimeoutMs = "" + # bigtableReadProjectId = "" + # bigtableReadAppProfile = "default" + # bigtableRpcAttemptTimeoutMs = "" + # bigtableRpcTimeoutMs = "" + # bigtableAdditionalRetryCodes = "" + # bigtableWriteAppProfile = "default" + # bigtableWriteProjectId = "" + # bigtableBulkWriteLatencyTargetMs = "" + # bigtableBulkWriteMaxRowKeyCount = "" + # bigtableBulkWriteMaxRequestSizeBytes = "" + # bigtableBulkWriteFlowControl = "false" + } +} +``` diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/BigtableChangeStreamsToBigtableOptions.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/BigtableChangeStreamsToBigtableOptions.java new file mode 100644 index 0000000000..08a7dd80f0 --- /dev/null +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/options/BigtableChangeStreamsToBigtableOptions.java @@ -0,0 +1,93 @@ +/* + * Copyright (C) 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.options; + +import com.google.cloud.teleport.metadata.TemplateParameter; +import com.google.cloud.teleport.v2.bigtable.options.BigtableCommonOptions; +import org.apache.beam.runners.dataflow.options.DataflowPipelineOptions; +import org.apache.beam.sdk.options.Default; + +/** + * The {@link BigtableChangeStreamsToBigtableOptions} class provides the custom execution options + * passed by the executor at the command-line. + */ +public interface BigtableChangeStreamsToBigtableOptions + extends DataflowPipelineOptions, + BigtableCommonOptions.ReadChangeStreamOptions, + BigtableCommonOptions.WriteOptions { + + @TemplateParameter.Boolean( + order = 1, + optional = true, + description = "Bidirectional replication", + helpText = + "Whether bidirectional replication between bigtable instances is enabled, adds additional logic to filter out replicated mutations") + @Default.Boolean(false) + Boolean getBidirectionalReplicationEnabled(); + + void setBidirectionalReplicationEnabled(Boolean bidirectionalReplicationEnabled); + + @TemplateParameter.Text( + order = 2, + optional = true, + description = "Source CBT qualifier", + helpText = "Bidirectional replication source CBT qualifier to append") + @Default.String("BIDIRECTIONAL_REPL_SOURCE_CBT") + String getCbtQualifier(); + + void setCbtQualifier(String cbtQualifier); + + @TemplateParameter.Text( + order = 3, + optional = true, + description = "Filter CBT qualifier", + helpText = "Bidirectional replication filter CBT qualifier to check and ignore") + @Default.String("BIDIRECTIONAL_REPL_SOURCE_CBT") + String getCbtFilterQualifier(); + + void setCbtFilterQualifier(String cbtFilterQualifier); + + @TemplateParameter.Boolean( + order = 4, + optional = true, + description = "Dry run", + helpText = "When dry run is enabled, pipeline will not write to Bigtable") + @Default.Boolean(false) + Boolean getDryRunEnabled(); + + void setDryRunEnabled(Boolean dryRunEnabled); + + @TemplateParameter.Boolean( + order = 5, + optional = true, + description = "Filter GC mutations", + helpText = "Filters out garbage collection Delete mutations from CBT") + @Default.Boolean(false) + Boolean getFilterGCMutations(); + + void setFilterGCMutations(Boolean filterGCMutations); + + @TemplateParameter.Boolean( + order = 6, + optional = true, + description = "Redistribute change stream mutations by row key", + helpText = + "When set to true, redistributes change stream mutations by their row key to balance load across workers.") + @Default.Boolean(false) + Boolean getAddRedistribute(); + + void setAddRedistribute(Boolean addRedistribute); +} diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/bigtablechangestreamstobigtable/BigtableChangeStreamsToBigtable.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/bigtablechangestreamstobigtable/BigtableChangeStreamsToBigtable.java new file mode 100644 index 0000000000..274146bea4 --- /dev/null +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/bigtablechangestreamstobigtable/BigtableChangeStreamsToBigtable.java @@ -0,0 +1,368 @@ +/* + * Copyright (C) 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigtable; + +import static com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation.MutationType.GARBAGE_COLLECTION; + +import com.google.bigtable.v2.Mutation; +import com.google.bigtable.v2.TimestampRange; +import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation; +import com.google.cloud.bigtable.data.v2.models.Entry; +import com.google.cloud.teleport.metadata.Template; +import com.google.cloud.teleport.metadata.TemplateCategory; +import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger; +import com.google.cloud.teleport.v2.options.BigtableChangeStreamsToBigtableOptions; +import com.google.protobuf.ByteString; +import java.util.ArrayList; +import java.util.List; +import org.apache.beam.sdk.Pipeline; +import org.apache.beam.sdk.PipelineResult; +import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO; +import org.apache.beam.sdk.metrics.Metrics; +import org.apache.beam.sdk.options.PipelineOptionsFactory; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.transforms.Redistribute; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.joda.time.Duration; +import org.joda.time.Instant; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Bigtable change stream pipeline to replicate changes to another Cloud Bigtable instance. Pipeline + * reads from a Bigtable change stream, converts change stream mutations to native protobuf + * mutations, and writes them directly to Cloud Bigtable. + */ +@Template( + name = "Bigtable_Change_Streams_to_Bigtable", + category = TemplateCategory.STREAMING, + displayName = "Bigtable Change Streams to Bigtable Replicator", + description = + "A streaming pipeline that replicates Bigtable change stream mutations to another Bigtable instance", + optionsClass = BigtableChangeStreamsToBigtableOptions.class, + flexContainerName = "bigtable-changestreams-to-bigtable", + contactInformation = "https://cloud.google.com/support", + streaming = true, + supportsAtLeastOnce = true) +public class BigtableChangeStreamsToBigtable { + + private static final Logger LOG = LoggerFactory.getLogger(BigtableChangeStreamsToBigtable.class); + private static final String USE_RUNNER_V2_EXPERIMENT = "use_runner_v2"; + + private static void setOptions(BigtableChangeStreamsToBigtableOptions options) { + options.setStreaming(true); + options.setEnableStreamingEngine(true); + + // Add use_runner_v2 to the experiments option, since change streams connector is only supported + // on Dataflow runner v2. + List experiments = options.getExperiments(); + if (experiments == null) { + experiments = new ArrayList<>(); + } + boolean hasUseRunnerV2 = false; + for (String experiment : experiments) { + if (experiment.equalsIgnoreCase(USE_RUNNER_V2_EXPERIMENT)) { + hasUseRunnerV2 = true; + break; + } + } + if (!hasUseRunnerV2) { + experiments.add(USE_RUNNER_V2_EXPERIMENT); + } + options.setExperiments(experiments); + } + + /** + * Creates and runs bigtable to bigtable pipeline. + * + * @param pipelineOptions options for reading and writing to bigtable + * @return PipelineResult + */ + public static PipelineResult run(BigtableChangeStreamsToBigtableOptions pipelineOptions) { + setOptions(pipelineOptions); + + Pipeline pipeline = Pipeline.create(pipelineOptions); + + Instant startTime = + pipelineOptions.getBigtableChangeStreamStartTimestamp().isEmpty() + ? Instant.now() + : Instant.parse(pipelineOptions.getBigtableChangeStreamStartTimestamp()); + + LOG.info("BigtableChangeStreamsToBigtable pipeline starting from {}", startTime); + + String readProjectId = pipelineOptions.getBigtableReadProjectId(); + if (readProjectId == null || readProjectId.isEmpty()) { + readProjectId = + pipelineOptions + .as(org.apache.beam.sdk.extensions.gcp.options.GcpOptions.class) + .getProject(); + } + + BigtableIO.ReadChangeStream readChangeStream = + BigtableIO.readChangeStream() + .withProjectId(readProjectId) + .withInstanceId(pipelineOptions.getBigtableReadInstanceId()) + .withTableId(pipelineOptions.getBigtableReadTableId()) + .withAppProfileId(pipelineOptions.getBigtableChangeStreamAppProfile()) + .withStartTime(startTime); + + if (pipelineOptions.getBigtableChangeStreamName() != null + && !pipelineOptions.getBigtableChangeStreamName().isEmpty()) { + readChangeStream = + readChangeStream + .withChangeStreamName(pipelineOptions.getBigtableChangeStreamName()) + .withExistingPipelineOptions( + pipelineOptions.getBigtableChangeStreamResume() != null + && pipelineOptions.getBigtableChangeStreamResume() + ? BigtableIO.ExistingPipelineOptions.RESUME_OR_FAIL + : BigtableIO.ExistingPipelineOptions.FAIL_IF_EXISTS); + } + + if (pipelineOptions.getBigtableChangeStreamMetadataInstanceId() != null + && !pipelineOptions.getBigtableChangeStreamMetadataInstanceId().isEmpty()) { + readChangeStream = + readChangeStream.withMetadataTableInstanceId( + pipelineOptions.getBigtableChangeStreamMetadataInstanceId()); + } + + if (pipelineOptions.getBigtableChangeStreamMetadataTableTableId() != null + && !pipelineOptions.getBigtableChangeStreamMetadataTableTableId().isEmpty()) { + readChangeStream = + readChangeStream.withMetadataTableTableId( + pipelineOptions.getBigtableChangeStreamMetadataTableTableId()); + } + + if (pipelineOptions.getBigtableReadChangeStreamTimeoutMs() != null) { + readChangeStream = + readChangeStream.withReadChangeStreamTimeout( + Duration.millis(pipelineOptions.getBigtableReadChangeStreamTimeoutMs())); + } + + PCollection> changeStream = + pipeline.apply("Read Change Stream", readChangeStream); + + if (pipelineOptions.getAddRedistribute()) { + changeStream = changeStream.apply("Redistribute Change Stream", Redistribute.byKey()); + } + + changeStream + .apply( + "Convert CDC mutation to Bigtable protobuf mutation", + ParDo.of( + new ConvertChangeStreamToNativeMutationFn( + pipelineOptions.getBidirectionalReplicationEnabled(), + pipelineOptions.getCbtQualifier(), + pipelineOptions.getCbtFilterQualifier(), + pipelineOptions.getFilterGCMutations()))) + .apply("Write row mutations to Bigtable", createWrite(pipelineOptions)); + + return pipeline.run(); + } + + static BigtableIO.Write createWrite(BigtableChangeStreamsToBigtableOptions options) { + BigtableIO.Write write = + BigtableIO.write() + .withInstanceId(options.getBigtableWriteInstanceId()) + .withTableId(options.getBigtableWriteTableId()); + + String projectId = options.getBigtableWriteProjectId(); + if (projectId == null || projectId.isEmpty()) { + projectId = + options.as(org.apache.beam.sdk.extensions.gcp.options.GcpOptions.class).getProject(); + } + if (projectId != null && !projectId.isEmpty()) { + write = write.withProjectId(projectId); + } + if (options.getBigtableWriteAppProfile() != null + && !options.getBigtableWriteAppProfile().isEmpty()) { + write = write.withAppProfileId(options.getBigtableWriteAppProfile()); + } + if (options.getBigtableBulkWriteMaxRowKeyCount() != null) { + write = + write.withMaxElementsPerBatch(options.getBigtableBulkWriteMaxRowKeyCount().longValue()); + } + if (options.getBigtableBulkWriteMaxRequestSizeBytes() != null) { + write = + write.withMaxBytesPerBatch(options.getBigtableBulkWriteMaxRequestSizeBytes().longValue()); + } + if (options.getBigtableBulkWriteFlowControl() != null) { + write = write.withFlowControl(options.getBigtableBulkWriteFlowControl()); + } + return write; + } + + public static void main(String[] args) { + UncaughtExceptionLogger.register(); + + LOG.info("Starting to replicate change records from Cloud Bigtable change streams to Bigtable"); + + BigtableChangeStreamsToBigtableOptions pipelineOptions = + PipelineOptionsFactory.fromArgs(args) + .withValidation() + .as(BigtableChangeStreamsToBigtableOptions.class); + + run(pipelineOptions); + } + + /** + * Converts Bigtable change stream mutations to native Bigtable v2 protobuf Mutation iterables. + */ + static class ConvertChangeStreamToNativeMutationFn + extends DoFn, KV>> { + + private final boolean bidirectionalReplicationEnabled; + private final String cbtQualifier; + private final String cbtFilterQualifier; + private final boolean filterGCMutations; + + ConvertChangeStreamToNativeMutationFn( + boolean bidirectionalReplicationEnabled, + String cbtQualifier, + String cbtFilterQualifier, + boolean filterGCMutations) { + this.bidirectionalReplicationEnabled = bidirectionalReplicationEnabled; + this.cbtQualifier = cbtQualifier; + this.cbtFilterQualifier = cbtFilterQualifier; + this.filterGCMutations = filterGCMutations; + } + + @ProcessElement + public void processElement( + @Element KV element, + OutputReceiver>> receiver) + throws Exception { + + ChangeStreamMutation mutation = element.getValue(); + + // Skip element if filter GC flag on and the mutation was of GC type. + if (filterGCMutations && mutation.getType().equals(GARBAGE_COLLECTION)) { + Metrics.counter(ConvertChangeStreamToNativeMutationFn.class, "gc_mutations_filtered").inc(); + return; + } + + // Skip element if it was replicated. + if (bidirectionalReplicationEnabled && isReplicated(mutation, cbtFilterQualifier)) { + return; + } + + List protoMutations = new ArrayList<>(); + String lastFamily = null; + + for (Entry entry : mutation.getEntries()) { + if (entry instanceof com.google.cloud.bigtable.data.v2.models.SetCell) { + com.google.cloud.bigtable.data.v2.models.SetCell setCell = + (com.google.cloud.bigtable.data.v2.models.SetCell) entry; + lastFamily = setCell.getFamilyName(); + + protoMutations.add( + Mutation.newBuilder() + .setSetCell( + Mutation.SetCell.newBuilder() + .setFamilyName(setCell.getFamilyName()) + .setColumnQualifier(setCell.getQualifier()) + .setTimestampMicros(setCell.getTimestamp()) + .setValue(setCell.getValue()) + .build()) + .build()); + } else if (entry instanceof com.google.cloud.bigtable.data.v2.models.DeleteCells) { + com.google.cloud.bigtable.data.v2.models.DeleteCells deleteCells = + (com.google.cloud.bigtable.data.v2.models.DeleteCells) entry; + lastFamily = deleteCells.getFamilyName(); + + Mutation.DeleteFromColumn.Builder delCol = + Mutation.DeleteFromColumn.newBuilder() + .setFamilyName(deleteCells.getFamilyName()) + .setColumnQualifier(deleteCells.getQualifier()); + + long start = deleteCells.getTimestampRange().getStart(); + long end = deleteCells.getTimestampRange().getEnd(); + + if (start > 0 || end > 0) { + delCol.setTimeRange( + TimestampRange.newBuilder() + .setStartTimestampMicros(start) + .setEndTimestampMicros(end > 0 ? end : Long.MAX_VALUE) + .build()); + } + + protoMutations.add(Mutation.newBuilder().setDeleteFromColumn(delCol).build()); + } else if (entry instanceof com.google.cloud.bigtable.data.v2.models.DeleteFamily) { + com.google.cloud.bigtable.data.v2.models.DeleteFamily deleteFamily = + (com.google.cloud.bigtable.data.v2.models.DeleteFamily) entry; + lastFamily = deleteFamily.getFamilyName(); + + protoMutations.add( + Mutation.newBuilder() + .setDeleteFromFamily( + Mutation.DeleteFromFamily.newBuilder() + .setFamilyName(deleteFamily.getFamilyName()) + .build()) + .build()); + } else { + LOG.warn("Unsupported entry type: {}", entry.getClass().getName()); + } + } + + if (protoMutations.isEmpty()) { + return; + } + + // Append origin information to mutations. + if (bidirectionalReplicationEnabled && lastFamily != null) { + protoMutations.add( + Mutation.newBuilder() + .setDeleteFromColumn( + Mutation.DeleteFromColumn.newBuilder() + .setFamilyName(lastFamily) + .setColumnQualifier(ByteString.copyFromUtf8(cbtQualifier)) + .setTimeRange( + TimestampRange.newBuilder() + .setStartTimestampMicros(0) + .setEndTimestampMicros(1) + .build()) + .build()) + .build()); + } + + receiver.output(KV.of(element.getKey(), protoMutations)); + } + + private boolean isReplicated(ChangeStreamMutation mutation, String filterQualifier) { + List mutationEntries = mutation.getEntries(); + + if (mutationEntries.isEmpty()) { + return false; + } + Entry lastEntry = mutationEntries.get(mutationEntries.size() - 1); + + if (lastEntry instanceof com.google.cloud.bigtable.data.v2.models.DeleteCells) { + if (((com.google.cloud.bigtable.data.v2.models.DeleteCells) lastEntry) + .getQualifier() + .equals(ByteString.copyFromUtf8(filterQualifier))) { + Metrics.counter( + ConvertChangeStreamToNativeMutationFn.class, "replicated_mutations_filtered") + .inc(); + return true; + } + } + Metrics.counter(ConvertChangeStreamToNativeMutationFn.class, "bigtable_mutations_replicated") + .inc(); + return false; + } + } +} diff --git a/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/bigtablechangestreamstobigtable/package-info.java b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/bigtablechangestreamstobigtable/package-info.java new file mode 100644 index 0000000000..1f125b28cd --- /dev/null +++ b/v2/googlecloud-to-googlecloud/src/main/java/com/google/cloud/teleport/v2/templates/bigtablechangestreamstobigtable/package-info.java @@ -0,0 +1,18 @@ +/* + * Copyright (C) 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +/** Template package for BigtableChangeStreamsToBigtable. */ +package com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigtable; diff --git a/v2/googlecloud-to-googlecloud/src/test/java/com/google/cloud/teleport/v2/templates/bigtablechangestreamstobigtable/BigtableChangeStreamsToBigtableIT.java b/v2/googlecloud-to-googlecloud/src/test/java/com/google/cloud/teleport/v2/templates/bigtablechangestreamstobigtable/BigtableChangeStreamsToBigtableIT.java new file mode 100644 index 0000000000..67923ab1f1 --- /dev/null +++ b/v2/googlecloud-to-googlecloud/src/test/java/com/google/cloud/teleport/v2/templates/bigtablechangestreamstobigtable/BigtableChangeStreamsToBigtableIT.java @@ -0,0 +1,167 @@ +/* + * Copyright (C) 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigtable; + +import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatPipeline; +import static org.apache.beam.it.truthmatchers.PipelineAsserts.assertThatResult; +import static org.apache.commons.lang3.RandomStringUtils.randomAlphanumeric; + +import com.google.cloud.bigtable.data.v2.models.Row; +import com.google.cloud.bigtable.data.v2.models.RowMutation; +import com.google.cloud.teleport.metadata.TemplateIntegrationTest; +import com.google.common.collect.Lists; +import java.io.IOException; +import java.time.Duration; +import java.util.List; +import java.util.UUID; +import java.util.function.Function; +import org.apache.beam.it.common.PipelineLauncher.LaunchConfig; +import org.apache.beam.it.common.PipelineLauncher.LaunchInfo; +import org.apache.beam.it.common.PipelineOperator; +import org.apache.beam.it.common.PipelineOperator.Config; +import org.apache.beam.it.common.PipelineOperator.Result; +import org.apache.beam.it.common.utils.ResourceManagerUtils; +import org.apache.beam.it.gcp.TemplateTestBase; +import org.apache.beam.it.gcp.bigtable.BigtableResourceManager; +import org.apache.beam.it.gcp.bigtable.BigtableResourceManagerUtils; +import org.apache.beam.it.gcp.bigtable.BigtableTableSpec; +import org.jetbrains.annotations.NotNull; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** Integration test for {@link BigtableChangeStreamsToBigtable}. */ +@Category(TemplateIntegrationTest.class) +@TemplateIntegrationTest(BigtableChangeStreamsToBigtable.class) +@RunWith(JUnit4.class) +public final class BigtableChangeStreamsToBigtableIT extends TemplateTestBase { + + private static final Logger LOG = + LoggerFactory.getLogger(BigtableChangeStreamsToBigtableIT.class); + + public static final String SOURCE_COLUMN_FAMILY = "cf"; + private static final Duration EXPECTED_REPLICATION_MAX_WAIT_TIME = Duration.ofMinutes(10); + private BigtableResourceManager bigtableResourceManager; + private LaunchInfo launchInfo; + + @Before + public void setup() throws IOException { + bigtableResourceManager = + BigtableResourceManager.builder(testName, PROJECT, credentialsProvider) + .maybeUseStaticInstance() + .build(); + } + + @After + public void tearDownClass() { + ResourceManagerUtils.cleanResources(bigtableResourceManager); + } + + @Test + public void testBigtableChangeStreamsToBigtableE2E() throws Exception { + String appProfileId = generateAppProfileId(); + + BigtableTableSpec cdcTableSpec = new BigtableTableSpec(); + cdcTableSpec.setCdcEnabled(true); + cdcTableSpec.setColumnFamilies(Lists.asList(SOURCE_COLUMN_FAMILY, new String[] {})); + + BigtableTableSpec dstTableSpec = new BigtableTableSpec(); + dstTableSpec.setColumnFamilies(Lists.asList(SOURCE_COLUMN_FAMILY, new String[] {})); + + String srcTable = BigtableResourceManagerUtils.generateTableId("src-mutation"); + String dstTable = BigtableResourceManagerUtils.generateTableId("dst-mutation"); + + bigtableResourceManager.createTable(srcTable, cdcTableSpec); + bigtableResourceManager.createTable(dstTable, dstTableSpec); + + bigtableResourceManager.createAppProfile( + appProfileId, true, bigtableResourceManager.getClusterNames()); + + Function paramsAdder = Function.identity(); + launchInfo = + launchTemplate( + paramsAdder.apply( + LaunchConfig.builder(testName, specPath) + .addParameter("bigtableReadProjectId", PROJECT) + .addParameter("bigtableReadTableId", srcTable) + .addParameter("bigtableReadInstanceId", bigtableResourceManager.getInstanceId()) + .addParameter("bigtableChangeStreamAppProfile", appProfileId) + .addParameter("bigtableWriteProjectId", PROJECT) + .addParameter("bigtableWriteTableId", dstTable) + .addParameter( + "bigtableWriteInstanceId", bigtableResourceManager.getInstanceId()) + .addParameter("bigtableWriteColumnFamily", SOURCE_COLUMN_FAMILY))); + + assertThatPipeline(launchInfo).isRunning(); + + String rowkey = UUID.randomUUID().toString(); + String column = UUID.randomUUID().toString(); + String value = UUID.randomUUID().toString(); + + RowMutation rowMutation = + RowMutation.create(srcTable, rowkey).setCell(SOURCE_COLUMN_FAMILY, column, value); + bigtableResourceManager.write(rowMutation); + + Config config = createConfig(launchInfo); + Result result = + pipelineOperator() + .waitForConditionAndCancel( + config, + () -> { + try { + List rows = bigtableResourceManager.readTable(dstTable); + for (Row row : rows) { + if (row.getKey().toStringUtf8().equals(rowkey)) { + return true; + } + } + return false; + } catch (Exception e) { + LOG.warn("Error reading destination table", e); + return false; + } + }); + + assertThatResult(result).meetsConditions(); + } + + @NotNull + private static String generateAppProfileId() { + return "cdc_app_profile_" + randomAlphanumeric(8).toLowerCase() + "_" + System.nanoTime(); + } + + @Override + protected PipelineOperator.Config createConfig(LaunchInfo info) { + Config.Builder configBuilder = + Config.builder().setJobId(info.jobId()).setProject(PROJECT).setRegion(REGION); + + if (System.getProperty("directRunnerTest") != null) { + configBuilder = + configBuilder + .setTimeoutAfter(EXPECTED_REPLICATION_MAX_WAIT_TIME.minus(Duration.ofMinutes(3))) + .setCheckAfter(Duration.ofSeconds(5)); + } else { + configBuilder.setTimeoutAfter(EXPECTED_REPLICATION_MAX_WAIT_TIME); + } + + return configBuilder.build(); + } +} diff --git a/v2/googlecloud-to-googlecloud/src/test/java/com/google/cloud/teleport/v2/templates/bigtablechangestreamstobigtable/BigtableChangeStreamsToBigtableTest.java b/v2/googlecloud-to-googlecloud/src/test/java/com/google/cloud/teleport/v2/templates/bigtablechangestreamstobigtable/BigtableChangeStreamsToBigtableTest.java new file mode 100644 index 0000000000..11b37cfa82 --- /dev/null +++ b/v2/googlecloud-to-googlecloud/src/test/java/com/google/cloud/teleport/v2/templates/bigtablechangestreamstobigtable/BigtableChangeStreamsToBigtableTest.java @@ -0,0 +1,255 @@ +/* + * Copyright (C) 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigtable; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.verifyNoInteractions; +import static org.mockito.Mockito.when; + +import com.google.bigtable.v2.Mutation; +import com.google.cloud.bigtable.data.v2.models.ChangeStreamMutation; +import com.google.cloud.bigtable.data.v2.models.DeleteCells; +import com.google.cloud.bigtable.data.v2.models.DeleteFamily; +import com.google.cloud.bigtable.data.v2.models.Entry; +import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange; +import com.google.cloud.bigtable.data.v2.models.SetCell; +import com.google.cloud.teleport.v2.options.BigtableChangeStreamsToBigtableOptions; +import com.google.cloud.teleport.v2.templates.bigtablechangestreamstobigtable.BigtableChangeStreamsToBigtable.ConvertChangeStreamToNativeMutationFn; +import com.google.protobuf.ByteString; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.values.KV; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; + +/** Unit tests for {@link ConvertChangeStreamToNativeMutationFn}. */ +@RunWith(JUnit4.class) +public class BigtableChangeStreamsToBigtableTest { + + private static final String ROW_KEY = "row1"; + private static final String CF = "cf"; + private static final String QUAL = "col1"; + private static final String VAL = "val1"; + + private static ChangeStreamMutation createMutation( + ChangeStreamMutation.MutationType type, List entries) { + ChangeStreamMutation mutation = mock(ChangeStreamMutation.class); + when(mutation.getType()).thenReturn(type); + when(mutation.getEntries()).thenReturn(com.google.common.collect.ImmutableList.copyOf(entries)); + return mutation; + } + + private static SetCell createSetCell(String cf, String qual, String val, long ts) { + SetCell setCell = mock(SetCell.class); + when(setCell.getFamilyName()).thenReturn(cf); + when(setCell.getQualifier()).thenReturn(ByteString.copyFromUtf8(qual)); + when(setCell.getValue()).thenReturn(ByteString.copyFromUtf8(val)); + when(setCell.getTimestamp()).thenReturn(ts); + return setCell; + } + + private static DeleteCells createDeleteCells(String cf, String qual, long startTs, long endTs) { + DeleteCells deleteCells = mock(DeleteCells.class); + when(deleteCells.getFamilyName()).thenReturn(cf); + when(deleteCells.getQualifier()).thenReturn(ByteString.copyFromUtf8(qual)); + + TimestampRange range = TimestampRange.create(startTs, endTs); + when(deleteCells.getTimestampRange()).thenReturn(range); + return deleteCells; + } + + private static DeleteFamily createDeleteFamily(String cf) { + DeleteFamily deleteFamily = mock(DeleteFamily.class); + when(deleteFamily.getFamilyName()).thenReturn(cf); + return deleteFamily; + } + + @Test + public void testStandardMutationConversion() throws Exception { + ConvertChangeStreamToNativeMutationFn fn = + new ConvertChangeStreamToNativeMutationFn(false, "SOURCE_CBT", "SOURCE_CBT", false); + + SetCell setCell = createSetCell(CF, QUAL, VAL, 1000000); + ChangeStreamMutation mutation = + createMutation(ChangeStreamMutation.MutationType.USER, List.of(setCell)); + + @SuppressWarnings("unchecked") + OutputReceiver>> receiver = mock(OutputReceiver.class); + + fn.processElement(KV.of(ByteString.copyFromUtf8(ROW_KEY), mutation), receiver); + + @SuppressWarnings("unchecked") + ArgumentCaptor>> captor = ArgumentCaptor.forClass(KV.class); + + verify(receiver).output(captor.capture()); + + KV> kv = captor.getValue(); + assertEquals(ROW_KEY, kv.getKey().toStringUtf8()); + + Iterator iter = kv.getValue().iterator(); + assertTrue(iter.hasNext()); + Mutation mut = iter.next(); + assertTrue(mut.hasSetCell()); + + Mutation.SetCell protoSetCell = mut.getSetCell(); + assertEquals(CF, protoSetCell.getFamilyName()); + assertEquals(QUAL, protoSetCell.getColumnQualifier().toStringUtf8()); + assertEquals(VAL, protoSetCell.getValue().toStringUtf8()); + assertEquals(1000000, protoSetCell.getTimestampMicros()); + } + + @Test + public void testBidirectionalReplicationAppendsTag() throws Exception { + ConvertChangeStreamToNativeMutationFn fn = + new ConvertChangeStreamToNativeMutationFn(true, "SOURCE_CBT", "SOURCE_CBT", false); + + SetCell setCell = createSetCell(CF, QUAL, VAL, 1000000); + ChangeStreamMutation mutation = + createMutation(ChangeStreamMutation.MutationType.USER, List.of(setCell)); + + @SuppressWarnings("unchecked") + OutputReceiver>> receiver = mock(OutputReceiver.class); + + fn.processElement(KV.of(ByteString.copyFromUtf8(ROW_KEY), mutation), receiver); + + @SuppressWarnings("unchecked") + ArgumentCaptor>> captor = ArgumentCaptor.forClass(KV.class); + + verify(receiver).output(captor.capture()); + + KV> kv = captor.getValue(); + + List muts = new ArrayList<>(); + kv.getValue().forEach(muts::add); + + // Should contain the original SetCell plus the hidden DeleteFromColumn source tag + assertEquals(2, muts.size()); + assertTrue(muts.get(0).hasSetCell()); + assertTrue(muts.get(1).hasDeleteFromColumn()); + + Mutation.DeleteFromColumn del = muts.get(1).getDeleteFromColumn(); + assertEquals("SOURCE_CBT", del.getColumnQualifier().toStringUtf8()); + } + + @Test + public void testBidirectionalReplicationFiltersIncomingTag() throws Exception { + ConvertChangeStreamToNativeMutationFn fn = + new ConvertChangeStreamToNativeMutationFn(true, "SOURCE_CBT", "SOURCE_CBT", false); + + SetCell setCell = createSetCell(CF, QUAL, VAL, 1000000); + DeleteCells deleteCells = createDeleteCells(CF, "SOURCE_CBT", 0, 1000000); + ChangeStreamMutation mutation = + createMutation(ChangeStreamMutation.MutationType.USER, List.of(setCell, deleteCells)); + + @SuppressWarnings("unchecked") + OutputReceiver>> receiver = mock(OutputReceiver.class); + + fn.processElement(KV.of(ByteString.copyFromUtf8(ROW_KEY), mutation), receiver); + + // Should be completely filtered out because the incoming mutation carries the replication tag + verifyNoInteractions(receiver); + } + + @Test + public void testDeleteCellsConversion() throws Exception { + ConvertChangeStreamToNativeMutationFn fn = + new ConvertChangeStreamToNativeMutationFn(false, "SOURCE_CBT", "SOURCE_CBT", false); + + DeleteCells deleteCells = createDeleteCells(CF, QUAL, 1000, 2000); + ChangeStreamMutation mutation = + createMutation(ChangeStreamMutation.MutationType.USER, List.of(deleteCells)); + + @SuppressWarnings("unchecked") + OutputReceiver>> receiver = mock(OutputReceiver.class); + + fn.processElement(KV.of(ByteString.copyFromUtf8(ROW_KEY), mutation), receiver); + + @SuppressWarnings("unchecked") + ArgumentCaptor>> captor = ArgumentCaptor.forClass(KV.class); + + verify(receiver).output(captor.capture()); + + KV> kv = captor.getValue(); + assertEquals(ROW_KEY, kv.getKey().toStringUtf8()); + + Iterator iter = kv.getValue().iterator(); + assertTrue(iter.hasNext()); + Mutation mut = iter.next(); + assertTrue(mut.hasDeleteFromColumn()); + + Mutation.DeleteFromColumn delCol = mut.getDeleteFromColumn(); + assertEquals(CF, delCol.getFamilyName()); + assertEquals(QUAL, delCol.getColumnQualifier().toStringUtf8()); + assertEquals(1000, delCol.getTimeRange().getStartTimestampMicros()); + assertEquals(2000, delCol.getTimeRange().getEndTimestampMicros()); + } + + @Test + public void testDeleteFamilyConversion() throws Exception { + ConvertChangeStreamToNativeMutationFn fn = + new ConvertChangeStreamToNativeMutationFn(false, "SOURCE_CBT", "SOURCE_CBT", false); + + DeleteFamily deleteFamily = createDeleteFamily(CF); + ChangeStreamMutation mutation = + createMutation(ChangeStreamMutation.MutationType.USER, List.of(deleteFamily)); + + @SuppressWarnings("unchecked") + OutputReceiver>> receiver = mock(OutputReceiver.class); + + fn.processElement(KV.of(ByteString.copyFromUtf8(ROW_KEY), mutation), receiver); + + @SuppressWarnings("unchecked") + ArgumentCaptor>> captor = ArgumentCaptor.forClass(KV.class); + + verify(receiver).output(captor.capture()); + + KV> kv = captor.getValue(); + assertEquals(ROW_KEY, kv.getKey().toStringUtf8()); + + Iterator iter = kv.getValue().iterator(); + assertTrue(iter.hasNext()); + Mutation mut = iter.next(); + assertTrue(mut.hasDeleteFromFamily()); + + Mutation.DeleteFromFamily delFam = mut.getDeleteFromFamily(); + assertEquals(CF, delFam.getFamilyName()); + } + + @Test + public void testCreateWrite() { + BigtableChangeStreamsToBigtableOptions options = + mock(BigtableChangeStreamsToBigtableOptions.class); + when(options.getBigtableWriteInstanceId()).thenReturn("inst1"); + when(options.getBigtableWriteTableId()).thenReturn("table1"); + when(options.getBigtableWriteProjectId()).thenReturn("proj1"); + when(options.getBigtableWriteAppProfile()).thenReturn("appProfile1"); + when(options.getBigtableBulkWriteMaxRowKeyCount()).thenReturn(100); + when(options.getBigtableBulkWriteMaxRequestSizeBytes()).thenReturn(1024); + when(options.getBigtableBulkWriteFlowControl()).thenReturn(true); + + BigtableIO.Write write = BigtableChangeStreamsToBigtable.createWrite(options); + assertNotNull(write); + } +} diff --git a/v2/googlecloud-to-googlecloud/terraform/Bigtable_Change_Streams_to_Bigtable/dataflow_job.tf b/v2/googlecloud-to-googlecloud/terraform/Bigtable_Change_Streams_to_Bigtable/dataflow_job.tf new file mode 100644 index 0000000000..ba10eb7bd3 --- /dev/null +++ b/v2/googlecloud-to-googlecloud/terraform/Bigtable_Change_Streams_to_Bigtable/dataflow_job.tf @@ -0,0 +1,51 @@ +provider "google-beta" { + project = var.project +} +variable "project" { + default = "" +} +variable "region" { + default = "us-central1" +} + +resource "google_dataflow_flex_template_job" "bigtable_change_streams_to_bigtable" { + + provider = google-beta + container_spec_gcs_path = "gs://dataflow-templates-${var.region}/latest/flex/Bigtable_Change_Streams_to_Bigtable" + name = "bigtable-change-streams-to-bigtable" + region = var.region + parameters = { + bigtableChangeStreamAppProfile = "" + bigtableReadInstanceId = "" + bigtableReadTableId = "" + bigtableWriteInstanceId = "" + bigtableWriteTableId = "" + bigtableWriteColumnFamily = "" + # bidirectionalReplicationEnabled = "false" + # cbtQualifier = "BIDIRECTIONAL_REPL_SOURCE_CBT" + # cbtFilterQualifier = "BIDIRECTIONAL_REPL_SOURCE_CBT" + # dryRunEnabled = "false" + # filterGCMutations = "false" + # addRedistribute = "false" + # bigtableChangeStreamMetadataInstanceId = "" + # bigtableChangeStreamMetadataTableTableId = "" + # bigtableChangeStreamCharset = "UTF-8" + # bigtableChangeStreamStartTimestamp = "" + # bigtableChangeStreamIgnoreColumnFamilies = "" + # bigtableChangeStreamIgnoreColumns = "" + # bigtableChangeStreamName = "" + # bigtableChangeStreamResume = "false" + # bigtableReadChangeStreamTimeoutMs = "" + # bigtableReadProjectId = "" + # bigtableReadAppProfile = "default" + # bigtableRpcAttemptTimeoutMs = "" + # bigtableRpcTimeoutMs = "" + # bigtableAdditionalRetryCodes = "" + # bigtableWriteAppProfile = "default" + # bigtableWriteProjectId = "" + # bigtableBulkWriteLatencyTargetMs = "" + # bigtableBulkWriteMaxRowKeyCount = "" + # bigtableBulkWriteMaxRequestSizeBytes = "" + # bigtableBulkWriteFlowControl = "false" + } +} diff --git a/v2/streaming-data-generator/README_Streaming_Data_Generator.md b/v2/streaming-data-generator/README_Streaming_Data_Generator.md index ca4eeaaf76..26f5b77426 100644 --- a/v2/streaming-data-generator/README_Streaming_Data_Generator.md +++ b/v2/streaming-data-generator/README_Streaming_Data_Generator.md @@ -51,6 +51,12 @@ on [Metadata Annotations](https://github.com/GoogleCloudPlatform/DataflowTemplat * **commitDeadlineSeconds**: Specifies the deadline in seconds for the Commit API call. * **bootstrapServer**: Kafka Bootstrap Server For example, `localhost:9092`. * **kafkaTopic**: Kafka topic to write to. For example, `topic`. +* **bigtableWriteInstanceId**: The ID of the Bigtable instance that contains the table. +* **bigtableWriteTableId**: The ID of the Bigtable table to write to. +* **bigtableWriteColumnFamily**: The name of the column family of the Bigtable table to write data into. +* **bigtableWriteAppProfile**: The ID of the Bigtable application profile to use for the export. Defaults to: default. +* **bigtableWriteProjectId**: The ID of the Google Cloud project that contains the Bigtable instance to write data to. +* **bigtableWriteRowkeyField**: The name of the field in the JSON to use as the Bigtable row key. @@ -176,6 +182,12 @@ export BATCH_SIZE_BYTES= export COMMIT_DEADLINE_SECONDS= export BOOTSTRAP_SERVER= export KAFKA_TOPIC= +export BIGTABLE_WRITE_INSTANCE_ID= +export BIGTABLE_WRITE_TABLE_ID= +export BIGTABLE_WRITE_COLUMN_FAMILY= +export BIGTABLE_WRITE_APP_PROFILE=default +export BIGTABLE_WRITE_PROJECT_ID= +export BIGTABLE_WRITE_ROWKEY_FIELD= gcloud dataflow flex-template run "streaming-data-generator-job" \ --project "$PROJECT" \ @@ -211,7 +223,13 @@ gcloud dataflow flex-template run "streaming-data-generator-job" \ --parameters "batchSizeBytes=$BATCH_SIZE_BYTES" \ --parameters "commitDeadlineSeconds=$COMMIT_DEADLINE_SECONDS" \ --parameters "bootstrapServer=$BOOTSTRAP_SERVER" \ - --parameters "kafkaTopic=$KAFKA_TOPIC" + --parameters "kafkaTopic=$KAFKA_TOPIC" \ + --parameters "bigtableWriteInstanceId=$BIGTABLE_WRITE_INSTANCE_ID" \ + --parameters "bigtableWriteTableId=$BIGTABLE_WRITE_TABLE_ID" \ + --parameters "bigtableWriteColumnFamily=$BIGTABLE_WRITE_COLUMN_FAMILY" \ + --parameters "bigtableWriteAppProfile=$BIGTABLE_WRITE_APP_PROFILE" \ + --parameters "bigtableWriteProjectId=$BIGTABLE_WRITE_PROJECT_ID" \ + --parameters "bigtableWriteRowkeyField=$BIGTABLE_WRITE_ROWKEY_FIELD" ``` For more information about the command, please check: @@ -263,6 +281,12 @@ export BATCH_SIZE_BYTES= export COMMIT_DEADLINE_SECONDS= export BOOTSTRAP_SERVER= export KAFKA_TOPIC= +export BIGTABLE_WRITE_INSTANCE_ID= +export BIGTABLE_WRITE_TABLE_ID= +export BIGTABLE_WRITE_COLUMN_FAMILY= +export BIGTABLE_WRITE_APP_PROFILE=default +export BIGTABLE_WRITE_PROJECT_ID= +export BIGTABLE_WRITE_ROWKEY_FIELD= mvn clean package -PtemplatesRun \ -DskipTests \ @@ -271,7 +295,7 @@ mvn clean package -PtemplatesRun \ -Dregion="$REGION" \ -DjobName="streaming-data-generator-job" \ -DtemplateName="Streaming_Data_Generator" \ --Dparameters="qps=$QPS,schemaTemplate=$SCHEMA_TEMPLATE,schemaLocation=$SCHEMA_LOCATION,topic=$TOPIC,messagesLimit=$MESSAGES_LIMIT,outputType=$OUTPUT_TYPE,avroSchemaLocation=$AVRO_SCHEMA_LOCATION,sinkType=$SINK_TYPE,outputTableSpec=$OUTPUT_TABLE_SPEC,writeDisposition=$WRITE_DISPOSITION,outputDeadletterTable=$OUTPUT_DEADLETTER_TABLE,windowDuration=$WINDOW_DURATION,outputDirectory=$OUTPUT_DIRECTORY,outputFilenamePrefix=$OUTPUT_FILENAME_PREFIX,numShards=$NUM_SHARDS,driverClassName=$DRIVER_CLASS_NAME,connectionUrl=$CONNECTION_URL,username=$USERNAME,password=$PASSWORD,connectionProperties=$CONNECTION_PROPERTIES,statement=$STATEMENT,projectId=$PROJECT_ID,spannerInstanceName=$SPANNER_INSTANCE_NAME,spannerDatabaseName=$SPANNER_DATABASE_NAME,spannerTableName=$SPANNER_TABLE_NAME,maxNumMutations=$MAX_NUM_MUTATIONS,maxNumRows=$MAX_NUM_ROWS,batchSizeBytes=$BATCH_SIZE_BYTES,commitDeadlineSeconds=$COMMIT_DEADLINE_SECONDS,bootstrapServer=$BOOTSTRAP_SERVER,kafkaTopic=$KAFKA_TOPIC" \ +-Dparameters="qps=$QPS,schemaTemplate=$SCHEMA_TEMPLATE,schemaLocation=$SCHEMA_LOCATION,topic=$TOPIC,messagesLimit=$MESSAGES_LIMIT,outputType=$OUTPUT_TYPE,avroSchemaLocation=$AVRO_SCHEMA_LOCATION,sinkType=$SINK_TYPE,outputTableSpec=$OUTPUT_TABLE_SPEC,writeDisposition=$WRITE_DISPOSITION,outputDeadletterTable=$OUTPUT_DEADLETTER_TABLE,windowDuration=$WINDOW_DURATION,outputDirectory=$OUTPUT_DIRECTORY,outputFilenamePrefix=$OUTPUT_FILENAME_PREFIX,numShards=$NUM_SHARDS,driverClassName=$DRIVER_CLASS_NAME,connectionUrl=$CONNECTION_URL,username=$USERNAME,password=$PASSWORD,connectionProperties=$CONNECTION_PROPERTIES,statement=$STATEMENT,projectId=$PROJECT_ID,spannerInstanceName=$SPANNER_INSTANCE_NAME,spannerDatabaseName=$SPANNER_DATABASE_NAME,spannerTableName=$SPANNER_TABLE_NAME,maxNumMutations=$MAX_NUM_MUTATIONS,maxNumRows=$MAX_NUM_ROWS,batchSizeBytes=$BATCH_SIZE_BYTES,commitDeadlineSeconds=$COMMIT_DEADLINE_SECONDS,bootstrapServer=$BOOTSTRAP_SERVER,kafkaTopic=$KAFKA_TOPIC,bigtableWriteInstanceId=$BIGTABLE_WRITE_INSTANCE_ID,bigtableWriteTableId=$BIGTABLE_WRITE_TABLE_ID,bigtableWriteColumnFamily=$BIGTABLE_WRITE_COLUMN_FAMILY,bigtableWriteAppProfile=$BIGTABLE_WRITE_APP_PROFILE,bigtableWriteProjectId=$BIGTABLE_WRITE_PROJECT_ID,bigtableWriteRowkeyField=$BIGTABLE_WRITE_ROWKEY_FIELD" \ -f v2/streaming-data-generator ``` @@ -347,6 +371,12 @@ resource "google_dataflow_flex_template_job" "streaming_data_generator" { # commitDeadlineSeconds = "" # bootstrapServer = "" # kafkaTopic = "" + # bigtableWriteInstanceId = "" + # bigtableWriteTableId = "" + # bigtableWriteColumnFamily = "" + # bigtableWriteAppProfile = "default" + # bigtableWriteProjectId = "" + # bigtableWriteRowkeyField = "" } } ``` diff --git a/v2/streaming-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/StreamingDataGenerator.java b/v2/streaming-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/StreamingDataGenerator.java index 960718802b..815ac9647c 100644 --- a/v2/streaming-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/StreamingDataGenerator.java +++ b/v2/streaming-data-generator/src/main/java/com/google/cloud/teleport/v2/templates/StreamingDataGenerator.java @@ -28,6 +28,7 @@ import com.google.cloud.teleport.v2.common.UncaughtExceptionLogger; import com.google.cloud.teleport.v2.templates.StreamingDataGenerator.StreamingDataGeneratorOptions; import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToBigQuery; +import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToBigtable; import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToGcs; import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToJdbc; import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToKafka; @@ -182,7 +183,8 @@ public interface StreamingDataGeneratorOptions extends PipelineOptions { @TemplateEnumOption("PUBSUB"), @TemplateEnumOption("JDBC"), @TemplateEnumOption("SPANNER"), - @TemplateEnumOption("KAFKA") + @TemplateEnumOption("KAFKA"), + @TemplateEnumOption("BIGTABLE") }, optional = true, description = "Output Sink Type", @@ -479,6 +481,78 @@ public interface StreamingDataGeneratorOptions extends PipelineOptions { String getKafkaTopic(); void setKafkaTopic(String outputTopic); + + @TemplateParameter.Text( + order = 32, + optional = true, + parentName = "sinkType", + parentTriggerValues = {"BIGTABLE"}, + regexes = {"[a-z][a-z0-9\\-]+[a-z0-9]"}, + description = "Bigtable Instance ID", + helpText = "The ID of the Bigtable instance that contains the table.") + String getBigtableWriteInstanceId(); + + void setBigtableWriteInstanceId(String value); + + @TemplateParameter.Text( + order = 33, + optional = true, + parentName = "sinkType", + parentTriggerValues = {"BIGTABLE"}, + regexes = {"[_a-zA-Z0-9][-_.a-zA-Z0-9]*"}, + description = "Bigtable Table ID", + helpText = "The ID of the Bigtable table to write to.") + String getBigtableWriteTableId(); + + void setBigtableWriteTableId(String value); + + @TemplateParameter.Text( + order = 34, + optional = true, + parentName = "sinkType", + parentTriggerValues = {"BIGTABLE"}, + regexes = {"[-_.a-zA-Z0-9]+"}, + description = "The Bigtable Column Family", + helpText = "The name of the column family of the Bigtable table to write data into.") + String getBigtableWriteColumnFamily(); + + void setBigtableWriteColumnFamily(String value); + + @TemplateParameter.Text( + order = 35, + optional = true, + parentName = "sinkType", + parentTriggerValues = {"BIGTABLE"}, + regexes = {"[a-z][a-z0-9\\-]+[a-z0-9]"}, + description = "Bigtable App Profile", + helpText = "The ID of the Bigtable application profile to use for the export.") + @Default.String("default") + String getBigtableWriteAppProfile(); + + void setBigtableWriteAppProfile(String value); + + @TemplateParameter.ProjectId( + order = 36, + optional = true, + parentName = "sinkType", + parentTriggerValues = {"BIGTABLE"}, + description = "Bigtable Project ID", + helpText = + "The ID of the Google Cloud project that contains the Bigtable instance to write data to.") + String getBigtableWriteProjectId(); + + void setBigtableWriteProjectId(String value); + + @TemplateParameter.Text( + order = 37, + optional = true, + parentName = "sinkType", + parentTriggerValues = {"BIGTABLE"}, + description = "Bigtable Rowkey Field", + helpText = "The name of the field in the JSON to use as the Bigtable row key.") + String getBigtableWriteRowkeyField(); + + void setBigtableWriteRowkeyField(String value); } /** Allowed list of existing schema templates. */ @@ -569,7 +643,8 @@ public enum SinkType { GCS, JDBC, SPANNER, - KAFKA + KAFKA, + BIGTABLE } /** @@ -790,6 +865,28 @@ static PTransform, PDone> createSink( "Missing required value --kafkaTopic for %s sink type", options.getSinkType().name())); return StreamingDataGeneratorWriteToKafka.Writer.builder(options).build(); + case BIGTABLE: + checkArgument( + options.getBigtableWriteInstanceId() != null, + String.format( + "Missing required value --bigtableWriteInstanceId for %s sink type", + options.getSinkType().name())); + checkArgument( + options.getBigtableWriteTableId() != null, + String.format( + "Missing required value --bigtableWriteTableId for %s sink type", + options.getSinkType().name())); + checkArgument( + options.getBigtableWriteColumnFamily() != null, + String.format( + "Missing required value --bigtableWriteColumnFamily for %s sink type", + options.getSinkType().name())); + checkArgument( + options.getBigtableWriteRowkeyField() != null, + String.format( + "Missing required value --bigtableWriteRowkeyField for %s sink type", + options.getSinkType().name())); + return StreamingDataGeneratorWriteToBigtable.builder(options, schema).build(); default: throw new IllegalArgumentException("Unsupported Sink."); } diff --git a/v2/streaming-data-generator/src/main/java/com/google/cloud/teleport/v2/transforms/StreamingDataGeneratorWriteToBigtable.java b/v2/streaming-data-generator/src/main/java/com/google/cloud/teleport/v2/transforms/StreamingDataGeneratorWriteToBigtable.java new file mode 100644 index 0000000000..f0ebdd1ab1 --- /dev/null +++ b/v2/streaming-data-generator/src/main/java/com/google/cloud/teleport/v2/transforms/StreamingDataGeneratorWriteToBigtable.java @@ -0,0 +1,176 @@ +/* + * Copyright (C) 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.transforms; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.auto.value.AutoValue; +import com.google.bigtable.v2.Mutation; +import com.google.cloud.teleport.v2.templates.StreamingDataGenerator.StreamingDataGeneratorOptions; +import com.google.common.base.Preconditions; +import com.google.protobuf.ByteString; +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO; +import org.apache.beam.sdk.transforms.DoFn; +import org.apache.beam.sdk.transforms.PTransform; +import org.apache.beam.sdk.transforms.ParDo; +import org.apache.beam.sdk.values.KV; +import org.apache.beam.sdk.values.PCollection; +import org.apache.beam.sdk.values.PDone; +import org.checkerframework.checker.nullness.qual.MonotonicNonNull; + +/** + * A {@link PTransform} that converts generated fake messages to native Bigtable protobuf mutations + * and writes them directly to Cloud Bigtable. + */ +@AutoValue +public abstract class StreamingDataGeneratorWriteToBigtable + extends PTransform, PDone> { + + abstract StreamingDataGeneratorOptions getPipelineOptions(); + + abstract String getSchema(); + + public static Builder builder(StreamingDataGeneratorOptions options, String schema) { + return new AutoValue_StreamingDataGeneratorWriteToBigtable.Builder() + .setPipelineOptions(options) + .setSchema(schema); + } + + /** Builder for {@link StreamingDataGeneratorWriteToBigtable}. */ + @AutoValue.Builder + public abstract static class Builder { + public abstract Builder setPipelineOptions(StreamingDataGeneratorOptions value); + + public abstract Builder setSchema(String schema); + + public abstract StreamingDataGeneratorWriteToBigtable build(); + } + + @Override + public PDone expand(PCollection generatedMessages) { + StreamingDataGeneratorOptions options = getPipelineOptions(); + + String columnFamily = options.getBigtableWriteColumnFamily(); + String rowkeyField = options.getBigtableWriteRowkeyField(); + + BigtableIO.Write write = + BigtableIO.write() + .withInstanceId(options.getBigtableWriteInstanceId()) + .withTableId(options.getBigtableWriteTableId()); + + String projectId = options.getBigtableWriteProjectId(); + if (projectId == null || projectId.isEmpty()) { + projectId = + options.as(org.apache.beam.sdk.extensions.gcp.options.GcpOptions.class).getProject(); + } + if (projectId != null && !projectId.isEmpty()) { + write = write.withProjectId(projectId); + } + if (options.getBigtableWriteAppProfile() != null + && !options.getBigtableWriteAppProfile().isEmpty()) { + write = write.withAppProfileId(options.getBigtableWriteAppProfile()); + } + + return generatedMessages + .apply( + "Convert to Native Protobuf Mutation", + ParDo.of(new BytesToNativeMutationFn(columnFamily, rowkeyField))) + .apply("Write to Bigtable", write); + } + + static class BytesToNativeMutationFn extends DoFn>> { + + private final String columnFamily; + private final String rowkeyField; + private transient @MonotonicNonNull ObjectMapper mapper; + + BytesToNativeMutationFn(String columnFamily, String rowkeyField) { + this.columnFamily = columnFamily; + this.rowkeyField = rowkeyField; + } + + @Setup + public void setup() { + mapper = new ObjectMapper(); + } + + @ProcessElement + public void processElement( + @Element byte[] message, OutputReceiver>> receiver) { + + JsonNode row; + try { + row = Preconditions.checkNotNull(mapper).readTree(message); + } catch (IOException e) { + throw new IllegalArgumentException("Failed to parse JSON for Bigtable mutation", e); + } + + if (!row.isObject()) { + throw new IllegalArgumentException("Expected JSON object for Bigtable mutation"); + } + + JsonNode rowkeyNode = row.get(rowkeyField); + if (rowkeyNode == null || rowkeyNode.isNull()) { + throw new IllegalArgumentException( + String.format("Row key column '%s' not found or is null in JSON", rowkeyField)); + } + + String rowkeyStr = rowkeyNode.asText(); + ByteString rowkeyByteString = ByteString.copyFromUtf8(rowkeyStr); + + long timestampMicros = System.currentTimeMillis() * 1000; + + List protoMutations = new ArrayList<>(); + + Iterator> fields = row.fields(); + while (fields.hasNext()) { + Map.Entry entry = fields.next(); + String columnName = entry.getKey(); + + if (columnName.equals(rowkeyField)) { + continue; + } + + JsonNode valueNode = entry.getValue(); + if (valueNode == null || valueNode.isNull()) { + continue; + } + + String valStr = valueNode.asText(); + + protoMutations.add( + Mutation.newBuilder() + .setSetCell( + Mutation.SetCell.newBuilder() + .setFamilyName(columnFamily) + .setColumnQualifier(ByteString.copyFromUtf8(columnName)) + .setTimestampMicros(timestampMicros) + .setValue(ByteString.copyFromUtf8(valStr)) + .build()) + .build()); + } + + if (!protoMutations.isEmpty()) { + receiver.output(KV.of(rowkeyByteString, protoMutations)); + } + } + } +} diff --git a/v2/streaming-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/StreamingDataGeneratorIT.java b/v2/streaming-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/StreamingDataGeneratorIT.java index 7b0ab0b30a..a2cbdf2165 100644 --- a/v2/streaming-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/StreamingDataGeneratorIT.java +++ b/v2/streaming-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/StreamingDataGeneratorIT.java @@ -22,6 +22,7 @@ import com.google.cloud.bigquery.Schema; import com.google.cloud.bigquery.StandardSQLTypeName; import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigtable.data.v2.models.Row; import com.google.cloud.teleport.metadata.TemplateIntegrationTest; import com.google.cloud.teleport.v2.templates.StreamingDataGenerator.SchemaTemplate; import com.google.cloud.teleport.v2.templates.StreamingDataGenerator.SinkType; @@ -47,6 +48,7 @@ import org.apache.beam.it.gcp.artifacts.Artifact; import org.apache.beam.it.gcp.bigquery.BigQueryResourceManager; import org.apache.beam.it.gcp.bigquery.conditions.BigQueryRowsCheck; +import org.apache.beam.it.gcp.bigtable.BigtableResourceManager; import org.apache.beam.it.gcp.pubsub.PubsubResourceManager; import org.apache.beam.it.gcp.pubsub.conditions.PubsubMessagesCheck; import org.apache.beam.it.gcp.spanner.SpannerResourceManager; @@ -94,6 +96,7 @@ public final class StreamingDataGeneratorIT extends TemplateTestBase { private SpannerResourceManager spannerResourceManager; private JDBCResourceManager jdbcResourceManager; private KafkaResourceManager kafkaResourceManager; + private BigtableResourceManager bigtableResourceManager; @After public void tearDown() { @@ -103,7 +106,8 @@ public void tearDown() { bigQueryResourceManager, spannerResourceManager, jdbcResourceManager, - kafkaResourceManager); + kafkaResourceManager, + bigtableResourceManager); } @Test @@ -434,4 +438,43 @@ public void testFakeMessagesToKafka() throws IOException { // Assert assertThatResult(result).meetsConditions(); } + + @Test + public void testFakeMessagesToBigtable() throws IOException { + // Set up resource manager + bigtableResourceManager = + BigtableResourceManager.builder(testName, PROJECT, credentialsProvider) + .maybeUseStaticInstance() + .build(); + String colFamily = "cf"; + bigtableResourceManager.createTable(testName, ImmutableList.of(colFamily)); + + // Arrange + LaunchConfig.Builder options = + LaunchConfig.builder(testName, specPath) + .addParameter(SCHEMA_TEMPLATE_KEY, SchemaTemplate.GAME_EVENT.name()) + .addParameter(QPS_KEY, DEFAULT_QPS) + .addParameter(SINK_TYPE_KEY, SinkType.BIGTABLE.name()) + .addParameter("bigtableWriteProjectId", PROJECT) + .addParameter("bigtableWriteInstanceId", bigtableResourceManager.getInstanceId()) + .addParameter("bigtableWriteTableId", testName) + .addParameter("bigtableWriteColumnFamily", colFamily) + .addParameter("bigtableWriteRowkeyField", "eventId"); + + // Act + LaunchInfo info = launchTemplate(options); + assertThatPipeline(info).isRunning(); + + Result result = + pipelineOperator() + .waitForConditionAndFinish( + createConfig(info), + () -> { + List rows = bigtableResourceManager.readTable(testName); + return !rows.isEmpty(); + }); + + // Assert + assertThatResult(result).meetsConditions(); + } } diff --git a/v2/streaming-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/StreamingDataGeneratorTest.java b/v2/streaming-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/StreamingDataGeneratorTest.java index 4ac6b1dd59..cccf81c40b 100644 --- a/v2/streaming-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/StreamingDataGeneratorTest.java +++ b/v2/streaming-data-generator/src/test/java/com/google/cloud/teleport/v2/templates/StreamingDataGeneratorTest.java @@ -21,6 +21,7 @@ import static org.junit.Assert.fail; import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToBigQuery; +import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToBigtable; import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToGcs; import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToPubSub; import java.io.ByteArrayInputStream; @@ -136,6 +137,23 @@ public void testCreatingGCSSink_returnsValidSinkType() { instanceof StreamingDataGeneratorWriteToGcs); } + /** Tests Creation of Bigtable Sink based on Pipeline options. */ + @Test + public void testCreatingBigtableSink_returnsValidSinkType() { + StreamingDataGenerator.StreamingDataGeneratorOptions options = + getPipelineOptions( + new String[] { + "--sinkType=" + StreamingDataGenerator.SinkType.BIGTABLE.name(), + "--bigtableWriteInstanceId=inst1", + "--bigtableWriteTableId=table1", + "--bigtableWriteColumnFamily=cf", + "--bigtableWriteRowkeyField=id" + }); + assertTrue( + StreamingDataGenerator.createSink(options, getSimpleSchema()) + instanceof StreamingDataGeneratorWriteToBigtable); + } + /** Tests generation of fake Json data message without attributes. */ @Test public void testMessageGenerator_returnsFakeMessage() throws IOException { diff --git a/v2/streaming-data-generator/src/test/java/com/google/cloud/teleport/v2/transforms/StreamingDataGeneratorWriteToBigtableTest.java b/v2/streaming-data-generator/src/test/java/com/google/cloud/teleport/v2/transforms/StreamingDataGeneratorWriteToBigtableTest.java new file mode 100644 index 0000000000..359748c01d --- /dev/null +++ b/v2/streaming-data-generator/src/test/java/com/google/cloud/teleport/v2/transforms/StreamingDataGeneratorWriteToBigtableTest.java @@ -0,0 +1,76 @@ +/* + * Copyright (C) 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ +package com.google.cloud.teleport.v2.transforms; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import com.google.bigtable.v2.Mutation; +import com.google.cloud.teleport.v2.transforms.StreamingDataGeneratorWriteToBigtable.BytesToNativeMutationFn; +import com.google.protobuf.ByteString; +import java.nio.charset.StandardCharsets; +import java.util.Iterator; +import org.apache.beam.sdk.transforms.DoFn.OutputReceiver; +import org.apache.beam.sdk.values.KV; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; + +/** Unit tests for {@link StreamingDataGeneratorWriteToBigtable}. */ +@RunWith(JUnit4.class) +public class StreamingDataGeneratorWriteToBigtableTest { + + @Test + public void testBytesToNativeMutationFn() throws Exception { + BytesToNativeMutationFn fn = new BytesToNativeMutationFn("cf", "id"); + fn.setup(); + + String json = "{\"id\":\"row1\",\"name\":\"Alice\",\"age\":30}"; + byte[] message = json.getBytes(StandardCharsets.UTF_8); + + @SuppressWarnings("unchecked") + OutputReceiver>> receiver = mock(OutputReceiver.class); + + fn.processElement(message, receiver); + + @SuppressWarnings("unchecked") + ArgumentCaptor>> captor = ArgumentCaptor.forClass(KV.class); + + verify(receiver).output(captor.capture()); + + KV> kv = captor.getValue(); + assertEquals("row1", kv.getKey().toStringUtf8()); + + Iterator iter = kv.getValue().iterator(); + + assertTrue(iter.hasNext()); + Mutation mut1 = iter.next(); + assertTrue(mut1.hasSetCell()); + assertEquals("cf", mut1.getSetCell().getFamilyName()); + assertEquals("name", mut1.getSetCell().getColumnQualifier().toStringUtf8()); + assertEquals("Alice", mut1.getSetCell().getValue().toStringUtf8()); + + assertTrue(iter.hasNext()); + Mutation mut2 = iter.next(); + assertTrue(mut2.hasSetCell()); + assertEquals("cf", mut2.getSetCell().getFamilyName()); + assertEquals("age", mut2.getSetCell().getColumnQualifier().toStringUtf8()); + assertEquals("30", mut2.getSetCell().getValue().toStringUtf8()); + } +}