diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryWrite.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryWrite.java index ab49903e4b..550d873a0e 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryWrite.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryWrite.java @@ -51,7 +51,6 @@ import java.lang.reflect.Type; import java.util.ArrayList; import java.util.Arrays; -import java.util.Collections; import java.util.List; import java.util.Map; import java.util.Objects; @@ -167,7 +166,10 @@ private SQLWriteResult writeInternal(SQLWriteRequest writeRequest, // Ensore both datasets are in the same location. if (!Objects.equals(srcDataset.getLocation(), destDataset.getLocation())) { - LOG.warn("Direct table copy is only supported if both datasets are in the same location."); + LOG.warn("Direct table copy is only supported if both datasets are in the same location. " + + "'{}' is '{}' , '{}' is '{}' .", + sourceDatasetId.getDataset(), srcDataset.getLocation(), + destinationDatasetId.getDataset(), destDataset.getLocation()); return SQLWriteResult.unsupported(datasetName); } @@ -177,6 +179,9 @@ private SQLWriteResult writeInternal(SQLWriteRequest writeRequest, return SQLWriteResult.unsupported(datasetName); } + // Get source table instance + Table srcTable = bigQuery.getTable(sourceTableId); + // Get destination table instance Table destTable = bigQuery.getTable(destinationTableId); @@ -219,7 +224,7 @@ private SQLWriteResult writeInternal(SQLWriteRequest writeRequest, // Wait for the query to complete. queryJob = queryJob.waitFor(); - JobStatistics.QueryStatistics statistics = queryJob.getStatistics(); + JobStatistics.QueryStatistics queryJobStats = queryJob.getStatistics(); // Check for errors if (queryJob.getStatus().getError() != null) { @@ -229,8 +234,11 @@ private SQLWriteResult writeInternal(SQLWriteRequest writeRequest, return SQLWriteResult.faiure(datasetName); } - long numRows = statistics.getNumDmlAffectedRows(); - LOG.info("Copied {} records from {}.{}.{} to {}.{}.{}", numRows, + // Number of rows is taken from the job statistics if available. + // If not, we use the number of source table records. + long numRows = queryJobStats != null && queryJobStats.getNumDmlAffectedRows() != null ? + queryJobStats.getNumDmlAffectedRows() : srcTable.getNumRows().longValue(); + LOG.info("Executed copy operation for {} records from {}.{}.{} to {}.{}.{}", numRows, sourceTableId.getProject(), sourceTableId.getDataset(), sourceTableId.getTable(), destinationTableId.getProject(), destinationTableId.getDataset(), destinationTableId.getTable()); @@ -240,8 +248,9 @@ private SQLWriteResult writeInternal(SQLWriteRequest writeRequest, /** * Relax table fields based on the supplied schema + * * @param schema schema to use when relaxing - * @param table the destionation table to relax + * @param table the destionation table to relax */ protected void relaxTableSchema(Schema schema, Table table) { com.google.cloud.bigquery.Schema bqSchema = BigQuerySinkUtils.convertCdapSchemaToBigQuerySchema(schema); @@ -252,9 +261,10 @@ protected void relaxTableSchema(Schema schema, Table table) { /** * Create a new BigQuery table based on the supplied schema and table identifier - * @param schema schema to use for this table - * @param tableId itendifier for the new table - * @param sinkConfig Sink configuration used to define this table + * + * @param schema schema to use for this table + * @param tableId itendifier for the new table + * @param sinkConfig Sink configuration used to define this table * @param newDestinationTable Atomic reference to this new table. Used to delete this table if the execution fails. */ protected void createTable(Schema schema, @@ -268,7 +278,7 @@ protected void createTable(Schema schema, tableDefinitionBuilder.setSchema(bqSchema); // Configure partitioning options - switch(sinkConfig.getPartitioningType()) { + switch (sinkConfig.getPartitioningType()) { case TIME: tableDefinitionBuilder.setTimePartitioning(getTimePartitioning(sinkConfig)); break; @@ -306,6 +316,7 @@ protected void createTable(Schema schema, /** * Try to delete this table while handling exception + * * @param table the table identified for the table we want to delete. */ protected void tryDeleteTable(TableId table) { @@ -381,6 +392,7 @@ protected QueryJobConfiguration.Builder getUpdateUpsertQueryJobBuilder(TableId s /** * Build time partitioning configuration based on the BigQuery Sink configuration. + * * @param config sink configuration to use * @return Time Partitioning configuration */ @@ -398,6 +410,7 @@ protected TimePartitioning getTimePartitioning(BigQuerySinkConfig config) { /** * Build range partitioning configuration based on the BigQuery Sink configuration. + * * @param config sink configuration to use * @return Range Partitioning configuration */ @@ -416,6 +429,7 @@ protected RangePartitioning getRangePartitioning(BigQuerySinkConfig config) { /** * Build range used for partitioning configuration + * * @param config sink configuration to use * @return Range configuration */ @@ -433,6 +447,7 @@ protected RangePartitioning.Range getRangePartitioningRange(BigQuerySinkConfig c /** * Get the list of fields to use for clustering based on the supplied sink configuration + * * @param config sink configuration to use * @return List containing all clustering order fields. */ @@ -446,6 +461,7 @@ List getClusteringOrderFields(BigQuerySinkConfig config) { /** * Get the clustering information for a list of clustering fields + * * @param clusteringFields list of clustering fields to use * @return Clustering configuration */ @@ -457,6 +473,7 @@ protected Clustering getClustering(List clusteringFields) { /** * Get encryption configuration for the supplied sink configuration + * * @param config sink configuration to use * @return Encryption configuration */