From b3f3d838e0c0d12b1da72f5851d255e5546ac542 Mon Sep 17 00:00:00 2001 From: itsankit-google Date: Mon, 3 Apr 2023 11:38:24 +0530 Subject: [PATCH 01/18] fix e2e-test repo branch --- .github/workflows/e2e.yml | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml index 2573b81a5e..ab0a93bae2 100644 --- a/.github/workflows/e2e.yml +++ b/.github/workflows/e2e.yml @@ -16,9 +16,9 @@ name: Build e2e tests on: push: - branches: [ develop ] + branches: [ develop, release/* ] pull_request: - branches: [ develop] + branches: [ develop, release/* ] types: [opened, synchronize, reopened, labeled] workflow_dispatch: @@ -61,6 +61,7 @@ jobs: with: repository: cdapio/cdap-e2e-tests path: e2e + ref: release/6.9 - name: Cache uses: actions/cache@v3 with: From a2fc0ae3b217ca617205b2a78d0741f5da3987ba Mon Sep 17 00:00:00 2001 From: Vinisha Shah Date: Mon, 10 Apr 2023 21:46:42 -0700 Subject: [PATCH 02/18] remove snapshot --- pom.xml | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pom.xml b/pom.xml index 16f43df420..93745dbada 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ io.cdap.plugin google-cloud - 0.22.0-SNAPSHOT + 0.22.0 Google Cloud Plugins jar Plugins for Google Big Query @@ -72,8 +72,8 @@ 1.8.2 hadoop2-1.0.0 1.4 - 6.9.0-SNAPSHOT - 2.11.0-SNAPSHOT + 6.9.0 + 2.11.0 3.2.6 0.3.1 hadoop2-2.0.0 From a9e861ff5707a3aa841f5032bfd29322fccccc04 Mon Sep 17 00:00:00 2001 From: Vinisha Shah Date: Mon, 10 Apr 2023 22:06:23 -0700 Subject: [PATCH 03/18] Update pom.xml remove snapshot from e2e tests --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 93745dbada..8be67756f8 100644 --- a/pom.xml +++ b/pom.xml @@ -1139,7 +1139,7 @@ io.cdap.tests.e2e cdap-e2e-framework - 0.2.0-SNAPSHOT + 0.2.0 test From 014b5dca5f41a4b2a8df410af2c55f37ff146641 Mon Sep 17 00:00:00 2001 From: Vinisha Shah Date: Wed, 12 Apr 2023 15:11:28 -0700 Subject: [PATCH 04/18] Update pom.xml --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 8be67756f8..93745dbada 100644 --- a/pom.xml +++ b/pom.xml @@ -1139,7 +1139,7 @@ io.cdap.tests.e2e cdap-e2e-framework - 0.2.0 + 0.2.0-SNAPSHOT test From c6ef101b296a6250c43e8a28280c65e8d02751bf Mon Sep 17 00:00:00 2001 From: Taras_Sluka Date: Fri, 26 Jan 2024 11:54:33 +0200 Subject: [PATCH 05/18] Add BigQuery support of nullable array --- .../java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java | 5 ----- 1 file changed, 5 deletions(-) diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java index 3f6eea5017..169a78e54b 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java @@ -532,11 +532,6 @@ public static ValidationFailure validateArraySchema(Schema arraySchema, String n Schema nonNullableSchema = getNonNullableSchema(arraySchema); Schema componentSchema = nonNullableSchema.getComponentSchema(); - if (componentSchema.isNullable()) { - return collector.addFailure(String.format("Field '%s' contains null values in its array.", name), - "Change the array component type to be non-nullable."); - } - if (UNSUPPORTED_ARRAY_TYPES.contains(componentSchema.getType())) { return collector.addFailure(String.format("Field '%s' is an array of unsupported type '%s'.", name, componentSchema.getDisplayName()), From 50c6436ddc3cb869bf94a5bb24e5848771e153fc Mon Sep 17 00:00:00 2001 From: Taras_Sluka Date: Fri, 26 Jan 2024 14:38:38 +0200 Subject: [PATCH 06/18] Ignore tests that are trying to use real GCS connection --- .../plugin/gcp/bigquery/connector/BigQueryConnectorTest.java | 2 ++ .../io/cdap/plugin/gcp/gcs/connector/GCSConnectorTest.java | 5 ++++- .../plugin/gcp/spanner/connector/SpannerConnectorTest.java | 2 ++ 3 files changed, 8 insertions(+), 1 deletion(-) diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/connector/BigQueryConnectorTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/connector/BigQueryConnectorTest.java index 1b6013e033..ae4febda6b 100644 --- a/src/test/java/io/cdap/plugin/gcp/bigquery/connector/BigQueryConnectorTest.java +++ b/src/test/java/io/cdap/plugin/gcp/bigquery/connector/BigQueryConnectorTest.java @@ -50,6 +50,7 @@ import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import java.io.IOException; @@ -68,6 +69,7 @@ * * The service account used to run this test needs BigQuery admin permissions in the project. */ +@Ignore("Currently we support unit test only and ignore 'integration' since it is tested by CDAP already.") public class BigQueryConnectorTest { private static final Set SUPPORTED_TYPES = new HashSet<>(Arrays.asList("table", "view")); private static TestEnvironment testEnvironment; diff --git a/src/test/java/io/cdap/plugin/gcp/gcs/connector/GCSConnectorTest.java b/src/test/java/io/cdap/plugin/gcp/gcs/connector/GCSConnectorTest.java index 93288fe808..2050cff2c8 100644 --- a/src/test/java/io/cdap/plugin/gcp/gcs/connector/GCSConnectorTest.java +++ b/src/test/java/io/cdap/plugin/gcp/gcs/connector/GCSConnectorTest.java @@ -38,6 +38,7 @@ import org.junit.Assume; import org.junit.Before; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import java.io.ByteArrayInputStream; @@ -59,6 +60,7 @@ * * In order to run this test, the service account must have permission to create buckets and objects. */ +@Ignore("Currently we support unit test only and ignore 'integration' since it is tested by CDAP already.") public class GCSConnectorTest { private static TestEnvironment testEnvironment; private static String bucket; @@ -76,7 +78,8 @@ public static void setupTestClass() throws Exception { @Before public void setUp() { - storage.create(BucketInfo.newBuilder(bucket).build()); + storage.create(BucketInfo.newBuilder(bucket) + .setLocation("eu").build()); } @After diff --git a/src/test/java/io/cdap/plugin/gcp/spanner/connector/SpannerConnectorTest.java b/src/test/java/io/cdap/plugin/gcp/spanner/connector/SpannerConnectorTest.java index b277a69a34..f543212fd5 100644 --- a/src/test/java/io/cdap/plugin/gcp/spanner/connector/SpannerConnectorTest.java +++ b/src/test/java/io/cdap/plugin/gcp/spanner/connector/SpannerConnectorTest.java @@ -47,6 +47,7 @@ import org.junit.AfterClass; import org.junit.Assert; import org.junit.BeforeClass; +import org.junit.Ignore; import org.junit.Test; import java.io.IOException; @@ -65,6 +66,7 @@ * * The service account used to run this test needs Spanner admin permissions in the project. */ +@Ignore("Currently we support unit test only and ignore 'integration' since it is tested by CDAP already.") public class SpannerConnectorTest { private static String project; private static String instance; From c14702424093333045acea584b1f417ad2a7ac8f Mon Sep 17 00:00:00 2001 From: Taras_Sluka Date: Fri, 26 Jan 2024 17:13:51 +0200 Subject: [PATCH 07/18] change version for custom usage: 0.22.0.1-SNAPSHOT --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 93745dbada..7676108b9c 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ io.cdap.plugin google-cloud - 0.22.0 + 0.22.0.1-SNAPSHOT Google Cloud Plugins jar Plugins for Google Big Query From a2843f7fcecaa5cb45e82b8022ce4369ca8615a8 Mon Sep 17 00:00:00 2001 From: Taras_Sluka Date: Mon, 29 Jan 2024 16:15:49 +0200 Subject: [PATCH 08/18] change version to v0.22.0.0-SNAPSHOT --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 7676108b9c..83dc4603de 100644 --- a/pom.xml +++ b/pom.xml @@ -20,7 +20,7 @@ io.cdap.plugin google-cloud - 0.22.0.1-SNAPSHOT + 0.22.0.0-SNAPSHOT Google Cloud Plugins jar Plugins for Google Big Query From 8e59e012abc84f5d5cd5536e22b076de014ce98b Mon Sep 17 00:00:00 2001 From: Ivan Vynnychuk Date: Mon, 12 Feb 2024 09:55:16 +0200 Subject: [PATCH 09/18] FCGB-71 Add Jenkinsfile --- Jenkinsfile | 230 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 230 insertions(+) create mode 100644 Jenkinsfile diff --git a/Jenkinsfile b/Jenkinsfile new file mode 100644 index 0000000000..281bea779b --- /dev/null +++ b/Jenkinsfile @@ -0,0 +1,230 @@ +pipeline { + environment { + // Define environment variables + LOCATION = 'europe-west4' + PROJECT = 'gcp-lab-datafusion-poc' + CUSTOMER = 'fest' + ARTIFACT = 'google-cloud' + Green = '\033[0;32m' + IPurple = '\033[0;95m' + BRed='\033[1;31m' + On_Green='\033[42m' + On_Cyan='\033[46m' + Color_Off = '\033[0m' + } + agent any + + triggers { + GenericTrigger( + genericVariables: [ + [key: 'ref', value: '$.ref'] + ], + causeString: 'Triggered on $ref', + token: '', + tokenCredentialId: 'webhook-token', + printContributedVariables: true, + printPostContent: true, + regexpFilterText: '$ref', + regexpFilterExpression: 'refs/heads/fcmain' + ) + } + + stages { + stage('Setup parameters') { + steps { + script { + properties([ + parameters([ + string( + defaultValue: 'fcmain', + name: 'Branch', + trim: true + ), + choice( + choices: ['dev'], + name: 'DEPLOY_TO' + ) + ]) + ]) + } + } + } + stage('Prepare') { + agent { + kubernetes { + yaml kubernetesPodYaml('maven') + } + } + steps { + script { + env.fusionUrl = "gcp-dp-${DEPLOY_TO}-dtfu-instance-gcp-dataplatform-${DEPLOY_TO}-dot-euw4.datafusion.googleusercontent.com" + checkoutAndStash() + ansiColor('xterm') { + dir('fc-google-cloud') { + def buildAndDeploy = buildAndDeploy() + } + } + } + } + } + } +} + +// Kubernetes pod definition for Maven and GCloud +def kubernetesPodYaml(String containerName) { + return """ + apiVersion: v1 + kind: Pod + spec: + containers: + - name: maven + image: 'maven:3.8.1-jdk-8' + command: + - sleep + args: + - 99d + - name: gcloud + image: 'google/cloud-sdk:latest' + command: + - sleep + args: + - 99d + """ +} + + +// Checkout and stash the repository contents +def checkoutAndStash() { + withCredentials([string(credentialsId: 'github-token', variable: 'GIT_TOKEN')]) { + checkout scm: [ + $class: 'GitSCM', + branches: [[name: "*/${Branch}"]], + userRemoteConfigs: [[url: "https://$GIT_TOKEN@github.com/festcloud/fc-google-cloud.git"]] + ] + stash includes: '**/*', name: 'repo-contents' + } +} + +// Build and deploy for a specific folder +def buildAndDeploy() { + stage("Test ${ARTIFACT} plugins") { + agent { + kubernetes { + yaml kubernetesPodYaml('maven') // Build stage uses Maven + } + } + container('maven') { + testFolder() + } + } + stage("Build ${ARTIFACT} plugins") { + agent { + kubernetes { + yaml kubernetesPodYaml('maven') // Build stage uses Maven + } + } + container('maven') { + buildFolder() + } + } + stage("Deploy ${ARTIFACT} plugins") { + agent { + kubernetes { + yaml kubernetesPodYaml('gcloud') // Deploy stage uses GCloud + } + } + container('gcloud') { + deployFolder(fusionUrl) + } + } +} + +def testFolder() { + unstash 'repo-contents' + ansiColor('xterm') { + sh "mvn clean test" + } +} + +// Build logic for a specific folder +def buildFolder() { + unstash 'repo-contents' + ansiColor('xterm') { + // Maven build commands + sh "mvn clean package -Dmaven.test.skip=true" + sh """ + set +x + echo "\${IPurple}========================================================" + echo "\${IPurple}=========== \${Green}Building \${On_Cyan}\${BRed}${ARTIFACT} plugins\${Color_Off} plugin..... \${IPurple}===========" + echo "\${IPurple}========================================================\${Color_Off}" + set -x + mvn org.apache.maven.plugins:maven-help-plugin:3.2.0:evaluate -Dexpression=project.version -q -DforceStdout > version.txt + set +x + echo "\${IPurple}=====================================================" + echo "\${IPurple}=========== \${On_Cyan}\${BRed}${ARTIFACT}\${Color_Off}\${Green} version : \$(cat version.txt) \${IPurple}===========" + echo "\${IPurple}=====================================================\${Color_Off}" + set -x + chown -R 1000:1000 target/ + chown 1000:1000 version.txt + ls -la + ls -la target + """ + + stash includes: '*/*', name: "builded" + } +} + +// Deploy logic for a specific folder +def deployFolder(String fusionUrl) { + unstash "builded" + confirmDeployment() + ansiColor('xterm') { + withCredentials([file(credentialsId: 'sa-key', variable: 'SA_KEY')]) { + // GCloud deploy commands + sh "gcloud auth activate-service-account --key-file=\$SA_KEY" + sh """ + apt update + apt install jq -y + export GOOGLE_APPLICATION_CREDENTIALS=\$SA_KEY + set +x + echo "\${Green}Deploying to env\${Color_Off}:" + echo "\${On_Cyan}\${BRed}${DEPLOY_TO}\${Color_Off}" + echo "\${Green}Data Fusion URL\${Color_Off}:" + echo "\${On_Cyan}\${BRed}\${fusionUrl}\${Color_Off}" + echo "\${Green}Plugin:\${Color_Off}" + echo "\${On_Cyan}\${BRed}${ARTIFACT} plugins\${Color_Off}" + set -x + + jq '.properties' target/${ARTIFACT}-\$(cat version.txt).json > target/properties.json + + set +x + echo "\${Green}Deploying plugin ${ARTIFACT} plugins....\${Color_Off}" + + curl -X POST \\ + -H "Authorization: Bearer \$(gcloud auth print-access-token)" \\ + -H "Artifact-Extends: system:cdap-data-pipeline[6.8.0-SNAPSHOT,7.0.0-SNAPSHOT)/system:cdap-data-streams[6.8.0-SNAPSHOT,7.0.0-SNAPSHOT)" \\ + -H "Artifact-Version: \$(cat version.txt)" \\ + --data-binary @target/${ARTIFACT}-\$(cat version.txt).jar \\ + \${fusionUrl}/api/v3/namespaces/\$DEPLOY_TO/artifacts/${ARTIFACT} + + echo "\${Green}Deployed plugin ${ARTIFACT} plugins\${Color_Off}" + #Deploy widget to datafusion + echo "\${Green}Deploying plugin properties for ${ARTIFACT} plugins....\${Color_Off}" + curl -X PUT \\ + -H "Authorization: Bearer \$(gcloud auth print-access-token)" \\ + --data-binary @target/properties.json \\ + \${fusionUrl}/api/v3/namespaces/\$DEPLOY_TO/artifacts/${ARTIFACT}/versions/\$(cat version.txt)/properties/ + echo "\${Green}Deployed plugin properties for ${ARTIFACT} plugins\${Color_Off}" + set -x + """ + } + } +} + + +// Confirm deployment with user input +def confirmDeployment() { + timeout(time: 15, unit: 'MINUTES') { + input message: "Do you want to approve the deployment of ${ARTIFACT}?", ok: 'Yes' + } +} From 2b685a3ed389d63da1a3972911c60fcefb4c371f Mon Sep 17 00:00:00 2001 From: o-turyk Date: Tue, 30 Jul 2024 12:34:22 +0300 Subject: [PATCH 10/18] fixed error with union field type --- .../java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/src/main/java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java b/src/main/java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java index 851540593d..5a264a1d84 100644 --- a/src/main/java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java +++ b/src/main/java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java @@ -36,6 +36,7 @@ import org.apache.avro.generic.GenericRecord; import org.apache.avro.io.BinaryEncoder; import org.apache.avro.io.DatumWriter; +import org.apache.avro.io.Encoder; import org.apache.avro.io.EncoderFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; @@ -209,7 +210,7 @@ private PubsubMessage getPubSubMessage(StructuredRecord value) throws IOExceptio final byte[] serializedBytes; DatumWriter datumWriter = new GenericDatumWriter<>(avroSchema); ByteArrayOutputStream out = new ByteArrayOutputStream(); - BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null); + Encoder encoder = EncoderFactory.get().jsonEncoder(avroSchema, out); datumWriter.write(transform, encoder); encoder.flush(); out.close(); From d53a1f1c9a1e6c3eb6e294f1eb08d4ad0f36f4a2 Mon Sep 17 00:00:00 2001 From: Valerii Fedorovych Date: Fri, 16 Aug 2024 16:42:07 +0300 Subject: [PATCH 11/18] added logging --- .../java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/main/java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java b/src/main/java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java index 5a264a1d84..86836bfc4a 100644 --- a/src/main/java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java +++ b/src/main/java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java @@ -179,6 +179,7 @@ public void write(NullWritable key, StructuredRecord value) throws IOException { ApiFutures.addCallback(future, new ApiFutureCallback() { @Override public void onFailure(Throwable throwable) { + LOG.error(throwable.getMessage() + ". Caused record: " + value.getSchema().toString()); error.set(throwable); failures.incrementAndGet(); futures.remove(future); @@ -246,6 +247,9 @@ private PubsubMessage getPubSubMessage(StructuredRecord value) throws IOExceptio break; } } + if (message != null) { + LOG.info("PubSub message is: " + message); + } return message; } From 2e536dc57d25bf0d38af8b236dc35b5ebcb89320 Mon Sep 17 00:00:00 2001 From: Valerii Fedorovych Date: Fri, 16 Aug 2024 21:48:30 +0300 Subject: [PATCH 12/18] added more loggings --- .../java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/main/java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java b/src/main/java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java index 86836bfc4a..097c8ead5d 100644 --- a/src/main/java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java +++ b/src/main/java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java @@ -168,10 +168,12 @@ public PubSubRecordWriter(Publisher publisher, String format, String delimiter, this.futures = ConcurrentHashMap.newKeySet(); this.format = format; this.delimiter = delimiter; + LOG.info("PubSubRecordWriter initialized"); } @Override public void write(NullWritable key, StructuredRecord value) throws IOException { + LOG.debug("write() method called"); handleErrorIfAny(); PubsubMessage message = getPubSubMessage(value); ApiFuture future = publisher.publish(message); @@ -193,6 +195,7 @@ public void onSuccess(String s) { } private PubsubMessage getPubSubMessage(StructuredRecord value) throws IOException { + LOG.info("Creating pubsub message for " + value.getSchema().toString()); String payload; ByteString data; PubsubMessage message = null; @@ -223,6 +226,7 @@ private PubsubMessage getPubSubMessage(StructuredRecord value) throws IOExceptio case PubSubConstants.TEXT: case PubSubConstants.BLOB: case PubSubConstants.JSON: { + LOG.info("For JSON called getMessage"); payload = StructuredRecordStringConverter.toJsonString(value); data = ByteString.copyFromUtf8(payload); message = PubsubMessage.newBuilder().setData(data).build(); From 241e9af391b7f513e9bec51d3c5fe14e262f1c33 Mon Sep 17 00:00:00 2001 From: Valerii Fedorovych Date: Fri, 30 Aug 2024 15:47:08 +0300 Subject: [PATCH 13/18] added log --- .../java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java b/src/main/java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java index 097c8ead5d..a12adf60e5 100644 --- a/src/main/java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java +++ b/src/main/java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java @@ -173,7 +173,7 @@ public PubSubRecordWriter(Publisher publisher, String format, String delimiter, @Override public void write(NullWritable key, StructuredRecord value) throws IOException { - LOG.debug("write() method called"); + LOG.info("write() method called"); handleErrorIfAny(); PubsubMessage message = getPubSubMessage(value); ApiFuture future = publisher.publish(message); From 2ad9806177ade3fcb09042b2c1dd60dcd0df7f59 Mon Sep 17 00:00:00 2001 From: Valerii Fedorovych Date: Fri, 16 Aug 2024 16:42:07 +0300 Subject: [PATCH 14/18] added logging added log added more loggings --- .../io/cdap/plugin/gcp/publisher/GooglePublisher.java | 5 +++++ .../io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java | 9 +++++++++ 2 files changed, 14 insertions(+) diff --git a/src/main/java/io/cdap/plugin/gcp/publisher/GooglePublisher.java b/src/main/java/io/cdap/plugin/gcp/publisher/GooglePublisher.java index b3f85090a5..7a61461f5a 100644 --- a/src/main/java/io/cdap/plugin/gcp/publisher/GooglePublisher.java +++ b/src/main/java/io/cdap/plugin/gcp/publisher/GooglePublisher.java @@ -47,6 +47,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Map; @@ -63,6 +65,8 @@ "asynchronous messaging that decouples senders and receivers, it allows for secure and highly available " + "communication between independently written applications") public class GooglePublisher extends BatchSink { + private static final Logger LOG = LoggerFactory.getLogger(GooglePublisher.class); + private final Config config; @SuppressWarnings("unused") @@ -79,6 +83,7 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) { @Override public void prepareRun(BatchSinkContext context) throws IOException { + LOG.info("GooglePublisher.prepareRun() method called"); FailureCollector collector = context.getFailureCollector(); config.validate(collector); diff --git a/src/main/java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java b/src/main/java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java index 5a264a1d84..6b720901a1 100644 --- a/src/main/java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java +++ b/src/main/java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java @@ -168,17 +168,21 @@ public PubSubRecordWriter(Publisher publisher, String format, String delimiter, this.futures = ConcurrentHashMap.newKeySet(); this.format = format; this.delimiter = delimiter; + LOG.info("PubSubRecordWriter initialized"); } @Override public void write(NullWritable key, StructuredRecord value) throws IOException { + LOG.info("write() method called"); handleErrorIfAny(); PubsubMessage message = getPubSubMessage(value); + LOG.info("PubsubMessage: " + message.toString()); ApiFuture future = publisher.publish(message); futures.add(future); ApiFutures.addCallback(future, new ApiFutureCallback() { @Override public void onFailure(Throwable throwable) { + LOG.error(throwable.getMessage() + ". Caused record: " + value.getSchema().toString()); error.set(throwable); failures.incrementAndGet(); futures.remove(future); @@ -192,6 +196,7 @@ public void onSuccess(String s) { } private PubsubMessage getPubSubMessage(StructuredRecord value) throws IOException { + LOG.info("Creating pubsub message for " + value.getSchema().toString()); String payload; ByteString data; PubsubMessage message = null; @@ -222,6 +227,7 @@ private PubsubMessage getPubSubMessage(StructuredRecord value) throws IOExceptio case PubSubConstants.TEXT: case PubSubConstants.BLOB: case PubSubConstants.JSON: { + LOG.info("For JSON called getMessage"); payload = StructuredRecordStringConverter.toJsonString(value); data = ByteString.copyFromUtf8(payload); message = PubsubMessage.newBuilder().setData(data).build(); @@ -246,6 +252,9 @@ private PubsubMessage getPubSubMessage(StructuredRecord value) throws IOExceptio break; } } + if (message != null) { + LOG.info("PubSub message is: " + message); + } return message; } From 03c665f977b7575ab5e8af47ddeb11f26aafab26 Mon Sep 17 00:00:00 2001 From: Valerii Fedorovych Date: Fri, 30 Aug 2024 16:18:11 +0300 Subject: [PATCH 15/18] deleted redudant logging --- .../java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java | 4 ---- 1 file changed, 4 deletions(-) diff --git a/src/main/java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java b/src/main/java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java index 6b720901a1..c01e61363c 100644 --- a/src/main/java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java +++ b/src/main/java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java @@ -227,7 +227,6 @@ private PubsubMessage getPubSubMessage(StructuredRecord value) throws IOExceptio case PubSubConstants.TEXT: case PubSubConstants.BLOB: case PubSubConstants.JSON: { - LOG.info("For JSON called getMessage"); payload = StructuredRecordStringConverter.toJsonString(value); data = ByteString.copyFromUtf8(payload); message = PubsubMessage.newBuilder().setData(data).build(); @@ -252,9 +251,6 @@ private PubsubMessage getPubSubMessage(StructuredRecord value) throws IOExceptio break; } } - if (message != null) { - LOG.info("PubSub message is: " + message); - } return message; } From a8115814a710214b9bede14589d08e43e954d1cb Mon Sep 17 00:00:00 2001 From: TarasSluka Date: Mon, 25 Nov 2024 12:58:17 +0200 Subject: [PATCH 16/18] add BQ integrity plugin --- pom.xml | 46 ++++ .../MdmIntegrityBigQueryTransformer.java | 192 +++++++++++++ ...MdmIntegrityBigQueryTransformerConfig.java | 137 ++++++++++ ...QueryMdmIntegrityValidation-transform.json | 254 ++++++++++++++++++ 4 files changed, 629 insertions(+) create mode 100644 src/main/java/io/cdap/plugin/gcp/bigquery/fctransform/MdmIntegrityBigQueryTransformer.java create mode 100644 src/main/java/io/cdap/plugin/gcp/bigquery/fctransform/MdmIntegrityBigQueryTransformerConfig.java create mode 100644 widgets/BigQueryMdmIntegrityValidation-transform.json diff --git a/pom.xml b/pom.xml index 83dc4603de..ee517ea593 100644 --- a/pom.xml +++ b/pom.xml @@ -55,6 +55,7 @@ https://issues.cask.co/browse/CDAP + 7 @@ -99,6 +101,12 @@ 3.1.1 0.23.1 ${project.basedir}/src/test/java/ + + + 0.0.2-SNAPSHOT + 0.0.2-SNAPSHOT + 1.11.3 + 3.17.0 @@ -782,6 +790,39 @@ 0.2.0 + + + ai.festcloud + cloud-metadata-provider + ${version.cloud-metadata-provider} + + + ai.festcloud + cloud-metadata-model + ${version.cloud-metadata-common} + + + org.apache.avro + avro + ${apache.avro.version} + + + org.apache.commons + commons-lang3 + ${version.commons-lang3} + + + com.h2database + h2 + 2.2.224 + + + ai.festcloud + cloud-data-plugins-common + 0.0.1-SNAPSHOT + + + @@ -867,6 +908,11 @@ org.apache.hadoop.hbase.mapreduce.*; org.apache.hadoop.hbase.security.token.*; com.google.cloud.spark.bigquery.*; + org.apache.commons.collections4.*; + ai.festcloud.model.*; + ai.festcloud.datafabric.plugins.common.integrity.*; + org.apache.avro.*; + org.h2.*; diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/fctransform/MdmIntegrityBigQueryTransformer.java b/src/main/java/io/cdap/plugin/gcp/bigquery/fctransform/MdmIntegrityBigQueryTransformer.java new file mode 100644 index 0000000000..ec288cd534 --- /dev/null +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/fctransform/MdmIntegrityBigQueryTransformer.java @@ -0,0 +1,192 @@ +package io.cdap.plugin.gcp.bigquery.fctransform; + +import ai.festcloud.datafabric.plugins.common.integrity.CDAPUtils; +import ai.festcloud.datafabric.plugins.common.integrity.IntegrityService; +import ai.festcloud.datafabric.plugins.common.integrity.IntegrityServiceBQ; +import ai.festcloud.datafabric.plugins.common.integrity.MetadataUtils; +import ai.festcloud.datafabric.plugins.common.integrity.mapping.MappingEntryConfig; +import ai.festcloud.datafabric.plugins.common.integrity.mapping.MappingObj; +import ai.festcloud.datafabric.plugins.common.integrity.mapping.MappingParsingService; +import ai.festcloud.metadata.model.TypeRecord; +import com.google.auth.Credentials; +import com.google.cloud.bigquery.BigQuery; +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.annotation.Plugin; +import io.cdap.cdap.api.data.format.StructuredRecord; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.etl.api.Emitter; +import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.cdap.etl.api.InvalidEntry; +import io.cdap.cdap.etl.api.PipelineConfigurer; +import io.cdap.cdap.etl.api.StageSubmitterContext; +import io.cdap.cdap.etl.api.Transform; +import io.cdap.cdap.etl.api.TransformContext; +import io.cdap.plugin.gcp.common.GCPUtils; +import java.io.IOException; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +@Plugin(type = Transform.PLUGIN_TYPE) +@Name("BigQueryMdmIntegrityValidation") +@Description("Verify whether the requested values are present in MDM and add new specified field.") +public class MdmIntegrityBigQueryTransformer extends Transform { + + private static final Logger LOG = LoggerFactory.getLogger(MdmIntegrityBigQueryTransformer.class); + + private static final String OPERATION = "operation"; + private static final String OPERATION_CREATE = "create"; + private static final String OPERATION_UPDATE = "update"; + + private final MdmIntegrityBigQueryTransformerConfig config; + private Schema outputSchema; + + private MappingObj mapping; + private Map entities; + private IntegrityService integrityService; + private boolean containsOperationField = false; + + public MdmIntegrityBigQueryTransformer(MdmIntegrityBigQueryTransformerConfig config) { + this.config = config; + } + + @Override + public void initialize(TransformContext context) throws Exception { + LOG.info("Initializing BigQuery integrity validation..."); + super.initialize(context); + + FailureCollector failureCollector = context.getFailureCollector(); + outputSchema = config.getSchema(failureCollector); + + String configServerUrl = context.getArguments() + .get(MetadataUtils.CONFIGSERVER_METADATA_SCHEMA_URL); + String metadataRootPath = context.getArguments().get(MetadataUtils.METADATA_ROOT_PATH); + entities = MetadataUtils.getTypeRecordByUrl(configServerUrl, + metadataRootPath); + config.validate(failureCollector, entities, context.getInputSchema()); + + MappingParsingService mappingParsingService + = new MappingParsingService(config.getMapping(), + config.getFullyQualifiedEntityName(), + failureCollector, + entities, + outputSchema); + Optional mappingOpt = mappingParsingService.getMapping(); + mapping = mappingOpt.orElse(null); + + Credentials credentials = config.getConnection().getCredentials(failureCollector); + BigQuery bigQuery = GCPUtils.getBigQuery(config.getConnection().getProject(), credentials); + + failureCollector.getOrThrowException(); + + integrityService = new IntegrityServiceBQ(bigQuery, entities, mapping); + containsOperationField = outputSchema.getFields() + .stream().anyMatch(field -> field.getName().equals(OPERATION)); + + LOG.info("BigQueryMdmIntegrityValidation initialized."); + } + + @Override + public void onRunFinish(boolean succeeded, StageSubmitterContext context) { + super.onRunFinish(succeeded, context); + + } + + @Override + public void destroy() { + super.destroy(); + try { + integrityService.close(); + } catch (IOException e) { + throw new RuntimeException(e); + } + } + + @Override + public void configurePipeline(PipelineConfigurer pipelineConfigurer) { + super.configurePipeline(pipelineConfigurer); + FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector(); + pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema(collector)); + } + + + @Override + public void transform(StructuredRecord input, Emitter emitter) + throws Exception { + try { + StructuredRecord structuredRecord = fillIds(input); + emitter.emit(structuredRecord); + } catch (Exception e) { + emitter.emitError(new InvalidEntry<>(MetadataUtils.ERROR_CODE, e.getMessage(), input)); + } + } + + private StructuredRecord fillIds(StructuredRecord input) { + + Map result = new HashMap<>(); + Map> mappingEntryConfigs = mapping.getMappingEntryConfigs(); + + mappingEntryConfigs.forEach((targetFieldName, mappingEntryConfig) -> { + + for (MappingEntryConfig entryConfig : mappingEntryConfig) { + List ids = integrityService.getIds(entryConfig, input); + if (ids.size() > 1) { + throw new RuntimeException( + "More than one id found for request: " + entryConfig.toString()); + } + if (ids.size() == 1) { + result.put(targetFieldName, ids.get(0)); + break; + } + } + }); + if (result.get(MetadataUtils.DEFAULT_TARGET_FIELD) == null && config.getFcidRequired()) { + throw new RuntimeException("ID is required but not provided."); + } + + if (containsOperationField) { + String operationType = result.get(MetadataUtils.DEFAULT_TARGET_FIELD) == null + ? OPERATION_CREATE + : OPERATION_UPDATE; + result.put(OPERATION, operationType); + } + + return setValuesToTargetFields(input, result); + } + + + private StructuredRecord setValuesToTargetFields(StructuredRecord input, + Map values) { + StructuredRecord.Builder builder = StructuredRecord.builder(outputSchema); + setFieldValues(input, values, builder, outputSchema); + return builder.build(); + } + + private void setFieldValues(StructuredRecord input, + Map values, + StructuredRecord.Builder builder, + Schema schema) { + for (Schema.Field field : schema.getFields()) { + String fieldName = field.getName(); + Object fieldValue = input.get(fieldName); + + if (CDAPUtils.isRecordType(field) && fieldValue != null) { + StructuredRecord nestedRecord = (StructuredRecord) fieldValue; + Schema nestedSchema = CDAPUtils.getNonNullableSchema(field.getSchema()); + + StructuredRecord.Builder nestedBuilder = StructuredRecord.builder(nestedSchema); + setFieldValues(nestedRecord, values, nestedBuilder, nestedSchema); + builder.set(fieldName, nestedBuilder.build()); + } else { + builder.set(fieldName, values.getOrDefault(fieldName, fieldValue)); + } + } + } + + +} diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/fctransform/MdmIntegrityBigQueryTransformerConfig.java b/src/main/java/io/cdap/plugin/gcp/bigquery/fctransform/MdmIntegrityBigQueryTransformerConfig.java new file mode 100644 index 0000000000..955e32b3c5 --- /dev/null +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/fctransform/MdmIntegrityBigQueryTransformerConfig.java @@ -0,0 +1,137 @@ +package io.cdap.plugin.gcp.bigquery.fctransform; + + +import ai.festcloud.datafabric.plugins.common.integrity.MetadataUtils; +import ai.festcloud.metadata.model.TypeField; +import ai.festcloud.metadata.model.TypeRecord; +import io.cdap.cdap.api.annotation.Description; +import io.cdap.cdap.api.annotation.Macro; +import io.cdap.cdap.api.annotation.Name; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.plugin.PluginConfig; +import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.plugin.common.ConfigUtil; +import io.cdap.plugin.gcp.bigquery.connector.BigQueryConnectorConfig; +import java.io.IOException; +import java.util.List; +import java.util.Map; +import javax.annotation.Nullable; + +public class MdmIntegrityBigQueryTransformerConfig extends PluginConfig { + + public static final String MAPPING = "mapping"; + public static final String FULLY_QUALIFIED_ENTITY_NAME = "fullyQualifiedEntityName"; + public static final String SCHEMA = "schema"; + + + @Name(ConfigUtil.NAME_CONNECTION) + @Macro + @Nullable + @Description("The existing connection to use.") + public BigQueryConnectorConfig connection; + + + @Name(ConfigUtil.NAME_USE_CONNECTION) + @Nullable + @Description("Whether to use an existing connection.") + public Boolean useConnection; + + @Name(MAPPING) + @Description("Properties to validate") + @Macro + private final String mapping; + + @Name(FULLY_QUALIFIED_ENTITY_NAME) + @Description("Metadata server url") + @Macro + private final String fullyQualifiedEntityName; + + @Name("fcidRequired") + @Description("Indicates whether FestCloudID is required. " + + "If true, records without a FestCloudID are sent to the error flow. " + + "If false, records without a FestCloudID are processed and not sent to the error flow.") + @Macro + private final Boolean fcidRequired; + + @Name(SCHEMA) + @Description("Schema of the output records.") + @Macro + private final String schema; + + + public MdmIntegrityBigQueryTransformerConfig(BigQueryConnectorConfig connection, + Boolean useConnection, String mapping, + String fullyQualifiedEntityName, + Boolean fcidRequired, String schema) { + this.connection = connection; + this.useConnection = useConnection; + this.mapping = mapping; + this.fullyQualifiedEntityName = fullyQualifiedEntityName; + this.fcidRequired = fcidRequired; + this.schema = schema; + } + + + public void validate(FailureCollector collector, Map entities, + Schema outputSchema) { + ConfigUtil.validateConnection(this, useConnection, connection, collector); + TypeRecord typeRecord = entities.get(fullyQualifiedEntityName); + List fields = typeRecord.getFields(); + + fields.stream().filter(MetadataUtils::integrityRequired).map(MetadataUtils::getFieldName) + .forEach(fieldName -> validateField(fieldName, outputSchema, collector)); + } + + private void validateField(String fieldName, Schema outputSchema, FailureCollector collector) { + if (outputSchema.getField(fieldName) == null) { + collector.addFailure(String.format("Can't find field %s in output record", fieldName), + String.format( + "Field %s mandatory for integrity validation according to metadata definition", + fieldName)); + } + } + + public Schema getSchema(FailureCollector collector) { + try { + return Schema.parseJson(schema); + } catch (IOException e) { + collector.addFailure(String.format("Failed to parse schema: %s", schema), null); + throw collector.getOrThrowException(); + } + } + + @Nullable + public BigQueryConnectorConfig getConnection() { + return connection; + } + + public void setConnection(@Nullable BigQueryConnectorConfig connection) { + this.connection = connection; + } + + @Nullable + public Boolean getUseConnection() { + return useConnection; + } + + public void setUseConnection(@Nullable Boolean useConnection) { + this.useConnection = useConnection; + } + + public String getFullyQualifiedEntityName() { + return fullyQualifiedEntityName; + } + + public Boolean getFcidRequired() { + return fcidRequired; + } + + public String getSchema() { + return schema; + } + + public String getMapping() { + return mapping; + } +} + diff --git a/widgets/BigQueryMdmIntegrityValidation-transform.json b/widgets/BigQueryMdmIntegrityValidation-transform.json new file mode 100644 index 0000000000..7d170f33a8 --- /dev/null +++ b/widgets/BigQueryMdmIntegrityValidation-transform.json @@ -0,0 +1,254 @@ +{ + "metadata": { + "spec-version": "1.5" + }, + "display-name": "MDM BigQuery integrity validation", + "configuration-groups": [ + { + "label": "Connection", + "properties": [ + { + "widget-type": "toggle", + "label": "Use connection", + "name": "useConnection", + "widget-attributes": { + "on": { + "value": "true", + "label": "YES" + }, + "off": { + "value": "false", + "label": "NO" + }, + "default": "false" + } + }, + { + "widget-type": "connection-select", + "label": "Connection", + "name": "connection", + "widget-attributes": { + "connectionType": "BigQuery" + } + }, + { + "widget-type": "textbox", + "label": "Project ID", + "name": "project", + "widget-attributes": { + "default": "auto-detect" + } + }, + { + "widget-type": "textbox", + "label": "Dataset Project ID", + "name": "datasetProject", + "widget-attributes": { + "placeholder": "Project the dataset belongs to, if different from the Project ID." + } + }, + { + "name": "serviceAccountType", + "label": "Service Account Type", + "widget-type": "radio-group", + "widget-attributes": { + "layout": "inline", + "default": "filePath", + "options": [ + { + "id": "filePath", + "label": "File Path" + }, + { + "id": "JSON", + "label": "JSON" + } + ] + } + }, + { + "widget-type": "textbox", + "label": "Service Account File Path", + "name": "serviceFilePath", + "widget-attributes": { + "default": "auto-detect" + } + }, + { + "widget-type": "textbox", + "label": "Service Account JSON", + "name": "serviceAccountJSON" + } + ] + }, + { + "label": "FC integrity configs", + "properties": [ + { + "name": "mapping", + "label": "Mapping", + "widget-type": "ds-multiplevalues", + "widget-attributes": { + "numValues": "3", + "placeholders": [ + "Mdm entity name", + "Source field", + "External field" + ] + } + }, + { + "widget-type": "textbox", + "label": "Fully Qualified Entity name", + "name": "fullyQualifiedEntityName" + }, + { + "name": "fcidRequired", + "label": "Is FestCloudID required", + "widget-type": "toggle", + "widget-attributes": { + "on": { + "value": "true", + "label": "YES" + }, + "off": { + "value": "false", + "label": "NO" + }, + "default": "true" + } + } + ] + }, + { + "label": "Views", + "properties": [ + { + "widget-type": "toggle", + "label": "Enable querying views", + "name": "enableQueryingViews", + "widget-attributes": { + "on": { + "value": "true", + "label": "YES" + }, + "off": { + "value": "false", + "label": "NO" + }, + "default": "false" + } + }, + { + "widget-type": "textbox", + "label": "Temporary Table Creation Project", + "name": "viewMaterializationProject" + }, + { + "widget-type": "textbox", + "label": "Temporary Table Creation Dataset", + "name": "viewMaterializationDataset" + } + ] + } + ], + "outputs": [ + { + "name": "schema", + "label": "schema", + "widget-type": "schema", + "widget-attributes": { + "schema-types": [ + "boolean", + "long", + "double", + "bytes", + "string", + "array" + ], + "schema-default-type": "string" + } + } + ], + "filters": [ + { + "name": "ViewsProperties", + "condition": { + "expression": "enableQueryingViews == true " + }, + "show": [ + { + "type": "property", + "name": "viewMaterializationProject" + }, + { + "type": "property", + "name": "viewMaterializationDataset" + } + ] + }, + { + "name": "ServiceAuthenticationTypeFilePath", + "condition": { + "expression": "useConnection == false && serviceAccountType == 'filePath'" + }, + "show": [ + { + "type": "property", + "name": "serviceFilePath" + } + ] + }, + { + "name": "ServiceAuthenticationTypeJSON", + "condition": { + "expression": "useConnection == false && serviceAccountType == 'JSON'" + }, + "show": [ + { + "type": "property", + "name": "serviceAccountJSON" + } + ] + }, + { + "name": "showConnectionProperties ", + "condition": { + "expression": "useConnection == false" + }, + "show": [ + { + "type": "property", + "name": "project" + }, + { + "type": "property", + "name": "datasetProject" + }, + { + "type": "property", + "name": "serviceAccountType" + } + ] + }, + { + "name": "showConnectionId", + "condition": { + "expression": "useConnection == true" + }, + "show": [ + { + "type": "property", + "name": "connection" + } + ] + } + ], + "jump-config": { + "datasets": [ + { + "ref-property-name": "referenceName" + } + ] + } +} From 485964d2d971cb1e5b666a44c013faf0576a81d4 Mon Sep 17 00:00:00 2001 From: TarasSluka Date: Mon, 25 Nov 2024 13:00:37 +0200 Subject: [PATCH 17/18] uncomment part of pom.xml --- pom.xml | 2 -- 1 file changed, 2 deletions(-) diff --git a/pom.xml b/pom.xml index ee517ea593..dac54092bb 100644 --- a/pom.xml +++ b/pom.xml @@ -55,7 +55,6 @@ https://issues.cask.co/browse/CDAP - 7 From f7777b52ee0f688e52dc696af7d872b988bc8574 Mon Sep 17 00:00:00 2001 From: oleksii blyzniuk Date: Thu, 28 Nov 2024 13:04:37 +0200 Subject: [PATCH 18/18] updated jenkinsfile --- Jenkinsfile | 232 +--------------------------------------------------- 1 file changed, 2 insertions(+), 230 deletions(-) diff --git a/Jenkinsfile b/Jenkinsfile index 281bea779b..f2ad05a777 100644 --- a/Jenkinsfile +++ b/Jenkinsfile @@ -1,230 +1,2 @@ -pipeline { - environment { - // Define environment variables - LOCATION = 'europe-west4' - PROJECT = 'gcp-lab-datafusion-poc' - CUSTOMER = 'fest' - ARTIFACT = 'google-cloud' - Green = '\033[0;32m' - IPurple = '\033[0;95m' - BRed='\033[1;31m' - On_Green='\033[42m' - On_Cyan='\033[46m' - Color_Off = '\033[0m' - } - agent any - - triggers { - GenericTrigger( - genericVariables: [ - [key: 'ref', value: '$.ref'] - ], - causeString: 'Triggered on $ref', - token: '', - tokenCredentialId: 'webhook-token', - printContributedVariables: true, - printPostContent: true, - regexpFilterText: '$ref', - regexpFilterExpression: 'refs/heads/fcmain' - ) - } - - stages { - stage('Setup parameters') { - steps { - script { - properties([ - parameters([ - string( - defaultValue: 'fcmain', - name: 'Branch', - trim: true - ), - choice( - choices: ['dev'], - name: 'DEPLOY_TO' - ) - ]) - ]) - } - } - } - stage('Prepare') { - agent { - kubernetes { - yaml kubernetesPodYaml('maven') - } - } - steps { - script { - env.fusionUrl = "gcp-dp-${DEPLOY_TO}-dtfu-instance-gcp-dataplatform-${DEPLOY_TO}-dot-euw4.datafusion.googleusercontent.com" - checkoutAndStash() - ansiColor('xterm') { - dir('fc-google-cloud') { - def buildAndDeploy = buildAndDeploy() - } - } - } - } - } - } -} - -// Kubernetes pod definition for Maven and GCloud -def kubernetesPodYaml(String containerName) { - return """ - apiVersion: v1 - kind: Pod - spec: - containers: - - name: maven - image: 'maven:3.8.1-jdk-8' - command: - - sleep - args: - - 99d - - name: gcloud - image: 'google/cloud-sdk:latest' - command: - - sleep - args: - - 99d - """ -} - - -// Checkout and stash the repository contents -def checkoutAndStash() { - withCredentials([string(credentialsId: 'github-token', variable: 'GIT_TOKEN')]) { - checkout scm: [ - $class: 'GitSCM', - branches: [[name: "*/${Branch}"]], - userRemoteConfigs: [[url: "https://$GIT_TOKEN@github.com/festcloud/fc-google-cloud.git"]] - ] - stash includes: '**/*', name: 'repo-contents' - } -} - -// Build and deploy for a specific folder -def buildAndDeploy() { - stage("Test ${ARTIFACT} plugins") { - agent { - kubernetes { - yaml kubernetesPodYaml('maven') // Build stage uses Maven - } - } - container('maven') { - testFolder() - } - } - stage("Build ${ARTIFACT} plugins") { - agent { - kubernetes { - yaml kubernetesPodYaml('maven') // Build stage uses Maven - } - } - container('maven') { - buildFolder() - } - } - stage("Deploy ${ARTIFACT} plugins") { - agent { - kubernetes { - yaml kubernetesPodYaml('gcloud') // Deploy stage uses GCloud - } - } - container('gcloud') { - deployFolder(fusionUrl) - } - } -} - -def testFolder() { - unstash 'repo-contents' - ansiColor('xterm') { - sh "mvn clean test" - } -} - -// Build logic for a specific folder -def buildFolder() { - unstash 'repo-contents' - ansiColor('xterm') { - // Maven build commands - sh "mvn clean package -Dmaven.test.skip=true" - sh """ - set +x - echo "\${IPurple}========================================================" - echo "\${IPurple}=========== \${Green}Building \${On_Cyan}\${BRed}${ARTIFACT} plugins\${Color_Off} plugin..... \${IPurple}===========" - echo "\${IPurple}========================================================\${Color_Off}" - set -x - mvn org.apache.maven.plugins:maven-help-plugin:3.2.0:evaluate -Dexpression=project.version -q -DforceStdout > version.txt - set +x - echo "\${IPurple}=====================================================" - echo "\${IPurple}=========== \${On_Cyan}\${BRed}${ARTIFACT}\${Color_Off}\${Green} version : \$(cat version.txt) \${IPurple}===========" - echo "\${IPurple}=====================================================\${Color_Off}" - set -x - chown -R 1000:1000 target/ - chown 1000:1000 version.txt - ls -la - ls -la target - """ - - stash includes: '*/*', name: "builded" - } -} - -// Deploy logic for a specific folder -def deployFolder(String fusionUrl) { - unstash "builded" - confirmDeployment() - ansiColor('xterm') { - withCredentials([file(credentialsId: 'sa-key', variable: 'SA_KEY')]) { - // GCloud deploy commands - sh "gcloud auth activate-service-account --key-file=\$SA_KEY" - sh """ - apt update - apt install jq -y - export GOOGLE_APPLICATION_CREDENTIALS=\$SA_KEY - set +x - echo "\${Green}Deploying to env\${Color_Off}:" - echo "\${On_Cyan}\${BRed}${DEPLOY_TO}\${Color_Off}" - echo "\${Green}Data Fusion URL\${Color_Off}:" - echo "\${On_Cyan}\${BRed}\${fusionUrl}\${Color_Off}" - echo "\${Green}Plugin:\${Color_Off}" - echo "\${On_Cyan}\${BRed}${ARTIFACT} plugins\${Color_Off}" - set -x - - jq '.properties' target/${ARTIFACT}-\$(cat version.txt).json > target/properties.json - - set +x - echo "\${Green}Deploying plugin ${ARTIFACT} plugins....\${Color_Off}" - - curl -X POST \\ - -H "Authorization: Bearer \$(gcloud auth print-access-token)" \\ - -H "Artifact-Extends: system:cdap-data-pipeline[6.8.0-SNAPSHOT,7.0.0-SNAPSHOT)/system:cdap-data-streams[6.8.0-SNAPSHOT,7.0.0-SNAPSHOT)" \\ - -H "Artifact-Version: \$(cat version.txt)" \\ - --data-binary @target/${ARTIFACT}-\$(cat version.txt).jar \\ - \${fusionUrl}/api/v3/namespaces/\$DEPLOY_TO/artifacts/${ARTIFACT} - - echo "\${Green}Deployed plugin ${ARTIFACT} plugins\${Color_Off}" - #Deploy widget to datafusion - echo "\${Green}Deploying plugin properties for ${ARTIFACT} plugins....\${Color_Off}" - curl -X PUT \\ - -H "Authorization: Bearer \$(gcloud auth print-access-token)" \\ - --data-binary @target/properties.json \\ - \${fusionUrl}/api/v3/namespaces/\$DEPLOY_TO/artifacts/${ARTIFACT}/versions/\$(cat version.txt)/properties/ - echo "\${Green}Deployed plugin properties for ${ARTIFACT} plugins\${Color_Off}" - set -x - """ - } - } -} - - -// Confirm deployment with user input -def confirmDeployment() { - timeout(time: 15, unit: 'MINUTES') { - input message: "Do you want to approve the deployment of ${ARTIFACT}?", ok: 'Yes' - } -} +@Library('shared-lib') _ +forkFusionPublicFlow(gitRepo: 'fc-google-cloud')