-
Notifications
You must be signed in to change notification settings - Fork 1.1k
feat(bigquery-jdbc): add EnableProjectDiscovery connection property for metadata methods
#13344
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,13 +16,20 @@ | |
|
|
||
| package com.google.cloud.bigquery.jdbc; | ||
|
|
||
| import com.google.api.client.googleapis.json.GoogleJsonResponseException; | ||
| import com.google.api.client.http.HttpRequestInitializer; | ||
| import com.google.api.client.http.HttpTransport; | ||
| import com.google.api.client.json.gson.GsonFactory; | ||
| import com.google.api.gax.core.CredentialsProvider; | ||
| import com.google.api.gax.core.FixedCredentialsProvider; | ||
| import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; | ||
| import com.google.api.gax.retrying.RetrySettings; | ||
| import com.google.api.gax.rpc.FixedHeaderProvider; | ||
| import com.google.api.gax.rpc.HeaderProvider; | ||
| import com.google.api.gax.rpc.TransportChannelProvider; | ||
| import com.google.api.services.bigquery.Bigquery; | ||
| import com.google.api.services.bigquery.model.ProjectList; | ||
| import com.google.api.services.bigquery.model.ProjectList.Projects; | ||
| import com.google.auth.Credentials; | ||
| import com.google.cloud.bigquery.BigQuery; | ||
| import com.google.cloud.bigquery.BigQueryException; | ||
|
|
@@ -41,6 +48,7 @@ | |
| import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient; | ||
| import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings; | ||
| import com.google.cloud.http.HttpTransportOptions; | ||
| import com.google.common.collect.ImmutableList; | ||
| import com.google.common.collect.ImmutableSortedSet; | ||
| import java.io.IOException; | ||
| import java.io.InputStream; | ||
|
|
@@ -77,6 +85,9 @@ public class BigQueryConnection extends BigQueryNoOpsConnection { | |
| private final String connectionId; | ||
| private static final String DEFAULT_JDBC_TOKEN_VALUE = "Google-BigQuery-JDBC-Driver"; | ||
| private static final String DEFAULT_VERSION = "0.0.0"; | ||
| private static final String BIGQUERY_SERVICE_NAME = "bigquery"; | ||
| private static final String PROJECT_LIST_FIELDS = | ||
| "projects/projectReference/projectId,nextPageToken"; | ||
| private static final Set<String> SAFE_TO_LOG_PROPERTIES = | ||
| ImmutableSortedSet.orderedBy(String.CASE_INSENSITIVE_ORDER) | ||
| .add( | ||
|
|
@@ -120,6 +131,7 @@ public class BigQueryConnection extends BigQueryNoOpsConnection { | |
| BigQueryJdbcUrlUtility.SWA_APPEND_ROW_COUNT_PROPERTY_NAME, | ||
| BigQueryJdbcUrlUtility.SWA_ACTIVATION_ROW_COUNT_PROPERTY_NAME, | ||
| BigQueryJdbcUrlUtility.FILTER_TABLES_ON_DEFAULT_DATASET_PROPERTY_NAME, | ||
| BigQueryJdbcUrlUtility.ENABLE_PROJECT_DISCOVERY_PROPERTY_NAME, | ||
| BigQueryJdbcUrlUtility.REQUEST_GOOGLE_DRIVE_SCOPE_PROPERTY_NAME, | ||
| BigQueryJdbcUrlUtility.SSL_TRUST_STORE_PROPERTY_NAME, | ||
| BigQueryJdbcUrlUtility.MAX_BYTES_BILLED_PROPERTY_NAME, | ||
|
|
@@ -169,6 +181,8 @@ public class BigQueryConnection extends BigQueryNoOpsConnection { | |
| int highThroughputMinTableSize; | ||
| int highThroughputActivationRatio; | ||
| boolean enableSession; | ||
| boolean enableProjectDiscovery; | ||
| private List<String> discoveredProjectsCache; | ||
| boolean unsupportedHTAPIFallback; | ||
| boolean useQueryCache; | ||
| String queryDialect; | ||
|
|
@@ -335,6 +349,7 @@ public class BigQueryConnection extends BigQueryNoOpsConnection { | |
| this.additionalProjects = ds.getAdditionalProjects(); | ||
|
|
||
| this.filterTablesOnDefaultDataset = ds.getFilterTablesOnDefaultDataset(); | ||
| this.enableProjectDiscovery = ds.getEnableProjectDiscovery(); | ||
| this.requestGoogleDriveScope = ds.getRequestGoogleDriveScope(); | ||
| this.metadataFetchThreadCount = ds.getMetadataFetchThreadCount(); | ||
| this.requestReason = ds.getRequestReason(); | ||
|
|
@@ -1221,6 +1236,64 @@ private boolean checkIsReadOnlyTokenUsed(Map<String, String> authProps) { | |
| return false; | ||
| } | ||
|
|
||
| public boolean isEnableProjectDiscovery() { | ||
| return this.enableProjectDiscovery; | ||
| } | ||
|
|
||
| public synchronized List<String> getDiscoveredProjects() { | ||
| if (this.discoveredProjectsCache != null) { | ||
| return this.discoveredProjectsCache; | ||
| } | ||
|
|
||
| try { | ||
| BigQueryOptions options = (BigQueryOptions) getBigQuery().getOptions(); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We have already code that provisions BigQuery client, we should not be creating new one (e.g. feels like HttpTransport is missing proxy/private endpoint and other properties that are configured). Are there issues with SDK client?
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, so the SDK does not expose a public I used reflection to extract the underlying client from Hence, the current implementation. And it should not bypass proxy, endpoint, or auth configurations.
|
||
| HttpTransportOptions transportOptions = (HttpTransportOptions) options.getTransportOptions(); | ||
| HttpTransport transport = transportOptions.getHttpTransportFactory().create(); | ||
| HttpRequestInitializer initializer = transportOptions.getHttpRequestInitializer(options); | ||
| Bigquery lowLevelBq = | ||
| new Bigquery.Builder(transport, GsonFactory.getDefaultInstance(), initializer) | ||
| .setRootUrl(options.getResolvedApiaryHost(BIGQUERY_SERVICE_NAME)) | ||
| .setApplicationName( | ||
| options.getApplicationName() != null | ||
| ? options.getApplicationName() | ||
| : DEFAULT_JDBC_TOKEN_VALUE) | ||
| .build(); | ||
|
|
||
| List<String> projects = new ArrayList<>(); | ||
| String pageToken = null; | ||
| do { | ||
| ProjectList projectList = | ||
| lowLevelBq | ||
| .projects() | ||
| .list() | ||
| .setPageToken(pageToken) | ||
| .setMaxResults(getMaxResults()) | ||
| .setFields(PROJECT_LIST_FIELDS) | ||
| .execute(); | ||
| if (projectList.getProjects() != null) { | ||
| for (Projects p : projectList.getProjects()) { | ||
| projects.add(p.getProjectReference().getProjectId()); | ||
| } | ||
| } | ||
| pageToken = projectList.getNextPageToken(); | ||
| } while (pageToken != null); | ||
|
|
||
| this.discoveredProjectsCache = ImmutableList.copyOf(projects); | ||
| } catch (GoogleJsonResponseException e) { | ||
| LOG.warning(e, "Failed to list all accessible projects due to Google API error."); | ||
| int statusCode = e.getStatusCode(); | ||
| // Only cache empty list for non-transient auth/permission errors (400, 401, 403) | ||
| if (statusCode == 400 || statusCode == 401 || statusCode == 403) { | ||
| this.discoveredProjectsCache = ImmutableList.of(); | ||
| } | ||
| return ImmutableList.of(); | ||
| } catch (Exception e) { | ||
| LOG.warning(e, "Failed to list all accessible projects, falling back to connection default."); | ||
| return ImmutableList.of(); | ||
| } | ||
|
keshavdandeva marked this conversation as resolved.
|
||
| return this.discoveredProjectsCache; | ||
| } | ||
|
keshavdandeva marked this conversation as resolved.
|
||
|
|
||
| @Override | ||
| public <T> T unwrap(Class<T> iface) throws SQLException { | ||
| if (iface.isInstance(this)) { | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -3652,60 +3652,61 @@ public ResultSet getSchemas(String catalog, String schemaPattern) { | |
| return; | ||
| } | ||
|
|
||
| ExecutorService apiExecutor = null; | ||
| final List<Future<List<Dataset>>> apiFutures = new ArrayList<>(); | ||
| try { | ||
| apiExecutor = Executors.newFixedThreadPool(API_EXECUTOR_POOL_SIZE); | ||
|
keshavdandeva marked this conversation as resolved.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. instead of create threadPool, we should have one available. We have
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yes, completely agree. I noticed this as well and this is happening with all major metadata methods ( |
||
| for (String currentProjectToScan : projectsToScanList) { | ||
| if (Thread.currentThread().isInterrupted()) { | ||
| LOG.warning( | ||
| "Schema fetcher interrupted during project iteration for project: " | ||
| + currentProjectToScan); | ||
| break; | ||
| } | ||
| LOG.info("Fetching schemas for project: " + currentProjectToScan); | ||
| List<Dataset> datasetsInProject = | ||
| findMatchingBigQueryObjects( | ||
| "Dataset", | ||
| () -> | ||
| bigquery.listDatasets( | ||
| currentProjectToScan, | ||
| BigQuery.DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), | ||
| (name) -> bigquery.getDataset(DatasetId.of(currentProjectToScan, name)), | ||
| (ds) -> ds.getDatasetId().getDataset(), | ||
| schemaPattern, | ||
| schemaRegex, | ||
| LOG); | ||
|
|
||
| if (datasetsInProject.isEmpty() || Thread.currentThread().isInterrupted()) { | ||
| LOG.info( | ||
| "Fetcher thread found no matching datasets in project: " | ||
| + currentProjectToScan); | ||
| continue; | ||
| } | ||
| checkInterrupted(apiFutures); | ||
| Callable<List<Dataset>> apiCallable = | ||
| () -> | ||
| findMatchingBigQueryObjects( | ||
| "Dataset", | ||
| () -> | ||
| bigquery.listDatasets( | ||
| currentProjectToScan, | ||
| BigQuery.DatasetListOption.pageSize(DEFAULT_PAGE_SIZE)), | ||
| (name) -> bigquery.getDataset(DatasetId.of(currentProjectToScan, name)), | ||
| (ds) -> ds.getDatasetId().getDataset(), | ||
| schemaPattern, | ||
| schemaRegex, | ||
| LOG); | ||
| apiFutures.add(apiExecutor.submit(apiCallable)); | ||
| } | ||
| apiExecutor.shutdown(); | ||
|
|
||
| LOG.fine("Processing found datasets for project: " + currentProjectToScan); | ||
| for (Dataset dataset : datasetsInProject) { | ||
| if (Thread.currentThread().isInterrupted()) { | ||
| LOG.warning( | ||
| "Schema fetcher interrupted during dataset iteration for project: " | ||
| + currentProjectToScan); | ||
| break; | ||
| for (Future<List<Dataset>> apiFuture : apiFutures) { | ||
| checkInterrupted(apiFutures); | ||
| try { | ||
| List<Dataset> datasetsInProject = apiFuture.get(); | ||
| if (datasetsInProject != null) { | ||
| for (Dataset dataset : datasetsInProject) { | ||
| processSchemaInfo(dataset, collectedResults, localResultSchemaFields); | ||
| } | ||
| } | ||
| processSchemaInfo(dataset, collectedResults, localResultSchemaFields); | ||
| } catch (InterruptedException e) { | ||
| Thread.currentThread().interrupt(); | ||
| checkInterrupted(apiFutures); | ||
| } catch (ExecutionException e) { | ||
| LOG.warning(e, "Error executing findMatchingDatasets task."); | ||
| } catch (CancellationException e) { | ||
| LOG.warning("A findMatchingDatasets task was cancelled."); | ||
| } | ||
| } | ||
|
keshavdandeva marked this conversation as resolved.
|
||
|
|
||
| if (!Thread.currentThread().isInterrupted()) { | ||
| Comparator<FieldValueList> comparator = | ||
| defineGetSchemasComparator(localResultSchemaFields); | ||
| sortResults(collectedResults, comparator, "getSchemas", LOG); | ||
| } | ||
|
|
||
| if (!Thread.currentThread().isInterrupted()) { | ||
| populateQueue(collectedResults, queue, localResultSchemaFields); | ||
| } | ||
| checkInterrupted(apiFutures); | ||
| Comparator<FieldValueList> comparator = | ||
| defineGetSchemasComparator(localResultSchemaFields); | ||
| sortResults(collectedResults, comparator, "getSchemas", LOG); | ||
| populateQueue(collectedResults, queue, localResultSchemaFields); | ||
|
|
||
| } catch (CancellationException e) { | ||
| LOG.warning("Schema fetcher task was cancelled/interrupted."); | ||
| } catch (Throwable t) { | ||
| LOG.severe("Unexpected error in schema fetcher runnable: " + t.getMessage()); | ||
| apiFutures.forEach(f -> f.cancel(true)); | ||
| } finally { | ||
| shutdownExecutor(apiExecutor); | ||
| signalEndOfData(queue, localResultSchemaFields); | ||
| LOG.info("Schema fetcher thread finished."); | ||
| } | ||
|
|
@@ -5147,6 +5148,13 @@ private void signalEndOfData( | |
| } | ||
| } | ||
|
|
||
| private void checkInterrupted(List<? extends Future<?>> futures) { | ||
| if (Thread.currentThread().isInterrupted()) { | ||
| futures.forEach(f -> f.cancel(true)); | ||
| throw new CancellationException("Fetcher thread was interrupted."); | ||
| } | ||
| } | ||
|
|
||
| private void shutdownExecutor(ExecutorService executor) { | ||
| if (executor == null || executor.isShutdown()) { | ||
| return; | ||
|
|
@@ -5197,6 +5205,10 @@ private List<String> getAccessibleCatalogNames() { | |
| } | ||
| } | ||
|
|
||
| if (this.connection.isEnableProjectDiscovery()) { | ||
| accessibleCatalogs.addAll(this.connection.getDiscoveredProjects()); | ||
| } | ||
|
|
||
| List<String> sortedCatalogs = new ArrayList<>(accessibleCatalogs); | ||
| Collections.sort(sortedCatalogs); | ||
| return sortedCatalogs; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to query projectName rather than projectId imo. This is used in some UI tools
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In the BigQuery REST API, there is no
projectNamefield (onlyprojectIdandfriendlyName).UsingprojectIdis the standard way to reference GCP projects. Also, the catalog name returned bygetCatalogs()must be the alphanumericprojectId.