Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package com.google.cloud.datastore;

import com.google.api.core.InternalApi;
import com.google.api.gax.grpc.GrpcStatusCode;
import com.google.api.gax.rpc.ApiException;
import com.google.api.gax.rpc.StatusCode;
Expand Down Expand Up @@ -176,7 +177,8 @@ static DatastoreException propagateUserException(Exception ex) {
* @param throwable the throwable to extract the status code from
* @return the status code name, or "UNKNOWN" if not determinable
*/
static String extractStatusCode(Throwable throwable) {
@InternalApi
public static String extractStatusCode(Throwable throwable) {
Throwable current = throwable;
while (current != null) {
if (current instanceof DatastoreException) {
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,20 @@
import static com.google.cloud.BaseService.EXCEPTION_HANDLER;

import com.google.api.core.InternalApi;
import com.google.api.core.ObsoleteApi;
import com.google.api.gax.retrying.RetrySettings;
import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.RetryHelper;
import com.google.cloud.RetryHelper.RetryHelperException;
import com.google.cloud.datastore.spi.v1.DatastoreRpc;
import com.google.cloud.datastore.telemetry.MetricsRecorder;
import com.google.cloud.datastore.telemetry.NoOpMetricsRecorder;
import com.google.cloud.datastore.telemetry.TelemetryConstants;
import com.google.cloud.datastore.telemetry.TelemetryUtils;
import com.google.cloud.datastore.telemetry.TraceUtil;
import com.google.cloud.http.HttpTransportOptions;
import com.google.common.base.Preconditions;
import com.google.common.base.Stopwatch;
import com.google.datastore.v1.AllocateIdsRequest;
import com.google.datastore.v1.AllocateIdsResponse;
import com.google.datastore.v1.BeginTransactionRequest;
Expand Down Expand Up @@ -53,7 +62,10 @@ public class RetryAndTraceDatastoreRpcDecorator implements DatastoreRpc {
private final com.google.cloud.datastore.telemetry.TraceUtil otelTraceUtil;
private final RetrySettings retrySettings;
private final DatastoreOptions datastoreOptions;
private final MetricsRecorder metricsRecorder;
private final boolean isHttpTransport;

@ObsoleteApi("Prefer to create RetryAndTraceDatastoreRpcDecorator via the Builder")
public RetryAndTraceDatastoreRpcDecorator(
DatastoreRpc datastoreRpc,
TraceUtil otelTraceUtil,
Expand All @@ -63,6 +75,69 @@ public RetryAndTraceDatastoreRpcDecorator(
this.retrySettings = retrySettings;
this.datastoreOptions = datastoreOptions;
this.otelTraceUtil = otelTraceUtil;
this.metricsRecorder = new NoOpMetricsRecorder();
this.isHttpTransport = datastoreOptions.getTransportOptions() instanceof HttpTransportOptions;
}

private RetryAndTraceDatastoreRpcDecorator(Builder builder) {
this.datastoreRpc = builder.datastoreRpc;
this.otelTraceUtil = builder.otelTraceUtil;
this.retrySettings = builder.retrySettings;
this.datastoreOptions = builder.datastoreOptions;
this.metricsRecorder = builder.metricsRecorder;
this.isHttpTransport = builder.isHttpTransport;
}

public static Builder newBuilder() {
return new Builder();
}

public static class Builder {
private DatastoreRpc datastoreRpc;
private TraceUtil otelTraceUtil;
private RetrySettings retrySettings;
private DatastoreOptions datastoreOptions;

// Defaults configured for this class
private MetricsRecorder metricsRecorder = new NoOpMetricsRecorder();
private boolean isHttpTransport = false;

private Builder() {}

public Builder setDatastoreRpc(DatastoreRpc datastoreRpc) {
this.datastoreRpc = datastoreRpc;
return this;
}

public Builder setTraceUtil(TraceUtil otelTraceUtil) {
this.otelTraceUtil = otelTraceUtil;
return this;
}

public Builder setRetrySettings(RetrySettings retrySettings) {
this.retrySettings = retrySettings;
return this;
}

public Builder setDatastoreOptions(DatastoreOptions datastoreOptions) {
this.datastoreOptions = datastoreOptions;
return this;
}

public Builder setMetricsRecorder(MetricsRecorder metricsRecorder) {
Preconditions.checkNotNull(metricsRecorder, "metricsRecorder can not be null");
this.metricsRecorder = metricsRecorder;
return this;
}

public RetryAndTraceDatastoreRpcDecorator build() {
Preconditions.checkNotNull(datastoreRpc, "datastoreRpc is required");
Preconditions.checkNotNull(otelTraceUtil, "otelTraceUtil is required");
Preconditions.checkNotNull(retrySettings, "retrySettings is required");
Preconditions.checkNotNull(datastoreOptions, "datastoreOptions is required");
this.isHttpTransport = datastoreOptions.getTransportOptions() instanceof HttpTransportOptions;
return new RetryAndTraceDatastoreRpcDecorator(this);
}
}

@Override
Expand Down Expand Up @@ -110,7 +185,10 @@ public RunAggregationQueryResponse runAggregationQuery(RunAggregationQueryReques
? com.google.cloud.datastore.telemetry.TraceUtil
.SPAN_NAME_TRANSACTION_RUN_AGGREGATION_QUERY
: com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY);
return invokeRpc(() -> datastoreRpc.runAggregationQuery(request), spanName);
return invokeRpc(
() -> datastoreRpc.runAggregationQuery(request),
spanName,
TelemetryConstants.METHOD_RUN_AGGREGATION_QUERY);
}

@Override
Expand All @@ -124,14 +202,34 @@ public boolean isClosed() {
}

public <O> O invokeRpc(Callable<O> block, String startSpan) {
com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(startSpan);
try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) {
return RetryHelper.runWithRetries(
block, this.retrySettings, EXCEPTION_HANDLER, this.datastoreOptions.getClock());
return invokeRpc(block, startSpan, null);
}

<O> O invokeRpc(Callable<O> block, String startSpan, String methodName) {
TraceUtil.Span span = otelTraceUtil.startSpan(startSpan);
Stopwatch stopwatch = isHttpTransport ? Stopwatch.createStarted() : null;
String operationStatus = StatusCode.Code.UNKNOWN.toString();
try (TraceUtil.Scope ignored = span.makeCurrent()) {
Callable<O> callable =
TelemetryUtils.attemptMetricsCallable(
block, metricsRecorder, datastoreOptions, isHttpTransport, methodName);
O result =
RetryHelper.runWithRetries(
callable, this.retrySettings, EXCEPTION_HANDLER, this.datastoreOptions.getClock());
operationStatus = StatusCode.Code.OK.toString();
return result;
} catch (RetryHelperException e) {
operationStatus = DatastoreException.extractStatusCode(e);
span.end(e);
throw DatastoreException.translateAndThrow(e);
} finally {
TelemetryUtils.recordOperationMetrics(
metricsRecorder,
datastoreOptions,
isHttpTransport,
stopwatch,
methodName,
operationStatus);
span.end();
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -208,10 +208,13 @@ public TraceUtil.Span setAttribute(String key, boolean value) {
}

@Override
@SuppressWarnings("MustBeClosedChecker")
public Scope makeCurrent() {
try (io.opentelemetry.context.Scope scope = span.makeCurrent()) {
return new Scope(scope);
}
// span.makeCurrent() opens a ThreadLocal scope that binds this span to the current thread.
// We explicitly leave this unclosed and suppress MustBeClosedChecker so that the returned
// TraceUtil.Scope can manage the lifecycle instead, allowing the caller's try-with-resources
// to control when the ThreadLocal context is restored.
return new Scope(span.makeCurrent());
}
}

Expand All @@ -238,10 +241,14 @@ static class Context implements TraceUtil.Context {
}

@Override
@SuppressWarnings("MustBeClosedChecker")
public Scope makeCurrent() {
try (io.opentelemetry.context.Scope scope = context.makeCurrent()) {
return new Scope(scope);
}
// context.makeCurrent() opens a ThreadLocal scope that binds this context to the current
// thread.
// We explicitly leave this unclosed and suppress MustBeClosedChecker so that the returned
// TraceUtil.Scope can manage the lifecycle instead, allowing the caller's try-with-resources
// to control when the ThreadLocal context is restored.
return new Scope(context.makeCurrent());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@ public interface MetricsRecorder {
/** Records the number of attempts a transaction took. */
void recordTransactionAttemptCount(long count, Map<String, String> attributes);

/** Records the latency of a single RPC attempt in milliseconds. */
void recordAttemptLatency(double latencyMs, Map<String, String> attributes);

/** Records the count of a single RPC attempt. */
void recordAttemptCount(long count, Map<String, String> attributes);

/** Records the total latency of an operation (including retries) in milliseconds. */
void recordOperationLatency(double latencyMs, Map<String, String> attributes);

/** Records the count of an operation. */
void recordOperationCount(long count, Map<String, String> attributes);

/**
* Returns a {@link MetricsRecorder} instance based on the provided OpenTelemetry options.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,19 @@

package com.google.cloud.datastore.telemetry;

import com.google.api.core.InternalApi;
import java.util.Map;

/**
* Metrics recorder implementation, used to stub out metrics instrumentation when metrics are
* disabled.
*
* <p>WARNING: This class is intended for internal use only. It was made public to be used across
* packages as a default. It should not be used by external customers and its API may change without
* notice.
*/
class NoOpMetricsRecorder implements MetricsRecorder {
@InternalApi
public class NoOpMetricsRecorder implements MetricsRecorder {

@Override
public void recordTransactionLatency(double latencyMs, Map<String, String> attributes) {
Expand All @@ -33,4 +39,24 @@ public void recordTransactionLatency(double latencyMs, Map<String, String> attri
public void recordTransactionAttemptCount(long count, Map<String, String> attributes) {
/* No-Op OTel Operation */
}

@Override
public void recordAttemptLatency(double latencyMs, Map<String, String> attributes) {
/* No-Op OTel Operation */
}

@Override
public void recordAttemptCount(long count, Map<String, String> attributes) {
/* No-Op OTel Operation */
}

@Override
public void recordOperationLatency(double latencyMs, Map<String, String> attributes) {
/* No-Op OTel Operation */
}

@Override
public void recordOperationCount(long count, Map<String, String> attributes) {
/* No-Op OTel Operation */
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ class OpenTelemetryMetricsRecorder implements MetricsRecorder {

private final DoubleHistogram transactionLatency;
private final LongCounter transactionAttemptCount;
private final DoubleHistogram attemptLatency;
private final LongCounter attemptCount;
private final DoubleHistogram operationLatency;
private final LongCounter operationCount;

OpenTelemetryMetricsRecorder(@Nonnull OpenTelemetry openTelemetry) {
this.openTelemetry = openTelemetry;
Expand All @@ -51,6 +55,34 @@ class OpenTelemetryMetricsRecorder implements MetricsRecorder {
.counterBuilder(TelemetryConstants.METRIC_NAME_TRANSACTION_ATTEMPT_COUNT)
.setDescription("Number of attempts to commit a transaction")
.build();

this.attemptLatency =
meter
.histogramBuilder(TelemetryConstants.METRIC_NAME_ATTEMPT_LATENCY)
.setDescription("Latency of a single RPC attempt")
.setUnit("ms")
.build();

this.attemptCount =
meter
.counterBuilder(TelemetryConstants.METRIC_NAME_ATTEMPT_COUNT)
.setDescription("Number of RPC attempts")
.setUnit("1")
.build();

this.operationLatency =
meter
.histogramBuilder(TelemetryConstants.METRIC_NAME_OPERATION_LATENCY)
.setDescription("Total latency of an operation including retries")
.setUnit("ms")
.build();

this.operationCount =
meter
.counterBuilder(TelemetryConstants.METRIC_NAME_OPERATION_COUNT)
.setDescription("Number of operations")
.setUnit("1")
.build();
}

OpenTelemetry getOpenTelemetry() {
Expand All @@ -67,6 +99,26 @@ public void recordTransactionAttemptCount(long count, Map<String, String> attrib
transactionAttemptCount.add(count, toOtelAttributes(attributes));
}

@Override
public void recordAttemptLatency(double latencyMs, Map<String, String> attributes) {
attemptLatency.record(latencyMs, toOtelAttributes(attributes));
}

@Override
public void recordAttemptCount(long count, Map<String, String> attributes) {
attemptCount.add(count, toOtelAttributes(attributes));
}

@Override
public void recordOperationLatency(double latencyMs, Map<String, String> attributes) {
operationLatency.record(latencyMs, toOtelAttributes(attributes));
}

@Override
public void recordOperationCount(long count, Map<String, String> attributes) {
operationCount.add(count, toOtelAttributes(attributes));
}

private static Attributes toOtelAttributes(Map<String, String> attributes) {
AttributesBuilder builder = Attributes.builder();
if (attributes != null) {
Expand Down
Loading
Loading