Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions java-bigquery-jdbc/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -424,6 +424,12 @@
<artifactId>opentelemetry-sdk-testing</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>google-cloud-trace</artifactId>
<version>2.92.0</version><!-- {x-version-update:google-cloud-trace:current} -->
<scope>test</scope>
</dependency>
</dependencies>

<profiles>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
import com.google.cloud.bigquery.storage.v1.BigQueryWriteSettings;
import com.google.cloud.http.HttpTransportOptions;
import com.google.cloud.logging.Logging;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSortedSet;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.baggage.Baggage;
Expand Down Expand Up @@ -215,6 +216,7 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {
boolean enableGcpTraceExporter;
boolean enableGcpLogExporter;
OpenTelemetry customOpenTelemetry;
boolean useGlobalOpenTelemetry;
private OpenTelemetry openTelemetry;
private Context otelContext;
Tracer tracer =
Expand Down Expand Up @@ -366,6 +368,7 @@ public class BigQueryConnection extends BigQueryNoOpsConnection {
this.enableGcpTraceExporter = ds.getEnableGcpTraceExporter();
this.enableGcpLogExporter = ds.getEnableGcpLogExporter();
this.customOpenTelemetry = ds.getCustomOpenTelemetry();
this.useGlobalOpenTelemetry = ds.getUseGlobalOpenTelemetry();
this.openTelemetry = getOpenTelemetryInstance();
this.bigQuery = getBigQueryConnection();
}
Expand Down Expand Up @@ -438,7 +441,8 @@ String getConnectionUrl() {
return connectionUrl;
}

String getConnectionId() {
@VisibleForTesting
public String getConnectionId() {
Comment thread
keshavdandeva marked this conversation as resolved.
return this.connectionId;
}

Expand Down Expand Up @@ -1036,7 +1040,6 @@ void removeStatement(Statement statement) {
}

private OpenTelemetry getOpenTelemetryInstance() {
boolean hasCustomOtel = this.customOpenTelemetry != null;

String effectiveProjectId =
(this.gcpTelemetryProjectId != null) ? this.gcpTelemetryProjectId : this.catalog;
Expand All @@ -1046,22 +1049,27 @@ private OpenTelemetry getOpenTelemetryInstance() {

OpenTelemetry openTelemetry =
BigQueryJdbcOpenTelemetry.getOpenTelemetry(
this.useGlobalOpenTelemetry,
this.enableGcpTraceExporter,
this.enableGcpLogExporter,
this.customOpenTelemetry,
effectiveCredentials,
effectiveProjectId);

boolean hasExternalOtel = this.customOpenTelemetry != null || this.useGlobalOpenTelemetry;
Logging localLoggingClient = null;
if (this.enableGcpLogExporter && !hasCustomOtel) {
if (this.enableGcpLogExporter && !hasExternalOtel) {
localLoggingClient =
BigQueryJdbcOpenTelemetry.createLoggingClient(
true, null, effectiveCredentials, effectiveProjectId, this.credentials);
}

if (this.enableGcpLogExporter || hasCustomOtel) {
if (this.enableGcpLogExporter || hasExternalOtel) {
BigQueryJdbcOpenTelemetry.registerConnection(
this.connectionId, openTelemetry, localLoggingClient, this.enableGcpLogExporter);
this.connectionId,
openTelemetry,
localLoggingClient,
this.enableGcpLogExporter && !hasExternalOtel);
}

return openTelemetry;
Expand Down Expand Up @@ -1126,7 +1134,9 @@ private BigQuery getBigQueryConnection() {
if (this.httpTransportOptions != null) {
bigQueryOptions.setTransportOptions(this.httpTransportOptions);
}
if (this.enableGcpTraceExporter || this.customOpenTelemetry != null) {
if (this.enableGcpTraceExporter
|| this.customOpenTelemetry != null
|| this.useGlobalOpenTelemetry) {
Tracer sdkTracer = this.openTelemetry.getTracer(BigQueryJdbcOpenTelemetry.BIGQUERY_NAMESPACE);
bigQueryOptions.setOpenTelemetryTracer(sdkTracer);
this.tracer =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.google.cloud.logging.Logging;
import com.google.cloud.logging.LoggingOptions;
import com.google.common.hash.Hashing;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.baggage.Baggage;
import io.opentelemetry.api.trace.Span;
Expand Down Expand Up @@ -65,6 +66,11 @@ public class BigQueryJdbcOpenTelemetry {
private static final String OTLP_ENDPOINT_VALUE = "https://telemetry.googleapis.com:443";
private static final String EXPORTER_NONE = "none";
private static final String EXPORTER_OTLP = "otlp";
private static final String OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT =
"otel.span.attribute.value.length.limit";
private static final String OTEL_ATTRIBUTE_VALUE_LENGTH_LIMIT =
"otel.attribute.value.length.limit";
private static final String DEFAULT_ATTRIBUTE_LENGTH_LIMIT = "32768";
private static final BigQueryJdbcCustomLogger LOG =
new BigQueryJdbcCustomLogger("BigQueryJdbcOpenTelemetry");

Expand Down Expand Up @@ -240,6 +246,7 @@ private static String getCredentialsIdentifier(String credentials) {
* customOpenTelemetry if provided; fallback to an auto-configured GCP exporter if requested.
*/
public static OpenTelemetry getOpenTelemetry(
boolean useGlobalOpenTelemetry,
boolean enableGcpTraceExporter,
boolean enableGcpLogExporter,
OpenTelemetry customOpenTelemetry,
Expand All @@ -250,6 +257,10 @@ public static OpenTelemetry getOpenTelemetry(
return customOpenTelemetry;
}

if (useGlobalOpenTelemetry) {
return GlobalOpenTelemetry.get();
}

// NOTE: Currently, tracing only fully supports Application Default Credentials (ADC).
// Once b/503721589 is completed, Service Account (SA) will work as well.
if (!enableGcpTraceExporter && !enableGcpLogExporter) {
Expand Down Expand Up @@ -290,6 +301,17 @@ public static OpenTelemetry getOpenTelemetry(
props.put(GOOGLE_CLOUD_PROJECT, gcpTelemetryProjectId);
}

// Set safe, generous default limits on attribute value lengths (32KB) to protect
// customers from GCP Cloud Trace 64KB span ingestion failures when logging massive
// exception stack traces or database schema metadata.
// Respect any existing user configuration overrides.
if (!props.containsKey(OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT)) {
props.put(OTEL_SPAN_ATTRIBUTE_VALUE_LENGTH_LIMIT, DEFAULT_ATTRIBUTE_LENGTH_LIMIT);
}
if (!props.containsKey(OTEL_ATTRIBUTE_VALUE_LENGTH_LIMIT)) {
props.put(OTEL_ATTRIBUTE_VALUE_LENGTH_LIMIT, DEFAULT_ATTRIBUTE_LENGTH_LIMIT);
}

AutoConfiguredOpenTelemetrySdk autoConfigured =
AutoConfiguredOpenTelemetrySdk.builder().addPropertiesSupplier(() -> props).build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ protected boolean removeEldestEntry(Map.Entry<String, Map<String, String>> eldes
static final boolean DEFAULT_ENABLE_GCP_TRACE_EXPORTER_VALUE = false;
static final String ENABLE_GCP_LOG_EXPORTER_PROPERTY_NAME = "enableGcpLogExporter";
static final boolean DEFAULT_ENABLE_GCP_LOG_EXPORTER_VALUE = false;
static final String USE_GLOBAL_OTEL_PROPERTY_NAME = "useGlobalOpenTelemetry";
static final boolean DEFAULT_USE_GLOBAL_OTEL_VALUE = false;
private static final BigQueryJdbcCustomLogger LOG =
new BigQueryJdbcCustomLogger(BigQueryJdbcUrlUtility.class.getName());
static final String FILTER_TABLES_ON_DEFAULT_DATASET_PROPERTY_NAME =
Expand Down Expand Up @@ -638,6 +640,12 @@ protected boolean removeEldestEntry(Map.Entry<String, Map<String, String>> eldes
BigQueryConnectionProperty.newBuilder()
.setName(GCP_TELEMETRY_PROJECT_ID_PROPERTY_NAME)
.setDescription("GCP Project ID for OTel exporter.")
.build(),
BigQueryConnectionProperty.newBuilder()
.setName(USE_GLOBAL_OTEL_PROPERTY_NAME)
.setDescription(
"Enables usage of the Global OpenTelemetry instance when true. Default is false.")
.setDefaultValue(String.valueOf(DEFAULT_USE_GLOBAL_OTEL_VALUE))
.build())));

private static final List<String> NETWORK_PROPERTIES =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -912,20 +912,11 @@ private void processArrowStream(
enqueueError(arrowBatchWrapperBlockingQueue, e);
Thread.currentThread().interrupt();
} catch (Exception e) {
if (e.getCause() instanceof InterruptedException || Thread.currentThread().isInterrupted()) {
LOG.log(
Level.WARNING,
"\n" + Thread.currentThread().getName() + " Interrupted @ arrowStreamProcessor",
e);
enqueueError(arrowBatchWrapperBlockingQueue, e);
Thread.currentThread().interrupt();
} else {
LOG.log(
Level.WARNING,
"\n" + Thread.currentThread().getName() + " Error @ arrowStreamProcessor",
e);
enqueueError(arrowBatchWrapperBlockingQueue, e);
}
LOG.log(
Level.WARNING,
"\n" + Thread.currentThread().getName() + " Error @ arrowStreamProcessor",
e);
enqueueError(arrowBatchWrapperBlockingQueue, e);
} finally { // logic needed for graceful shutdown
enqueueEndOfStream(arrowBatchWrapperBlockingQueue);
}
Expand Down Expand Up @@ -1683,20 +1674,11 @@ private void parseAndPopulateRpcData(
}

} catch (Exception ex) {
if (ex.getCause() instanceof InterruptedException || Thread.currentThread().isInterrupted()) {
LOG.log(
Level.WARNING,
"\n" + Thread.currentThread().getName() + " Interrupted @ populateBufferAsync",
ex);
enqueueBufferError(bigQueryFieldValueListWrapperBlockingQueue, ex);
Thread.currentThread().interrupt();
} else {
LOG.log(
Level.WARNING,
"\n" + Thread.currentThread().getName() + " Error @ populateBufferAsync",
ex);
enqueueBufferError(bigQueryFieldValueListWrapperBlockingQueue, ex);
}
LOG.log(
Level.WARNING,
"\n" + Thread.currentThread().getName() + " Error @ populateBufferAsync",
ex);
enqueueBufferError(bigQueryFieldValueListWrapperBlockingQueue, ex);
} finally {
enqueueBufferEndOfStream(bigQueryFieldValueListWrapperBlockingQueue);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ public class DataSource implements javax.sql.DataSource {
private boolean enableGcpLogExporter =
BigQueryJdbcUrlUtility.DEFAULT_ENABLE_GCP_LOG_EXPORTER_VALUE;
private OpenTelemetry customOpenTelemetry;
private boolean useGlobalOpenTelemetry = BigQueryJdbcUrlUtility.DEFAULT_USE_GLOBAL_OTEL_VALUE;

// Make sure the JDBC driver class is loaded.
static {
Expand Down Expand Up @@ -358,6 +359,12 @@ public class DataSource implements javax.sql.DataSource {
ds.setEnableGcpLogExporter(
BigQueryJdbcUrlUtility.convertIntToBoolean(
val, BigQueryJdbcUrlUtility.ENABLE_GCP_LOG_EXPORTER_PROPERTY_NAME)))
.put(
BigQueryJdbcUrlUtility.USE_GLOBAL_OTEL_PROPERTY_NAME,
(ds, val) ->
ds.setUseGlobalOpenTelemetry(
BigQueryJdbcUrlUtility.convertIntToBoolean(
val, BigQueryJdbcUrlUtility.USE_GLOBAL_OTEL_PROPERTY_NAME)))
.build();

public static DataSource fromUrl(String url) {
Expand Down Expand Up @@ -675,6 +682,11 @@ Properties createProperties() {
BigQueryJdbcUrlUtility.ENABLE_GCP_LOG_EXPORTER_PROPERTY_NAME,
String.valueOf(this.enableGcpLogExporter));
}
if (this.useGlobalOpenTelemetry) {
connectionProperties.setProperty(
BigQueryJdbcUrlUtility.USE_GLOBAL_OTEL_PROPERTY_NAME,
String.valueOf(this.useGlobalOpenTelemetry));
}
return connectionProperties;
}

Expand Down Expand Up @@ -832,6 +844,14 @@ public void setCustomOpenTelemetry(OpenTelemetry customOpenTelemetry) {
this.customOpenTelemetry = customOpenTelemetry;
}

public boolean getUseGlobalOpenTelemetry() {
return useGlobalOpenTelemetry;
}

public void setUseGlobalOpenTelemetry(boolean useGlobalOpenTelemetry) {
this.useGlobalOpenTelemetry = useGlobalOpenTelemetry;
}

public void setHighThroughputMinTableSize(Integer highThroughputMinTableSize) {
if (highThroughputMinTableSize != null) {
validateNonNegative(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@
public class OpenTelemetryJulHandler extends Handler {
private static final Pattern UNSAFE_LOG_CHARACTERS = Pattern.compile("[^a-zA-Z0-9./_-]");

public OpenTelemetryJulHandler() {}
public OpenTelemetryJulHandler() {
setLevel(Level.ALL);
Comment thread
logachev marked this conversation as resolved.
}

@Override
public void publish(LogRecord record) {
Expand Down
Loading
Loading