diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryJoinDataset.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryJoinDataset.java index 0e0e64050d..ae838f1771 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryJoinDataset.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryJoinDataset.java @@ -150,12 +150,14 @@ public void executeJoin() { if (queryJob == null) { throw new SQLEngineException("BigQuery job not found: " + jobId); } else if (queryJob.getStatus().getError() != null) { + BigQuerySQLEngineUtils.logJobMetrics(queryJob); throw new SQLEngineException(String.format( "Error executing BigQuery Job: '%s' in Project '%s', Dataset '%s', Location'%s' : %s", jobId, project, bqDataset, location, queryJob.getStatus().getError().toString())); } LOG.info("Created BigQuery table `{}` using Job: {}", bqTable, jobId); + BigQuerySQLEngineUtils.logJobMetrics(queryJob); } @Override diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySelectDataset.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySelectDataset.java index d1ea72ce7e..a6a9f57fdf 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySelectDataset.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySelectDataset.java @@ -124,12 +124,14 @@ public BigQuerySelectDataset execute() { if (queryJob == null) { throw new SQLEngineException("BigQuery job not found: " + jobId); } else if (queryJob.getStatus().getError() != null) { + BigQuerySQLEngineUtils.logJobMetrics(queryJob); throw new SQLEngineException(String.format( "Error executing BigQuery Job: '%s' in Project '%s', Dataset '%s', Location'%s' : %s", jobId, project, bqDataset, location, queryJob.getStatus().getError().toString())); } LOG.info("Created BigQuery table `{}` using Job: {}", bqTable, jobId); + BigQuerySQLEngineUtils.logJobMetrics(queryJob); return this; } diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryWrite.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryWrite.java index 550d873a0e..bd559be3f3 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryWrite.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryWrite.java @@ -228,6 +228,7 @@ private SQLWriteResult writeInternal(SQLWriteRequest writeRequest, // Check for errors if (queryJob.getStatus().getError() != null) { + BigQuerySQLEngineUtils.logJobMetrics(queryJob); LOG.error("Error executing BigQuery Job: '{}' in Project '{}', Dataset '{}': {}", jobId, sqlEngineConfig.getProject(), sqlEngineConfig.getDatasetProject(), queryJob.getStatus().getError().toString()); @@ -241,6 +242,7 @@ private SQLWriteResult writeInternal(SQLWriteRequest writeRequest, LOG.info("Executed copy operation for {} records from {}.{}.{} to {}.{}.{}", numRows, sourceTableId.getProject(), sourceTableId.getDataset(), sourceTableId.getTable(), destinationTableId.getProject(), destinationTableId.getDataset(), destinationTableId.getTable()); + BigQuerySQLEngineUtils.logJobMetrics(queryJob); return SQLWriteResult.success(datasetName, numRows); } diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/util/BigQuerySQLEngineUtils.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/util/BigQuerySQLEngineUtils.java index e97d005f12..43259e940b 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/util/BigQuerySQLEngineUtils.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/util/BigQuerySQLEngineUtils.java @@ -18,11 +18,15 @@ import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.DatasetId; +import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.JobStatistics; +import com.google.cloud.bigquery.QueryStage; import com.google.cloud.bigquery.StandardTableDefinition; import com.google.cloud.bigquery.Table; import com.google.cloud.bigquery.TableDefinition; import com.google.cloud.bigquery.TableId; import com.google.cloud.bigquery.TableInfo; +import com.google.gson.Gson; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.etl.api.engine.sql.SQLEngineException; import io.cdap.cdap.etl.api.join.JoinCondition; @@ -41,6 +45,7 @@ import java.util.Map; import java.util.UUID; import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; import javax.annotation.Nullable; /** @@ -49,6 +54,7 @@ public class BigQuerySQLEngineUtils { private static final Logger LOG = LoggerFactory.getLogger(BigQuerySQLEngineUtils.class); + private static final Gson GSON = new Gson(); public static final String GCS_PATH_FORMAT = BigQuerySinkUtils.GS_PATH_FORMAT + "/%s"; public static final String BQ_TABLE_NAME_FORMAT = "%s_%s"; @@ -218,7 +224,7 @@ public static void validateOnExpressionJoinCondition(JoinCondition.OnExpression /** * Validates stages for a Join on Key operation - * + *

* TODO: Update logic once BQ SQL engine joins support multiple outer join tables * * @param joinDefinition Join Definition to validate @@ -292,4 +298,98 @@ public static Map getJobTags(String operation) { labels.put("pushdown_operation", operation); return Collections.unmodifiableMap(labels); } + + /** + * Logs information about a BigQUery Job execution using a specified Logger instance + * + * @param job BigQuery Job + */ + public static void logJobMetrics(Job job) { + // Ensure job has statistics information + if (job.getStatistics() == null) { + LOG.warn("No statistics were found for BigQuery job {}", job.getJobId()); + } + + String startTimeStr = getISODateTimeString(job.getStatistics().getStartTime()); + String endTimeStr = getISODateTimeString(job.getStatistics().getEndTime()); + String executionTimeStr = getExecutionTimeString(job.getStatistics().getStartTime(), + job.getStatistics().getEndTime()); + + // Print detailed query statistics if available + if (job.getStatistics() instanceof JobStatistics.QueryStatistics) { + JobStatistics.QueryStatistics queryStatistics = (JobStatistics.QueryStatistics) job.getStatistics(); + LOG.info("Metrics for job {}:\n" + + " Start: {} ,\n" + + " End: {} ,\n" + + " Execution time: {} ,\n" + + " Processed Bytes: {} ,\n" + + " Billed Bytes: {} ,\n" + + " Total Slot ms: {} ,\n" + + " Records per stage (read/write): {}", + job.getJobId().getJob(), + startTimeStr, + endTimeStr, + executionTimeStr, + queryStatistics.getTotalBytesProcessed(), + queryStatistics.getTotalBytesBilled(), + queryStatistics.getTotalSlotMs(), + getQueryStageRecordCounts(queryStatistics.getQueryPlan())); + + if (LOG.isTraceEnabled()) { + LOG.trace("Additional Metrics for job {}:\n" + + " Query Plan: {} ,\n" + + " Query Timeline: {} \n", + job.getJobId().getJob(), + GSON.toJson(queryStatistics.getQueryPlan()), + GSON.toJson(queryStatistics.getTimeline())); + } + + return; + } + + // Print basic metrics + JobStatistics statistics = job.getStatistics(); + LOG.info("Metrics for job: {}\n" + + " Start: {} ,\n" + + " End: {} ,\n" + + " Execution time: {}", + job.getJobId().getJob(), + startTimeStr, + endTimeStr, + executionTimeStr); + } + + private static String getISODateTimeString(Long epoch) { + if (epoch == null) { + return "N/A"; + } + + return Instant.ofEpochMilli(epoch).toString(); + } + + private static String getExecutionTimeString(Long startEpoch, Long endEpoch) { + if (startEpoch == null || endEpoch == null) { + return "N/A"; + } + + return (endEpoch - startEpoch) + " ms"; + } + + private static String getQueryStageRecordCounts(List queryPlan) { + if (queryPlan == null || queryPlan.isEmpty()) { + return "N/A"; + } + + return queryPlan.stream() + .map(qs -> formatRecordCount(qs.getRecordsRead()) + "/" + formatRecordCount(qs.getRecordsWritten())) + .collect(Collectors.joining(" , ", "[ ", " ]")); + } + + private static String formatRecordCount(Long val) { + if (val == null) { + return "N/A"; + } + + return val.toString(); + } }