diff --git a/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/options/ElasticsearchWriteOptions.java b/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/options/ElasticsearchWriteOptions.java index ce0ec24260..990b3b2afa 100644 --- a/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/options/ElasticsearchWriteOptions.java +++ b/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/options/ElasticsearchWriteOptions.java @@ -64,6 +64,12 @@ public interface ElasticsearchWriteOptions extends PipelineOptions { void setBatchSizeBytes(Long batchSizeBytes); + @Description("Elastic search socket timeout. Default: 30 minutes") + @Default.Integer(1800000) + Integer getMaxSocketTimeout(); + + void setMaxSocketTimeout(Integer maxSocketTimeout); + @Description("Optional: Max retry attempts, must be > 0, ex: 3. Default: no retries") Integer getMaxRetryAttempts(); diff --git a/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/transforms/WriteToElasticsearch.java b/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/transforms/WriteToElasticsearch.java index e3f35055e7..7b471926ab 100644 --- a/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/transforms/WriteToElasticsearch.java +++ b/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/transforms/WriteToElasticsearch.java @@ -114,6 +114,7 @@ public PDone expand(PCollection jsonStrings) { options().getIndex(), DOCUMENT_TYPE); + config = config.withSocketTimeout(options().getMaxSocketTimeout()); // If username and password are not blank, use them instead of ApiKey if (StringUtils.isNotBlank(options().getElasticsearchUsername()) && StringUtils.isNotBlank(options().getElasticsearchPassword())) { diff --git a/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/utils/ElasticsearchIO.java b/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/utils/ElasticsearchIO.java index b2fbd0dbf4..ada7dfe225 100644 --- a/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/utils/ElasticsearchIO.java +++ b/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/utils/ElasticsearchIO.java @@ -96,6 +96,12 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.cloud.ServiceOptions; +import com.google.cloud.secretmanager.v1.AccessSecretVersionResponse; +import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; +import com.google.cloud.secretmanager.v1.SecretVersionName; + + /** * Transforms for reading and writing data from/to Elasticsearch. * @@ -469,8 +475,20 @@ RestClient createClient() throws IOException { httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); } if (getApiKey() != null) { + SecretManagerServiceClient client = SecretManagerServiceClient.create(); + String projectId = ServiceOptions.getDefaultProjectId(); + String secretId = getApiKey(); + String versionId = "latest"; + SecretVersionName secretVersionName = SecretVersionName.of(projectId, secretId, versionId); + + AccessSecretVersionResponse response = client.accessSecretVersion(secretVersionName); + String plainTextSecret = response.getPayload().getData().toStringUtf8(); + URL url = new URL(plainTextSecret); + String userInfo = url.getUserInfo(); + + String[] userInfoArray = userInfo.split(":"); restClientBuilder.setDefaultHeaders( - new Header[] {new BasicHeader("Authorization", "ApiKey " + getApiKey())}); + new Header[] {new BasicHeader("Authorization", "ApiKey " + userInfoArray[1])}); } if (getBearerToken() != null) { restClientBuilder.setDefaultHeaders( diff --git a/v2/googlecloud-to-elasticsearch/src/main/resources/googlecloud-to-elasticsearch-command-spec.json b/v2/googlecloud-to-elasticsearch/src/main/resources/googlecloud-to-elasticsearch-command-spec.json new file mode 100644 index 0000000000..f1bd9dc5dc --- /dev/null +++ b/v2/googlecloud-to-elasticsearch/src/main/resources/googlecloud-to-elasticsearch-command-spec.json @@ -0,0 +1,7 @@ +{ + "mainClass": "com.google.cloud.teleport.v2.elasticsearch.templates.BigQueryToElasticsearch", + "classPath": "/template/googlecloud-to-elasticsearch/*:/template/googlecloud-to-elasticsearch/libs/*:/template/googlecloud-to-elasticsearch/classes", + "defaultParameterValues": { + "labels": "{\"goog-dataflow-provided-template-type\":\"flex\", \"goog-dataflow-provided-template-name\":\"bigquery_to_elasticsearch\"}" + } +} diff --git a/v2/pom.xml b/v2/pom.xml index 133a7bf854..96b4a8de41 100644 --- a/v2/pom.xml +++ b/v2/pom.xml @@ -173,12 +173,12 @@ ${mockito-core.version} test - +