From 3adb4b6263f3ec5d8d6f55a04ee7aa115eeafa6e Mon Sep 17 00:00:00 2001 From: Fernando Velasquez Date: Wed, 28 Sep 2022 21:27:38 -0400 Subject: [PATCH] PLUGIN-1413 Added argument for BQ temporary staging bucket names --- .../bigquery/sink/AbstractBigQuerySink.java | 53 +++++++- .../gcp/bigquery/sink/BigQuerySinkUtils.java | 33 ++++- .../gcp/bigquery/source/BigQuerySource.java | 23 +++- .../gcp/bigquery/util/BigQueryUtil.java | 67 +++++++++ .../gcp/bigquery/util/BigQueryUtilTest.java | 128 ++++++++++++++++++ 5 files changed, 290 insertions(+), 14 deletions(-) diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java index 6adf70938a..6258958738 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/AbstractBigQuerySink.java @@ -17,10 +17,13 @@ import com.google.auth.Credentials; import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.Dataset; import com.google.cloud.bigquery.DatasetId; import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration; import com.google.cloud.hadoop.io.bigquery.output.BigQueryTableFieldSchema; import com.google.cloud.kms.v1.CryptoKeyName; +import com.google.cloud.storage.Bucket; +import com.google.cloud.storage.Storage; import com.google.common.base.Strings; import io.cdap.cdap.api.data.batch.Output; import io.cdap.cdap.api.data.batch.OutputFormatProvider; @@ -32,7 +35,6 @@ import io.cdap.cdap.etl.api.batch.BatchSink; import io.cdap.cdap.etl.api.batch.BatchSinkContext; import io.cdap.plugin.common.Asset; -import io.cdap.plugin.common.ReferenceNames; import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants; import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize; import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil; @@ -88,14 +90,23 @@ public final void prepareRun(BatchSinkContext context) throws Exception { CryptoKeyName cmekKeyName = CmekUtils.getCmekKey(config.cmekKey, context.getArguments().asMap(), collector); collector.getOrThrowException(); baseConfiguration = getBaseConfiguration(cmekKeyName); - String bucket = BigQuerySinkUtils.configureBucket(baseConfiguration, config.getBucket(), runUUID.toString()); + + // Get required dataset ID and dataset instance (if it exists) + DatasetId datasetId = DatasetId.of(config.getDatasetProject(), config.getDataset()); + Dataset dataset = bigQuery.getDataset(datasetId); + + // Get the required bucket name and bucket instance (if it exists) + Storage storage = GCPUtils.getStorage(project, credentials); + String bucketName = getStagingBucketName(context, config, dataset); + Bucket bucket = storage.get(bucketName); + if (!context.isPreviewEnabled()) { - BigQuerySinkUtils.createResources(bigQuery, GCPUtils.getStorage(project, credentials), - DatasetId.of(config.getDatasetProject(), config.getDataset()), - bucket, config.getLocation(), cmekKeyName); + BigQuerySinkUtils.createResources(bigQuery, dataset, datasetId, + storage, bucket, bucketName, + config.getLocation(), cmekKeyName); } - prepareRunInternal(context, bigQuery, bucket); + prepareRunInternal(context, bigQuery, bucketName); } @Override @@ -291,4 +302,34 @@ protected Configuration getOutputConfiguration() throws IOException { return configuration; } + /** + * Identify a stating bucket name from the pipeline context and sink configuration + * @param context Sink Context + * @param config Sink Configuration + * @return Bucket name to use for this sink. + */ + protected String getStagingBucketName(BatchSinkContext context, + AbstractBigQuerySinkConfig config, + @Nullable Dataset dataset) { + // Get temporary bucket name from sink configuration + String bucketName = config.getBucket(); + + // Get Bucket Prefix from configuration + String bucketPrefix = BigQueryUtil.getBucketPrefix(context.getArguments()); + + // If temp bucket name is not defined in configuration, and a common bucket name prefix has been specified, + // we must set this prefix along with the destination location in order to create/reuse the bucket. + // Otherwise, if temp bucket name is defined, or a prefix is not set, the configureBucket method will prepare + // for a new bucket creation. + if (bucketName == null && bucketPrefix != null) { + // If the destination dataset exists, use the dataset location. Otherwise, use location from configuration. + String datasetLocation = dataset != null ? dataset.getLocation() : config.getLocation(); + + // Get the bucket name for the specified location. + bucketName = BigQueryUtil.getBucketNameForLocation(bucketPrefix, datasetLocation); + } + + return BigQuerySinkUtils.configureBucket(baseConfiguration, bucketName, runUUID.toString()); + } + } diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtils.java b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtils.java index 157225f5d9..e9ff785765 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtils.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/sink/BigQuerySinkUtils.java @@ -111,25 +111,50 @@ public final class BigQuerySinkUtils { public static void createResources(BigQuery bigQuery, Storage storage, DatasetId datasetId, String bucketName, @Nullable String location, @Nullable CryptoKeyName cmekKeyName) throws IOException { + // Get the required Dataset and bucket instances using the supplied clients Dataset dataset = bigQuery.getDataset(datasetId); Bucket bucket = storage.get(bucketName); + createResources(bigQuery, dataset, datasetId, storage, bucket, bucketName, location, cmekKeyName); + } + + /** + * Creates the given dataset and bucket if the supplied ones are null. + * + * If the dataset already exists but the + * bucket does not, the bucket will be created in the same location as the dataset. If the bucket already exists + * but the dataset does not, the dataset will attempt to be created in the same location. This may fail if the bucket + * is in a location that BigQuery does not yet support. + * + * @param bigQuery the bigquery client for the project + * @param dataset the bigquery dataset instance (may be null) + * @param datasetId the Id of the dataset + * @param storage the storage client for the project + * @param bucket the storage bucket instance (may be null) + * @param bucketName the name of the bucket + * @param location the location of the resources, this is only applied if both the bucket and dataset do not exist + * @param cmekKey the name of the cmek key + * @throws IOException + */ + public static void createResources(BigQuery bigQuery, @Nullable Dataset dataset, DatasetId datasetId, + Storage storage, @Nullable Bucket bucket, String bucketName, + @Nullable String location, @Nullable CryptoKeyName cmekKey) throws IOException { if (dataset == null && bucket == null) { - createBucket(storage, bucketName, location, cmekKeyName, + createBucket(storage, bucketName, location, cmekKey, () -> String.format("Unable to create Cloud Storage bucket '%s'", bucketName)); - createDataset(bigQuery, datasetId, location, cmekKeyName, + createDataset(bigQuery, datasetId, location, cmekKey, () -> String.format("Unable to create BigQuery dataset '%s.%s'", datasetId.getProject(), datasetId.getDataset())); } else if (bucket == null) { createBucket( - storage, bucketName, dataset.getLocation(), cmekKeyName, + storage, bucketName, dataset.getLocation(), cmekKey, () -> String.format( "Unable to create Cloud Storage bucket '%s' in the same location ('%s') as BigQuery dataset '%s'. " + "Please use a bucket that is in the same location as the dataset.", bucketName, dataset.getLocation(), datasetId.getProject() + "." + datasetId.getDataset())); } else if (dataset == null) { createDataset( - bigQuery, datasetId, bucket.getLocation(), cmekKeyName, + bigQuery, datasetId, bucket.getLocation(), cmekKey, () -> String.format( "Unable to create BigQuery dataset '%s' in the same location ('%s') as Cloud Storage bucket '%s'. " + "Please use a bucket that is in a supported location.", diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java index eda27bfe26..60f3d0d2e4 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/source/BigQuerySource.java @@ -28,7 +28,6 @@ import com.google.cloud.bigquery.TimePartitioning; import com.google.cloud.kms.v1.CryptoKeyName; import com.google.cloud.storage.Storage; -import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import com.google.gson.Gson; import io.cdap.cdap.api.annotation.Description; @@ -52,11 +51,9 @@ import io.cdap.cdap.etl.api.validation.ValidationFailure; import io.cdap.plugin.common.Asset; import io.cdap.plugin.common.LineageRecorder; -import io.cdap.plugin.common.ReferenceNames; import io.cdap.plugin.gcp.bigquery.connector.BigQueryConnector; import io.cdap.plugin.gcp.bigquery.sqlengine.BigQueryReadDataset; import io.cdap.plugin.gcp.bigquery.sqlengine.BigQuerySQLEngine; -import io.cdap.plugin.gcp.bigquery.sqlengine.BigQueryWrite; import io.cdap.plugin.gcp.bigquery.util.BigQueryConstants; import io.cdap.plugin.gcp.bigquery.util.BigQueryUtil; import io.cdap.plugin.gcp.common.CmekUtils; @@ -150,10 +147,12 @@ public void prepareRun(BatchSourceContext context) throws Exception { configuration = BigQueryUtil.getBigQueryConfig(serviceAccount, config.getProject(), cmekKeyName, config.getServiceAccountType()); + String bucketName = getBucketName(context, dataset); + // Configure GCS Bucket to use String bucket = BigQuerySourceUtils.getOrCreateBucket(configuration, storage, - config.getBucket(), + bucketName, dataset, bucketPath, cmekKeyName); @@ -393,4 +392,20 @@ private void emitLineage(BatchSourceContext context, Schema schema, Type sourceT schema.getFields().stream().map(Schema.Field::getName).collect(Collectors.toList())); } } + + @Nullable + private String getBucketName(BatchSourceContext context, Dataset dataset) { + // Get the bucket name from configuration, and the bucket prefix if defined. + String bucketName = config.getBucket(); + String bucketPrefix = BigQueryUtil.getBucketPrefix(context.getArguments()); + + // If temp bucket name is not defined in configuration, and a common bucket name prefix has been specified, + // we must set this prefix along with the source dataset location. + // Otherwise, return the original bucket name (may be null). + if (bucketName == null && bucketPrefix != null) { + bucketName = BigQueryUtil.getBucketNameForLocation(bucketPrefix, dataset.getLocation()); + } + + return bucketName; + } } diff --git a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java index c100d5ab57..fa7125d358 100644 --- a/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java +++ b/src/main/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtil.java @@ -29,11 +29,13 @@ import com.google.cloud.bigquery.TimePartitioning; import com.google.cloud.hadoop.io.bigquery.BigQueryConfiguration; import com.google.cloud.kms.v1.CryptoKeyName; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.cdap.etl.api.action.SettableArguments; import io.cdap.cdap.etl.api.validation.InvalidConfigPropertyException; import io.cdap.cdap.etl.api.validation.InvalidStageException; import io.cdap.cdap.etl.api.validation.ValidationFailure; @@ -63,6 +65,8 @@ import java.util.regex.Pattern; import java.util.stream.Collectors; import java.util.stream.StreamSupport; +import java.util.zip.CRC32; +import java.util.zip.Checksum; import javax.annotation.Nullable; /** @@ -73,6 +77,7 @@ public final class BigQueryUtil { private static final Logger LOG = LoggerFactory.getLogger(BigQueryUtil.class); private static final String DEFAULT_PARTITION_COLUMN_NAME = "_PARTITIONTIME"; + private static final String BIGQUERY_BUCKET_PREFIX_PROPERTY_NAME = "gcp.bigquery.bucket.prefix"; public static final String BUCKET_PATTERN = "[a-z0-9._-]+"; public static final String DATASET_PATTERN = "[A-Za-z0-9_]+"; @@ -772,4 +777,66 @@ public static String getFQN(String datasetProject, String datasetName, String ta return String.join(":", BigQueryConstants.BQ_FQN_PREFIX, datasetProject, datasetName, tableName); } + + /** + * Get the bucket prefix from the runtime arguments. If not set, it will be created and set. + * + * @param arguments settable arguments instance to verify + * @return the bucket prefix to use for this pipeline + */ + @Nullable + public static String getBucketPrefix(SettableArguments arguments) { + // If the bucket prefix property is set, use it. + if (arguments.has(BIGQUERY_BUCKET_PREFIX_PROPERTY_NAME)) { + String bucketPrefix = arguments.get(BIGQUERY_BUCKET_PREFIX_PROPERTY_NAME); + validateBucketPrefix(bucketPrefix); + LOG.debug("Using bucket prefix for temporary buckets: {}", bucketPrefix); + return bucketPrefix; + } + return null; + } + + /** + * Ensures configured bucket prefix is valid per the GCS naming convention. + * + * @param bucketPrefix + */ + private static void validateBucketPrefix(String bucketPrefix) { + if (!bucketPrefix.matches("^[a-z0-9-_.]+$")) { + throw new IllegalArgumentException("The configured bucket prefix '" + bucketPrefix + "' is not a valid bucket " + + "name. Bucket names can only contain lowercase letters, numeric " + + "characters, dashes (-), underscores (_), and dots (.)."); + } + + if (!bucketPrefix.contains(".") && bucketPrefix.length() > 50) { + throw new IllegalArgumentException("The configured bucket prefix '" + bucketPrefix + "' should be 50 " + + "characters or shorter."); + } + } + + /** + * Method to generate the CRC32 checksum for a location. + * We use this to ensure location name length is constant (only 8 characters). + * + * @param location location to checksum + * @return checksum value as an 8 character string (hex). + */ + @VisibleForTesting + public static String crc32location(String location) { + byte[] bytes = location.toLowerCase().getBytes(); + Checksum checksum = new CRC32(); + checksum.update(bytes, 0, bytes.length); + return Long.toHexString(checksum.getValue()); + } + + /** + * Build bucket name concatenating the bucket prefix with the location crc32 hash using a hyphen (-) + * + * @param bucketPrefix Bucket prefix + * @param location location to use. + * @return String containing the bucket location. + */ + public static String getBucketNameForLocation(String bucketPrefix, String location) { + return String.format("%s-%s", bucketPrefix, crc32location(location)); + } } diff --git a/src/test/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtilTest.java b/src/test/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtilTest.java index f730103a69..8dcc40b71d 100644 --- a/src/test/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtilTest.java +++ b/src/test/java/io/cdap/plugin/gcp/bigquery/util/BigQueryUtilTest.java @@ -20,22 +20,29 @@ import com.google.cloud.bigquery.StandardSQLTypeName; import io.cdap.cdap.api.data.schema.Schema; import io.cdap.cdap.etl.api.FailureCollector; +import io.cdap.cdap.etl.api.action.SettableArguments; import io.cdap.cdap.etl.api.validation.ValidationFailure; import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize.BigNumeric; import io.cdap.plugin.gcp.bigquery.util.BigQueryTypeSize.Numeric; +import org.junit.Assert; import org.junit.Test; import org.junit.runner.RunWith; import org.mockito.Mockito; import org.powermock.modules.junit4.PowerMockRunner; import java.util.ArrayList; +import java.util.HashSet; import java.util.List; +import java.util.Set; import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.anyString; @RunWith(PowerMockRunner.class) public class BigQueryUtilTest { + + private static final String BUCKET_PREFIX_ARG = "gcp.bigquery.bucket.prefix"; + @Test public void testGetTableSchema() { List fieldList = new ArrayList<>(); @@ -127,6 +134,127 @@ public void testValidateArraySchema() { BigQueryUtil.validateFieldModeMatches(bigQueryField, recordField, false, collector); Mockito.verify(collector, Mockito.times(0)).addFailure(anyString(), anyString()); + } + + @Test + public void testGetBucketPrefix() { + SettableArguments args = Mockito.mock(SettableArguments.class); + Mockito.when(args.has(BUCKET_PREFIX_ARG)) + .thenReturn(true); + Mockito.when(args.get(BUCKET_PREFIX_ARG)) + .thenReturn("this-is-valid-as-a-prefix-to-use-123456789_.abcdef"); + BigQueryUtil.getBucketPrefix(args); + + // Ensure method was called. + Mockito.verify(args, Mockito.times(1)).has(BUCKET_PREFIX_ARG); + Mockito.verify(args, Mockito.times(1)).get(BUCKET_PREFIX_ARG); + } + + @Test + public void testGetBucketPrefixNotSet() { + SettableArguments args = Mockito.mock(SettableArguments.class); + Mockito.when(args.has(BUCKET_PREFIX_ARG)) + .thenReturn(false); + Mockito.when(args.get(BUCKET_PREFIX_ARG)) + .thenReturn("this-is-valid-as-a-prefix-to-use-123456789_.abcdef"); + BigQueryUtil.getBucketPrefix(args); + + // Ensure method was called. + Mockito.verify(args, Mockito.times(1)).has(BUCKET_PREFIX_ARG); + Mockito.verify(args, Mockito.times(0)).get(BUCKET_PREFIX_ARG); + } + + @Test + public void testGetBucketPrefixInvalidBucketName() { + SettableArguments args = Mockito.mock(SettableArguments.class); + Mockito.when(args.has(BUCKET_PREFIX_ARG)) + .thenReturn(true); + Mockito.when(args.get(BUCKET_PREFIX_ARG)) + .thenReturn("This is an invalid bucket name!@"); + + IllegalArgumentException e = null; + + try { + BigQueryUtil.getBucketPrefix(args); + } catch (IllegalArgumentException ie) { + e = ie; } + Assert.assertNotNull(e); + Assert.assertEquals("The configured bucket prefix 'This is an invalid bucket name!@' is not a valid " + + "bucket name. Bucket names can only contain lowercase letters, numeric " + + "characters, dashes (-), underscores (_), and dots (.).", e.getMessage()); + } + + @Test + public void testGetBucketPrefixTooLong() { + SettableArguments args = Mockito.mock(SettableArguments.class); + Mockito.when(args.has(BUCKET_PREFIX_ARG)) + .thenReturn(true); + Mockito.when(args.get(BUCKET_PREFIX_ARG)) + .thenReturn("this-prefix-is-too-long-to-be-used-as-a-prefix-oops"); + + IllegalArgumentException e = null; + + try { + BigQueryUtil.getBucketPrefix(args); + } catch (IllegalArgumentException ie) { + e = ie; + } + + Assert.assertNotNull(e); + Assert.assertEquals("The configured bucket prefix 'this-prefix-is-too-long-to-be-used-as-a-prefix-oops'" + + " should be 50 characters or shorter.", e.getMessage()); + } + + @Test + public void testCRC32LocationDoesNotCollide() { + // Set containing all current GCP region names. + Set locations = new HashSet<>(); + locations.add("us"); + locations.add("eu"); + locations.add("asia-east1"); + locations.add("asia-east2"); + locations.add("asia-northeast1"); + locations.add("asia-northeast2"); + locations.add("asia-northeast3"); + locations.add("asia-south1"); + locations.add("asia-south2"); + locations.add("asia-southeast1"); + locations.add("asia-southeast2"); + locations.add("australia-southeast1"); + locations.add("australia-southeast2"); + locations.add("europe-central2"); + locations.add("europe-north1"); + locations.add("europe-southwest1"); + locations.add("europe-west1"); + locations.add("europe-west2"); + locations.add("europe-west3"); + locations.add("europe-west4"); + locations.add("europe-west6"); + locations.add("europe-west8"); + locations.add("europe-west9"); + locations.add("northamerica-northeast1"); + locations.add("northamerica-northeast2"); + locations.add("southamerica-east1"); + locations.add("southamerica-west1"); + locations.add("us-central1"); + locations.add("us-east1"); + locations.add("us-east4"); + locations.add("us-east5"); + locations.add("us-south1"); + locations.add("us-west1"); + locations.add("us-west2"); + locations.add("us-west3"); + locations.add("us-west4"); + + // Check there are no collisions + Set hashValues = new HashSet<>(); + for (String location : locations) { + String hash = BigQueryUtil.crc32location(location); + Assert.assertFalse(hashValues.contains(hash)); + hashValues.add(hash); + } + } + }