diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java index bec4068271..62a9c66055 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java @@ -97,7 +97,9 @@ public final void prepareRun(BatchSinkContext context) throws Exception { // Get the required bucket name and bucket instance (if it exists) Storage storage = GCPUtils.getStorage(project, credentials); - String bucketName = getStagingBucketName(context, config, dataset); + String bucketName = BigQueryUtil.getStagingBucketName(context.getArguments().asMap(), config.getLocation(), + dataset, config.getBucket()); + bucketName = BigQuerySinkUtils.configureBucket(baseConfiguration, bucketName, runUUID.toString()); Bucket bucket = storage.get(bucketName); if (!context.isPreviewEnabled()) { @@ -306,34 +308,4 @@ protected Configuration getOutputConfiguration() throws IOException { return configuration; } - /** - * Identify a stating bucket name from the pipeline context and sink configuration - * @param context Sink Context - * @param config Sink Configuration - * @return Bucket name to use for this sink. - */ - protected String getStagingBucketName(BatchSinkContext context, - AbstractBigQuerySinkConfig config, - @Nullable Dataset dataset) { - // Get temporary bucket name from sink configuration - String bucketName = config.getBucket(); - - // Get Bucket Prefix from configuration - String bucketPrefix = BigQueryUtil.getBucketPrefix(context.getArguments()); - - // If temp bucket name is not defined in configuration, and a common bucket name prefix has been specified, - // we must set this prefix along with the destination location in order to create/reuse the bucket. - // Otherwise, if temp bucket name is defined, or a prefix is not set, the configureBucket method will prepare - // for a new bucket creation. - if (bucketName == null && bucketPrefix != null) { - // If the destination dataset exists, use the dataset location. Otherwise, use location from configuration. - String datasetLocation = dataset != null ? dataset.getLocation() : config.getLocation(); - - // Get the bucket name for the specified location. - bucketName = BigQueryUtil.getBucketNameForLocation(bucketPrefix, datasetLocation); - } - - return BigQuerySinkUtils.configureBucket(baseConfiguration, bucketName, runUUID.toString()); - } - } diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java index 60f3d0d2e4..a1e667f867 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java @@ -147,7 +147,8 @@ public void prepareRun(BatchSourceContext context) throws Exception { configuration = BigQueryUtil.getBigQueryConfig(serviceAccount, config.getProject(), cmekKeyName, config.getServiceAccountType()); - String bucketName = getBucketName(context, dataset); + String bucketName = BigQueryUtil.getStagingBucketName(context.getArguments().asMap(), null, + dataset, config.getBucket()); // Configure GCS Bucket to use String bucket = BigQuerySourceUtils.getOrCreateBucket(configuration, @@ -392,20 +393,4 @@ private void emitLineage(BatchSourceContext context, Schema schema, Type sourceT schema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList())); } } - - @Nullable - private String getBucketName(BatchSourceContext context, Dataset dataset) { - // Get the bucket name from configuration, and the bucket prefix if defined. - String bucketName = config.getBucket(); - String bucketPrefix = BigQueryUtil.getBucketPrefix(context.getArguments()); - - // If temp bucket name is not defined in configuration, and a common bucket name prefix has been specified, - // we must set this prefix along with the source dataset location. - // Otherwise, return the original bucket name (may be null). - if (bucketName == null && bucketPrefix != null) { - bucketName = BigQueryUtil.getBucketNameForLocation(bucketPrefix, dataset.getLocation()); - } - - return bucketName; - } } diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java index e510c3b9d3..017a660b90 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sqlengine/BigQuerySQLEngine.java @@ -19,6 +19,7 @@ import com.google.auth.Credentials; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryException; +import com.google.cloud.bigquery.Dataset; import com.google.cloud.bigquery.DatasetId; import com.google.cloud.bigquery.Job; import com.google.cloud.bigquery.TableId; @@ -26,7 +27,6 @@ import com.google.cloud.storage.Storage; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; -import io.cdap.cdap.api.RuntimeContext; import io.cdap.cdap.api.SQLEngineContext; import io.cdap.cdap.api.annotation.Description; import io.cdap.cdap.api.annotation.Metadata; @@ -116,7 +116,7 @@ public class BigQuerySQLEngine private Configuration configuration; private String project; private String datasetProject; - private String dataset; + private String datasetName; private String bucket; private String runId; private Map datasets; @@ -153,12 +153,14 @@ public void prepareRun(SQLEngineContext context) throws Exception { null : GCPUtils.loadServiceAccountCredentials(serviceAccount, sqlEngineConfig.isServiceAccountFilePath()); project = sqlEngineConfig.getProject(); datasetProject = sqlEngineConfig.getDatasetProject(); - dataset = sqlEngineConfig.getDataset(); - bucket = sqlEngineConfig.getBucket() != null ? sqlEngineConfig.getBucket() : "bqpushdown-" + runId; + datasetName = sqlEngineConfig.getDataset(); // Initialize BQ and GCS clients. bigQuery = GCPUtils.getBigQuery(project, credentials); storage = GCPUtils.getStorage(project, credentials); + Dataset dataset = bigQuery.getDataset(DatasetId.of(datasetProject, datasetName)); + bucket = BigQueryUtil.getStagingBucketName(context.getRuntimeArguments(), sqlEngineConfig.getLocation(), + dataset, sqlEngineConfig.getBucket()); String cmekKey = !Strings.isNullOrEmpty(sqlEngineConfig.cmekKey) ? sqlEngineConfig.cmekKey : ctx.getRuntimeArguments().get(CmekUtils.CMEK_KEY); @@ -169,11 +171,12 @@ public void prepareRun(SQLEngineContext context) throws Exception { configuration = BigQueryUtil.getBigQueryConfig(sqlEngineConfig.getServiceAccount(), sqlEngineConfig.getProject(), cmekKeyName, sqlEngineConfig.getServiceAccountType()); // Create resources needed for this execution - BigQuerySinkUtils.createResources(bigQuery, storage, DatasetId.of(datasetProject, dataset), bucket, + BigQuerySinkUtils.createResources(bigQuery, storage, DatasetId.of(datasetProject, datasetName), bucket, sqlEngineConfig.getLocation(), cmekKeyName); // Configure GCS bucket that is used to stage temporary files. - // If the bucket is created for this run, mar it for deletion after executon is completed - BigQuerySinkUtils.configureBucket(configuration, bucket, runId, sqlEngineConfig.getBucket() == null); + // If the bucket is created for this run, mark it for deletion after executon is completed + String fallbackBucketName = "bqpushdown-" + runId; + BigQuerySinkUtils.configureBucket(configuration, bucket, fallbackBucketName); // Configure credentials for the source BigQuerySourceUtils.configureServiceAccount(configuration, sqlEngineConfig.connection); @@ -210,7 +213,7 @@ public SQLPushDataset getPushP sqlEngineConfig, configuration, bigQuery, - DatasetId.of(datasetProject, dataset), + DatasetId.of(datasetProject, datasetName), bucket, runId); @@ -241,7 +244,7 @@ public SQLPullDataset getPul return BigQueryPullDataset.getInstance(sqlPullRequest, configuration, bigQuery, - DatasetId.of(datasetProject, dataset), + DatasetId.of(datasetProject, datasetName), table, bucket, runId); @@ -306,7 +309,7 @@ public SQLDataset join(SQLJoinRequest sqlJoinRequest) throws SQLEngineException // Get SQL builder for this Join operation BigQueryJoinSQLBuilder builder = new BigQueryJoinSQLBuilder( sqlJoinRequest.getJoinDefinition(), - DatasetId.of(datasetProject, dataset), + DatasetId.of(datasetProject, datasetName), getStageNameToBQTableNameMap()); // Execute Select job with the supplied query. @@ -328,7 +331,7 @@ public SQLDatasetProducer getProducer(SQLPullRequest pullRequest, PullCapability return new BigQuerySparkDatasetProducer(sqlEngineConfig, datasetProject, - dataset, + datasetName, table, pullRequest.getDatasetSchema()); } @@ -363,7 +366,7 @@ public SQLReadResult read(SQLReadRequest readRequest) throws SQLEngineException String destinationTable = BigQuerySQLEngineUtils.getNewTableName(runId); // Create empty table to store query results. - TableId destinationTableId = TableId.of(datasetProject, dataset, destinationTable); + TableId destinationTableId = TableId.of(datasetProject, this.datasetName, destinationTable); // Build Big Query Write instance and execute write operation. BigQueryReadDataset readDataset = BigQueryReadDataset.getInstance(datasetName, @@ -393,7 +396,7 @@ public SQLWriteResult write(SQLWriteRequest writeRequest) { // Get source table information (from the stage we are attempting to write into the sink) String sourceTable = datasets.get(writeRequest.getDatasetName()).getBigQueryTable(); - TableId sourceTableId = TableId.of(datasetProject, dataset, sourceTable); + TableId sourceTableId = TableId.of(datasetProject, this.datasetName, sourceTable); // Build Big Query Write instance and execute write operation. BigQueryWrite bigQueryWrite = BigQueryWrite.getInstance(datasetName, @@ -560,7 +563,7 @@ private BigQuerySelectDataset executeSelect(String datasetName, String table = BigQuerySQLEngineUtils.getNewTableName(runId); // Create empty table to store query results. - BigQuerySQLEngineUtils.createEmptyTable(sqlEngineConfig, bigQuery, project, dataset, table); + BigQuerySQLEngineUtils.createEmptyTable(sqlEngineConfig, bigQuery, project, this.datasetName, table); BigQuerySelectDataset selectDataset = BigQuerySelectDataset.getInstance( datasetName, @@ -568,7 +571,7 @@ private BigQuerySelectDataset executeSelect(String datasetName, sqlEngineConfig, bigQuery, project, - DatasetId.of(datasetProject, dataset), + DatasetId.of(datasetProject, this.datasetName), table, jobId, jobType, @@ -640,7 +643,7 @@ protected void deleteTable(String stageName, BigQuerySQLDataset bqDataset) throw } String tableName = bqDataset.getBigQueryTable(); - TableId tableId = TableId.of(datasetProject, dataset, tableName); + TableId tableId = TableId.of(datasetProject, datasetName, tableName); // Delete this table if found if (bigQuery.getTable(tableId) != null) { diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java index 001cc44202..3f6eea5017 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java @@ -19,6 +19,7 @@ import com.google.api.client.googleapis.media.MediaHttpUploader; import com.google.cloud.bigquery.BigQuery; import com.google.cloud.bigquery.BigQueryException; +import com.google.cloud.bigquery.Dataset; import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.FieldList; import com.google.cloud.bigquery.LegacySQLTypeName; @@ -792,9 +793,9 @@ public static String getFQN(String datasetProject, String datasetName, String ta * @return the bucket prefix to use for this pipeline */ @Nullable - public static String getBucketPrefix(SettableArguments arguments) { + public static String getBucketPrefix(Map arguments) { // If the bucket prefix property is set, use it. - if (arguments.has(BIGQUERY_BUCKET_PREFIX_PROPERTY_NAME)) { + if (arguments.containsKey(BIGQUERY_BUCKET_PREFIX_PROPERTY_NAME)) { String bucketPrefix = arguments.get(BIGQUERY_BUCKET_PREFIX_PROPERTY_NAME); validateBucketPrefix(bucketPrefix); LOG.debug("Using bucket prefix for temporary buckets: {}", bucketPrefix); @@ -858,4 +859,31 @@ public static Map getJobTags(String jobType) { labels.put("type", jobType); return labels; } + + /** + * Identify a stating bucket name from the pipeline context and plugin configuration + * @param arguments runtime arguments + * @param configLocation location from plugin configuration + * @param dataset BigQuery dataset + * @param bucket bucket from plugin configuration + * @return Bucket name to use for this sink. + */ + @Nullable + public static String getStagingBucketName(Map arguments, @Nullable String configLocation, + @Nullable Dataset dataset, @Nullable String bucket) { + // Get Bucket Prefix from configuration + String bucketPrefix = BigQueryUtil.getBucketPrefix(arguments); + + // If temp bucket name is not defined in configuration, and a common bucket name prefix has been specified, + // we must set this prefix along with the destination location in order to create/reuse the bucket. + // Otherwise, if temp bucket name is defined, or a prefix is not set, the configureBucket method will prepare + // for a new bucket creation. + if (Strings.isNullOrEmpty(bucket) && bucketPrefix != null) { + // If the destination dataset exists, use the dataset location. Otherwise, use location from configuration. + String datasetLocation = dataset != null ? dataset.getLocation() : configLocation; + // Get the bucket name for the specified location. + bucket = BigQueryUtil.getBucketNameForLocation(bucketPrefix, datasetLocation); + } + return bucket; + } } diff --git a/src/main/java/io/cdap/plugin/gcp/dataplex/sink/DataplexBatchSink.java b/src/main/java/io/cdap/plugin/gcp/dataplex/sink/DataplexBatchSink.java index 987e85a2a8..18e518877c 100644 --- a/src/main/java/io/cdap/plugin/gcp/dataplex/sink/DataplexBatchSink.java +++ b/src/main/java/io/cdap/plugin/gcp/dataplex/sink/DataplexBatchSink.java @@ -20,6 +20,7 @@ import com.google.auth.Credentials; import com.google.auth.oauth2.GoogleCredentials; import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.Dataset; import com.google.cloud.bigquery.DatasetId; import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.Job; @@ -284,23 +285,29 @@ private void prepareRunBigQueryDataset(BatchSinkContext context) throws Exceptio baseConfiguration = getBaseConfiguration(cmekKeyName); // asset.getResourceSpec().getName() will be of format 'projects/datasetProjectName/datasets/datasetName' String[] assetValues = asset.getResourceSpec().getName().split("/"); - String dataset = assetValues[assetValues.length - 1]; + String datasetName = assetValues[assetValues.length - 1]; String datasetProject = assetValues[assetValues.length - 3]; bigQuery = GCPUtils.getBigQuery(datasetProject, credentials); - String bucket = BigQuerySinkUtils.configureBucket(baseConfiguration, null, runUUID.toString()); + // Get required dataset ID and dataset instance (if it exists) + DatasetId datasetId = DatasetId.of(datasetProject, datasetName); + Dataset dataset = bigQuery.getDataset(datasetId); + String bucket = BigQueryUtil.getStagingBucketName(context.getArguments().asMap(), config.getLocation(), + dataset, null); + String fallbackBucketName = "dataplex-" + runUUID; + bucket = BigQuerySinkUtils.configureBucket(baseConfiguration, bucket, fallbackBucketName); if (!context.isPreviewEnabled()) { BigQuerySinkUtils.createResources(bigQuery, GCPUtils.getStorage(project, credentials), - DatasetId.of(datasetProject, dataset), + DatasetId.of(datasetProject, datasetName), bucket, config.getLocation(), cmekKeyName); } Schema configSchema = config.getSchema(collector); Schema outputSchema = configSchema == null ? context.getInputSchema() : configSchema; - configureTable(outputSchema, dataset, datasetProject, collector); + configureTable(outputSchema, datasetName, datasetProject, collector); configureBigQuerySink(); initOutput(context, bigQuery, - config.getReferenceName(BigQueryUtil.getFQN(datasetProject, dataset, config.getTable())), - config.getTable(), outputSchema, bucket, collector, dataset, datasetProject); + config.getReferenceName(BigQueryUtil.getFQN(datasetProject, datasetName, config.getTable())), + config.getTable(), outputSchema, bucket, collector, datasetName, datasetProject); } diff --git a/src/main/java/io/cdap/plugin/gcp/dataplex/source/DataplexBatchSource.java b/src/main/java/io/cdap/plugin/gcp/dataplex/source/DataplexBatchSource.java index 84f7cbfa34..043889cfbe 100644 --- a/src/main/java/io/cdap/plugin/gcp/dataplex/source/DataplexBatchSource.java +++ b/src/main/java/io/cdap/plugin/gcp/dataplex/source/DataplexBatchSource.java @@ -45,6 +45,7 @@ import io.cdap.cdap.etl.api.PipelineConfigurer; import io.cdap.cdap.etl.api.StageConfigurer; import io.cdap.cdap.etl.api.batch.BatchRuntimeContext; +import io.cdap.cdap.etl.api.batch.BatchSinkContext; import io.cdap.cdap.etl.api.batch.BatchSource; import io.cdap.cdap.etl.api.batch.BatchSourceContext; import io.cdap.cdap.etl.api.validation.ValidatingInputFormat; @@ -52,6 +53,7 @@ import io.cdap.plugin.common.LineageRecorder; import io.cdap.plugin.common.ReferenceNames; import io.cdap.plugin.common.batch.JobUtils; +import io.cdap.plugin.gcp.bigquery.sink.BigQuerySinkUtils; import io.cdap.plugin.gcp.bigquery.source.BigQueryAvroToStructuredTransformer; import io.cdap.plugin.gcp.bigquery.source.BigQuerySourceUtils; import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants; @@ -60,6 +62,7 @@ import io.cdap.plugin.gcp.common.GCPUtils; import io.cdap.plugin.gcp.dataplex.common.util.DataplexConstants; import io.cdap.plugin.gcp.dataplex.common.util.DataplexUtil; +import io.cdap.plugin.gcp.dataplex.sink.config.DataplexBatchSinkConfig; import io.cdap.plugin.gcp.dataplex.source.config.DataplexBatchSourceConfig; import io.cdap.plugin.gcp.gcs.GCSPath; @@ -223,8 +226,10 @@ private void prepareRunBigQueryDataset(BatchSourceContext context) throws Except config.getServiceAccountType()); // Configure temporay GCS Bucket to use - String bucket = createBucket(configuration, - config.getProject(), bigQuery, credentials, bucketPath); + String bucketName = BigQueryUtil.getStagingBucketName(context.getArguments().asMap(), config.getLocation(), + bigQuery.getDataset(DatasetId.of(datasetProject, dataset)), + null); + String bucket = createBucket(configuration, config.getProject(), bigQuery, credentials, bucketName, bucketPath); // Configure Service account credentials configureServiceAccount(configuration, config.getConnection()); @@ -464,18 +469,21 @@ public void transform(KeyValue input, Emitter * @param project GCP projectId * @param bigQuery bigquery client * @param credentials GCP credentials - * @param bucketName bucket Name + * @param bucket bucket name + * @param bucketPath bucket path to use. Will be used as a bucket name if needed.. * @return Bucket name. */ private String createBucket(Configuration configuration, String project, BigQuery bigQuery, Credentials credentials, - String bucketName) throws IOException { - String bucket = String.format(BQ_TEMP_BUCKET_NAME_TEMPLATE, bucketName); - // By default, this option is false, meaning the job can not delete the bucket. - configuration.setBoolean("fs.gs.bucket.delete.enable", true); - + @Nullable String bucket, + String bucketPath) throws IOException { + if (bucket == null) { + bucket = String.format(BQ_TEMP_BUCKET_NAME_TEMPLATE, bucketPath); + // By default, this option is false, meaning the job can not delete the bucket. + configuration.setBoolean("fs.gs.bucket.delete.enable", true); + } // the dataset existence is validated before, so this cannot be null Dataset bigQueryDataset = bigQuery.getDataset(DatasetId.of(datasetProject, dataset)); createBucket(configuration, GCPUtils.getStorage(project, credentials), diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtilTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtilTest.java index 8dcc40b71d..586d4b9b55 100644 --- a/src/test/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtilTest.java +++ b/src/test/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtilTest.java @@ -20,7 +20,6 @@ import com.google.cloud.bigquery.StandardSQLTypeName; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.etl.api.FailureCollector; -import io.cdap.cdap.etl.api.action.SettableArguments; import io.cdap.cdap.etl.api.validation.ValidationFailure; import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize.BigNumeric; import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize.Numeric; @@ -31,8 +30,10 @@ import org.powermock.modules.junit4.PowerMockRunner; import java.util.ArrayList; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import static org.junit.Assert.assertEquals; @@ -138,39 +139,22 @@ public void testValidateArraySchema() { @Test public void testGetBucketPrefix() { - SettableArguments args = Mockito.mock(SettableArguments.class); - Mockito.when(args.has(BUCKET_PREFIX_ARG)) - .thenReturn(true); - Mockito.when(args.get(BUCKET_PREFIX_ARG)) - .thenReturn("this-is-valid-as-a-prefix-to-use-123456789_.abcdef"); - BigQueryUtil.getBucketPrefix(args); - - // Ensure method was called. - Mockito.verify(args, Mockito.times(1)).has(BUCKET_PREFIX_ARG); - Mockito.verify(args, Mockito.times(1)).get(BUCKET_PREFIX_ARG); + Map args = new HashMap<>(); + String prefix = "this-is-valid-as-a-prefix-to-use-123456789_.abcdef"; + args.put(BUCKET_PREFIX_ARG, prefix); + Assert.assertEquals(BigQueryUtil.getBucketPrefix(args), prefix); } @Test public void testGetBucketPrefixNotSet() { - SettableArguments args = Mockito.mock(SettableArguments.class); - Mockito.when(args.has(BUCKET_PREFIX_ARG)) - .thenReturn(false); - Mockito.when(args.get(BUCKET_PREFIX_ARG)) - .thenReturn("this-is-valid-as-a-prefix-to-use-123456789_.abcdef"); - BigQueryUtil.getBucketPrefix(args); - - // Ensure method was called. - Mockito.verify(args, Mockito.times(1)).has(BUCKET_PREFIX_ARG); - Mockito.verify(args, Mockito.times(0)).get(BUCKET_PREFIX_ARG); + Map args = new HashMap<>(); + Assert.assertNull(BigQueryUtil.getBucketPrefix(args)); } @Test public void testGetBucketPrefixInvalidBucketName() { - SettableArguments args = Mockito.mock(SettableArguments.class); - Mockito.when(args.has(BUCKET_PREFIX_ARG)) - .thenReturn(true); - Mockito.when(args.get(BUCKET_PREFIX_ARG)) - .thenReturn("This is an invalid bucket name!@"); + Map args = new HashMap<>(); + args.put(BUCKET_PREFIX_ARG, "This is an invalid bucket name!@"); IllegalArgumentException e = null; @@ -188,11 +172,8 @@ public void testGetBucketPrefixInvalidBucketName() { @Test public void testGetBucketPrefixTooLong() { - SettableArguments args = Mockito.mock(SettableArguments.class); - Mockito.when(args.has(BUCKET_PREFIX_ARG)) - .thenReturn(true); - Mockito.when(args.get(BUCKET_PREFIX_ARG)) - .thenReturn("this-prefix-is-too-long-to-be-used-as-a-prefix-oops"); + Map args = new HashMap<>(); + args.put(BUCKET_PREFIX_ARG, "this-prefix-is-too-long-to-be-used-as-a-prefix-oops"); IllegalArgumentException e = null;