From cf46a5e4a9baab0c733d4d6d761d73a7f9472b52 Mon Sep 17 00:00:00 2001 From: Fernando Velasquez Date: Wed, 23 Feb 2022 22:44:46 -0500 Subject: [PATCH] Added logic to control which stages are forced and skipped from SQL Engine execution --- .../bigquery/sqlengine/BigQuerySQLEngine.java | 24 +++++++-- .../sqlengine/BigQuerySQLEngineConfig.java | 40 +++++++++++++++ .../util/BigQuerySQLEngineUtils.java | 10 ++++ .../BigQuerySQLEngineConfigTest.java | 51 +++++++++++++++++++ widgets/BigQueryPushdownEngine-sqlengine.json | 18 +++++++ 5 files changed, 140 insertions(+), 3 deletions(-) create mode 100644 src/test/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngineConfigTest.java diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java index e1ed4c0f90..5db2550d27 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java @@ -416,7 +416,6 @@ public void cleanup(String datasetName) throws SQLEngineException { } /** - * * @return capabilities provided by {@link BigQuerySQLEngine}, which include {@link StringExpressionFactoryType}.SQL. */ @Override @@ -425,7 +424,6 @@ public Set getCapabilities() { } /** - * * @return the single expression factory provided by {@link BigQuerySQLEngine}, which is {@link SQLExpressionFactory}. */ @Override @@ -439,7 +437,7 @@ public Relation getRelation(SQLRelationDefinition relationDefinition) { Set columnSet = new LinkedHashSet<>(); List fields = relationDefinition.getSchema().getFields(); if (fields != null) { - for (Schema.Field field: fields) { + for (Schema.Field field : fields) { columnSet.add(field.getName()); } } @@ -458,6 +456,26 @@ public boolean supportsRelationalTranform() { return true; } + @Override + public boolean supportsInputSchema(Schema schema) { + return BigQuerySQLEngineUtils.isSupportedSchema(schema); + } + + @Override + public boolean supportsOutputSchema(Schema schema) { + return BigQuerySQLEngineUtils.isSupportedSchema(schema); + } + + @Override + public Set getIncludedStageNames() { + return sqlEngineConfig.getIncludedStages(); + } + + @Override + public Set getExcludedStageNames() { + return sqlEngineConfig.getExcludedStages(); + } + @Override public boolean canTransform(SQLTransformDefinition transformDefinition) { Relation relation = transformDefinition.getOutputRelation(); diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngineConfig.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngineConfig.java index b52cee05cc..b793732183 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngineConfig.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngineConfig.java @@ -18,6 +18,8 @@ import com.google.cloud.bigquery.QueryJobConfiguration; import com.google.cloud.kms.v1.CryptoKeyName; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Macro; import io.cdap.cdap.api.annotation.Name; @@ -31,6 +33,9 @@ import java.util.Collections; import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; +import java.util.stream.Stream; import javax.annotation.Nullable; /** @@ -42,6 +47,8 @@ public class BigQuerySQLEngineConfig extends BigQueryBaseConfig { public static final String NAME_RETAIN_TABLES = "retainTables"; public static final String NAME_TEMP_TABLE_TTL_HOURS = "tempTableTTLHours"; public static final String NAME_JOB_PRIORITY = "jobPriority"; + public static final String NAME_INCLUDED_STAGES = "includedStages"; + public static final String NAME_EXCLUDED_STAGES = "excludedStages"; public static final String NAME_USE_STORAGE_READ_API = "useStorageReadAPI"; public static final String NAME_DIRECT_SINK_WRITE = "useDirectSinkWrite"; @@ -49,6 +56,7 @@ public class BigQuerySQLEngineConfig extends BigQueryBaseConfig { public static final String PRIORITY_BATCH = "batch"; public static final String PRIORITY_INTERACTIVE = "interactive"; private static final String SCHEME = "gs://"; + private static final String STAGE_SPLIT = "\u0001"; @Name(NAME_LOCATION) @Macro @@ -98,6 +106,20 @@ public class BigQuerySQLEngineConfig extends BigQueryBaseConfig { "succeed, the standard sink workflow will continue to execute.") private Boolean useDirectSinkWrite; + @Name(NAME_INCLUDED_STAGES) + @Macro + @Nullable + @Description("Stages that should always be pushed down to the BigQuery ELT Transformation Pushdown engine, " + + "if supported by the engine. Each stage name should be in a separate line.") + protected String includedStages; + + @Name(NAME_EXCLUDED_STAGES) + @Macro + @Nullable + @Description("Stages that should never be pushed down to the BigQuery ELT Transformation Pushdown engine, " + + "even when supported. Each stage name should be in a separate line.") + protected String excludedStages; + private BigQuerySQLEngineConfig(@Nullable BigQueryConnectorConfig connection, @Nullable String dataset, @Nullable String location, @@ -121,6 +143,24 @@ public Integer getTempTableTTLHours() { return tempTableTTLHours != null && tempTableTTLHours > 0 ? tempTableTTLHours : 72; } + public Set getIncludedStages() { + return splitStages(includedStages); + } + + public Set getExcludedStages() { + return splitStages(excludedStages); + } + + @VisibleForTesting + protected static Set splitStages(String stages) { + if (Strings.isNullOrEmpty(stages)) { + return Collections.emptySet(); + } + return Stream.of(stages.split(STAGE_SPLIT)) + .filter(s -> !Strings.isNullOrEmpty(s)) + .collect(Collectors.toSet()); + } + public Boolean shouldUseStorageReadAPI() { return useStorageReadAPI != null ? useStorageReadAPI : false; } diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/util/BigQuerySQLEngineUtils.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/util/BigQuerySQLEngineUtils.java index a05441b410..e97d005f12 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/util/BigQuerySQLEngineUtils.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/util/BigQuerySQLEngineUtils.java @@ -245,6 +245,16 @@ public static void validateJoinOnKeyStages(JoinDefinition joinDefinition, List diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngineConfigTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngineConfigTest.java new file mode 100644 index 0000000000..3e14cbf190 --- /dev/null +++ b/src/test/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngineConfigTest.java @@ -0,0 +1,51 @@ +/* + * Copyright © 2020 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.gcp.bigquery.sqlengine; + +import org.junit.Assert; +import org.junit.Test; + +import java.util.HashSet; +import java.util.Set; + +/** + * Test for {@link BigQuerySQLEngineConfig} class + */ +public class BigQuerySQLEngineConfigTest { + + @Test + public void testSplitStages() { + Set stages = new HashSet<>(); + + Assert.assertEquals(stages, BigQuerySQLEngineConfig.splitStages(null)); + Assert.assertEquals(stages, BigQuerySQLEngineConfig.splitStages("")); + + stages.add(" "); + Assert.assertEquals(stages, BigQuerySQLEngineConfig.splitStages(" ")); + + stages.add("a"); + Assert.assertEquals(stages, BigQuerySQLEngineConfig.splitStages(" \u0001a")); + + stages.add("this is some stage"); + Assert.assertEquals(stages, BigQuerySQLEngineConfig.splitStages(" \u0001a\u0001this is some stage")); + + stages.add(" this is another "); + Assert.assertEquals(stages, + BigQuerySQLEngineConfig.splitStages( + " \u0001a\u0001this is some stage\u0001 this is another ")); + } +} diff --git a/widgets/BigQueryPushdownEngine-sqlengine.json b/widgets/BigQueryPushdownEngine-sqlengine.json index f3c3645c28..a744d22c91 100644 --- a/widgets/BigQueryPushdownEngine-sqlengine.json +++ b/widgets/BigQueryPushdownEngine-sqlengine.json @@ -174,6 +174,24 @@ ] } }, + { + "widget-type": "csv", + "label": "Stages to force execution in the SQL Engine", + "name": "includedStages", + "widget-attributes": { + "delimiter": "\u0001", + "placeholder": "Names of all stages to force push to execute in the BigQuery ELT engine. Each stage name must be on a separate line" + } + }, + { + "widget-type": "csv", + "label": "Stages to skip from executing in the SQL engine", + "name": "excludedStages", + "widget-attributes": { + "delimiter": "\u0001", + "placeholder": "Names of all stages to skip from executing in the BigQuery ELT engine. Each stage name must be on a separate line" + } + }, { "widget-type": "toggle", "label": "Use BigQuery Storage Read API",