From 74712aca03bbfd713451310eee77142d4ba587e4 Mon Sep 17 00:00:00 2001 From: Amol Dabade Date: Tue, 19 Apr 2022 15:33:56 +0000 Subject: [PATCH 1/7] ES increase timeout --- .../transforms/WriteToElasticsearch.java | 1 + .../v2/elasticsearch/utils/ElasticsearchIO.java | 11 +++++++++++ .../googlecloud-to-elasticsearch-command-spec.json | 7 +++++++ v2/pom.xml | 4 ++-- 4 files changed, 21 insertions(+), 2 deletions(-) create mode 100644 v2/googlecloud-to-elasticsearch/src/main/resources/googlecloud-to-elasticsearch-command-spec.json diff --git a/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/transforms/WriteToElasticsearch.java b/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/transforms/WriteToElasticsearch.java index c88964be49..7b2a538661 100644 --- a/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/transforms/WriteToElasticsearch.java +++ b/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/transforms/WriteToElasticsearch.java @@ -79,6 +79,7 @@ public PDone expand(PCollection jsonStrings) { options().getIndex(), DOCUMENT_TYPE); + config = config.withSocketTimeout(1800000); // If username and password are not blank, use them instead of ApiKey if (StringUtils.isNotBlank(options().getElasticsearchUsername()) && StringUtils.isNotBlank(options().getElasticsearchPassword())) { diff --git a/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/utils/ElasticsearchIO.java b/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/utils/ElasticsearchIO.java index eea718c32d..52c0e98bb6 100644 --- a/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/utils/ElasticsearchIO.java +++ b/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/utils/ElasticsearchIO.java @@ -69,6 +69,7 @@ import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.http.HttpHost; +import org.apache.http.impl.nio.reactor.IOReactorConfig; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; @@ -457,6 +458,12 @@ RestClient createClient() throws IOException { httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); } + // restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder + // .setDefaultIOReactorConfig(IOReactorConfig.custom() + // .setSoKeepAlive(true) + // .build() + // ) + // ); if (getApiKey() != null) { restClientBuilder.setDefaultHeaders( new Header[] {new BasicHeader("Authorization", "ApiKey " + getApiKey())}); @@ -498,6 +505,10 @@ public RequestConfig.Builder customizeRequestConfig( return requestConfigBuilder; } }); + + restClientBuilder.setHttpClientConfigCallback( + httpAsyncClientBuilder -> + httpAsyncClientBuilder.setKeepAliveStrategy((response, context) -> 3600000)); return restClientBuilder.build(); } } diff --git a/v2/googlecloud-to-elasticsearch/src/main/resources/googlecloud-to-elasticsearch-command-spec.json b/v2/googlecloud-to-elasticsearch/src/main/resources/googlecloud-to-elasticsearch-command-spec.json new file mode 100644 index 0000000000..f1bd9dc5dc --- /dev/null +++ b/v2/googlecloud-to-elasticsearch/src/main/resources/googlecloud-to-elasticsearch-command-spec.json @@ -0,0 +1,7 @@ +{ + "mainClass": "com.google.cloud.teleport.v2.elasticsearch.templates.BigQueryToElasticsearch", + "classPath": "/template/googlecloud-to-elasticsearch/*:/template/googlecloud-to-elasticsearch/libs/*:/template/googlecloud-to-elasticsearch/classes", + "defaultParameterValues": { + "labels": "{\"goog-dataflow-provided-template-type\":\"flex\", \"goog-dataflow-provided-template-name\":\"bigquery_to_elasticsearch\"}" + } +} diff --git a/v2/pom.xml b/v2/pom.xml index e964f02adf..f6aaaad1eb 100644 --- a/v2/pom.xml +++ b/v2/pom.xml @@ -149,12 +149,12 @@ ${mockito-core.version} test - + From 332be5afefba08cd540ac3b473da1b31ce133a64 Mon Sep 17 00:00:00 2001 From: Amol Dabade Date: Wed, 20 Apr 2022 14:50:16 +0530 Subject: [PATCH 2/7] Default value and override option --- .../options/ElasticsearchWriteOptions.java | 7 +++++++ .../transforms/WriteToElasticsearch.java | 2 +- .../v2/elasticsearch/utils/ElasticsearchIO.java | 11 ----------- 3 files changed, 8 insertions(+), 12 deletions(-) diff --git a/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/options/ElasticsearchWriteOptions.java b/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/options/ElasticsearchWriteOptions.java index de85ac9bc4..bd08923328 100644 --- a/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/options/ElasticsearchWriteOptions.java +++ b/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/options/ElasticsearchWriteOptions.java @@ -63,6 +63,12 @@ public interface ElasticsearchWriteOptions extends PipelineOptions { void setBatchSizeBytes(Long batchSizeBytes); + @Description("Elastic search socket timeout. Default: 30 minutes") + @Default.Long(1800000) + Long getMaxSocketTimeout(); + + void setMaxSocketTimeout(Long maxSocketTimeout); + @Description("Optional: Max retry attempts, must be > 0, ex: 3. Default: no retries") Integer getMaxRetryAttempts(); @@ -73,4 +79,5 @@ public interface ElasticsearchWriteOptions extends PipelineOptions { Long getMaxRetryDuration(); void setMaxRetryDuration(Long maxRetryDuration); + } diff --git a/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/transforms/WriteToElasticsearch.java b/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/transforms/WriteToElasticsearch.java index 7b2a538661..03fdd14f7a 100644 --- a/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/transforms/WriteToElasticsearch.java +++ b/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/transforms/WriteToElasticsearch.java @@ -79,7 +79,7 @@ public PDone expand(PCollection jsonStrings) { options().getIndex(), DOCUMENT_TYPE); - config = config.withSocketTimeout(1800000); + config = config.withSocketTimeout(options().getMaxSocketTimeout()); // If username and password are not blank, use them instead of ApiKey if (StringUtils.isNotBlank(options().getElasticsearchUsername()) && StringUtils.isNotBlank(options().getElasticsearchPassword())) { diff --git a/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/utils/ElasticsearchIO.java b/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/utils/ElasticsearchIO.java index 52c0e98bb6..eea718c32d 100644 --- a/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/utils/ElasticsearchIO.java +++ b/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/utils/ElasticsearchIO.java @@ -69,7 +69,6 @@ import org.apache.http.Header; import org.apache.http.HttpEntity; import org.apache.http.HttpHost; -import org.apache.http.impl.nio.reactor.IOReactorConfig; import org.apache.http.auth.AuthScope; import org.apache.http.auth.UsernamePasswordCredentials; import org.apache.http.client.CredentialsProvider; @@ -458,12 +457,6 @@ RestClient createClient() throws IOException { httpAsyncClientBuilder -> httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); } - // restClientBuilder.setHttpClientConfigCallback(httpClientBuilder -> httpClientBuilder - // .setDefaultIOReactorConfig(IOReactorConfig.custom() - // .setSoKeepAlive(true) - // .build() - // ) - // ); if (getApiKey() != null) { restClientBuilder.setDefaultHeaders( new Header[] {new BasicHeader("Authorization", "ApiKey " + getApiKey())}); @@ -505,10 +498,6 @@ public RequestConfig.Builder customizeRequestConfig( return requestConfigBuilder; } }); - - restClientBuilder.setHttpClientConfigCallback( - httpAsyncClientBuilder -> - httpAsyncClientBuilder.setKeepAliveStrategy((response, context) -> 3600000)); return restClientBuilder.build(); } } From 44b0dedcbb749f77f7ba7c5c8c73500042d86171 Mon Sep 17 00:00:00 2001 From: Amol Dabade Date: Wed, 20 Apr 2022 15:03:37 +0530 Subject: [PATCH 3/7] Use Integer --- .../v2/elasticsearch/options/ElasticsearchWriteOptions.java | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/options/ElasticsearchWriteOptions.java b/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/options/ElasticsearchWriteOptions.java index bd08923328..5646fa7242 100644 --- a/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/options/ElasticsearchWriteOptions.java +++ b/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/options/ElasticsearchWriteOptions.java @@ -64,10 +64,10 @@ public interface ElasticsearchWriteOptions extends PipelineOptions { void setBatchSizeBytes(Long batchSizeBytes); @Description("Elastic search socket timeout. Default: 30 minutes") - @Default.Long(1800000) + @Default.Integer(1800000) Long getMaxSocketTimeout(); - void setMaxSocketTimeout(Long maxSocketTimeout); + void setMaxSocketTimeout(Integer maxSocketTimeout); @Description("Optional: Max retry attempts, must be > 0, ex: 3. Default: no retries") Integer getMaxRetryAttempts(); @@ -79,5 +79,4 @@ public interface ElasticsearchWriteOptions extends PipelineOptions { Long getMaxRetryDuration(); void setMaxRetryDuration(Long maxRetryDuration); - } From 765b83bc05f40c12cbe84e6508dbf87e159774a7 Mon Sep 17 00:00:00 2001 From: Amol Dabade Date: Wed, 20 Apr 2022 15:12:47 +0530 Subject: [PATCH 4/7] Change type --- .../v2/elasticsearch/options/ElasticsearchWriteOptions.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/options/ElasticsearchWriteOptions.java b/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/options/ElasticsearchWriteOptions.java index 5646fa7242..056f994723 100644 --- a/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/options/ElasticsearchWriteOptions.java +++ b/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/options/ElasticsearchWriteOptions.java @@ -65,7 +65,7 @@ public interface ElasticsearchWriteOptions extends PipelineOptions { @Description("Elastic search socket timeout. Default: 30 minutes") @Default.Integer(1800000) - Long getMaxSocketTimeout(); + Integer getMaxSocketTimeout(); void setMaxSocketTimeout(Integer maxSocketTimeout); From 01b5e8118e9f946a6cded69e3b6ce8c20931a9fe Mon Sep 17 00:00:00 2001 From: Amol Dabade Date: Thu, 8 Sep 2022 17:46:31 +0530 Subject: [PATCH 5/7] Try decrypting secret --- .../v2/elasticsearch/utils/ElasticsearchIO.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/utils/ElasticsearchIO.java b/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/utils/ElasticsearchIO.java index b2fbd0dbf4..5250efafef 100644 --- a/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/utils/ElasticsearchIO.java +++ b/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/utils/ElasticsearchIO.java @@ -96,6 +96,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.cloud.secretmanager.v1.AccessSecretVersionResponse; +import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; +import com.google.cloud.secretmanager.v1.SecretVersionName; + + /** * Transforms for reading and writing data from/to Elasticsearch. * @@ -469,8 +474,16 @@ RestClient createClient() throws IOException { httpAsyncClientBuilder.setDefaultCredentialsProvider(credentialsProvider)); } if (getApiKey() != null) { + SecretManagerServiceClient client = SecretManagerServiceClient.create(); + String projectId = "tinyclues-data"; + String secretId = getApiKey(); + String versionId = "latest"; + SecretVersionName secretVersionName = SecretVersionName.of(projectId, secretId, versionId); + + AccessSecretVersionResponse response = client.accessSecretVersion(secretVersionName); + String plainTextSecret = response.getPayload().getData().toStringUtf8(); restClientBuilder.setDefaultHeaders( - new Header[] {new BasicHeader("Authorization", "ApiKey " + getApiKey())}); + new Header[] {new BasicHeader("Authorization", "ApiKey " + plainTextSecret)}); } if (getBearerToken() != null) { restClientBuilder.setDefaultHeaders( From ff819cf0b4fc968d740803287061ed6885f9a5a0 Mon Sep 17 00:00:00 2001 From: Amol Dabade Date: Mon, 12 Sep 2022 18:23:51 +0530 Subject: [PATCH 6/7] Read value --- .../teleport/v2/elasticsearch/utils/ElasticsearchIO.java | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/utils/ElasticsearchIO.java b/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/utils/ElasticsearchIO.java index 5250efafef..e748c404bb 100644 --- a/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/utils/ElasticsearchIO.java +++ b/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/utils/ElasticsearchIO.java @@ -482,8 +482,12 @@ RestClient createClient() throws IOException { AccessSecretVersionResponse response = client.accessSecretVersion(secretVersionName); String plainTextSecret = response.getPayload().getData().toStringUtf8(); + URL url = new URL(plainTextSecret); + String userInfo = url.getUserInfo(); + + String[] userInfoArray = userInfo.split(":"); restClientBuilder.setDefaultHeaders( - new Header[] {new BasicHeader("Authorization", "ApiKey " + plainTextSecret)}); + new Header[] {new BasicHeader("Authorization", "ApiKey " + userInfoArray[1])}); } if (getBearerToken() != null) { restClientBuilder.setDefaultHeaders( From bb718ee32f3752a6912eb42e1f1e1c65f6c37326 Mon Sep 17 00:00:00 2001 From: Amol Dabade Date: Tue, 13 Sep 2022 18:34:16 +0530 Subject: [PATCH 7/7] Try retrieve project --- .../cloud/teleport/v2/elasticsearch/utils/ElasticsearchIO.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/utils/ElasticsearchIO.java b/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/utils/ElasticsearchIO.java index e748c404bb..ada7dfe225 100644 --- a/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/utils/ElasticsearchIO.java +++ b/v2/elasticsearch-common/src/main/java/com/google/cloud/teleport/v2/elasticsearch/utils/ElasticsearchIO.java @@ -96,6 +96,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.cloud.ServiceOptions; import com.google.cloud.secretmanager.v1.AccessSecretVersionResponse; import com.google.cloud.secretmanager.v1.SecretManagerServiceClient; import com.google.cloud.secretmanager.v1.SecretVersionName; @@ -475,7 +476,7 @@ RestClient createClient() throws IOException { } if (getApiKey() != null) { SecretManagerServiceClient client = SecretManagerServiceClient.create(); - String projectId = "tinyclues-data"; + String projectId = ServiceOptions.getDefaultProjectId(); String secretId = getApiKey(); String versionId = "latest"; SecretVersionName secretVersionName = SecretVersionName.of(projectId, secretId, versionId);