diff --git a/pom.xml b/pom.xml
index 82f35ba774..a5633d15de 100644
--- a/pom.xml
+++ b/pom.xml
@@ -72,7 +72,7 @@
1.8.2
hadoop2-1.0.0
1.4
- 6.7.0
+ 6.8.0-SNAPSHOT
2.10.0-SNAPSHOT
3.2.6
0.3.1
diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java
index 42e0edac81..2f01a4cc00 100644
--- a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java
+++ b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java
@@ -29,6 +29,8 @@
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.cloud.storage.Storage;
import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableMap;
+import com.google.gson.Gson;
import io.cdap.cdap.api.annotation.Description;
import io.cdap.cdap.api.annotation.Metadata;
import io.cdap.cdap.api.annotation.MetadataProperty;
@@ -46,9 +48,13 @@
import io.cdap.cdap.etl.api.batch.BatchSource;
import io.cdap.cdap.etl.api.batch.BatchSourceContext;
import io.cdap.cdap.etl.api.connector.Connector;
+import io.cdap.cdap.etl.api.engine.sql.SQLEngineInput;
import io.cdap.cdap.etl.api.validation.ValidationFailure;
import io.cdap.plugin.common.LineageRecorder;
import io.cdap.plugin.gcp.bigquery.connector.BigQueryConnector;
+import io.cdap.plugin.gcp.bigquery.sqlengine.BigQueryReadDataset;
+import io.cdap.plugin.gcp.bigquery.sqlengine.BigQuerySQLEngine;
+import io.cdap.plugin.gcp.bigquery.sqlengine.BigQueryWrite;
import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants;
import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
import io.cdap.plugin.gcp.common.CmekUtils;
@@ -61,6 +67,7 @@
import java.time.DateTimeException;
import java.time.LocalDate;
+import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import javax.annotation.Nullable;
@@ -76,6 +83,7 @@
@Metadata(properties = {@MetadataProperty(key = Connector.PLUGIN_TYPE, value = BigQueryConnector.NAME)})
public final class BigQuerySource extends BatchSource {
private static final Logger LOG = LoggerFactory.getLogger(BigQuerySource.class);
+ private static final Gson GSON = new Gson();
public static final String NAME = "BigQueryTable";
private BigQuerySourceConfig config;
private Schema outputSchema;
@@ -165,7 +173,7 @@ public void prepareRun(BatchSourceContext context) throws Exception {
// We call emitLineage before since it creates the dataset with schema.
Type sourceTableType = config.getSourceTableType();
emitLineage(context, configuredSchema, sourceTableType, config.getTable());
- setInputFormat(context);
+ setInputFormat(context, configuredSchema);
}
@Override
@@ -335,8 +343,31 @@ private void validatePartitionProperties(FailureCollector collector) {
}
}
- private void setInputFormat(BatchSourceContext context) {
+ private void setInputFormat(BatchSourceContext context,
+ Schema configuredSchema) {
+ // Set input for Spark
context.setInput(Input.of(config.referenceName, new BigQueryInputFormatProvider(configuration)));
+
+ // Add output for SQL Engine Direct read
+ ImmutableMap.Builder arguments = new ImmutableMap.Builder<>();
+
+ if (configuredSchema == null) {
+ LOG.debug("BigQuery SQL Engine Input was not initialized. Schema was empty.");
+ return;
+ }
+
+ List fieldNames = configuredSchema.getFields().stream().map(f -> f.getName()).collect(Collectors.toList());
+
+ arguments
+ .put(BigQueryReadDataset.SQL_INPUT_CONFIG, GSON.toJson(config))
+ .put(BigQueryReadDataset.SQL_INPUT_SCHEMA, GSON.toJson(configuredSchema))
+ .put(BigQueryReadDataset.SQL_INPUT_FIELDS, GSON.toJson(fieldNames));
+
+ Input sqlEngineInput = new SQLEngineInput(config.referenceName,
+ context.getStageName(),
+ BigQuerySQLEngine.class.getName(),
+ arguments.build());
+ context.setInput(sqlEngineInput);
}
private void emitLineage(BatchSourceContext context, Schema schema, Type sourceTableType,
diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java b/src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java
index cfdf8ae440..e47f411dbb 100644
--- a/src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java
+++ b/src/main/java/io/cdap/plugin/gcp/bigquery/source/PartitionedBigQueryInputFormat.java
@@ -48,7 +48,6 @@
* in order to create input splits.
*/
public class PartitionedBigQueryInputFormat extends AbstractBigQueryInputFormat {
- private static final String DEFAULT_COLUMN_NAME = "_PARTITIONTIME";
private InputFormat delegateInputFormat =
new AvroBigQueryInputFormat();
@@ -160,8 +159,8 @@ String generateQuery(String partitionFromDate, String partitionToDate, String fi
StringBuilder condition = new StringBuilder();
if (timePartitioning != null) {
- String timePartitionCondition = generateTimePartitionCondition(tableDefinition, timePartitioning,
- partitionFromDate, partitionToDate);
+ String timePartitionCondition = BigQueryUtil.generateTimePartitionCondition(tableDefinition, partitionFromDate,
+ partitionToDate);
condition.append(timePartitionCondition);
}
@@ -289,33 +288,4 @@ private static JobReference getJobReference(Configuration conf, BigQueryHelper b
}
return new JobReference().setProjectId(projectId).setJobId(savedJobId).setLocation(location);
}
-
- private String generateTimePartitionCondition(StandardTableDefinition tableDefinition,
- TimePartitioning timePartitioning, String partitionFromDate,
- String partitionToDate) {
- StringBuilder timePartitionCondition = new StringBuilder();
- String columnName = timePartitioning.getField() != null ? timePartitioning.getField() : DEFAULT_COLUMN_NAME;
-
- LegacySQLTypeName columnType = null;
- if (!DEFAULT_COLUMN_NAME.equals(columnName)) {
- columnType = tableDefinition.getSchema().getFields().get(columnName).getType();
- }
-
- String columnNameTS = columnName;
- if (!LegacySQLTypeName.TIMESTAMP.equals(columnType)) {
- columnNameTS = "TIMESTAMP(`" + columnNameTS + "`)";
- }
- if (partitionFromDate != null) {
- timePartitionCondition.append(columnNameTS).append(" >= ").append("TIMESTAMP(\"")
- .append(partitionFromDate).append("\")");
- }
- if (partitionFromDate != null && partitionToDate != null) {
- timePartitionCondition.append(" and ");
- }
- if (partitionToDate != null) {
- timePartitionCondition.append(columnNameTS).append(" < ").append("TIMESTAMP(\"")
- .append(partitionToDate).append("\")");
- }
- return timePartitionCondition.toString();
- }
}
diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryReadDataset.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryReadDataset.java
new file mode 100644
index 0000000000..8aba2f61ae
--- /dev/null
+++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryReadDataset.java
@@ -0,0 +1,365 @@
+/*
+ * Copyright © 2022 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+/*
+ * Readright © 2022 Cask Data, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a read of
+ * the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations under
+ * the License.
+ */
+
+package io.cdap.plugin.gcp.bigquery.sqlengine;
+
+import com.google.cloud.bigquery.BigQuery;
+import com.google.cloud.bigquery.BigQueryException;
+import com.google.cloud.bigquery.Dataset;
+import com.google.cloud.bigquery.DatasetId;
+import com.google.cloud.bigquery.Job;
+import com.google.cloud.bigquery.JobId;
+import com.google.cloud.bigquery.JobInfo;
+import com.google.cloud.bigquery.JobStatistics;
+import com.google.cloud.bigquery.QueryJobConfiguration;
+import com.google.cloud.bigquery.StandardTableDefinition;
+import com.google.cloud.bigquery.Table;
+import com.google.cloud.bigquery.TableDefinition.Type;
+import com.google.cloud.bigquery.TableId;
+import com.google.cloud.bigquery.TableResult;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Strings;
+import com.google.common.reflect.TypeToken;
+import com.google.gson.Gson;
+import io.cdap.cdap.api.data.schema.Schema;
+import io.cdap.cdap.api.metrics.Metrics;
+import io.cdap.cdap.etl.api.engine.sql.dataset.SQLDataset;
+import io.cdap.cdap.etl.api.engine.sql.request.SQLReadRequest;
+import io.cdap.cdap.etl.api.engine.sql.request.SQLReadResult;
+import io.cdap.plugin.gcp.bigquery.source.BigQuerySourceConfig;
+import io.cdap.plugin.gcp.bigquery.sqlengine.util.BigQuerySQLEngineUtils;
+import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.atomic.AtomicReference;
+import javax.annotation.Nullable;
+
+/**
+ * SQL Pull Dataset implementation for BigQuery backed datasets.
+ */
+public class BigQueryReadDataset implements SQLDataset, BigQuerySQLDataset {
+
+ private static final Logger LOG = LoggerFactory.getLogger(BigQueryReadDataset.class);
+ private static final Gson GSON = new Gson();
+
+ public static final String SQL_INPUT_CONFIG = "config";
+ public static final String SQL_INPUT_FIELDS = "fields";
+ public static final String SQL_INPUT_SCHEMA = "schema";
+ private static final java.lang.reflect.Type LIST_OF_STRINGS_TYPE = new TypeToken>() {
+ }.getType();
+
+ private final BigQuerySQLEngineConfig sqlEngineConfig;
+ private final BigQuery bigQuery;
+ private final String datasetName;
+ private final SQLReadRequest readRequest;
+ private final TableId destinationTableId;
+ private final String jobId;
+ private Schema schema;
+ private Long numRows;
+ private Metrics metrics;
+
+ private BigQueryReadDataset(String datasetName,
+ BigQuerySQLEngineConfig sqlEngineConfig,
+ BigQuery bigQuery,
+ SQLReadRequest readRequest,
+ TableId destinationTableId,
+ String jobId,
+ Metrics metrics) {
+ this.datasetName = datasetName;
+ this.sqlEngineConfig = sqlEngineConfig;
+ this.bigQuery = bigQuery;
+ this.readRequest = readRequest;
+ this.destinationTableId = destinationTableId;
+ this.jobId = jobId;
+ this.metrics = metrics;
+ }
+
+ public static BigQueryReadDataset getInstance(String datasetName,
+ BigQuerySQLEngineConfig sqlEngineConfig,
+ BigQuery bigQuery,
+ SQLReadRequest readRequest,
+ TableId destinationTableId,
+ Metrics metrics) {
+ // Get new Job ID for this push operation
+ String jobId = BigQuerySQLEngineUtils.newIdentifier();
+
+ return new BigQueryReadDataset(datasetName,
+ sqlEngineConfig,
+ bigQuery,
+ readRequest,
+ destinationTableId,
+ jobId,
+ metrics);
+ }
+
+ public SQLReadResult read() {
+ SQLReadResult result = null;
+ // We use this atomic reference to delete a new table if it was created for this execution.
+ AtomicReference newSourceTable = new AtomicReference<>(null);
+ try {
+ return readInternal(readRequest, newSourceTable);
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted exception during BigQuery read operation.", e);
+ } catch (BigQueryException bqe) {
+ LOG.error("BigQuery exception during BigQuery read operation", bqe);
+ } catch (Exception e) {
+ LOG.error("Exception during BigQuery read operation", e);
+ }
+
+ // If a new table was created for this execution, but the execution failed for any reason,
+ // delete the created table so the standard sink workflow can succeed.
+ if (result == null || !result.isSuccessful()) {
+ tryDeleteTable(destinationTableId);
+ }
+
+ // Return as a failure if the operation threw an exception.
+ return SQLReadResult.failure(readRequest.getDatasetName());
+ }
+
+ private SQLReadResult readInternal(SQLReadRequest readRequest,
+ AtomicReference newSourceTable)
+ throws BigQueryException, InterruptedException {
+ // Check if this output matches the expected engine.
+ String datasetName = readRequest.getDatasetName();
+ if (!BigQuerySQLEngine.class.getName().equals(readRequest.getInput().getSqlEngineClassName())) {
+ LOG.debug("Got output for another SQL engine {}, skipping", readRequest.getInput().getSqlEngineClassName());
+ return SQLReadResult.unsupported(datasetName);
+ }
+
+ // Get configuration properties from read request arguments
+ Map arguments = readRequest.getInput().getArguments();
+ BigQuerySourceConfig sourceConfig = GSON.fromJson(arguments.get(SQL_INPUT_CONFIG), BigQuerySourceConfig.class);
+ schema = GSON.fromJson(arguments.get(SQL_INPUT_SCHEMA), Schema.class);
+ List fields = GSON.fromJson(arguments.get(SQL_INPUT_FIELDS), LIST_OF_STRINGS_TYPE);
+
+ // Get source table information
+ String sourceProject = sourceConfig.getDatasetProject();
+ String sourceDataset = sourceConfig.getDataset();
+ String sourceTableName = sourceConfig.getTable();
+ TableId sourceTableId = TableId.of(sourceProject, sourceDataset, sourceTableName);
+
+ // Check if both datasets are in the same Location. If not, the direct read operation cannot be performed.
+ DatasetId sourceDatasetId = DatasetId.of(sourceTableId.getProject(), sourceTableId.getDataset());
+ DatasetId destinationDatasetId = DatasetId.of(destinationTableId.getProject(), destinationTableId.getDataset());
+ Dataset srcDataset = bigQuery.getDataset(sourceDatasetId);
+ Dataset destDataset = bigQuery.getDataset(destinationDatasetId);
+
+ // Ensure datasets exist before proceeding
+ if (srcDataset == null || destDataset == null) {
+ LOG.warn("Direct table read is not supported when the datasets are not created.");
+ return SQLReadResult.unsupported(datasetName);
+ }
+
+ // Ensure both datasets are in the same location.
+ if (!Objects.equals(srcDataset.getLocation(), destDataset.getLocation())) {
+ LOG.error("Direct table read is only supported if both datasets are in the same location. "
+ + "'{}' is '{}' , '{}' is '{}' .",
+ sourceDatasetId.getDataset(), srcDataset.getLocation(),
+ sourceDatasetId.getDataset(), destDataset.getLocation());
+ return SQLReadResult.unsupported(datasetName);
+ }
+
+ // Get source table instance
+ Table srcTable = bigQuery.getTable(sourceTableId);
+
+ // Get source table instance
+ Table destTable = bigQuery.getTable(sourceTableId);
+
+ //Get Source Table Object : will be used for metadata like TABLE.TYPE and Time Partitioning
+ Table sourceTable;
+ try {
+ sourceTable = bigQuery.getTable(sourceTableId);
+ } catch (BigQueryException e) {
+ throw new IllegalArgumentException("Unable to get details about the BigQuery table: " + e.getMessage(), e);
+ }
+
+ // Get query job configuration based on wether the job is an insert or update/upsert
+ QueryJobConfiguration.Builder queryConfigBuilder = getQueryBuilder(sourceTable, sourceTableId,
+ destinationTableId,
+ fields,
+ sourceConfig.getFilter(),
+ sourceConfig.getPartitionFrom(),
+ sourceConfig.getPartitionTo());
+
+ QueryJobConfiguration queryConfig = queryConfigBuilder.build();
+ // Create a job ID so that we can safely retry.
+ JobId bqJobId = JobId.newBuilder()
+ .setJob(jobId)
+ .setLocation(sqlEngineConfig.getLocation())
+ .setProject(sqlEngineConfig.getProject())
+ .build();
+ Job queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(bqJobId).build());
+ TableResult result = null;
+
+ // Wait for the query to complete.
+ queryJob = queryJob.waitFor();
+ JobStatistics.QueryStatistics queryJobStats = queryJob.getStatistics();
+
+ // Check for errors
+ if (queryJob.getStatus().getError() != null) {
+ BigQuerySQLEngineUtils.logJobMetrics(queryJob, metrics);
+ LOG.error("Error executing BigQuery Job: '{}' in Project '{}', Dataset '{}': {}",
+ jobId, sqlEngineConfig.getProject(), sqlEngineConfig.getDatasetProject(),
+ queryJob.getStatus().getError().toString());
+ return SQLReadResult.failure(datasetName);
+ }
+
+ // Number of rows is taken from the job statistics if available.
+ // If not, we use the number of source table records.
+ long numRows = queryJobStats != null && queryJobStats.getNumDmlAffectedRows() != null ?
+ queryJobStats.getNumDmlAffectedRows() : srcTable.getNumRows().longValue();
+ LOG.info("Executed read operation for {} records from {}.{}.{} into {}.{}.{}", numRows,
+ sourceTableId.getProject(), sourceTableId.getDataset(), sourceTableId.getTable(),
+ sourceTableId.getProject(), sourceTableId.getDataset(), sourceTableId.getTable());
+ BigQuerySQLEngineUtils.logJobMetrics(queryJob, metrics);
+
+ return SQLReadResult.success(datasetName, this);
+ }
+
+ @VisibleForTesting
+ QueryJobConfiguration.Builder getQueryBuilder(Table sourceTable, TableId sourceTableId,
+ TableId destinationTableId,
+ List fields,
+ String filter,
+ String partitionFromDate,
+ String partitionToDate) {
+ String query = String.format("SELECT %s FROM `%s.%s.%s`",
+ String.join(",", fields),
+ sourceTableId.getProject(),
+ sourceTableId.getDataset(),
+ sourceTableId.getTable());
+
+ StringBuilder condition = new StringBuilder();
+
+ //Depending on the Type of Table --> add partitioning
+ StandardTableDefinition tableDefinition = Objects.requireNonNull(sourceTable).getDefinition();
+ Type type = tableDefinition.getType();
+ if (!(type == Type.VIEW || type == Type.MATERIALIZED_VIEW || type == Type.EXTERNAL)) {
+ condition.append(
+ BigQueryUtil.generateTimePartitionCondition(tableDefinition, partitionFromDate, partitionToDate));
+ }
+
+ //If filter is present add it.
+ if (!Strings.isNullOrEmpty(filter)) {
+ if (condition.length() == 0) {
+ condition.append(filter);
+ } else {
+ condition.append(" and (").append(filter).append(")");
+ }
+ }
+
+ if (condition.length() > 0) {
+ query = String.format("%s WHERE %s", query, condition);
+ }
+
+ LOG.info("Reading data from `{}.{}.{}` to `{}.{}.{}` using SQL statement: {} ",
+ sourceTableId.getProject(), sourceTableId.getDataset(), sourceTableId.getTable(),
+ destinationTableId.getProject(), destinationTableId.getDataset(), destinationTableId.getTable(),
+ query);
+
+ return QueryJobConfiguration.newBuilder(query)
+ .setDestinationTable(destinationTableId)
+ .setCreateDisposition(JobInfo.CreateDisposition.CREATE_NEVER)
+ .setWriteDisposition(JobInfo.WriteDisposition.WRITE_APPEND)
+ .setPriority(sqlEngineConfig.getJobPriority())
+ .setLabels(BigQuerySQLEngineUtils.getJobTags("read"));
+ }
+
+ /**
+ * Try to delete this table while handling exception
+ *
+ * @param table the table identified for the table we want to delete.
+ */
+ protected void tryDeleteTable(TableId table) {
+ try {
+ bigQuery.delete(table);
+ } catch (BigQueryException bqe) {
+ LOG.error("Unable to delete table {}.{}.{}. This may cause the pipeline to fail",
+ table.getProject(), table.getDataset(), table.getTable(), bqe);
+ }
+ }
+
+ @Override
+ public String getBigQueryProject() {
+ return destinationTableId.getProject();
+ }
+
+ @Override
+ public String getBigQueryDataset() {
+ return destinationTableId.getDataset();
+ }
+
+ @Override
+ public String getBigQueryTable() {
+ return destinationTableId.getTable();
+ }
+
+ @Nullable
+ @Override
+ public String getJobId() {
+ return jobId;
+ }
+
+ @Nullable
+ @Override
+ public String getGCSPath() {
+ return null;
+ }
+
+ @Override
+ public long getNumRows() {
+ // Get the number of rows from BQ if not known at this time.
+ if (numRows == null) {
+ numRows = BigQuerySQLEngineUtils.getNumRows(bigQuery,
+ DatasetId.of(destinationTableId.getProject(),
+ destinationTableId.getDataset()),
+ destinationTableId.getTable());
+ }
+
+ return numRows;
+ }
+
+ @Override
+ public String getDatasetName() {
+ return datasetName;
+ }
+
+ @Override
+ public Schema getSchema() {
+ return schema;
+ }
+}
diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java
index 6c32c4b587..563816b5e3 100644
--- a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java
+++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java
@@ -50,6 +50,8 @@
import io.cdap.cdap.etl.api.engine.sql.request.SQLJoinRequest;
import io.cdap.cdap.etl.api.engine.sql.request.SQLPullRequest;
import io.cdap.cdap.etl.api.engine.sql.request.SQLPushRequest;
+import io.cdap.cdap.etl.api.engine.sql.request.SQLReadRequest;
+import io.cdap.cdap.etl.api.engine.sql.request.SQLReadResult;
import io.cdap.cdap.etl.api.engine.sql.request.SQLRelationDefinition;
import io.cdap.cdap.etl.api.engine.sql.request.SQLTransformDefinition;
import io.cdap.cdap.etl.api.engine.sql.request.SQLTransformRequest;
@@ -342,6 +344,45 @@ public Set getPullCapabilities() {
return Collections.singleton(DefaultPullCapability.SPARK_RDD_PULL);
}
+ @Override
+ public SQLReadResult read(SQLReadRequest readRequest) throws SQLEngineException {
+ String datasetName = readRequest.getDatasetName();
+
+ // TODO: implement direct source read toggle
+ // Check if direct sink write is enabled. If not, skip.
+// if (!sqlEngineConfig.shouldUseDirectSinkWrite()) {
+// return SQLReadResult.unsupported(datasetName);
+// }
+
+ // Check if this output matches the expected engine. If it doesn't, skip execution for this write operation.;
+ if (!BigQuerySQLEngine.class.getName().equals(readRequest.getInput().getSqlEngineClassName())) {
+ LOG.debug("Got output for another SQL engine {}, skipping", readRequest.getInput().getSqlEngineClassName());
+ return SQLReadResult.unsupported(datasetName);
+ }
+
+ // Get source table information (from the stage we are attempting to write into the sink)
+ String destinationTable = BigQuerySQLEngineUtils.getNewTableName(runId);
+
+ // Create empty table to store query results.
+ BigQuerySQLEngineUtils.createEmptyTable(sqlEngineConfig, bigQuery, project, dataset, destinationTable);
+ TableId destinationTableId = TableId.of(datasetProject, dataset, destinationTable);
+
+ // Build Big Query Write instance and execute write operation.
+ BigQueryReadDataset readDataset = BigQueryReadDataset.getInstance(datasetName,
+ sqlEngineConfig,
+ bigQuery,
+ readRequest,
+ destinationTableId,
+ metrics);
+ SQLReadResult result = readDataset.read();
+
+ if (result.isSuccessful()) {
+ datasets.put(datasetName, readDataset);
+ }
+
+ return result;
+ }
+
@Override
public SQLWriteResult write(SQLWriteRequest writeRequest) {
String datasetName = writeRequest.getDatasetName();
diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java
index 78d68d2de7..dee1c9e1c1 100644
--- a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java
+++ b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java
@@ -23,8 +23,10 @@
import com.google.cloud.bigquery.FieldList;
import com.google.cloud.bigquery.LegacySQLTypeName;
import com.google.cloud.bigquery.StandardSQLTypeName;
+import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.Table;
import com.google.cloud.bigquery.TableId;
+import com.google.cloud.bigquery.TimePartitioning;
import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration;
import com.google.cloud.kms.v1.CryptoKeyName;
import com.google.common.base.Strings;
@@ -43,6 +45,7 @@
import io.cdap.plugin.gcp.common.GCPConfig;
import io.cdap.plugin.gcp.common.GCPUtils;
import io.cdap.plugin.gcp.gcs.GCSPath;
+import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -69,6 +72,8 @@ public final class BigQueryUtil {
private static final Logger LOG = LoggerFactory.getLogger(BigQueryUtil.class);
+ private static final String DEFAULT_PARTITION_COLUMN_NAME = "_PARTITIONTIME";
+
public static final String BUCKET_PATTERN = "[a-z0-9._-]+";
public static final String DATASET_PATTERN = "[A-Za-z0-9_]+";
public static final String TABLE_PATTERN = "[A-Za-z0-9_]+";
@@ -717,4 +722,41 @@ public static void deleteTemporaryDirectory(Configuration configuration, String
LOG.debug("Deleted temporary directory '{}'", path);
}
}
+
+ public static String generateTimePartitionCondition(StandardTableDefinition tableDefinition,
+ String partitionFromDate,
+ String partitionToDate) {
+
+ TimePartitioning timePartitioning = tableDefinition.getTimePartitioning();
+
+ if (timePartitioning == null) {
+ return StringUtils.EMPTY;
+ }
+
+ StringBuilder timePartitionCondition = new StringBuilder();
+ String columnName = timePartitioning.getField() != null ?
+ timePartitioning.getField() : DEFAULT_PARTITION_COLUMN_NAME;
+
+ LegacySQLTypeName columnType = null;
+ if (!DEFAULT_PARTITION_COLUMN_NAME.equals(columnName)) {
+ columnType = tableDefinition.getSchema().getFields().get(columnName).getType();
+ }
+
+ String columnNameTS = columnName;
+ if (!LegacySQLTypeName.TIMESTAMP.equals(columnType)) {
+ columnNameTS = "TIMESTAMP(`" + columnNameTS + "`)";
+ }
+ if (partitionFromDate != null) {
+ timePartitionCondition.append(columnNameTS).append(" >= ").append("TIMESTAMP(\"")
+ .append(partitionFromDate).append("\")");
+ }
+ if (partitionFromDate != null && partitionToDate != null) {
+ timePartitionCondition.append(" and ");
+ }
+ if (partitionToDate != null) {
+ timePartitionCondition.append(columnNameTS).append(" < ").append("TIMESTAMP(\"")
+ .append(partitionToDate).append("\")");
+ }
+ return timePartitionCondition.toString();
+ }
}
diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryReadDatasetTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryReadDatasetTest.java
new file mode 100644
index 0000000000..2ca9daf8b6
--- /dev/null
+++ b/src/test/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryReadDatasetTest.java
@@ -0,0 +1,130 @@
+package io.cdap.plugin.gcp.bigquery.sqlengine;
+
+import com.google.cloud.bigquery.QueryJobConfiguration;
+import com.google.cloud.bigquery.StandardTableDefinition;
+import com.google.cloud.bigquery.Table;
+import com.google.cloud.bigquery.TableDefinition;
+import com.google.cloud.bigquery.TableId;
+import com.google.cloud.bigquery.TimePartitioning;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.Mock;
+import org.mockito.Mockito;
+import org.mockito.junit.MockitoJUnitRunner;
+
+import java.util.Arrays;
+import java.util.List;
+
+@RunWith(MockitoJUnitRunner.class)
+public class BigQueryReadDatasetTest {
+
+ @Mock
+ Table sourceTable;
+
+ @Mock
+ StandardTableDefinition tableDefinition;
+
+ @Mock
+ BigQuerySQLEngineConfig sqlEngineConfig;
+
+ @Mock
+ TimePartitioning timePartitioning;
+
+ String datasetProject = "test_bq_dataset_project";
+ String dataset = "test_bq_dataset";
+ String table = "test_bq_table";
+ String filter = "tableColumn = 'abc'";
+ String destTable = "test_bq_dest_table";
+ List fieldList = Arrays.asList("id_test", "name_test", "place_test");
+
+ BigQueryReadDataset bigQueryReadDataset;
+
+ @Before
+ public void init() {
+ TableId destTableId = TableId.of(datasetProject, dataset, destTable);
+ bigQueryReadDataset = BigQueryReadDataset.getInstance(
+ dataset,
+ sqlEngineConfig,
+ null,
+ null,
+ destTableId,
+ null);
+ }
+
+ @Test
+ public void testGenerateQueryForMaterializingView() {
+ TableId sourceTableId = TableId.of(datasetProject, dataset, table);
+ TableId destTableId = TableId.of(datasetProject, dataset, destTable);
+
+ Mockito.when(tableDefinition.getType()).thenReturn(TableDefinition.Type.VIEW);
+ Mockito.when(sourceTable.getDefinition()).thenReturn(tableDefinition);
+ Mockito.when(sqlEngineConfig.getJobPriority()).thenReturn(QueryJobConfiguration.Priority.BATCH);
+
+ String generatedQuery = bigQueryReadDataset.getQueryBuilder(sourceTable,
+ sourceTableId,
+ destTableId,
+ fieldList,
+ filter,
+ "2000-01-01", "2000-01-01")
+ .build().getQuery();
+
+ String expectedQuery = String.format("SELECT %s FROM `%s.%s.%s` WHERE %s", String.join(",", fieldList),
+ datasetProject, dataset, table, filter);
+ Assert.assertEquals(expectedQuery, generatedQuery);
+
+ //Without Filter
+ generatedQuery = bigQueryReadDataset.getQueryBuilder(sourceTable,
+ sourceTableId,
+ destTableId,
+ fieldList,
+ null, null, null)
+ .build().getQuery();
+
+ expectedQuery = String.format("SELECT %s FROM `%s.%s.%s`", String.join(",", fieldList),
+ datasetProject, dataset, table);
+ Assert.assertEquals(expectedQuery, generatedQuery);
+ }
+
+ @Test
+ public void testGenerateQuery() {
+ TableId sourceTableId = TableId.of(datasetProject, dataset, table);
+ TableId destTableId = TableId.of(datasetProject, dataset, destTable);
+
+ Mockito.when(tableDefinition.getType()).thenReturn(TableDefinition.Type.TABLE);
+ Mockito.when(sourceTable.getDefinition()).thenReturn(tableDefinition);
+ Mockito.when(sqlEngineConfig.getJobPriority()).thenReturn(QueryJobConfiguration.Priority.BATCH);
+
+ //When Table is NOT PARTITIONED
+ String generatedQuery = bigQueryReadDataset.getQueryBuilder(sourceTable,
+ sourceTableId,
+ destTableId,
+ fieldList,
+ filter,
+ "2000-01-01", "2000-01-01")
+ .build().getQuery();
+
+ String expectedQuery = String.format("SELECT %s FROM `%s.%s.%s` WHERE %s", String.join(",", fieldList),
+ datasetProject, dataset, table, filter);
+ Assert.assertEquals(expectedQuery, generatedQuery);
+
+ //When Table is PARTITIONED
+ Mockito.when(tableDefinition.getTimePartitioning()).thenReturn(timePartitioning);
+
+ generatedQuery = bigQueryReadDataset.getQueryBuilder(sourceTable,
+ sourceTableId,
+ destTableId,
+ fieldList,
+ filter,
+ "2000-01-01", "2000-01-01")
+ .build().getQuery();
+
+ String partitionFilter = "TIMESTAMP(`_PARTITIONTIME`) >= TIMESTAMP(\"2000-01-01\") and " +
+ "TIMESTAMP(`_PARTITIONTIME`) < TIMESTAMP(\"2000-01-01\")";
+ expectedQuery = String.format("SELECT %s FROM `%s.%s.%s` WHERE %s and (%s)", String.join(",", fieldList),
+ datasetProject, dataset, table, partitionFilter, filter);
+ Assert.assertEquals(expectedQuery, generatedQuery);
+ }
+
+}