From 5b8c1d8906cfae67a9b3d88d1e48eb30bf9536af Mon Sep 17 00:00:00 2001 From: Fernando Velasquez Date: Tue, 19 Jul 2022 09:28:37 -0400 Subject: [PATCH 1/7] Added simple BigQuery Pushdown source --- .../gcp/bigquery/source/BigQuerySource.java | 32 +- .../sqlengine/BigQueryReadDataset.java | 327 ++++++++++++++++++ .../bigquery/sqlengine/BigQuerySQLEngine.java | 40 +++ 3 files changed, 396 insertions(+), 3 deletions(-) create mode 100644 src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryReadDataset.java diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java index 42e0edac81..188f5399a3 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java @@ -29,6 +29,8 @@ import com.google.cloud.kms.v1.CryptoKeyName; import com.google.cloud.storage.Storage; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; +import com.google.gson.Gson; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Metadata; import io.cdap.cdap.api.annotation.MetadataProperty; @@ -46,9 +48,13 @@ import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.cdap.etl.api.batch.BatchSourceContext; import io.cdap.cdap.etl.api.connector.Connector; +import io.cdap.cdap.etl.api.engine.sql.SQLEngineInput; import io.cdap.cdap.etl.api.validation.ValidationFailure; import io.cdap.plugin.common.LineageRecorder; import io.cdap.plugin.gcp.bigquery.connector.BigQueryConnector; +import io.cdap.plugin.gcp.bigquery.sqlengine.BigQueryReadDataset; +import io.cdap.plugin.gcp.bigquery.sqlengine.BigQuerySQLEngine; +import io.cdap.plugin.gcp.bigquery.sqlengine.BigQueryWrite; import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants; import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil; import io.cdap.plugin.gcp.common.CmekUtils; @@ -61,6 +67,7 @@ import java.time.DateTimeException; import java.time.LocalDate; +import java.util.List; import java.util.UUID; import java.util.stream.Collectors; import javax.annotation.Nullable; @@ -76,6 +83,7 @@ @Metadata(properties = {@MetadataProperty(key = Connector.PLUGIN_TYPE, value = BigQueryConnector.NAME)}) public final class BigQuerySource extends BatchSource { private static final Logger LOG = LoggerFactory.getLogger(BigQuerySource.class); + private static final Gson GSON = new Gson(); public static final String NAME = "BigQueryTable"; private BigQuerySourceConfig config; private Schema outputSchema; @@ -165,7 +173,7 @@ public void prepareRun(BatchSourceContext context) throws Exception { // We call emitLineage before since it creates the dataset with schema. Type sourceTableType = config.getSourceTableType(); emitLineage(context, configuredSchema, sourceTableType, config.getTable()); - setInputFormat(context); + setInputFormat(context, configuredSchema); } @Override @@ -335,8 +343,26 @@ private void validatePartitionProperties(FailureCollector collector) { } } - private void setInputFormat(BatchSourceContext context) { - context.setInput(Input.of(config.referenceName, new BigQueryInputFormatProvider(configuration))); + private void setInputFormat(BatchSourceContext context, + Schema configuredSchema) { + Input inputFormatInput = Input.of(config.referenceName, new BigQueryInputFormatProvider(configuration)); + + // Add output for SQL Engine Direct read + ImmutableMap.Builder arguments = new ImmutableMap.Builder<>(); + + List fieldNames = configuredSchema.getFields().stream().map(f -> f.getName()).collect(Collectors.toList()); + + arguments + .put(BigQueryReadDataset.SQL_INPUT_CONFIG, GSON.toJson(config)) + .put(BigQueryReadDataset.SQL_INPUT_SCHEMA, GSON.toJson(configuredSchema)) + .put(BigQueryReadDataset.SQL_INPUT_FIELDS, GSON.toJson(fieldNames)); + + Input sqlEngineInput = new SQLEngineInput(config.referenceName, + context.getStageName(), + BigQuerySQLEngine.class.getName(), + arguments.build(), + inputFormatInput); + context.setInput(sqlEngineInput); } private void emitLineage(BatchSourceContext context, Schema schema, Type sourceTableType, diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryReadDataset.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryReadDataset.java new file mode 100644 index 0000000000..67584160ab --- /dev/null +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryReadDataset.java @@ -0,0 +1,327 @@ +/* + * Copyright © 2022 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a copy of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +/* + * Readright © 2022 Cask Data, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not + * use this file except in compliance with the License. You may obtain a read of + * the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT + * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the + * License for the specific language governing permissions and limitations under + * the License. + */ + +package io.cdap.plugin.gcp.bigquery.sqlengine; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryException; +import com.google.cloud.bigquery.Dataset; +import com.google.cloud.bigquery.DatasetId; +import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.JobId; +import com.google.cloud.bigquery.JobInfo; +import com.google.cloud.bigquery.JobStatistics; +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TableResult; +import com.google.common.reflect.TypeToken; +import com.google.gson.Gson; +import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.etl.api.engine.sql.dataset.SQLDataset; +import io.cdap.cdap.etl.api.engine.sql.request.SQLReadRequest; +import io.cdap.cdap.etl.api.engine.sql.request.SQLReadResult; +import io.cdap.plugin.gcp.bigquery.source.BigQuerySourceConfig; +import io.cdap.plugin.gcp.bigquery.sqlengine.util.BigQuerySQLEngineUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.lang.reflect.Type; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.atomic.AtomicReference; +import javax.annotation.Nullable; + +/** + * SQL Pull Dataset implementation for BigQuery backed datasets. + */ +public class BigQueryReadDataset implements SQLDataset, BigQuerySQLDataset { + + private static final Logger LOG = LoggerFactory.getLogger(BigQueryReadDataset.class); + private static final Gson GSON = new Gson(); + + public static final String SQL_INPUT_CONFIG = "config"; + public static final String SQL_INPUT_FIELDS = "fields"; + public static final String SQL_INPUT_SCHEMA = "schema"; + private static final Type LIST_OF_STRINGS_TYPE = new TypeToken>() { + }.getType(); + + private final BigQuerySQLEngineConfig sqlEngineConfig; + private final BigQuery bigQuery; + private final String datasetName; + private final SQLReadRequest readRequest; + private final TableId destinationTableId; + private final String jobId; + private Schema schema; + private Long numRows; + + private BigQueryReadDataset(String datasetName, + BigQuerySQLEngineConfig sqlEngineConfig, + BigQuery bigQuery, + SQLReadRequest readRequest, + TableId destinationTableId, + String jobId) { + this.datasetName = datasetName; + this.sqlEngineConfig = sqlEngineConfig; + this.bigQuery = bigQuery; + this.readRequest = readRequest; + this.destinationTableId = destinationTableId; + this.jobId = jobId; + } + + public static BigQueryReadDataset getInstance(String datasetName, + BigQuerySQLEngineConfig sqlEngineConfig, + BigQuery bigQuery, + SQLReadRequest readRequest, + TableId destinationTableId) { + // Get new Job ID for this push operation + String jobId = BigQuerySQLEngineUtils.newIdentifier(); + + return new BigQueryReadDataset(datasetName, + sqlEngineConfig, + bigQuery, + readRequest, + destinationTableId, + jobId); + } + + public SQLReadResult read() { + SQLReadResult result = null; + // We use this atomic reference to delete a new table if it was created for this execution. + AtomicReference newSourceTable = new AtomicReference<>(null); + try { + return readInternal(readRequest, newSourceTable); + } catch (InterruptedException e) { + LOG.error("Interrupted exception during BigQuery read operation.", e); + } catch (BigQueryException bqe) { + LOG.error("BigQuery exception during BigQuery read operation", bqe); + } catch (Exception e) { + LOG.error("Exception during BigQuery read operation", e); + } + + // If a new table was created for this execution, but the execution failed for any reason, + // delete the created table so the standard sink workflow can succeed. + if (result == null || !result.isSuccessful()) { + tryDeleteTable(destinationTableId); + } + + // Return as a failure if the operation threw an exception. + return SQLReadResult.faiure(readRequest.getDatasetName()); + } + + private SQLReadResult readInternal(SQLReadRequest readRequest, + AtomicReference newSourceTable) + throws BigQueryException, InterruptedException { + // Check if this output matches the expected engine. + String datasetName = readRequest.getDatasetName(); + if (!BigQuerySQLEngine.class.getName().equals(readRequest.getInput().getSqlEngineClassName())) { + LOG.debug("Got output for another SQL engine {}, skipping", readRequest.getInput().getSqlEngineClassName()); + return SQLReadResult.unsupported(datasetName); + } + + // Get configuration properties from read request arguments + Map arguments = readRequest.getInput().getArguments(); + BigQuerySourceConfig sourceConfig = GSON.fromJson(arguments.get(SQL_INPUT_CONFIG), BigQuerySourceConfig.class); + schema = GSON.fromJson(arguments.get(SQL_INPUT_SCHEMA), Schema.class); + List fields = GSON.fromJson(arguments.get(SQL_INPUT_FIELDS), LIST_OF_STRINGS_TYPE); + + // Get source table information + String sourceProject = sourceConfig.getDatasetProject(); + String sourceDataset = sourceConfig.getDataset(); + String sourceTableName = sourceConfig.getTable(); + TableId sourceTableId = TableId.of(sourceProject, sourceDataset, sourceTableName); + + // Check if both datasets are in the same Location. If not, the direct read operation cannot be performed. + DatasetId sourceDatasetId = DatasetId.of(sourceTableId.getProject(), sourceTableId.getDataset()); + DatasetId destinationDatasetId = DatasetId.of(destinationTableId.getProject(), destinationTableId.getDataset()); + Dataset srcDataset = bigQuery.getDataset(sourceDatasetId); + Dataset destDataset = bigQuery.getDataset(destinationDatasetId); + + // Ensure datasets exist before proceeding + if (srcDataset == null || destDataset == null) { + LOG.warn("Direct table read is not supported when the datasets are not created."); + return SQLReadResult.unsupported(datasetName); + } + + // Ensore both datasets are in the same location. + if (!Objects.equals(srcDataset.getLocation(), destDataset.getLocation())) { + LOG.warn("Direct table read is only supported if both datasets are in the same location. " + + "'{}' is '{}' , '{}' is '{}' .", + sourceDatasetId.getDataset(), srcDataset.getLocation(), + sourceDatasetId.getDataset(), destDataset.getLocation()); + return SQLReadResult.unsupported(datasetName); + } + + // Inserts with Truncate are not supported by the Direct read operation + if (sourceConfig.getFilter() != null || + sourceConfig.getPartitionFrom() != null || + sourceConfig.getPartitionTo() != null) { + LOG.warn("Direct table read is not supported for filtered or partitioned tables."); + return SQLReadResult.unsupported(datasetName); + } + + // Get source table instance + Table srcTable = bigQuery.getTable(sourceTableId); + + // Get source table instance + Table destTable = bigQuery.getTable(sourceTableId); + + // Get query job configuration based on wether the job is an insert or update/upsert + QueryJobConfiguration.Builder queryConfigBuilder = getQueryBuilder(sourceTableId, destinationTableId, fields); + + QueryJobConfiguration queryConfig = queryConfigBuilder.build(); + // Create a job ID so that we can safely retry. + JobId bqJobId = JobId.newBuilder() + .setJob(jobId) + .setLocation(sqlEngineConfig.getLocation()) + .setProject(sqlEngineConfig.getProject()) + .build(); + Job queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(bqJobId).build()); + TableResult result = null; + + // Wait for the query to complete. + queryJob = queryJob.waitFor(); + JobStatistics.QueryStatistics queryJobStats = queryJob.getStatistics(); + + // Check for errors + if (queryJob.getStatus().getError() != null) { + BigQuerySQLEngineUtils.logJobMetrics(queryJob); + LOG.error("Error executing BigQuery Job: '{}' in Project '{}', Dataset '{}': {}", + jobId, sqlEngineConfig.getProject(), sqlEngineConfig.getDatasetProject(), + queryJob.getStatus().getError().toString()); + return SQLReadResult.faiure(datasetName); + } + + // Number of rows is taken from the job statistics if available. + // If not, we use the number of source table records. + long numRows = queryJobStats != null && queryJobStats.getNumDmlAffectedRows() != null ? + queryJobStats.getNumDmlAffectedRows() : srcTable.getNumRows().longValue(); + LOG.info("Executed read operation for {} records from {}.{}.{} into {}.{}.{}", numRows, + sourceTableId.getProject(), sourceTableId.getDataset(), sourceTableId.getTable(), + sourceTableId.getProject(), sourceTableId.getDataset(), sourceTableId.getTable()); + BigQuerySQLEngineUtils.logJobMetrics(queryJob); + + return SQLReadResult.success(datasetName, this); + } + + protected QueryJobConfiguration.Builder getQueryBuilder(TableId sourceTableId, + TableId destinationTableId, + List fields) { + String query = String.format("SELECT %s FROM `%s.%s.%s`", + String.join(",", fields), + sourceTableId.getProject(), + sourceTableId.getDataset(), + sourceTableId.getTable()); + LOG.info("Reading data from `{}.{}.{}` to `{}.{}.{}` using SQL statement: {} ", + sourceTableId.getProject(), sourceTableId.getDataset(), sourceTableId.getTable(), + sourceTableId.getProject(), sourceTableId.getDataset(), sourceTableId.getTable(), + query); + + return QueryJobConfiguration.newBuilder(query) + .setDestinationTable(destinationTableId) + .setCreateDisposition(JobInfo.CreateDisposition.CREATE_NEVER) + .setWriteDisposition(JobInfo.WriteDisposition.WRITE_APPEND) + .setPriority(sqlEngineConfig.getJobPriority()) + .setLabels(BigQuerySQLEngineUtils.getJobTags("read")); + } + + /** + * Try to delete this table while handling exception + * + * @param table the table identified for the table we want to delete. + */ + protected void tryDeleteTable(TableId table) { + try { + bigQuery.delete(table); + } catch (BigQueryException bqe) { + LOG.error("Unable to delete table {}.{}.{}. This may cause the pipeline to fail", + table.getProject(), table.getDataset(), table.getTable(), bqe); + } + } + + @Override + public String getBigQueryProject() { + return destinationTableId.getProject(); + } + + @Override + public String getBigQueryDataset() { + return destinationTableId.getDataset(); + } + + @Override + public String getBigQueryTable() { + return destinationTableId.getTable(); + } + + @Nullable + @Override + public String getJobId() { + return jobId; + } + + @Nullable + @Override + public String getGCSPath() { + return null; + } + + @Override + public long getNumRows() { + // Get the number of rows from BQ if not known at this time. + if (numRows == null) { + numRows = BigQuerySQLEngineUtils.getNumRows(bigQuery, + DatasetId.of(destinationTableId.getProject(), + destinationTableId.getDataset()), + destinationTableId.getTable()); + } + + return numRows; + } + + @Override + public String getDatasetName() { + return datasetName; + } + + @Override + public Schema getSchema() { + return schema; + } + + @Override + public boolean isValid() { + return true; + } +} diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java index 6c32c4b587..f9000e5c9f 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java @@ -50,6 +50,8 @@ import io.cdap.cdap.etl.api.engine.sql.request.SQLJoinRequest; import io.cdap.cdap.etl.api.engine.sql.request.SQLPullRequest; import io.cdap.cdap.etl.api.engine.sql.request.SQLPushRequest; +import io.cdap.cdap.etl.api.engine.sql.request.SQLReadRequest; +import io.cdap.cdap.etl.api.engine.sql.request.SQLReadResult; import io.cdap.cdap.etl.api.engine.sql.request.SQLRelationDefinition; import io.cdap.cdap.etl.api.engine.sql.request.SQLTransformDefinition; import io.cdap.cdap.etl.api.engine.sql.request.SQLTransformRequest; @@ -342,6 +344,44 @@ public Set getPullCapabilities() { return Collections.singleton(DefaultPullCapability.SPARK_RDD_PULL); } + @Override + public SQLReadResult read(SQLReadRequest readRequest) throws SQLEngineException { + String datasetName = readRequest.getDatasetName(); + + // TODO: implement direct source read toggle + // Check if direct sink write is enabled. If not, skip. + if (!sqlEngineConfig.shouldUseDirectSinkWrite()) { + return SQLReadResult.unsupported(datasetName); + } + + // Check if this output matches the expected engine. If it doesn't, skip execution for this write operation.; + if (!BigQuerySQLEngine.class.getName().equals(readRequest.getInput().getSqlEngineClassName())) { + LOG.debug("Got output for another SQL engine {}, skipping", readRequest.getInput().getSqlEngineClassName()); + return SQLReadResult.unsupported(datasetName); + } + + // Get source table information (from the stage we are attempting to write into the sink) + String destinationTable = BigQuerySQLEngineUtils.getNewTableName(runId); + + // Create empty table to store query results. + BigQuerySQLEngineUtils.createEmptyTable(sqlEngineConfig, bigQuery, project, dataset, destinationTable); + TableId destinationTableId = TableId.of(datasetProject, dataset, destinationTable); + + // Build Big Query Write instance and execute write operation. + BigQueryReadDataset readDataset = BigQueryReadDataset.getInstance(datasetName, + sqlEngineConfig, + bigQuery, + readRequest, + destinationTableId); + SQLReadResult result = readDataset.read(); + + if (result.isSuccessful()) { + datasets.put(datasetName, readDataset); + } + + return result; + } + @Override public SQLWriteResult write(SQLWriteRequest writeRequest) { String datasetName = writeRequest.getDatasetName(); From 64121a876e71e27c237775fd1cb9038440e5ba78 Mon Sep 17 00:00:00 2001 From: sahusanket Date: Tue, 2 Aug 2022 17:24:38 +0530 Subject: [PATCH 2/7] adding metrics support --- pom.xml | 2 +- .../bigquery/sqlengine/BigQueryReadDataset.java | 16 +++++++++++----- .../bigquery/sqlengine/BigQuerySQLEngine.java | 9 +++++---- 3 files changed, 17 insertions(+), 10 deletions(-) diff --git a/pom.xml b/pom.xml index 82f35ba774..a5633d15de 100644 --- a/pom.xml +++ b/pom.xml @@ -72,7 +72,7 @@ 1.8.2 hadoop2-1.0.0 1.4 - 6.7.0 + 6.8.0-SNAPSHOT 2.10.0-SNAPSHOT 3.2.6 0.3.1 diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryReadDataset.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryReadDataset.java index 67584160ab..6fae5563fb 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryReadDataset.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryReadDataset.java @@ -47,6 +47,7 @@ import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.metrics.Metrics; import io.cdap.cdap.etl.api.engine.sql.dataset.SQLDataset; import io.cdap.cdap.etl.api.engine.sql.request.SQLReadRequest; import io.cdap.cdap.etl.api.engine.sql.request.SQLReadResult; @@ -85,26 +86,30 @@ public class BigQueryReadDataset implements SQLDataset, BigQuerySQLDataset { private final String jobId; private Schema schema; private Long numRows; + private Metrics metrics; private BigQueryReadDataset(String datasetName, BigQuerySQLEngineConfig sqlEngineConfig, BigQuery bigQuery, SQLReadRequest readRequest, TableId destinationTableId, - String jobId) { + String jobId, + Metrics metrics) { this.datasetName = datasetName; this.sqlEngineConfig = sqlEngineConfig; this.bigQuery = bigQuery; this.readRequest = readRequest; this.destinationTableId = destinationTableId; this.jobId = jobId; + this.metrics = metrics; } public static BigQueryReadDataset getInstance(String datasetName, BigQuerySQLEngineConfig sqlEngineConfig, BigQuery bigQuery, SQLReadRequest readRequest, - TableId destinationTableId) { + TableId destinationTableId, + Metrics metrics) { // Get new Job ID for this push operation String jobId = BigQuerySQLEngineUtils.newIdentifier(); @@ -113,7 +118,8 @@ public static BigQueryReadDataset getInstance(String datasetName, bigQuery, readRequest, destinationTableId, - jobId); + jobId, + metrics); } public SQLReadResult read() { @@ -216,7 +222,7 @@ private SQLReadResult readInternal(SQLReadRequest readRequest, // Check for errors if (queryJob.getStatus().getError() != null) { - BigQuerySQLEngineUtils.logJobMetrics(queryJob); + BigQuerySQLEngineUtils.logJobMetrics(queryJob, metrics); LOG.error("Error executing BigQuery Job: '{}' in Project '{}', Dataset '{}': {}", jobId, sqlEngineConfig.getProject(), sqlEngineConfig.getDatasetProject(), queryJob.getStatus().getError().toString()); @@ -230,7 +236,7 @@ private SQLReadResult readInternal(SQLReadRequest readRequest, LOG.info("Executed read operation for {} records from {}.{}.{} into {}.{}.{}", numRows, sourceTableId.getProject(), sourceTableId.getDataset(), sourceTableId.getTable(), sourceTableId.getProject(), sourceTableId.getDataset(), sourceTableId.getTable()); - BigQuerySQLEngineUtils.logJobMetrics(queryJob); + BigQuerySQLEngineUtils.logJobMetrics(queryJob, metrics); return SQLReadResult.success(datasetName, this); } diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java index f9000e5c9f..563816b5e3 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java @@ -350,9 +350,9 @@ public SQLReadResult read(SQLReadRequest readRequest) throws SQLEngineException // TODO: implement direct source read toggle // Check if direct sink write is enabled. If not, skip. - if (!sqlEngineConfig.shouldUseDirectSinkWrite()) { - return SQLReadResult.unsupported(datasetName); - } +// if (!sqlEngineConfig.shouldUseDirectSinkWrite()) { +// return SQLReadResult.unsupported(datasetName); +// } // Check if this output matches the expected engine. If it doesn't, skip execution for this write operation.; if (!BigQuerySQLEngine.class.getName().equals(readRequest.getInput().getSqlEngineClassName())) { @@ -372,7 +372,8 @@ public SQLReadResult read(SQLReadRequest readRequest) throws SQLEngineException sqlEngineConfig, bigQuery, readRequest, - destinationTableId); + destinationTableId, + metrics); SQLReadResult result = readDataset.read(); if (result.isSuccessful()) { From 45f9d25c743e703faeafd411f0de5db93a823025 Mon Sep 17 00:00:00 2001 From: sahusanket Date: Wed, 17 Aug 2022 23:18:03 +0530 Subject: [PATCH 3/7] removing fallback input --- .../io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java index 188f5399a3..a895c5adf3 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java @@ -345,8 +345,6 @@ private void validatePartitionProperties(FailureCollector collector) { private void setInputFormat(BatchSourceContext context, Schema configuredSchema) { - Input inputFormatInput = Input.of(config.referenceName, new BigQueryInputFormatProvider(configuration)); - // Add output for SQL Engine Direct read ImmutableMap.Builder arguments = new ImmutableMap.Builder<>(); @@ -360,8 +358,7 @@ private void setInputFormat(BatchSourceContext context, Input sqlEngineInput = new SQLEngineInput(config.referenceName, context.getStageName(), BigQuerySQLEngine.class.getName(), - arguments.build(), - inputFormatInput); + arguments.build()); context.setInput(sqlEngineInput); } From 6961b3a19df8d7f5e740ec16c95bd922be2afa5a Mon Sep 17 00:00:00 2001 From: sahusanket Date: Tue, 23 Aug 2022 15:10:38 +0530 Subject: [PATCH 4/7] add input for spark path --- .../io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java index a895c5adf3..7dde1c43a5 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java @@ -345,6 +345,9 @@ private void validatePartitionProperties(FailureCollector collector) { private void setInputFormat(BatchSourceContext context, Schema configuredSchema) { + // Set input for Spark + context.setInput(Input.of(config.referenceName, new BigQueryInputFormatProvider(configuration))); + // Add output for SQL Engine Direct read ImmutableMap.Builder arguments = new ImmutableMap.Builder<>(); From 56dda3bdab46e93228014ffedb70302376ce2fb9 Mon Sep 17 00:00:00 2001 From: sahusanket Date: Mon, 29 Aug 2022 15:27:43 +0530 Subject: [PATCH 5/7] changing log level to Error --- .../gcp/bigquery/sqlengine/BigQueryReadDataset.java | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryReadDataset.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryReadDataset.java index 6fae5563fb..101fd811bf 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryReadDataset.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryReadDataset.java @@ -143,7 +143,7 @@ public SQLReadResult read() { } // Return as a failure if the operation threw an exception. - return SQLReadResult.faiure(readRequest.getDatasetName()); + return SQLReadResult.failure(readRequest.getDatasetName()); } private SQLReadResult readInternal(SQLReadRequest readRequest, @@ -180,9 +180,9 @@ private SQLReadResult readInternal(SQLReadRequest readRequest, return SQLReadResult.unsupported(datasetName); } - // Ensore both datasets are in the same location. + // Ensure both datasets are in the same location. if (!Objects.equals(srcDataset.getLocation(), destDataset.getLocation())) { - LOG.warn("Direct table read is only supported if both datasets are in the same location. " + LOG.error("Direct table read is only supported if both datasets are in the same location. " + "'{}' is '{}' , '{}' is '{}' .", sourceDatasetId.getDataset(), srcDataset.getLocation(), sourceDatasetId.getDataset(), destDataset.getLocation()); @@ -226,7 +226,7 @@ private SQLReadResult readInternal(SQLReadRequest readRequest, LOG.error("Error executing BigQuery Job: '{}' in Project '{}', Dataset '{}': {}", jobId, sqlEngineConfig.getProject(), sqlEngineConfig.getDatasetProject(), queryJob.getStatus().getError().toString()); - return SQLReadResult.faiure(datasetName); + return SQLReadResult.failure(datasetName); } // Number of rows is taken from the job statistics if available. @@ -325,9 +325,4 @@ public String getDatasetName() { public Schema getSchema() { return schema; } - - @Override - public boolean isValid() { - return true; - } } From 118b38c1ba0c569879431371b18c3a1b060ff3ef Mon Sep 17 00:00:00 2001 From: sahusanket Date: Fri, 2 Sep 2022 17:18:07 +0530 Subject: [PATCH 6/7] Adding support for FILTER and PARTITIONING in bq source sql --- .../PartitionedBigQueryInputFormat.java | 34 +---- .../sqlengine/BigQueryReadDataset.java | 65 +++++++-- .../gcp/bigquery/util/BigQueryUtil.java | 42 ++++++ .../sqlengine/BigQueryReadDatasetTest.java | 130 ++++++++++++++++++ 4 files changed, 225 insertions(+), 46 deletions(-) create mode 100644 src/test/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryReadDatasetTest.java diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java b/src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java index cfdf8ae440..e47f411dbb 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java @@ -48,7 +48,6 @@ * in order to create input splits. */ public class PartitionedBigQueryInputFormat extends AbstractBigQueryInputFormat { - private static final String DEFAULT_COLUMN_NAME = "_PARTITIONTIME"; private InputFormat delegateInputFormat = new AvroBigQueryInputFormat(); @@ -160,8 +159,8 @@ String generateQuery(String partitionFromDate, String partitionToDate, String fi StringBuilder condition = new StringBuilder(); if (timePartitioning != null) { - String timePartitionCondition = generateTimePartitionCondition(tableDefinition, timePartitioning, - partitionFromDate, partitionToDate); + String timePartitionCondition = BigQueryUtil.generateTimePartitionCondition(tableDefinition, partitionFromDate, + partitionToDate); condition.append(timePartitionCondition); } @@ -289,33 +288,4 @@ private static JobReference getJobReference(Configuration conf, BigQueryHelper b } return new JobReference().setProjectId(projectId).setJobId(savedJobId).setLocation(location); } - - private String generateTimePartitionCondition(StandardTableDefinition tableDefinition, - TimePartitioning timePartitioning, String partitionFromDate, - String partitionToDate) { - StringBuilder timePartitionCondition = new StringBuilder(); - String columnName = timePartitioning.getField() != null ? timePartitioning.getField() : DEFAULT_COLUMN_NAME; - - LegacySQLTypeName columnType = null; - if (!DEFAULT_COLUMN_NAME.equals(columnName)) { - columnType = tableDefinition.getSchema().getFields().get(columnName).getType(); - } - - String columnNameTS = columnName; - if (!LegacySQLTypeName.TIMESTAMP.equals(columnType)) { - columnNameTS = "TIMESTAMP(`" + columnNameTS + "`)"; - } - if (partitionFromDate != null) { - timePartitionCondition.append(columnNameTS).append(" >= ").append("TIMESTAMP(\"") - .append(partitionFromDate).append("\")"); - } - if (partitionFromDate != null && partitionToDate != null) { - timePartitionCondition.append(" and "); - } - if (partitionToDate != null) { - timePartitionCondition.append(columnNameTS).append(" < ").append("TIMESTAMP(\"") - .append(partitionToDate).append("\")"); - } - return timePartitionCondition.toString(); - } } diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryReadDataset.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryReadDataset.java index 101fd811bf..8aba2f61ae 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryReadDataset.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryReadDataset.java @@ -41,9 +41,13 @@ import com.google.cloud.bigquery.JobInfo; import com.google.cloud.bigquery.JobStatistics; import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.StandardTableDefinition; import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableDefinition.Type; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.TableResult; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Strings; import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import io.cdap.cdap.api.data.schema.Schema; @@ -53,10 +57,10 @@ import io.cdap.cdap.etl.api.engine.sql.request.SQLReadResult; import io.cdap.plugin.gcp.bigquery.source.BigQuerySourceConfig; import io.cdap.plugin.gcp.bigquery.sqlengine.util.BigQuerySQLEngineUtils; +import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.lang.reflect.Type; import java.util.ArrayList; import java.util.List; import java.util.Map; @@ -75,7 +79,7 @@ public class BigQueryReadDataset implements SQLDataset, BigQuerySQLDataset { public static final String SQL_INPUT_CONFIG = "config"; public static final String SQL_INPUT_FIELDS = "fields"; public static final String SQL_INPUT_SCHEMA = "schema"; - private static final Type LIST_OF_STRINGS_TYPE = new TypeToken>() { + private static final java.lang.reflect.Type LIST_OF_STRINGS_TYPE = new TypeToken>() { }.getType(); private final BigQuerySQLEngineConfig sqlEngineConfig; @@ -189,22 +193,27 @@ private SQLReadResult readInternal(SQLReadRequest readRequest, return SQLReadResult.unsupported(datasetName); } - // Inserts with Truncate are not supported by the Direct read operation - if (sourceConfig.getFilter() != null || - sourceConfig.getPartitionFrom() != null || - sourceConfig.getPartitionTo() != null) { - LOG.warn("Direct table read is not supported for filtered or partitioned tables."); - return SQLReadResult.unsupported(datasetName); - } - // Get source table instance Table srcTable = bigQuery.getTable(sourceTableId); // Get source table instance Table destTable = bigQuery.getTable(sourceTableId); + //Get Source Table Object : will be used for metadata like TABLE.TYPE and Time Partitioning + Table sourceTable; + try { + sourceTable = bigQuery.getTable(sourceTableId); + } catch (BigQueryException e) { + throw new IllegalArgumentException("Unable to get details about the BigQuery table: " + e.getMessage(), e); + } + // Get query job configuration based on wether the job is an insert or update/upsert - QueryJobConfiguration.Builder queryConfigBuilder = getQueryBuilder(sourceTableId, destinationTableId, fields); + QueryJobConfiguration.Builder queryConfigBuilder = getQueryBuilder(sourceTable, sourceTableId, + destinationTableId, + fields, + sourceConfig.getFilter(), + sourceConfig.getPartitionFrom(), + sourceConfig.getPartitionTo()); QueryJobConfiguration queryConfig = queryConfigBuilder.build(); // Create a job ID so that we can safely retry. @@ -241,17 +250,45 @@ private SQLReadResult readInternal(SQLReadRequest readRequest, return SQLReadResult.success(datasetName, this); } - protected QueryJobConfiguration.Builder getQueryBuilder(TableId sourceTableId, + @VisibleForTesting + QueryJobConfiguration.Builder getQueryBuilder(Table sourceTable, TableId sourceTableId, TableId destinationTableId, - List fields) { + List fields, + String filter, + String partitionFromDate, + String partitionToDate) { String query = String.format("SELECT %s FROM `%s.%s.%s`", String.join(",", fields), sourceTableId.getProject(), sourceTableId.getDataset(), sourceTableId.getTable()); + + StringBuilder condition = new StringBuilder(); + + //Depending on the Type of Table --> add partitioning + StandardTableDefinition tableDefinition = Objects.requireNonNull(sourceTable).getDefinition(); + Type type = tableDefinition.getType(); + if (!(type == Type.VIEW || type == Type.MATERIALIZED_VIEW || type == Type.EXTERNAL)) { + condition.append( + BigQueryUtil.generateTimePartitionCondition(tableDefinition, partitionFromDate, partitionToDate)); + } + + //If filter is present add it. + if (!Strings.isNullOrEmpty(filter)) { + if (condition.length() == 0) { + condition.append(filter); + } else { + condition.append(" and (").append(filter).append(")"); + } + } + + if (condition.length() > 0) { + query = String.format("%s WHERE %s", query, condition); + } + LOG.info("Reading data from `{}.{}.{}` to `{}.{}.{}` using SQL statement: {} ", sourceTableId.getProject(), sourceTableId.getDataset(), sourceTableId.getTable(), - sourceTableId.getProject(), sourceTableId.getDataset(), sourceTableId.getTable(), + destinationTableId.getProject(), destinationTableId.getDataset(), destinationTableId.getTable(), query); return QueryJobConfiguration.newBuilder(query) diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java index 78d68d2de7..dee1c9e1c1 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java @@ -23,8 +23,10 @@ import com.google.cloud.bigquery.FieldList; import com.google.cloud.bigquery.LegacySQLTypeName; import com.google.cloud.bigquery.StandardSQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; import com.google.cloud.bigquery.Table; import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TimePartitioning; import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration; import com.google.cloud.kms.v1.CryptoKeyName; import com.google.common.base.Strings; @@ -43,6 +45,7 @@ import io.cdap.plugin.gcp.common.GCPConfig; import io.cdap.plugin.gcp.common.GCPUtils; import io.cdap.plugin.gcp.gcs.GCSPath; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -69,6 +72,8 @@ public final class BigQueryUtil { private static final Logger LOG = LoggerFactory.getLogger(BigQueryUtil.class); + private static final String DEFAULT_PARTITION_COLUMN_NAME = "_PARTITIONTIME"; + public static final String BUCKET_PATTERN = "[a-z0-9._-]+"; public static final String DATASET_PATTERN = "[A-Za-z0-9_]+"; public static final String TABLE_PATTERN = "[A-Za-z0-9_]+"; @@ -717,4 +722,41 @@ public static void deleteTemporaryDirectory(Configuration configuration, String LOG.debug("Deleted temporary directory '{}'", path); } } + + public static String generateTimePartitionCondition(StandardTableDefinition tableDefinition, + String partitionFromDate, + String partitionToDate) { + + TimePartitioning timePartitioning = tableDefinition.getTimePartitioning(); + + if (timePartitioning == null) { + return StringUtils.EMPTY; + } + + StringBuilder timePartitionCondition = new StringBuilder(); + String columnName = timePartitioning.getField() != null ? + timePartitioning.getField() : DEFAULT_PARTITION_COLUMN_NAME; + + LegacySQLTypeName columnType = null; + if (!DEFAULT_PARTITION_COLUMN_NAME.equals(columnName)) { + columnType = tableDefinition.getSchema().getFields().get(columnName).getType(); + } + + String columnNameTS = columnName; + if (!LegacySQLTypeName.TIMESTAMP.equals(columnType)) { + columnNameTS = "TIMESTAMP(`" + columnNameTS + "`)"; + } + if (partitionFromDate != null) { + timePartitionCondition.append(columnNameTS).append(" >= ").append("TIMESTAMP(\"") + .append(partitionFromDate).append("\")"); + } + if (partitionFromDate != null && partitionToDate != null) { + timePartitionCondition.append(" and "); + } + if (partitionToDate != null) { + timePartitionCondition.append(columnNameTS).append(" < ").append("TIMESTAMP(\"") + .append(partitionToDate).append("\")"); + } + return timePartitionCondition.toString(); + } } diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryReadDatasetTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryReadDatasetTest.java new file mode 100644 index 0000000000..2ca9daf8b6 --- /dev/null +++ b/src/test/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryReadDatasetTest.java @@ -0,0 +1,130 @@ +package io.cdap.plugin.gcp.bigquery.sqlengine; + +import com.google.cloud.bigquery.QueryJobConfiguration; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.Table; +import com.google.cloud.bigquery.TableDefinition; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.TimePartitioning; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.Mockito; +import org.mockito.junit.MockitoJUnitRunner; + +import java.util.Arrays; +import java.util.List; + +@RunWith(MockitoJUnitRunner.class) +public class BigQueryReadDatasetTest { + + @Mock + Table sourceTable; + + @Mock + StandardTableDefinition tableDefinition; + + @Mock + BigQuerySQLEngineConfig sqlEngineConfig; + + @Mock + TimePartitioning timePartitioning; + + String datasetProject = "test_bq_dataset_project"; + String dataset = "test_bq_dataset"; + String table = "test_bq_table"; + String filter = "tableColumn = 'abc'"; + String destTable = "test_bq_dest_table"; + List fieldList = Arrays.asList("id_test", "name_test", "place_test"); + + BigQueryReadDataset bigQueryReadDataset; + + @Before + public void init() { + TableId destTableId = TableId.of(datasetProject, dataset, destTable); + bigQueryReadDataset = BigQueryReadDataset.getInstance( + dataset, + sqlEngineConfig, + null, + null, + destTableId, + null); + } + + @Test + public void testGenerateQueryForMaterializingView() { + TableId sourceTableId = TableId.of(datasetProject, dataset, table); + TableId destTableId = TableId.of(datasetProject, dataset, destTable); + + Mockito.when(tableDefinition.getType()).thenReturn(TableDefinition.Type.VIEW); + Mockito.when(sourceTable.getDefinition()).thenReturn(tableDefinition); + Mockito.when(sqlEngineConfig.getJobPriority()).thenReturn(QueryJobConfiguration.Priority.BATCH); + + String generatedQuery = bigQueryReadDataset.getQueryBuilder(sourceTable, + sourceTableId, + destTableId, + fieldList, + filter, + "2000-01-01", "2000-01-01") + .build().getQuery(); + + String expectedQuery = String.format("SELECT %s FROM `%s.%s.%s` WHERE %s", String.join(",", fieldList), + datasetProject, dataset, table, filter); + Assert.assertEquals(expectedQuery, generatedQuery); + + //Without Filter + generatedQuery = bigQueryReadDataset.getQueryBuilder(sourceTable, + sourceTableId, + destTableId, + fieldList, + null, null, null) + .build().getQuery(); + + expectedQuery = String.format("SELECT %s FROM `%s.%s.%s`", String.join(",", fieldList), + datasetProject, dataset, table); + Assert.assertEquals(expectedQuery, generatedQuery); + } + + @Test + public void testGenerateQuery() { + TableId sourceTableId = TableId.of(datasetProject, dataset, table); + TableId destTableId = TableId.of(datasetProject, dataset, destTable); + + Mockito.when(tableDefinition.getType()).thenReturn(TableDefinition.Type.TABLE); + Mockito.when(sourceTable.getDefinition()).thenReturn(tableDefinition); + Mockito.when(sqlEngineConfig.getJobPriority()).thenReturn(QueryJobConfiguration.Priority.BATCH); + + //When Table is NOT PARTITIONED + String generatedQuery = bigQueryReadDataset.getQueryBuilder(sourceTable, + sourceTableId, + destTableId, + fieldList, + filter, + "2000-01-01", "2000-01-01") + .build().getQuery(); + + String expectedQuery = String.format("SELECT %s FROM `%s.%s.%s` WHERE %s", String.join(",", fieldList), + datasetProject, dataset, table, filter); + Assert.assertEquals(expectedQuery, generatedQuery); + + //When Table is PARTITIONED + Mockito.when(tableDefinition.getTimePartitioning()).thenReturn(timePartitioning); + + generatedQuery = bigQueryReadDataset.getQueryBuilder(sourceTable, + sourceTableId, + destTableId, + fieldList, + filter, + "2000-01-01", "2000-01-01") + .build().getQuery(); + + String partitionFilter = "TIMESTAMP(`_PARTITIONTIME`) >= TIMESTAMP(\"2000-01-01\") and " + + "TIMESTAMP(`_PARTITIONTIME`) < TIMESTAMP(\"2000-01-01\")"; + expectedQuery = String.format("SELECT %s FROM `%s.%s.%s` WHERE %s and (%s)", String.join(",", fieldList), + datasetProject, dataset, table, partitionFilter, filter); + Assert.assertEquals(expectedQuery, generatedQuery); + } + +} From 31a1ac02e6e42d701ebc4a8f1f720646ac29f888 Mon Sep 17 00:00:00 2001 From: sahusanket Date: Mon, 12 Sep 2022 19:51:04 +0530 Subject: [PATCH 7/7] handdling null input schema --- .../io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java index 7dde1c43a5..2f01a4cc00 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java @@ -351,6 +351,11 @@ private void setInputFormat(BatchSourceContext context, // Add output for SQL Engine Direct read ImmutableMap.Builder arguments = new ImmutableMap.Builder<>(); + if (configuredSchema == null) { + LOG.debug("BigQuery SQL Engine Input was not initialized. Schema was empty."); + return; + } + List fieldNames = configuredSchema.getFields().stream().map(f -> f.getName()).collect(Collectors.toList()); arguments