diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java index 644c36739f..72fa9a2ea3 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java @@ -101,19 +101,15 @@ public final void prepareRun(BatchSinkContext context) throws Exception { @Override public void onRunFinish(boolean succeeded, BatchSinkContext context) { - Path gcsPath; + String gcsPath; String bucket = getConfig().getBucket(); if (bucket == null) { - gcsPath = new Path(String.format("gs://%s", runUUID.toString())); + gcsPath = String.format("gs://%s", runUUID.toString()); } else { - gcsPath = new Path(String.format(gcsPathFormat, bucket, runUUID.toString())); + gcsPath = String.format(gcsPathFormat, bucket, runUUID.toString()); } try { - FileSystem fs = gcsPath.getFileSystem(baseConfiguration); - if (fs.exists(gcsPath)) { - fs.delete(gcsPath, true); - LOG.debug("Deleted temporary directory '{}'", gcsPath); - } + BigQueryUtil.deleteTemporaryDirectory(baseConfiguration, gcsPath); } catch (IOException e) { LOG.warn("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage()); } diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtils.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtils.java index 12ce451c30..25fecfa69d 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtils.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtils.java @@ -155,8 +155,26 @@ private static void createBucket(Storage storage, String bucket, @Nullable Strin * @return bucket name */ public static String configureBucket(Configuration baseConfiguration, @Nullable String bucket, String runId) { + boolean deleteBucket = false; + // If the bucket is null, assign the run ID as the bucket name and mark the bucket for deletion. if (bucket == null) { bucket = runId; + deleteBucket = true; + } + return configureBucket(baseConfiguration, bucket, runId, deleteBucket); + } + + /** + * Updates {@link Configuration} with bucket details. + * Uses provided bucket, otherwise uses provided runId as a bucket name. + * + * @return bucket name + */ + public static String configureBucket(Configuration baseConfiguration, + String bucket, + String runId, + boolean deleteBucket) { + if (deleteBucket) { // By default, this option is false, meaning the job can not delete the bucket. // So enable it only when bucket name is not provided. baseConfiguration.setBoolean("fs.gs.bucket.delete.enable", true); diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryPushDataset.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryPushDataset.java index 6666fc6292..0a4223de55 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryPushDataset.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQueryPushDataset.java @@ -98,7 +98,6 @@ protected static BigQueryPushDataset getInstance(SQLPushRequest pushRequest, String gcsPath = BigQuerySQLEngineUtils.getGCSPath(bucket, runId, table); List fields = BigQuerySinkUtils.getBigQueryTableFieldsFromSchema(pushRequest.getDatasetSchema()); - BigQuerySinkUtils.configureBucket(configuration, bucket, runId); BigQuerySinkUtils.configureOutput(configuration, project, dataset, table, gcsPath, fields); // Create empty table to store uploaded records. diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java index 09fc65906a..574b6d1db1 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java @@ -125,7 +125,30 @@ public void prepareRun(RuntimeContext context) throws Exception { String cmekKey = context.getRuntimeArguments().get(GCPUtils.CMEK_KEY); configuration = BigQueryUtil.getBigQueryConfig(sqlEngineConfig.getServiceAccount(), sqlEngineConfig.getProject(), cmekKey, sqlEngineConfig.getServiceAccountType()); + // Create resources needed for this execution BigQuerySinkUtils.createResources(bigQuery, storage, dataset, bucket, sqlEngineConfig.getLocation(), cmekKey); + // Configure GCS bucket that is used to stage temporary files. + // If the bucket is created for this run, mar it for deletion after executon is completed + BigQuerySinkUtils.configureBucket(configuration, bucket, runId, sqlEngineConfig.getBucket() == null); + } + + @Override + public void onRunFinish(boolean succeeded, RuntimeContext context) { + super.onRunFinish(succeeded, context); + + String gcsPath; + // If the bucket was created for this run, we should delete it. + // Otherwise, just clean the directory within the provided bucket. + if (sqlEngineConfig.getBucket() == null) { + gcsPath = String.format("gs://%s", bucket); + } else { + gcsPath = String.format(BigQuerySinkUtils.GS_PATH_FORMAT, bucket, runId); + } + try { + BigQueryUtil.deleteTemporaryDirectory(configuration, gcsPath); + } catch (IOException e) { + LOG.warn("Failed to delete temporary directory '{}': {}", gcsPath, e.getMessage()); + } } @Override