From c96fd3cad6484f607a7ec33d8000a1239dd56850 Mon Sep 17 00:00:00 2001 From: Fernando Velasquez Date: Tue, 26 Apr 2022 18:03:39 -0400 Subject: [PATCH] Added support for context metrics in the BigQuery SQL Engine --- .../sqlengine/BigQueryJoinDataset.java | 221 ------------------ .../bigquery/sqlengine/BigQuerySQLEngine.java | 18 +- .../sqlengine/BigQuerySelectDataset.java | 18 +- .../gcp/bigquery/sqlengine/BigQueryWrite.java | 18 +- .../util/BigQuerySQLEngineUtils.java | 19 +- 5 files changed, 54 insertions(+), 240 deletions(-) delete mode 100644 src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryJoinDataset.java diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryJoinDataset.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryJoinDataset.java deleted file mode 100644 index ae838f1771..0000000000 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryJoinDataset.java +++ /dev/null @@ -1,221 +0,0 @@ -/* - * Copyright © 2021 Cask Data, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); you may not - * use this file except in compliance with the License. You may obtain a copy of - * the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT - * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the - * License for the specific language governing permissions and limitations under - * the License. - */ - -package io.cdap.plugin.gcp.bigquery.sqlengine; - -import com.google.cloud.bigquery.BigQuery; -import com.google.cloud.bigquery.Dataset; -import com.google.cloud.bigquery.DatasetId; -import com.google.cloud.bigquery.Job; -import com.google.cloud.bigquery.JobId; -import com.google.cloud.bigquery.JobInfo; -import com.google.cloud.bigquery.QueryJobConfiguration; -import com.google.cloud.bigquery.Table; -import com.google.cloud.bigquery.TableDefinition; -import com.google.cloud.bigquery.TableId; -import io.cdap.cdap.api.data.schema.Schema; -import io.cdap.cdap.etl.api.engine.sql.SQLEngineException; -import io.cdap.cdap.etl.api.engine.sql.dataset.SQLDataset; -import io.cdap.cdap.etl.api.engine.sql.request.SQLJoinRequest; -import io.cdap.cdap.etl.api.join.JoinDefinition; -import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils; -import io.cdap.plugin.gcp.bigquery.sqlengine.builder.BigQueryJoinSQLBuilder; -import io.cdap.plugin.gcp.bigquery.sqlengine.util.BigQuerySQLEngineUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Collections; -import java.util.Map; -import javax.annotation.Nullable; - -/** - * SQL Dataset that represents the result of a Join operation that is executed in BigQuery. - */ -public class BigQueryJoinDataset implements SQLDataset, BigQuerySQLDataset { - - private static final Logger LOG = LoggerFactory.getLogger(BigQueryJoinDataset.class); - - private final String datasetName; - private final JoinDefinition joinDefinition; - private final BigQuerySQLEngineConfig sqlEngineConfig; - private final BigQuery bigQuery; - private final String project; - private final DatasetId bqDataset; - private final String bqTable; - private final String jobId; - private final BigQueryJoinSQLBuilder queryBuilder; - private Long numRows; - - private BigQueryJoinDataset(String datasetName, - JoinDefinition joinDefinition, - BigQuerySQLEngineConfig sqlEngineConfig, - Map stageToTableNameMap, - BigQuery bigQuery, - String project, - DatasetId bqDataset, - String bqTable, - String jobId) { - this.datasetName = datasetName; - this.joinDefinition = joinDefinition; - this.sqlEngineConfig = sqlEngineConfig; - this.bigQuery = bigQuery; - this.project = project; - this.bqDataset = bqDataset; - this.bqTable = bqTable; - this.jobId = jobId; - this.queryBuilder = new BigQueryJoinSQLBuilder(this.joinDefinition, this.bqDataset, stageToTableNameMap); - } - - public static BigQueryJoinDataset getInstance(SQLJoinRequest joinRequest, - Map bqTableNamesMap, - BigQuerySQLEngineConfig sqlEngineConfig, - BigQuery bigQuery, - String project, - DatasetId dataset, - String runId) { - - // Get new Job ID for this push operation - String jobId = BigQuerySQLEngineUtils.newIdentifier(); - - // Build new table name for this dataset - String table = BigQuerySQLEngineUtils.getNewTableName(runId); - - // Create empty table to store join results. - BigQuerySQLEngineUtils.createEmptyTable(sqlEngineConfig, bigQuery, dataset.getProject(), dataset.getDataset(), - table); - - BigQueryJoinDataset instance = new BigQueryJoinDataset(joinRequest.getDatasetName(), - joinRequest.getJoinDefinition(), - sqlEngineConfig, - bqTableNamesMap, - bigQuery, - project, - dataset, - table, - jobId); - instance.executeJoin(); - return instance; - } - - public void executeJoin() { - TableId destinationTable = TableId.of(bqDataset.getProject(), bqDataset.getDataset(), bqTable); - - // Get location for target dataset. This way, the job will run in the same location as the dataset - Dataset dataset = bigQuery.getDataset(bqDataset); - String location = dataset.getLocation(); - - String query = queryBuilder.getQuery(); - LOG.info("Creating table `{}` using job: {} with SQL statement: {}", bqTable, jobId, query); - - // Update destination table schema to match configured schema in the pipeline. - updateTableSchema(destinationTable, joinDefinition.getOutputSchema()); - - // Run BigQuery job with generated SQL statement, store results in a new table, and set priority to BATCH - // TODO: Make priority configurable - QueryJobConfiguration queryConfig = - QueryJobConfiguration.newBuilder(query) - .setDestinationTable(destinationTable) - .setCreateDisposition(JobInfo.CreateDisposition.CREATE_NEVER) - .setWriteDisposition(JobInfo.WriteDisposition.WRITE_APPEND) - .setSchemaUpdateOptions(Collections.singletonList(JobInfo.SchemaUpdateOption.ALLOW_FIELD_ADDITION)) - .setPriority(sqlEngineConfig.getJobPriority()) - .setLabels(BigQuerySQLEngineUtils.getJobTags("join")) - .build(); - - // Create a job ID so that we can safely retry. - JobId bqJobId = JobId.newBuilder().setJob(jobId).setLocation(location).setProject(project).build(); - Job queryJob = bigQuery.create(JobInfo.newBuilder(queryConfig).setJobId(bqJobId).build()); - - // Wait for the query to complete. - try { - queryJob = queryJob.waitFor(); - } catch (InterruptedException ie) { - throw new SQLEngineException("Interrupted exception when executing Join operation", ie); - } - - // Check for errors - if (queryJob == null) { - throw new SQLEngineException("BigQuery job not found: " + jobId); - } else if (queryJob.getStatus().getError() != null) { - BigQuerySQLEngineUtils.logJobMetrics(queryJob); - throw new SQLEngineException(String.format( - "Error executing BigQuery Job: '%s' in Project '%s', Dataset '%s', Location'%s' : %s", - jobId, project, bqDataset, location, queryJob.getStatus().getError().toString())); - } - - LOG.info("Created BigQuery table `{}` using Job: {}", bqTable, jobId); - BigQuerySQLEngineUtils.logJobMetrics(queryJob); - } - - @Override - public String getDatasetName() { - return datasetName; - } - - @Override - public Schema getSchema() { - return joinDefinition.getOutputSchema(); - } - - @Override - public long getNumRows() { - // Get the number of rows from BQ if not known at this time. - if (numRows == null) { - numRows = BigQuerySQLEngineUtils.getNumRows(bigQuery, bqDataset, bqTable); - } - - return numRows; - } - - @Override - public String getBigQueryProject() { - return bqDataset.getProject(); - } - - @Override - public String getBigQueryDataset() { - return bqDataset.getDataset(); - } - - @Override - public String getBigQueryTable() { - return bqTable; - } - - @Override - @Nullable - public String getGCSPath() { - return null; - } - - @Override - public String getJobId() { - return jobId; - } - - protected void updateTableSchema(TableId tableId, Schema schema) { - // Get BigQuery schema for this table - com.google.cloud.bigquery.Schema bqSchema = BigQuerySinkUtils.convertCdapSchemaToBigQuerySchema(schema); - - // Get table and update definition to match the new schema - Table table = bigQuery.getTable(tableId); - TableDefinition updatedDefinition = table.getDefinition().toBuilder().setSchema(bqSchema).build(); - Table updatedTable = table.toBuilder().setDefinition(updatedDefinition).build(); - - // Update table. - bigQuery.update(updatedTable); - } -} diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java index 46f9d16eb9..6a3c2d7f0f 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java @@ -27,6 +27,7 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import io.cdap.cdap.api.RuntimeContext; +import io.cdap.cdap.api.SQLEngineContext; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Metadata; import io.cdap.cdap.api.annotation.MetadataProperty; @@ -34,6 +35,7 @@ import io.cdap.cdap.api.annotation.Plugin; import io.cdap.cdap.api.data.format.StructuredRecord; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.metrics.Metrics; import io.cdap.cdap.etl.api.PipelineConfigurer; import io.cdap.cdap.etl.api.connector.Connector; import io.cdap.cdap.etl.api.engine.sql.BatchSQLEngine; @@ -115,8 +117,8 @@ public class BigQuerySQLEngine private String dataset; private String bucket; private String runId; - private Map tableNames; private Map datasets; + private Metrics metrics; @SuppressWarnings("unused") public BigQuerySQLEngine(BigQuerySQLEngineConfig sqlEngineConfig) { @@ -132,14 +134,13 @@ public void configurePipeline(PipelineConfigurer pipelineConfigurer) { } @Override - public void prepareRun(RuntimeContext context) throws Exception { + public void prepareRun(SQLEngineContext context) throws Exception { super.prepareRun(context); // Validate configuration and throw exception if the supplied configuration is invalid. sqlEngineConfig.validate(); runId = BigQuerySQLEngineUtils.newIdentifier(); - tableNames = new HashMap<>(); datasets = new HashMap<>(); String serviceAccount = sqlEngineConfig.getServiceAccount(); @@ -171,10 +172,13 @@ public void prepareRun(RuntimeContext context) throws Exception { // Configure credentials for the source BigQuerySourceUtils.configureServiceAccount(configuration, sqlEngineConfig.connection); + + // Get metrics instance + metrics = context.getMetrics(); } @Override - public void onRunFinish(boolean succeeded, RuntimeContext context) { + public void onRunFinish(boolean succeeded, SQLEngineContext context) { super.onRunFinish(succeeded, context); String gcsPath; @@ -358,7 +362,8 @@ public SQLWriteResult write(SQLWriteRequest writeRequest) { sqlEngineConfig, bigQuery, writeRequest, - sourceTableId); + sourceTableId, + metrics); return bigQueryWrite.write(); } @@ -529,7 +534,8 @@ private BigQuerySelectDataset executeSelect(String datasetName, table, jobId, jobType, - query + query, + metrics ).execute(); datasets.put(datasetName, selectDataset); diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySelectDataset.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySelectDataset.java index a6a9f57fdf..0f65366672 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySelectDataset.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySelectDataset.java @@ -11,6 +11,7 @@ import com.google.cloud.bigquery.TableDefinition; import com.google.cloud.bigquery.TableId; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.metrics.Metrics; import io.cdap.cdap.etl.api.engine.sql.SQLEngineException; import io.cdap.cdap.etl.api.engine.sql.dataset.SQLDataset; import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils; @@ -19,6 +20,8 @@ import org.slf4j.LoggerFactory; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; import javax.annotation.Nullable; /** @@ -38,6 +41,7 @@ public class BigQuerySelectDataset implements SQLDataset, BigQuerySQLDataset { private final String jobId; private final BigQueryJobType operation; private final String selectQuery; + private final Metrics metrics; private Long numRows; public static BigQuerySelectDataset getInstance(String datasetName, @@ -49,7 +53,8 @@ public static BigQuerySelectDataset getInstance(String datasetName, String bqTable, String jobId, BigQueryJobType jobType, - String selectQuery) { + String selectQuery, + Metrics metrics) { return new BigQuerySelectDataset(datasetName, outputSchema, @@ -60,7 +65,8 @@ public static BigQuerySelectDataset getInstance(String datasetName, bqTable, jobId, jobType, - selectQuery); + selectQuery, + metrics); } private BigQuerySelectDataset(String datasetName, @@ -72,7 +78,8 @@ private BigQuerySelectDataset(String datasetName, String bqTable, String jobId, BigQueryJobType operation, - String selectQuery) { + String selectQuery, + Metrics metrics) { this.datasetName = datasetName; this.outputSchema = outputSchema; this.sqlEngineConfig = sqlEngineConfig; @@ -83,6 +90,7 @@ private BigQuerySelectDataset(String datasetName, this.jobId = jobId; this.operation = operation; this.selectQuery = selectQuery; + this.metrics = metrics; } public BigQuerySelectDataset execute() { @@ -124,14 +132,14 @@ public BigQuerySelectDataset execute() { if (queryJob == null) { throw new SQLEngineException("BigQuery job not found: " + jobId); } else if (queryJob.getStatus().getError() != null) { - BigQuerySQLEngineUtils.logJobMetrics(queryJob); + BigQuerySQLEngineUtils.logJobMetrics(queryJob, metrics); throw new SQLEngineException(String.format( "Error executing BigQuery Job: '%s' in Project '%s', Dataset '%s', Location'%s' : %s", jobId, project, bqDataset, location, queryJob.getStatus().getError().toString())); } LOG.info("Created BigQuery table `{}` using Job: {}", bqTable, jobId); - BigQuerySQLEngineUtils.logJobMetrics(queryJob); + BigQuerySQLEngineUtils.logJobMetrics(queryJob, metrics); return this; } diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryWrite.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryWrite.java index b94a43b191..cb164df3f0 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryWrite.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryWrite.java @@ -38,6 +38,7 @@ import com.google.common.reflect.TypeToken; import com.google.gson.Gson; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.metrics.Metrics; import io.cdap.cdap.etl.api.engine.sql.request.SQLWriteRequest; import io.cdap.cdap.etl.api.engine.sql.request.SQLWriteResult; import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkConfig; @@ -51,6 +52,7 @@ import java.lang.reflect.Type; import java.util.ArrayList; import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -76,30 +78,34 @@ public class BigQueryWrite { private final String datasetName; private final SQLWriteRequest writeRequest; private final TableId sourceTableId; + private final Metrics metrics; private BigQueryWrite(String datasetName, BigQuerySQLEngineConfig sqlEngineConfig, BigQuery bigQuery, SQLWriteRequest writeRequest, - TableId sourceTableId) { + TableId sourceTableId, + Metrics metrics) { this.datasetName = datasetName; this.sqlEngineConfig = sqlEngineConfig; this.bigQuery = bigQuery; this.writeRequest = writeRequest; this.sourceTableId = sourceTableId; + this.metrics = metrics; } public static BigQueryWrite getInstance(String datasetName, BigQuerySQLEngineConfig sqlEngineConfig, BigQuery bigQuery, SQLWriteRequest writeRequest, - TableId sourceTableId) { + TableId sourceTableId, + Metrics metrics) { return new BigQueryWrite(datasetName, sqlEngineConfig, bigQuery, writeRequest, - sourceTableId - ); + sourceTableId, + metrics); } public SQLWriteResult write() { @@ -228,7 +234,7 @@ private SQLWriteResult writeInternal(SQLWriteRequest writeRequest, // Check for errors if (queryJob.getStatus().getError() != null) { - BigQuerySQLEngineUtils.logJobMetrics(queryJob); + BigQuerySQLEngineUtils.logJobMetrics(queryJob, metrics); LOG.error("Error executing BigQuery Job: '{}' in Project '{}', Dataset '{}': {}", jobId, sqlEngineConfig.getProject(), sqlEngineConfig.getDatasetProject(), queryJob.getStatus().getError().toString()); @@ -242,7 +248,7 @@ private SQLWriteResult writeInternal(SQLWriteRequest writeRequest, LOG.info("Executed copy operation for {} records from {}.{}.{} to {}.{}.{}", numRows, sourceTableId.getProject(), sourceTableId.getDataset(), sourceTableId.getTable(), destinationTableId.getProject(), destinationTableId.getDataset(), destinationTableId.getTable()); - BigQuerySQLEngineUtils.logJobMetrics(queryJob); + BigQuerySQLEngineUtils.logJobMetrics(queryJob, metrics); return SQLWriteResult.success(datasetName, numRows); } diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/util/BigQuerySQLEngineUtils.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/util/BigQuerySQLEngineUtils.java index 43259e940b..1dd6581bed 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/util/BigQuerySQLEngineUtils.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/util/BigQuerySQLEngineUtils.java @@ -28,6 +28,7 @@ import com.google.cloud.bigquery.TableInfo; import com.google.gson.Gson; import io.cdap.cdap.api.data.schema.Schema; +import io.cdap.cdap.api.metrics.Metrics; import io.cdap.cdap.etl.api.engine.sql.SQLEngineException; import io.cdap.cdap.etl.api.join.JoinCondition; import io.cdap.cdap.etl.api.join.JoinDefinition; @@ -58,6 +59,9 @@ public class BigQuerySQLEngineUtils { public static final String GCS_PATH_FORMAT = BigQuerySinkUtils.GS_PATH_FORMAT + "/%s"; public static final String BQ_TABLE_NAME_FORMAT = "%s_%s"; + public static final String METRIC_BYTES_PROCESSED = "bytes.processed"; + public static final String METRIC_BYTES_BILLED = "bytes.billed"; + public static final String METRIC_SLOT_MS = "slot.ms"; private BigQuerySQLEngineUtils() { // no-op @@ -303,8 +307,9 @@ public static Map getJobTags(String operation) { * Logs information about a BigQUery Job execution using a specified Logger instance * * @param job BigQuery Job + * @param metrics map used to collect additional metrics for this job. */ - public static void logJobMetrics(Job job) { + public static void logJobMetrics(Job job, Metrics metrics) { // Ensure job has statistics information if (job.getStatistics() == null) { LOG.warn("No statistics were found for BigQuery job {}", job.getJobId()); @@ -344,11 +349,21 @@ public static void logJobMetrics(Job job) { GSON.toJson(queryStatistics.getTimeline())); } + // Collect job metrics + if (queryStatistics.getTotalBytesProcessed() != null) { + metrics.countLong(METRIC_BYTES_PROCESSED, queryStatistics.getTotalBytesProcessed()); + } + if (queryStatistics.getTotalBytesBilled() != null) { + metrics.countLong(METRIC_BYTES_BILLED, queryStatistics.getTotalBytesBilled()); + } + if (queryStatistics.getTotalSlotMs() != null) { + metrics.countLong(METRIC_SLOT_MS, queryStatistics.getTotalSlotMs()); + } + return; } // Print basic metrics - JobStatistics statistics = job.getStatistics(); LOG.info("Metrics for job: {}\n" + " Start: {} ,\n" + " End: {} ,\n" +