diff --git a/.github/workflows/e2e.yml b/.github/workflows/e2e.yml
index 2573b81a5e..ab0a93bae2 100644
--- a/.github/workflows/e2e.yml
+++ b/.github/workflows/e2e.yml
@@ -16,9 +16,9 @@ name: Build e2e tests
on:
push:
- branches: [ develop ]
+ branches: [ develop, release/* ]
pull_request:
- branches: [ develop]
+ branches: [ develop, release/* ]
types: [opened, synchronize, reopened, labeled]
workflow_dispatch:
@@ -61,6 +61,7 @@ jobs:
with:
repository: cdapio/cdap-e2e-tests
path: e2e
+ ref: release/6.9
- name: Cache
uses: actions/cache@v3
with:
diff --git a/Jenkinsfile b/Jenkinsfile
new file mode 100644
index 0000000000..f2ad05a777
--- /dev/null
+++ b/Jenkinsfile
@@ -0,0 +1,2 @@
+@Library('shared-lib') _
+forkFusionPublicFlow(gitRepo: 'fc-google-cloud')
diff --git a/pom.xml b/pom.xml
index 16f43df420..dac54092bb 100644
--- a/pom.xml
+++ b/pom.xml
@@ -20,7 +20,7 @@
io.cdap.plugin
google-cloud
- 0.22.0-SNAPSHOT
+ 0.22.0.0-SNAPSHOT
Google Cloud Plugins
jar
Plugins for Google Big Query
@@ -72,8 +72,8 @@
1.8.2
hadoop2-1.0.0
1.4
- 6.9.0-SNAPSHOT
- 2.11.0-SNAPSHOT
+ 6.9.0
+ 2.11.0
3.2.6
0.3.1
hadoop2-2.0.0
@@ -99,6 +99,12 @@
3.1.1
0.23.1
${project.basedir}/src/test/java/
+
+
+ 0.0.2-SNAPSHOT
+ 0.0.2-SNAPSHOT
+ 1.11.3
+ 3.17.0
@@ -782,6 +788,39 @@
0.2.0
+
+
+ ai.festcloud
+ cloud-metadata-provider
+ ${version.cloud-metadata-provider}
+
+
+ ai.festcloud
+ cloud-metadata-model
+ ${version.cloud-metadata-common}
+
+
+ org.apache.avro
+ avro
+ ${apache.avro.version}
+
+
+ org.apache.commons
+ commons-lang3
+ ${version.commons-lang3}
+
+
+ com.h2database
+ h2
+ 2.2.224
+
+
+ ai.festcloud
+ cloud-data-plugins-common
+ 0.0.1-SNAPSHOT
+
+
+
@@ -867,6 +906,11 @@
org.apache.hadoop.hbase.mapreduce.*;
org.apache.hadoop.hbase.security.token.*;
com.google.cloud.spark.bigquery.*;
+ org.apache.commons.collections4.*;
+ ai.festcloud.model.*;
+ ai.festcloud.datafabric.plugins.common.integrity.*;
+ org.apache.avro.*;
+ org.h2.*;
diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/fctransform/MdmIntegrityBigQueryTransformer.java b/src/main/java/io/cdap/plugin/gcp/bigquery/fctransform/MdmIntegrityBigQueryTransformer.java
new file mode 100644
index 0000000000..ec288cd534
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/gcp/bigquery/fctransform/MdmIntegrityBigQueryTransformer.java
@@ -0,0 +1,192 @@
+package io.cdap.plugin.gcp.bigquery.fctransform;
+
+import ai.festcloud.datafabric.plugins.common.integrity.CDAPUtils;
+import ai.festcloud.datafabric.plugins.common.integrity.IntegrityService;
+import ai.festcloud.datafabric.plugins.common.integrity.IntegrityServiceBQ;
+import ai.festcloud.datafabric.plugins.common.integrity.MetadataUtils;
+import ai.festcloud.datafabric.plugins.common.integrity.mapping.MappingEntryConfig;
+import ai.festcloud.datafabric.plugins.common.integrity.mapping.MappingObj;
+import ai.festcloud.datafabric.plugins.common.integrity.mapping.MappingParsingService;
+import ai.festcloud.metadata.model.TypeRecord;
+import com.google.auth.Credentials;
+import com.google.cloud.bigquery.BigQuery;
+import io.cdap.cdap.api.annotation.Description;
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.annotation.Plugin;
+import io.cdap.cdap.api.data.format.StructuredRecord;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.etl.api.Emitter;
+import io.cdap.cdap.etl.api.FailureCollector;
+import io.cdap.cdap.etl.api.InvalidEntry;
+import io.cdap.cdap.etl.api.PipelineConfigurer;
+import io.cdap.cdap.etl.api.StageSubmitterContext;
+import io.cdap.cdap.etl.api.Transform;
+import io.cdap.cdap.etl.api.TransformContext;
+import io.cdap.plugin.gcp.common.GCPUtils;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Plugin(type = Transform.PLUGIN_TYPE)
+@Name("BigQueryMdmIntegrityValidation")
+@Description("Verify whether the requested values are present in MDM and add new specified field.")
+public class MdmIntegrityBigQueryTransformer extends Transform {
+
+ private static final Logger LOG = LoggerFactory.getLogger(MdmIntegrityBigQueryTransformer.class);
+
+ private static final String OPERATION = "operation";
+ private static final String OPERATION_CREATE = "create";
+ private static final String OPERATION_UPDATE = "update";
+
+ private final MdmIntegrityBigQueryTransformerConfig config;
+ private Schema outputSchema;
+
+ private MappingObj mapping;
+ private Map entities;
+ private IntegrityService integrityService;
+ private boolean containsOperationField = false;
+
+ public MdmIntegrityBigQueryTransformer(MdmIntegrityBigQueryTransformerConfig config) {
+ this.config = config;
+ }
+
+ @Override
+ public void initialize(TransformContext context) throws Exception {
+ LOG.info("Initializing BigQuery integrity validation...");
+ super.initialize(context);
+
+ FailureCollector failureCollector = context.getFailureCollector();
+ outputSchema = config.getSchema(failureCollector);
+
+ String configServerUrl = context.getArguments()
+ .get(MetadataUtils.CONFIGSERVER_METADATA_SCHEMA_URL);
+ String metadataRootPath = context.getArguments().get(MetadataUtils.METADATA_ROOT_PATH);
+ entities = MetadataUtils.getTypeRecordByUrl(configServerUrl,
+ metadataRootPath);
+ config.validate(failureCollector, entities, context.getInputSchema());
+
+ MappingParsingService mappingParsingService
+ = new MappingParsingService(config.getMapping(),
+ config.getFullyQualifiedEntityName(),
+ failureCollector,
+ entities,
+ outputSchema);
+ Optional mappingOpt = mappingParsingService.getMapping();
+ mapping = mappingOpt.orElse(null);
+
+ Credentials credentials = config.getConnection().getCredentials(failureCollector);
+ BigQuery bigQuery = GCPUtils.getBigQuery(config.getConnection().getProject(), credentials);
+
+ failureCollector.getOrThrowException();
+
+ integrityService = new IntegrityServiceBQ(bigQuery, entities, mapping);
+ containsOperationField = outputSchema.getFields()
+ .stream().anyMatch(field -> field.getName().equals(OPERATION));
+
+ LOG.info("BigQueryMdmIntegrityValidation initialized.");
+ }
+
+ @Override
+ public void onRunFinish(boolean succeeded, StageSubmitterContext context) {
+ super.onRunFinish(succeeded, context);
+
+ }
+
+ @Override
+ public void destroy() {
+ super.destroy();
+ try {
+ integrityService.close();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
+ super.configurePipeline(pipelineConfigurer);
+ FailureCollector collector = pipelineConfigurer.getStageConfigurer().getFailureCollector();
+ pipelineConfigurer.getStageConfigurer().setOutputSchema(config.getSchema(collector));
+ }
+
+
+ @Override
+ public void transform(StructuredRecord input, Emitter emitter)
+ throws Exception {
+ try {
+ StructuredRecord structuredRecord = fillIds(input);
+ emitter.emit(structuredRecord);
+ } catch (Exception e) {
+ emitter.emitError(new InvalidEntry<>(MetadataUtils.ERROR_CODE, e.getMessage(), input));
+ }
+ }
+
+ private StructuredRecord fillIds(StructuredRecord input) {
+
+ Map result = new HashMap<>();
+ Map> mappingEntryConfigs = mapping.getMappingEntryConfigs();
+
+ mappingEntryConfigs.forEach((targetFieldName, mappingEntryConfig) -> {
+
+ for (MappingEntryConfig entryConfig : mappingEntryConfig) {
+ List ids = integrityService.getIds(entryConfig, input);
+ if (ids.size() > 1) {
+ throw new RuntimeException(
+ "More than one id found for request: " + entryConfig.toString());
+ }
+ if (ids.size() == 1) {
+ result.put(targetFieldName, ids.get(0));
+ break;
+ }
+ }
+ });
+ if (result.get(MetadataUtils.DEFAULT_TARGET_FIELD) == null && config.getFcidRequired()) {
+ throw new RuntimeException("ID is required but not provided.");
+ }
+
+ if (containsOperationField) {
+ String operationType = result.get(MetadataUtils.DEFAULT_TARGET_FIELD) == null
+ ? OPERATION_CREATE
+ : OPERATION_UPDATE;
+ result.put(OPERATION, operationType);
+ }
+
+ return setValuesToTargetFields(input, result);
+ }
+
+
+ private StructuredRecord setValuesToTargetFields(StructuredRecord input,
+ Map values) {
+ StructuredRecord.Builder builder = StructuredRecord.builder(outputSchema);
+ setFieldValues(input, values, builder, outputSchema);
+ return builder.build();
+ }
+
+ private void setFieldValues(StructuredRecord input,
+ Map values,
+ StructuredRecord.Builder builder,
+ Schema schema) {
+ for (Schema.Field field : schema.getFields()) {
+ String fieldName = field.getName();
+ Object fieldValue = input.get(fieldName);
+
+ if (CDAPUtils.isRecordType(field) && fieldValue != null) {
+ StructuredRecord nestedRecord = (StructuredRecord) fieldValue;
+ Schema nestedSchema = CDAPUtils.getNonNullableSchema(field.getSchema());
+
+ StructuredRecord.Builder nestedBuilder = StructuredRecord.builder(nestedSchema);
+ setFieldValues(nestedRecord, values, nestedBuilder, nestedSchema);
+ builder.set(fieldName, nestedBuilder.build());
+ } else {
+ builder.set(fieldName, values.getOrDefault(fieldName, fieldValue));
+ }
+ }
+ }
+
+
+}
diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/fctransform/MdmIntegrityBigQueryTransformerConfig.java b/src/main/java/io/cdap/plugin/gcp/bigquery/fctransform/MdmIntegrityBigQueryTransformerConfig.java
new file mode 100644
index 0000000000..955e32b3c5
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/gcp/bigquery/fctransform/MdmIntegrityBigQueryTransformerConfig.java
@@ -0,0 +1,137 @@
+package io.cdap.plugin.gcp.bigquery.fctransform;
+
+
+import ai.festcloud.datafabric.plugins.common.integrity.MetadataUtils;
+import ai.festcloud.metadata.model.TypeField;
+import ai.festcloud.metadata.model.TypeRecord;
+import io.cdap.cdap.api.annotation.Description;
+import io.cdap.cdap.api.annotation.Macro;
+import io.cdap.cdap.api.annotation.Name;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.api.plugin.PluginConfig;
+import io.cdap.cdap.etl.api.FailureCollector;
+import io.cdap.plugin.common.ConfigUtil;
+import io.cdap.plugin.gcp.bigquery.connector.BigQueryConnectorConfig;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import javax.annotation.Nullable;
+
+public class MdmIntegrityBigQueryTransformerConfig extends PluginConfig {
+
+ public static final String MAPPING = "mapping";
+ public static final String FULLY_QUALIFIED_ENTITY_NAME = "fullyQualifiedEntityName";
+ public static final String SCHEMA = "schema";
+
+
+ @Name(ConfigUtil.NAME_CONNECTION)
+ @Macro
+ @Nullable
+ @Description("The existing connection to use.")
+ public BigQueryConnectorConfig connection;
+
+
+ @Name(ConfigUtil.NAME_USE_CONNECTION)
+ @Nullable
+ @Description("Whether to use an existing connection.")
+ public Boolean useConnection;
+
+ @Name(MAPPING)
+ @Description("Properties to validate")
+ @Macro
+ private final String mapping;
+
+ @Name(FULLY_QUALIFIED_ENTITY_NAME)
+ @Description("Metadata server url")
+ @Macro
+ private final String fullyQualifiedEntityName;
+
+ @Name("fcidRequired")
+ @Description("Indicates whether FestCloudID is required. "
+ + "If true, records without a FestCloudID are sent to the error flow. "
+ + "If false, records without a FestCloudID are processed and not sent to the error flow.")
+ @Macro
+ private final Boolean fcidRequired;
+
+ @Name(SCHEMA)
+ @Description("Schema of the output records.")
+ @Macro
+ private final String schema;
+
+
+ public MdmIntegrityBigQueryTransformerConfig(BigQueryConnectorConfig connection,
+ Boolean useConnection, String mapping,
+ String fullyQualifiedEntityName,
+ Boolean fcidRequired, String schema) {
+ this.connection = connection;
+ this.useConnection = useConnection;
+ this.mapping = mapping;
+ this.fullyQualifiedEntityName = fullyQualifiedEntityName;
+ this.fcidRequired = fcidRequired;
+ this.schema = schema;
+ }
+
+
+ public void validate(FailureCollector collector, Map entities,
+ Schema outputSchema) {
+ ConfigUtil.validateConnection(this, useConnection, connection, collector);
+ TypeRecord typeRecord = entities.get(fullyQualifiedEntityName);
+ List fields = typeRecord.getFields();
+
+ fields.stream().filter(MetadataUtils::integrityRequired).map(MetadataUtils::getFieldName)
+ .forEach(fieldName -> validateField(fieldName, outputSchema, collector));
+ }
+
+ private void validateField(String fieldName, Schema outputSchema, FailureCollector collector) {
+ if (outputSchema.getField(fieldName) == null) {
+ collector.addFailure(String.format("Can't find field %s in output record", fieldName),
+ String.format(
+ "Field %s mandatory for integrity validation according to metadata definition",
+ fieldName));
+ }
+ }
+
+ public Schema getSchema(FailureCollector collector) {
+ try {
+ return Schema.parseJson(schema);
+ } catch (IOException e) {
+ collector.addFailure(String.format("Failed to parse schema: %s", schema), null);
+ throw collector.getOrThrowException();
+ }
+ }
+
+ @Nullable
+ public BigQueryConnectorConfig getConnection() {
+ return connection;
+ }
+
+ public void setConnection(@Nullable BigQueryConnectorConfig connection) {
+ this.connection = connection;
+ }
+
+ @Nullable
+ public Boolean getUseConnection() {
+ return useConnection;
+ }
+
+ public void setUseConnection(@Nullable Boolean useConnection) {
+ this.useConnection = useConnection;
+ }
+
+ public String getFullyQualifiedEntityName() {
+ return fullyQualifiedEntityName;
+ }
+
+ public Boolean getFcidRequired() {
+ return fcidRequired;
+ }
+
+ public String getSchema() {
+ return schema;
+ }
+
+ public String getMapping() {
+ return mapping;
+ }
+}
+
diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java
index 3f6eea5017..169a78e54b 100644
--- a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java
+++ b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java
@@ -532,11 +532,6 @@ public static ValidationFailure validateArraySchema(Schema arraySchema, String n
Schema nonNullableSchema = getNonNullableSchema(arraySchema);
Schema componentSchema = nonNullableSchema.getComponentSchema();
- if (componentSchema.isNullable()) {
- return collector.addFailure(String.format("Field '%s' contains null values in its array.", name),
- "Change the array component type to be non-nullable.");
- }
-
if (UNSUPPORTED_ARRAY_TYPES.contains(componentSchema.getType())) {
return collector.addFailure(String.format("Field '%s' is an array of unsupported type '%s'.",
name, componentSchema.getDisplayName()),
diff --git a/src/main/java/io/cdap/plugin/gcp/publisher/GooglePublisher.java b/src/main/java/io/cdap/plugin/gcp/publisher/GooglePublisher.java
index b3f85090a5..7a61461f5a 100644
--- a/src/main/java/io/cdap/plugin/gcp/publisher/GooglePublisher.java
+++ b/src/main/java/io/cdap/plugin/gcp/publisher/GooglePublisher.java
@@ -47,6 +47,8 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Map;
@@ -63,6 +65,8 @@
"asynchronous messaging that decouples senders and receivers, it allows for secure and highly available " +
"communication between independently written applications")
public class GooglePublisher extends BatchSink {
+ private static final Logger LOG = LoggerFactory.getLogger(GooglePublisher.class);
+
private final Config config;
@SuppressWarnings("unused")
@@ -79,6 +83,7 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) {
@Override
public void prepareRun(BatchSinkContext context) throws IOException {
+ LOG.info("GooglePublisher.prepareRun() method called");
FailureCollector collector = context.getFailureCollector();
config.validate(collector);
diff --git a/src/main/java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java b/src/main/java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java
index 851540593d..c01e61363c 100644
--- a/src/main/java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java
+++ b/src/main/java/io/cdap/plugin/gcp/publisher/PubSubOutputFormat.java
@@ -36,6 +36,7 @@
import org.apache.avro.generic.GenericRecord;
import org.apache.avro.io.BinaryEncoder;
import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.NullWritable;
@@ -167,17 +168,21 @@ public PubSubRecordWriter(Publisher publisher, String format, String delimiter,
this.futures = ConcurrentHashMap.newKeySet();
this.format = format;
this.delimiter = delimiter;
+ LOG.info("PubSubRecordWriter initialized");
}
@Override
public void write(NullWritable key, StructuredRecord value) throws IOException {
+ LOG.info("write() method called");
handleErrorIfAny();
PubsubMessage message = getPubSubMessage(value);
+ LOG.info("PubsubMessage: " + message.toString());
ApiFuture future = publisher.publish(message);
futures.add(future);
ApiFutures.addCallback(future, new ApiFutureCallback() {
@Override
public void onFailure(Throwable throwable) {
+ LOG.error(throwable.getMessage() + ". Caused record: " + value.getSchema().toString());
error.set(throwable);
failures.incrementAndGet();
futures.remove(future);
@@ -191,6 +196,7 @@ public void onSuccess(String s) {
}
private PubsubMessage getPubSubMessage(StructuredRecord value) throws IOException {
+ LOG.info("Creating pubsub message for " + value.getSchema().toString());
String payload;
ByteString data;
PubsubMessage message = null;
@@ -209,7 +215,7 @@ private PubsubMessage getPubSubMessage(StructuredRecord value) throws IOExceptio
final byte[] serializedBytes;
DatumWriter datumWriter = new GenericDatumWriter<>(avroSchema);
ByteArrayOutputStream out = new ByteArrayOutputStream();
- BinaryEncoder encoder = EncoderFactory.get().binaryEncoder(out, null);
+ Encoder encoder = EncoderFactory.get().jsonEncoder(avroSchema, out);
datumWriter.write(transform, encoder);
encoder.flush();
out.close();
diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/connector/BigQueryConnectorTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/connector/BigQueryConnectorTest.java
index 1b6013e033..ae4febda6b 100644
--- a/src/test/java/io/cdap/plugin/gcp/bigquery/connector/BigQueryConnectorTest.java
+++ b/src/test/java/io/cdap/plugin/gcp/bigquery/connector/BigQueryConnectorTest.java
@@ -50,6 +50,7 @@
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
@@ -68,6 +69,7 @@
*
* The service account used to run this test needs BigQuery admin permissions in the project.
*/
+@Ignore("Currently we support unit test only and ignore 'integration' since it is tested by CDAP already.")
public class BigQueryConnectorTest {
private static final Set SUPPORTED_TYPES = new HashSet<>(Arrays.asList("table", "view"));
private static TestEnvironment testEnvironment;
diff --git a/src/test/java/io/cdap/plugin/gcp/gcs/connector/GCSConnectorTest.java b/src/test/java/io/cdap/plugin/gcp/gcs/connector/GCSConnectorTest.java
index 93288fe808..2050cff2c8 100644
--- a/src/test/java/io/cdap/plugin/gcp/gcs/connector/GCSConnectorTest.java
+++ b/src/test/java/io/cdap/plugin/gcp/gcs/connector/GCSConnectorTest.java
@@ -38,6 +38,7 @@
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
import java.io.ByteArrayInputStream;
@@ -59,6 +60,7 @@
*
* In order to run this test, the service account must have permission to create buckets and objects.
*/
+@Ignore("Currently we support unit test only and ignore 'integration' since it is tested by CDAP already.")
public class GCSConnectorTest {
private static TestEnvironment testEnvironment;
private static String bucket;
@@ -76,7 +78,8 @@ public static void setupTestClass() throws Exception {
@Before
public void setUp() {
- storage.create(BucketInfo.newBuilder(bucket).build());
+ storage.create(BucketInfo.newBuilder(bucket)
+ .setLocation("eu").build());
}
@After
diff --git a/src/test/java/io/cdap/plugin/gcp/spanner/connector/SpannerConnectorTest.java b/src/test/java/io/cdap/plugin/gcp/spanner/connector/SpannerConnectorTest.java
index b277a69a34..f543212fd5 100644
--- a/src/test/java/io/cdap/plugin/gcp/spanner/connector/SpannerConnectorTest.java
+++ b/src/test/java/io/cdap/plugin/gcp/spanner/connector/SpannerConnectorTest.java
@@ -47,6 +47,7 @@
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
+import org.junit.Ignore;
import org.junit.Test;
import java.io.IOException;
@@ -65,6 +66,7 @@
*
* The service account used to run this test needs Spanner admin permissions in the project.
*/
+@Ignore("Currently we support unit test only and ignore 'integration' since it is tested by CDAP already.")
public class SpannerConnectorTest {
private static String project;
private static String instance;
diff --git a/widgets/BigQueryMdmIntegrityValidation-transform.json b/widgets/BigQueryMdmIntegrityValidation-transform.json
new file mode 100644
index 0000000000..7d170f33a8
--- /dev/null
+++ b/widgets/BigQueryMdmIntegrityValidation-transform.json
@@ -0,0 +1,254 @@
+{
+ "metadata": {
+ "spec-version": "1.5"
+ },
+ "display-name": "MDM BigQuery integrity validation",
+ "configuration-groups": [
+ {
+ "label": "Connection",
+ "properties": [
+ {
+ "widget-type": "toggle",
+ "label": "Use connection",
+ "name": "useConnection",
+ "widget-attributes": {
+ "on": {
+ "value": "true",
+ "label": "YES"
+ },
+ "off": {
+ "value": "false",
+ "label": "NO"
+ },
+ "default": "false"
+ }
+ },
+ {
+ "widget-type": "connection-select",
+ "label": "Connection",
+ "name": "connection",
+ "widget-attributes": {
+ "connectionType": "BigQuery"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Project ID",
+ "name": "project",
+ "widget-attributes": {
+ "default": "auto-detect"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Dataset Project ID",
+ "name": "datasetProject",
+ "widget-attributes": {
+ "placeholder": "Project the dataset belongs to, if different from the Project ID."
+ }
+ },
+ {
+ "name": "serviceAccountType",
+ "label": "Service Account Type",
+ "widget-type": "radio-group",
+ "widget-attributes": {
+ "layout": "inline",
+ "default": "filePath",
+ "options": [
+ {
+ "id": "filePath",
+ "label": "File Path"
+ },
+ {
+ "id": "JSON",
+ "label": "JSON"
+ }
+ ]
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Service Account File Path",
+ "name": "serviceFilePath",
+ "widget-attributes": {
+ "default": "auto-detect"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Service Account JSON",
+ "name": "serviceAccountJSON"
+ }
+ ]
+ },
+ {
+ "label": "FC integrity configs",
+ "properties": [
+ {
+ "name": "mapping",
+ "label": "Mapping",
+ "widget-type": "ds-multiplevalues",
+ "widget-attributes": {
+ "numValues": "3",
+ "placeholders": [
+ "Mdm entity name",
+ "Source field",
+ "External field"
+ ]
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Fully Qualified Entity name",
+ "name": "fullyQualifiedEntityName"
+ },
+ {
+ "name": "fcidRequired",
+ "label": "Is FestCloudID required",
+ "widget-type": "toggle",
+ "widget-attributes": {
+ "on": {
+ "value": "true",
+ "label": "YES"
+ },
+ "off": {
+ "value": "false",
+ "label": "NO"
+ },
+ "default": "true"
+ }
+ }
+ ]
+ },
+ {
+ "label": "Views",
+ "properties": [
+ {
+ "widget-type": "toggle",
+ "label": "Enable querying views",
+ "name": "enableQueryingViews",
+ "widget-attributes": {
+ "on": {
+ "value": "true",
+ "label": "YES"
+ },
+ "off": {
+ "value": "false",
+ "label": "NO"
+ },
+ "default": "false"
+ }
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Temporary Table Creation Project",
+ "name": "viewMaterializationProject"
+ },
+ {
+ "widget-type": "textbox",
+ "label": "Temporary Table Creation Dataset",
+ "name": "viewMaterializationDataset"
+ }
+ ]
+ }
+ ],
+ "outputs": [
+ {
+ "name": "schema",
+ "label": "schema",
+ "widget-type": "schema",
+ "widget-attributes": {
+ "schema-types": [
+ "boolean",
+ "long",
+ "double",
+ "bytes",
+ "string",
+ "array"
+ ],
+ "schema-default-type": "string"
+ }
+ }
+ ],
+ "filters": [
+ {
+ "name": "ViewsProperties",
+ "condition": {
+ "expression": "enableQueryingViews == true "
+ },
+ "show": [
+ {
+ "type": "property",
+ "name": "viewMaterializationProject"
+ },
+ {
+ "type": "property",
+ "name": "viewMaterializationDataset"
+ }
+ ]
+ },
+ {
+ "name": "ServiceAuthenticationTypeFilePath",
+ "condition": {
+ "expression": "useConnection == false && serviceAccountType == 'filePath'"
+ },
+ "show": [
+ {
+ "type": "property",
+ "name": "serviceFilePath"
+ }
+ ]
+ },
+ {
+ "name": "ServiceAuthenticationTypeJSON",
+ "condition": {
+ "expression": "useConnection == false && serviceAccountType == 'JSON'"
+ },
+ "show": [
+ {
+ "type": "property",
+ "name": "serviceAccountJSON"
+ }
+ ]
+ },
+ {
+ "name": "showConnectionProperties ",
+ "condition": {
+ "expression": "useConnection == false"
+ },
+ "show": [
+ {
+ "type": "property",
+ "name": "project"
+ },
+ {
+ "type": "property",
+ "name": "datasetProject"
+ },
+ {
+ "type": "property",
+ "name": "serviceAccountType"
+ }
+ ]
+ },
+ {
+ "name": "showConnectionId",
+ "condition": {
+ "expression": "useConnection == true"
+ },
+ "show": [
+ {
+ "type": "property",
+ "name": "connection"
+ }
+ ]
+ }
+ ],
+ "jump-config": {
+ "datasets": [
+ {
+ "ref-property-name": "referenceName"
+ }
+ ]
+ }
+}