diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreException.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreException.java index 6312bc76bb6e..5591cbb2f46f 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreException.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreException.java @@ -16,6 +16,7 @@ package com.google.cloud.datastore; +import com.google.api.core.InternalApi; import com.google.api.gax.grpc.GrpcStatusCode; import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.StatusCode; @@ -176,7 +177,8 @@ static DatastoreException propagateUserException(Exception ex) { * @param throwable the throwable to extract the status code from * @return the status code name, or "UNKNOWN" if not determinable */ - static String extractStatusCode(Throwable throwable) { + @InternalApi + public static String extractStatusCode(Throwable throwable) { Throwable current = throwable; while (current != null) { if (current instanceof DatastoreException) { diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java index 39447c7eeb41..d4e64fd0e39b 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java @@ -37,6 +37,7 @@ import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN_QUERY; import com.google.api.core.BetaApi; +import com.google.api.gax.retrying.ResultRetryAlgorithm; import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.StatusCode; import com.google.cloud.BaseService; @@ -49,8 +50,10 @@ import com.google.cloud.datastore.spi.v1.DatastoreRpc; import com.google.cloud.datastore.telemetry.MetricsRecorder; import com.google.cloud.datastore.telemetry.TelemetryConstants; +import com.google.cloud.datastore.telemetry.TelemetryUtils; import com.google.cloud.datastore.telemetry.TraceUtil; import com.google.cloud.datastore.telemetry.TraceUtil.Scope; +import com.google.cloud.http.HttpTransportOptions; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; @@ -63,6 +66,7 @@ import com.google.datastore.v1.ExplainOptions; import com.google.datastore.v1.ReadOptions; import com.google.datastore.v1.ReserveIdsRequest; +import com.google.datastore.v1.RollbackRequest; import com.google.datastore.v1.RunQueryResponse; import com.google.datastore.v1.TransactionOptions; import com.google.protobuf.ByteString; @@ -98,6 +102,7 @@ final class DatastoreImpl extends BaseService implements Datas private final com.google.cloud.datastore.telemetry.TraceUtil otelTraceUtil = getOptions().getTraceUtil(); private final MetricsRecorder metricsRecorder = getOptions().getMetricsRecorder(); + private final boolean isHttpTransport; private final ReadOptionProtoPreparer readOptionProtoPreparer; private final AggregationQueryExecutor aggregationQueryExecutor; @@ -105,14 +110,20 @@ final class DatastoreImpl extends BaseService implements Datas DatastoreImpl(DatastoreOptions options) { super(options); this.datastoreRpc = options.getDatastoreRpcV1(); + this.isHttpTransport = options.getTransportOptions() instanceof HttpTransportOptions; retrySettings = MoreObjects.firstNonNull(options.getRetrySettings(), ServiceOptions.getNoRetrySettings()); readOptionProtoPreparer = new ReadOptionProtoPreparer(); aggregationQueryExecutor = new AggregationQueryExecutor( - new RetryAndTraceDatastoreRpcDecorator( - datastoreRpc, otelTraceUtil, retrySettings, options), + RetryAndTraceDatastoreRpcDecorator.newBuilder() + .setDatastoreRpc(datastoreRpc) + .setTraceUtil(otelTraceUtil) + .setRetrySettings(retrySettings) + .setDatastoreOptions(options) + .setMetricsRecorder(metricsRecorder) + .build(), options); } @@ -353,37 +364,37 @@ com.google.datastore.v1.RunQueryResponse runQuery( ReadOptions readOptions = requestPb.getReadOptions(); boolean isTransactional = readOptions.hasTransaction() || readOptions.hasNewTransaction(); String spanName = (isTransactional ? SPAN_NAME_TRANSACTION_RUN_QUERY : SPAN_NAME_RUN_QUERY); - com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(spanName); - try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { - RunQueryResponse response = - RetryHelper.runWithRetries( - () -> datastoreRpc.runQuery(requestPb), - retrySettings, - requestPb.getReadOptions().getTransaction().isEmpty() - ? EXCEPTION_HANDLER - : TRANSACTION_OPERATION_EXCEPTION_HANDLER, - getOptions().getClock()); - span.addEvent( - spanName + " complete.", - new ImmutableMap.Builder() - .put(ATTRIBUTES_KEY_DOCUMENT_COUNT, response.getBatch().getEntityResultsCount()) - .put(ATTRIBUTES_KEY_TRANSACTIONAL, isTransactional) - .put(ATTRIBUTES_KEY_READ_CONSISTENCY, readOptions.getReadConsistency().toString()) - .put( - ATTRIBUTES_KEY_TRANSACTION_ID, - (isTransactional - ? requestPb.getReadOptions().getTransaction().toStringUtf8() - : "")) - .put(ATTRIBUTES_KEY_MORE_RESULTS, response.getBatch().getMoreResults().toString()) - .build()); - return response; - } catch (RetryHelperException e) { - span.end(e); - throw DatastoreException.translateAndThrow(e); - } finally { - span.end(); - } + return runWithObservability( + () -> { + RunQueryResponse response = datastoreRpc.runQuery(requestPb); + com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.getCurrentSpan(); + if (span != null) { + span.addEvent( + spanName + " complete.", + new ImmutableMap.Builder() + .put(ATTRIBUTES_KEY_DOCUMENT_COUNT, response.getBatch().getEntityResultsCount()) + .put(ATTRIBUTES_KEY_TRANSACTIONAL, isTransactional) + .put( + ATTRIBUTES_KEY_READ_CONSISTENCY, + readOptions.getReadConsistency().toString()) + .put( + ATTRIBUTES_KEY_TRANSACTION_ID, + (isTransactional + ? requestPb.getReadOptions().getTransaction().toStringUtf8() + : "")) + .put( + ATTRIBUTES_KEY_MORE_RESULTS, + response.getBatch().getMoreResults().toString()) + .build()); + } + return response; + }, + TelemetryConstants.METHOD_RUN_QUERY, + spanName, + requestPb.getReadOptions().getTransaction().isEmpty() + ? EXCEPTION_HANDLER + : TRANSACTION_OPERATION_EXCEPTION_HANDLER); } @Override @@ -424,25 +435,11 @@ public List allocateId(IncompleteKey... keys) { private com.google.datastore.v1.AllocateIdsResponse allocateIds( final com.google.datastore.v1.AllocateIdsRequest requestPb) { - com.google.cloud.datastore.telemetry.TraceUtil.Span span = - otelTraceUtil.startSpan(SPAN_NAME_ALLOCATE_IDS); - try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { - return RetryHelper.runWithRetries( - new Callable() { - @Override - public com.google.datastore.v1.AllocateIdsResponse call() throws DatastoreException { - return datastoreRpc.allocateIds(requestPb); - } - }, - retrySettings, - EXCEPTION_HANDLER, - getOptions().getClock()); - } catch (RetryHelperException e) { - span.end(e); - throw DatastoreException.translateAndThrow(e); - } finally { - span.end(); - } + return runWithObservability( + () -> datastoreRpc.allocateIds(requestPb), + TelemetryConstants.METHOD_ALLOCATE_IDS, + SPAN_NAME_ALLOCATE_IDS, + EXCEPTION_HANDLER); } private IncompleteKey trimNameOrId(IncompleteKey key) { @@ -587,12 +584,12 @@ com.google.datastore.v1.LookupResponse lookup( ReadOptions readOptions = requestPb.getReadOptions(); boolean isTransactional = readOptions.hasTransaction() || readOptions.hasNewTransaction(); String spanName = (isTransactional ? SPAN_NAME_TRANSACTION_LOOKUP : SPAN_NAME_LOOKUP); - com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(spanName); - try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { - return RetryHelper.runWithRetries( - () -> { - com.google.datastore.v1.LookupResponse response = datastoreRpc.lookup(requestPb); + return runWithObservability( + () -> { + com.google.datastore.v1.LookupResponse response = datastoreRpc.lookup(requestPb); + com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.getCurrentSpan(); + if (span != null) { span.addEvent( spanName + " complete.", new ImmutableMap.Builder() @@ -604,19 +601,14 @@ com.google.datastore.v1.LookupResponse lookup( ATTRIBUTES_KEY_TRANSACTION_ID, isTransactional ? readOptions.getTransaction().toStringUtf8() : "") .build()); - return response; - }, - retrySettings, - requestPb.getReadOptions().getTransaction().isEmpty() - ? EXCEPTION_HANDLER - : TRANSACTION_OPERATION_EXCEPTION_HANDLER, - getOptions().getClock()); - } catch (RetryHelperException e) { - span.end(e); - throw DatastoreException.translateAndThrow(e); - } finally { - span.end(); - } + } + return response; + }, + TelemetryConstants.METHOD_LOOKUP, + spanName, + requestPb.getReadOptions().getTransaction().isEmpty() + ? EXCEPTION_HANDLER + : TRANSACTION_OPERATION_EXCEPTION_HANDLER); } @Override @@ -639,25 +631,11 @@ public List reserveIds(Key... keys) { com.google.datastore.v1.ReserveIdsResponse reserveIds( final com.google.datastore.v1.ReserveIdsRequest requestPb) { - com.google.cloud.datastore.telemetry.TraceUtil.Span span = - otelTraceUtil.startSpan(SPAN_NAME_RESERVE_IDS); - try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { - return RetryHelper.runWithRetries( - new Callable() { - @Override - public com.google.datastore.v1.ReserveIdsResponse call() throws DatastoreException { - return datastoreRpc.reserveIds(requestPb); - } - }, - retrySettings, - EXCEPTION_HANDLER, - getOptions().getClock()); - } catch (RetryHelperException e) { - span.end(e); - throw DatastoreException.translateAndThrow(e); - } finally { - span.end(); - } + return runWithObservability( + () -> datastoreRpc.reserveIds(requestPb), + TelemetryConstants.METHOD_RESERVE_IDS, + SPAN_NAME_RESERVE_IDS, + EXCEPTION_HANDLER); } @Override @@ -753,32 +731,29 @@ com.google.datastore.v1.CommitResponse commit( final boolean isTransactional = requestPb.hasTransaction() || requestPb.hasSingleUseTransaction(); final String spanName = isTransactional ? SPAN_NAME_TRANSACTION_COMMIT : SPAN_NAME_COMMIT; - com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(spanName); - try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { - CommitResponse response = - RetryHelper.runWithRetries( - () -> datastoreRpc.commit(requestPb), - retrySettings, - requestPb.getTransaction().isEmpty() - ? EXCEPTION_HANDLER - : TRANSACTION_OPERATION_EXCEPTION_HANDLER, - getOptions().getClock()); - span.addEvent( - spanName + " complete.", - new ImmutableMap.Builder() - .put(ATTRIBUTES_KEY_DOCUMENT_COUNT, response.getMutationResultsCount()) - .put(ATTRIBUTES_KEY_TRANSACTIONAL, isTransactional) - .put( - ATTRIBUTES_KEY_TRANSACTION_ID, - isTransactional ? requestPb.getTransaction().toStringUtf8() : "") - .build()); - return response; - } catch (RetryHelperException e) { - span.end(e); - throw DatastoreException.translateAndThrow(e); - } finally { - span.end(); - } + + return runWithObservability( + () -> { + CommitResponse response = datastoreRpc.commit(requestPb); + com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.getCurrentSpan(); + if (span != null) { + span.addEvent( + spanName + " complete.", + new ImmutableMap.Builder() + .put(ATTRIBUTES_KEY_DOCUMENT_COUNT, response.getMutationResultsCount()) + .put(ATTRIBUTES_KEY_TRANSACTIONAL, isTransactional) + .put( + ATTRIBUTES_KEY_TRANSACTION_ID, + isTransactional ? requestPb.getTransaction().toStringUtf8() : "") + .build()); + } + return response; + }, + TelemetryConstants.METHOD_COMMIT, + spanName, + requestPb.getTransaction().isEmpty() + ? EXCEPTION_HANDLER + : TRANSACTION_OPERATION_EXCEPTION_HANDLER); } ByteString requestTransactionId( @@ -788,20 +763,11 @@ ByteString requestTransactionId( com.google.datastore.v1.BeginTransactionResponse beginTransaction( final com.google.datastore.v1.BeginTransactionRequest requestPb) { - com.google.cloud.datastore.telemetry.TraceUtil.Span span = - otelTraceUtil.startSpan(SPAN_NAME_BEGIN_TRANSACTION); - try (com.google.cloud.datastore.telemetry.TraceUtil.Scope scope = span.makeCurrent()) { - return RetryHelper.runWithRetries( - () -> datastoreRpc.beginTransaction(requestPb), - retrySettings, - EXCEPTION_HANDLER, - getOptions().getClock()); - } catch (RetryHelperException e) { - span.end(e); - throw DatastoreException.translateAndThrow(e); - } finally { - span.end(); - } + return runWithObservability( + () -> datastoreRpc.beginTransaction(requestPb), + TelemetryConstants.METHOD_BEGIN_TRANSACTION, + SPAN_NAME_BEGIN_TRANSACTION, + EXCEPTION_HANDLER); } void rollbackTransaction(ByteString transaction) { @@ -813,30 +779,56 @@ void rollbackTransaction(ByteString transaction) { rollback(requestPb.build()); } - void rollback(final com.google.datastore.v1.RollbackRequest requestPb) { - com.google.cloud.datastore.telemetry.TraceUtil.Span span = - otelTraceUtil.startSpan(SPAN_NAME_ROLLBACK); - try (Scope scope = span.makeCurrent()) { - RetryHelper.runWithRetries( - new Callable() { - @Override - public Void call() throws DatastoreException { - datastoreRpc.rollback(requestPb); - return null; - } - }, - retrySettings, - EXCEPTION_HANDLER, - getOptions().getClock()); - span.addEvent( - SPAN_NAME_ROLLBACK, - new ImmutableMap.Builder() - .put(ATTRIBUTES_KEY_TRANSACTION_ID, requestPb.getTransaction().toStringUtf8()) - .build()); + void rollback(final RollbackRequest requestPb) { + runWithObservability( + () -> { + datastoreRpc.rollback(requestPb); + com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.getCurrentSpan(); + if (span != null) { + span.addEvent( + SPAN_NAME_ROLLBACK, + new ImmutableMap.Builder() + .put(ATTRIBUTES_KEY_TRANSACTION_ID, requestPb.getTransaction().toStringUtf8()) + .build()); + } + return null; + }, + TelemetryConstants.METHOD_ROLLBACK, + SPAN_NAME_ROLLBACK, + EXCEPTION_HANDLER); + } + + private T runWithObservability( + Callable callable, + String methodName, + String spanName, + ResultRetryAlgorithm exceptionHandler) { + com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(spanName); + + // Gax already records operation and attempt metrics. Since Datastore HttpJson does not + // integrate with Gax, manually instrument these metrics when using HttpJson for parity + Stopwatch operationStopwatch = isHttpTransport ? Stopwatch.createStarted() : null; + String operationStatus = StatusCode.Code.OK.toString(); + + DatastoreOptions options = getOptions(); + Callable attemptCallable = + TelemetryUtils.attemptMetricsCallable( + callable, metricsRecorder, options, isHttpTransport, methodName); + try (TraceUtil.Scope ignored = span.makeCurrent()) { + return RetryHelper.runWithRetries( + attemptCallable, retrySettings, exceptionHandler, options.getClock()); } catch (RetryHelperException e) { + operationStatus = DatastoreException.extractStatusCode(e); span.end(e); throw DatastoreException.translateAndThrow(e); } finally { + TelemetryUtils.recordOperationMetrics( + metricsRecorder, + options, + isHttpTransport, + operationStopwatch, + methodName, + operationStatus); span.end(); } } diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java index 630ddd225ce2..397015c5ef4e 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java @@ -18,11 +18,20 @@ import static com.google.cloud.BaseService.EXCEPTION_HANDLER; import com.google.api.core.InternalApi; +import com.google.api.core.ObsoleteApi; import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.rpc.StatusCode; import com.google.cloud.RetryHelper; import com.google.cloud.RetryHelper.RetryHelperException; import com.google.cloud.datastore.spi.v1.DatastoreRpc; +import com.google.cloud.datastore.telemetry.MetricsRecorder; +import com.google.cloud.datastore.telemetry.NoOpMetricsRecorder; +import com.google.cloud.datastore.telemetry.TelemetryConstants; +import com.google.cloud.datastore.telemetry.TelemetryUtils; import com.google.cloud.datastore.telemetry.TraceUtil; +import com.google.cloud.http.HttpTransportOptions; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; import com.google.datastore.v1.AllocateIdsRequest; import com.google.datastore.v1.AllocateIdsResponse; import com.google.datastore.v1.BeginTransactionRequest; @@ -53,7 +62,10 @@ public class RetryAndTraceDatastoreRpcDecorator implements DatastoreRpc { private final com.google.cloud.datastore.telemetry.TraceUtil otelTraceUtil; private final RetrySettings retrySettings; private final DatastoreOptions datastoreOptions; + private final MetricsRecorder metricsRecorder; + private final boolean isHttpTransport; + @ObsoleteApi("Prefer to create RetryAndTraceDatastoreRpcDecorator via the Builder") public RetryAndTraceDatastoreRpcDecorator( DatastoreRpc datastoreRpc, TraceUtil otelTraceUtil, @@ -63,6 +75,69 @@ public RetryAndTraceDatastoreRpcDecorator( this.retrySettings = retrySettings; this.datastoreOptions = datastoreOptions; this.otelTraceUtil = otelTraceUtil; + this.metricsRecorder = new NoOpMetricsRecorder(); + this.isHttpTransport = datastoreOptions.getTransportOptions() instanceof HttpTransportOptions; + } + + private RetryAndTraceDatastoreRpcDecorator(Builder builder) { + this.datastoreRpc = builder.datastoreRpc; + this.otelTraceUtil = builder.otelTraceUtil; + this.retrySettings = builder.retrySettings; + this.datastoreOptions = builder.datastoreOptions; + this.metricsRecorder = builder.metricsRecorder; + this.isHttpTransport = builder.isHttpTransport; + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + private DatastoreRpc datastoreRpc; + private TraceUtil otelTraceUtil; + private RetrySettings retrySettings; + private DatastoreOptions datastoreOptions; + + // Defaults configured for this class + private MetricsRecorder metricsRecorder = new NoOpMetricsRecorder(); + private boolean isHttpTransport = false; + + private Builder() {} + + public Builder setDatastoreRpc(DatastoreRpc datastoreRpc) { + this.datastoreRpc = datastoreRpc; + return this; + } + + public Builder setTraceUtil(TraceUtil otelTraceUtil) { + this.otelTraceUtil = otelTraceUtil; + return this; + } + + public Builder setRetrySettings(RetrySettings retrySettings) { + this.retrySettings = retrySettings; + return this; + } + + public Builder setDatastoreOptions(DatastoreOptions datastoreOptions) { + this.datastoreOptions = datastoreOptions; + return this; + } + + public Builder setMetricsRecorder(MetricsRecorder metricsRecorder) { + Preconditions.checkNotNull(metricsRecorder, "metricsRecorder can not be null"); + this.metricsRecorder = metricsRecorder; + return this; + } + + public RetryAndTraceDatastoreRpcDecorator build() { + Preconditions.checkNotNull(datastoreRpc, "datastoreRpc is required"); + Preconditions.checkNotNull(otelTraceUtil, "otelTraceUtil is required"); + Preconditions.checkNotNull(retrySettings, "retrySettings is required"); + Preconditions.checkNotNull(datastoreOptions, "datastoreOptions is required"); + this.isHttpTransport = datastoreOptions.getTransportOptions() instanceof HttpTransportOptions; + return new RetryAndTraceDatastoreRpcDecorator(this); + } } @Override @@ -110,7 +185,10 @@ public RunAggregationQueryResponse runAggregationQuery(RunAggregationQueryReques ? com.google.cloud.datastore.telemetry.TraceUtil .SPAN_NAME_TRANSACTION_RUN_AGGREGATION_QUERY : com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY); - return invokeRpc(() -> datastoreRpc.runAggregationQuery(request), spanName); + return invokeRpc( + () -> datastoreRpc.runAggregationQuery(request), + spanName, + TelemetryConstants.METHOD_RUN_AGGREGATION_QUERY); } @Override @@ -124,14 +202,34 @@ public boolean isClosed() { } public O invokeRpc(Callable block, String startSpan) { - com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(startSpan); - try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { - return RetryHelper.runWithRetries( - block, this.retrySettings, EXCEPTION_HANDLER, this.datastoreOptions.getClock()); + return invokeRpc(block, startSpan, null); + } + + O invokeRpc(Callable block, String startSpan, String methodName) { + TraceUtil.Span span = otelTraceUtil.startSpan(startSpan); + Stopwatch stopwatch = isHttpTransport ? Stopwatch.createStarted() : null; + String operationStatus = StatusCode.Code.UNKNOWN.toString(); + try (TraceUtil.Scope ignored = span.makeCurrent()) { + Callable callable = + TelemetryUtils.attemptMetricsCallable( + block, metricsRecorder, datastoreOptions, isHttpTransport, methodName); + O result = + RetryHelper.runWithRetries( + callable, this.retrySettings, EXCEPTION_HANDLER, this.datastoreOptions.getClock()); + operationStatus = StatusCode.Code.OK.toString(); + return result; } catch (RetryHelperException e) { + operationStatus = DatastoreException.extractStatusCode(e); span.end(e); throw DatastoreException.translateAndThrow(e); } finally { + TelemetryUtils.recordOperationMetrics( + metricsRecorder, + datastoreOptions, + isHttpTransport, + stopwatch, + methodName, + operationStatus); span.end(); } } diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/EnabledTraceUtil.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/EnabledTraceUtil.java index 711e94020548..3dc8395d170b 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/EnabledTraceUtil.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/EnabledTraceUtil.java @@ -208,10 +208,13 @@ public TraceUtil.Span setAttribute(String key, boolean value) { } @Override + @SuppressWarnings("MustBeClosedChecker") public Scope makeCurrent() { - try (io.opentelemetry.context.Scope scope = span.makeCurrent()) { - return new Scope(scope); - } + // span.makeCurrent() opens a ThreadLocal scope that binds this span to the current thread. + // We explicitly leave this unclosed and suppress MustBeClosedChecker so that the returned + // TraceUtil.Scope can manage the lifecycle instead, allowing the caller's try-with-resources + // to control when the ThreadLocal context is restored. + return new Scope(span.makeCurrent()); } } @@ -238,10 +241,14 @@ static class Context implements TraceUtil.Context { } @Override + @SuppressWarnings("MustBeClosedChecker") public Scope makeCurrent() { - try (io.opentelemetry.context.Scope scope = context.makeCurrent()) { - return new Scope(scope); - } + // context.makeCurrent() opens a ThreadLocal scope that binds this context to the current + // thread. + // We explicitly leave this unclosed and suppress MustBeClosedChecker so that the returned + // TraceUtil.Scope can manage the lifecycle instead, allowing the caller's try-with-resources + // to control when the ThreadLocal context is restored. + return new Scope(context.makeCurrent()); } } diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/MetricsRecorder.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/MetricsRecorder.java index 6a0ea16b5dd6..a71cb9b7c25c 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/MetricsRecorder.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/MetricsRecorder.java @@ -38,6 +38,18 @@ public interface MetricsRecorder { /** Records the number of attempts a transaction took. */ void recordTransactionAttemptCount(long count, Map attributes); + /** Records the latency of a single RPC attempt in milliseconds. */ + void recordAttemptLatency(double latencyMs, Map attributes); + + /** Records the count of a single RPC attempt. */ + void recordAttemptCount(long count, Map attributes); + + /** Records the total latency of an operation (including retries) in milliseconds. */ + void recordOperationLatency(double latencyMs, Map attributes); + + /** Records the count of an operation. */ + void recordOperationCount(long count, Map attributes); + /** * Returns a {@link MetricsRecorder} instance based on the provided OpenTelemetry options. * diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/NoOpMetricsRecorder.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/NoOpMetricsRecorder.java index 3ac5f31a023c..1523e569505f 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/NoOpMetricsRecorder.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/NoOpMetricsRecorder.java @@ -16,13 +16,19 @@ package com.google.cloud.datastore.telemetry; +import com.google.api.core.InternalApi; import java.util.Map; /** * Metrics recorder implementation, used to stub out metrics instrumentation when metrics are * disabled. + * + *

WARNING: This class is intended for internal use only. It was made public to be used across + * packages as a default. It should not be used by external customers and its API may change without + * notice. */ -class NoOpMetricsRecorder implements MetricsRecorder { +@InternalApi +public class NoOpMetricsRecorder implements MetricsRecorder { @Override public void recordTransactionLatency(double latencyMs, Map attributes) { @@ -33,4 +39,24 @@ public void recordTransactionLatency(double latencyMs, Map attri public void recordTransactionAttemptCount(long count, Map attributes) { /* No-Op OTel Operation */ } + + @Override + public void recordAttemptLatency(double latencyMs, Map attributes) { + /* No-Op OTel Operation */ + } + + @Override + public void recordAttemptCount(long count, Map attributes) { + /* No-Op OTel Operation */ + } + + @Override + public void recordOperationLatency(double latencyMs, Map attributes) { + /* No-Op OTel Operation */ + } + + @Override + public void recordOperationCount(long count, Map attributes) { + /* No-Op OTel Operation */ + } } diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorder.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorder.java index e50edbd5f79f..6314bbc6413c 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorder.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorder.java @@ -33,6 +33,10 @@ class OpenTelemetryMetricsRecorder implements MetricsRecorder { private final DoubleHistogram transactionLatency; private final LongCounter transactionAttemptCount; + private final DoubleHistogram attemptLatency; + private final LongCounter attemptCount; + private final DoubleHistogram operationLatency; + private final LongCounter operationCount; OpenTelemetryMetricsRecorder(@Nonnull OpenTelemetry openTelemetry) { this.openTelemetry = openTelemetry; @@ -51,6 +55,34 @@ class OpenTelemetryMetricsRecorder implements MetricsRecorder { .counterBuilder(TelemetryConstants.METRIC_NAME_TRANSACTION_ATTEMPT_COUNT) .setDescription("Number of attempts to commit a transaction") .build(); + + this.attemptLatency = + meter + .histogramBuilder(TelemetryConstants.METRIC_NAME_ATTEMPT_LATENCY) + .setDescription("Latency of a single RPC attempt") + .setUnit("ms") + .build(); + + this.attemptCount = + meter + .counterBuilder(TelemetryConstants.METRIC_NAME_ATTEMPT_COUNT) + .setDescription("Number of RPC attempts") + .setUnit("1") + .build(); + + this.operationLatency = + meter + .histogramBuilder(TelemetryConstants.METRIC_NAME_OPERATION_LATENCY) + .setDescription("Total latency of an operation including retries") + .setUnit("ms") + .build(); + + this.operationCount = + meter + .counterBuilder(TelemetryConstants.METRIC_NAME_OPERATION_COUNT) + .setDescription("Number of operations") + .setUnit("1") + .build(); } OpenTelemetry getOpenTelemetry() { @@ -67,6 +99,26 @@ public void recordTransactionAttemptCount(long count, Map attrib transactionAttemptCount.add(count, toOtelAttributes(attributes)); } + @Override + public void recordAttemptLatency(double latencyMs, Map attributes) { + attemptLatency.record(latencyMs, toOtelAttributes(attributes)); + } + + @Override + public void recordAttemptCount(long count, Map attributes) { + attemptCount.add(count, toOtelAttributes(attributes)); + } + + @Override + public void recordOperationLatency(double latencyMs, Map attributes) { + operationLatency.record(latencyMs, toOtelAttributes(attributes)); + } + + @Override + public void recordOperationCount(long count, Map attributes) { + operationCount.add(count, toOtelAttributes(attributes)); + } + private static Attributes toOtelAttributes(Map attributes) { AttributesBuilder builder = Attributes.builder(); if (attributes != null) { diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java index 847949000512..cd98de7e28b4 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java @@ -68,19 +68,74 @@ public class TelemetryConstants { public static final String METRIC_NAME_TRANSACTION_ATTEMPT_COUNT = SERVICE_NAME + "/client/transaction_attempt_count"; - // Format is not SnakeCase to match the method name convention in Gax. + /** + * Metric name for the total latency of an operation (one full RPC call including retries). Note: + * This does not have the /client prefix to match Gax's format. + */ + public static final String METRIC_NAME_OPERATION_LATENCY = SERVICE_NAME + "/operation_latency"; + + /** + * Metric name for the latency of a single RPC attempt. Note: This does not have the /client + * prefix to match Gax's format. + */ + public static final String METRIC_NAME_ATTEMPT_LATENCY = SERVICE_NAME + "/attempt_latency"; + + /** + * Metric name for the count of operations. Note: This does not have the /client prefix to match + * Gax's format. + */ + public static final String METRIC_NAME_OPERATION_COUNT = SERVICE_NAME + "/operation_count"; + + /** + * Metric name for the count of RPC attempts. Note: This does not have the /client prefix to match + * Gax's format. + */ + public static final String METRIC_NAME_ATTEMPT_COUNT = SERVICE_NAME + "/attempt_count"; + + // This is intentionally different from the `SERVICE_NAME` constant as it matches Gax's logic for + // method name. + static final String METHOD_SERVICE_NAME = "Datastore"; + + // The follow method name formats are not in SnakeCase to match the method name convention in Gax. // The format is {ServiceName}.{MethodName}. For these methods, include `Transaction` // to denote that the metrics are related specifically to transactions. - public static final String METHOD_TRANSACTION_COMMIT = "Datastore.Transaction.Commit"; - public static final String METHOD_TRANSACTION_RUN = "Datastore.Transaction.Run"; + public static final String METHOD_ALLOCATE_IDS = METHOD_SERVICE_NAME + ".AllocateIds"; + public static final String METHOD_BEGIN_TRANSACTION = METHOD_SERVICE_NAME + ".BeginTransaction"; + public static final String METHOD_COMMIT = METHOD_SERVICE_NAME + ".Commit"; + public static final String METHOD_LOOKUP = METHOD_SERVICE_NAME + ".Lookup"; + public static final String METHOD_RESERVE_IDS = METHOD_SERVICE_NAME + ".ReserveIds"; + public static final String METHOD_ROLLBACK = METHOD_SERVICE_NAME + ".Rollback"; + public static final String METHOD_RUN_QUERY = METHOD_SERVICE_NAME + ".RunQuery"; + public static final String METHOD_RUN_AGGREGATION_QUERY = + METHOD_SERVICE_NAME + ".RunAggregationQuery"; + + // These metrics capture the specific transaction related + public static final String METHOD_TRANSACTION_COMMIT = + METHOD_SERVICE_NAME + ".Transaction.Commit"; + public static final String METHOD_TRANSACTION_RUN = METHOD_SERVICE_NAME + ".Transaction.Run"; + + public enum Transport { + GRPC("grpc"), + HTTP("http"); + + private final String transport; + + Transport(String transport) { + this.transport = transport; + } - private TelemetryConstants() {} + public String getTransport() { + return transport; + } + } public static String getTransportName(TransportOptions transportOptions) { if (transportOptions instanceof GrpcTransportOptions) { - return "grpc"; + return Transport.GRPC.getTransport(); } else { - return "http"; + return Transport.HTTP.getTransport(); } } + + private TelemetryConstants() {} } diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryUtils.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryUtils.java new file mode 100644 index 000000000000..d6c04abaf66f --- /dev/null +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryUtils.java @@ -0,0 +1,132 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.datastore.telemetry; + +import com.google.api.core.InternalApi; +import com.google.api.gax.rpc.StatusCode; +import com.google.cloud.datastore.DatastoreException; +import com.google.cloud.datastore.DatastoreOptions; +import com.google.common.base.Stopwatch; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +/** + * Utility class for common telemetry operations in Datastore. + * + *

WARNING: This class is intended for internal use only. + */ +@InternalApi +public final class TelemetryUtils { + + private TelemetryUtils() {} + + /** + * Method to build a map of attributes to be used across both operation and attempt level metrics. + * + * @param datastoreOptions The DatastoreOptions object. + * @param methodName The name of the API method. + * @param status The status of the operation or attempt. + * @return The map of attributes. + */ + public static Map buildMetricAttributes( + DatastoreOptions datastoreOptions, String methodName, String status) { + Map attributes = new HashMap<>(); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_METHOD, methodName); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, status); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, datastoreOptions.getProjectId()); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, datastoreOptions.getDatabaseId()); + attributes.put( + TelemetryConstants.ATTRIBUTES_KEY_TRANSPORT, + TelemetryConstants.getTransportName(datastoreOptions.getTransportOptions())); + return attributes; + } + + /** + * Method to record operation level metrics for HttpJson transport. This method should be called + * after the entire operation across all retry attempts has completed. + * + * @param metricsRecorder The metrics recorder. + * @param datastoreOptions The DatastoreOptions object. + * @param isHttpTransport Whether the current transport is HTTP. + * @param operationStopwatch The stopwatch tracking the duration of the entire operation. + * @param methodName The name of the API method. + * @param status The final status of the operation after all retries. + */ + public static void recordOperationMetrics( + MetricsRecorder metricsRecorder, + DatastoreOptions datastoreOptions, + boolean isHttpTransport, + Stopwatch operationStopwatch, + String methodName, + String status) { + // Operation metrics are only recorded for HttpJson transport as Gax already records + // operation metrics for gRPC transport. This prevents metrics from being recorded twice + // for gRPC transport. + if (!isHttpTransport) { + return; + } + if (methodName != null) { + Map attributes = buildMetricAttributes(datastoreOptions, methodName, status); + metricsRecorder.recordOperationLatency( + operationStopwatch.elapsed(TimeUnit.MILLISECONDS), attributes); + metricsRecorder.recordOperationCount(1, attributes); + } + } + + /** + * Wraps a callable with logic to record attempt-level metrics for HttpJson transport. Attempt + * metrics are recorded for each individual execution of the callable, regardless of whether it + * succeeds or fails. + * + * @param callable The original callable to execute. + * @param metricsRecorder The metrics recorder. + * @param datastoreOptions The DatastoreOptions object. + * @param isHttpTransport Whether the current transport is HTTP. + * @param methodName The name of the API method. + * @param The return type of the callable. + * @return A wrapped callable that includes attempt-level metrics recording. + */ + public static Callable attemptMetricsCallable( + Callable callable, + MetricsRecorder metricsRecorder, + DatastoreOptions datastoreOptions, + boolean isHttpTransport, + String methodName) { + // Attempt metrics are already recorded by Gax for gRPC transport. This + // prevents the metrics from being recorded twice for gRPC transport. + if (!isHttpTransport) { + return callable; + } + return () -> { + Stopwatch stopwatch = Stopwatch.createStarted(); + String status = StatusCode.Code.OK.toString(); + try { + return callable.call(); + } catch (Exception e) { + status = DatastoreException.extractStatusCode(e); + throw e; + } finally { + Map attributes = + buildMetricAttributes(datastoreOptions, methodName, status); + metricsRecorder.recordAttemptLatency(stopwatch.elapsed(TimeUnit.MILLISECONDS), attributes); + metricsRecorder.recordAttemptCount(1, attributes); + } + }; + } +} diff --git a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java index c8a0e079af7a..b8129af13d37 100644 --- a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java +++ b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java @@ -28,6 +28,8 @@ import com.google.datastore.v1.BeginTransactionResponse; import com.google.datastore.v1.CommitRequest; import com.google.datastore.v1.CommitResponse; +import com.google.datastore.v1.LookupRequest; +import com.google.datastore.v1.LookupResponse; import com.google.datastore.v1.RollbackRequest; import com.google.datastore.v1.RollbackResponse; import com.google.datastore.v1.TransactionOptions; @@ -39,33 +41,51 @@ import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import java.util.Arrays; import java.util.Collection; +import java.util.List; import java.util.Optional; import org.easymock.EasyMock; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; /** * Tests for transaction metrics recording in {@link DatastoreImpl}. These tests verify that * transaction latency and per-attempt metrics are correctly recorded via the {@link * com.google.cloud.datastore.telemetry.MetricsRecorder}. */ -@RunWith(JUnit4.class) +@RunWith(Parameterized.class) public class DatastoreImplMetricsTest { private static final String PROJECT_ID = "test-project"; private static final String DATABASE_ID = "test-database"; - private static InMemoryMetricReader metricReader; - private static DatastoreRpcFactory rpcFactoryMock; - private static DatastoreRpc rpcMock; - private static Datastore datastore; + private InMemoryMetricReader metricReader; + private DatastoreRpc rpcMock; + private Datastore datastore; + private final TelemetryConstants.Transport transport; + + /** + * We parameterize this test to run against both GRPC and HTTP transports. This ensures that + * Datastore-specific transaction and commit metrics are correctly recorded for both transports, + * while verifying that operation and attempt metrics are only manually recorded for HTTP (since + * GAX handles them natively for gRPC). Parameterizing allows us to automatically test both + * transports for any new metrics added. + */ + @Parameters(name = "transport={0}") + public static List data() { + return Arrays.asList(TelemetryConstants.Transport.GRPC, TelemetryConstants.Transport.HTTP); + } + + public DatastoreImplMetricsTest(TelemetryConstants.Transport transport) { + this.transport = transport; + } - @BeforeClass - public static void setUpClass() { + @Before + public void setUp() { // Use delta temporality so each collectAllMetrics() call returns only new data and resets. metricReader = InMemoryMetricReader.createDelta(); SdkMeterProvider meterProvider = @@ -73,12 +93,12 @@ public static void setUpClass() { OpenTelemetrySdk openTelemetry = OpenTelemetrySdk.builder().setMeterProvider(meterProvider).build(); - rpcFactoryMock = EasyMock.createStrictMock(DatastoreRpcFactory.class); + DatastoreRpcFactory rpcFactoryMock = EasyMock.createStrictMock(DatastoreRpcFactory.class); // Use a regular (non-strict) mock for rpcMock so that anyTimes() expectations work without // enforcing call order — needed for retry tests with unpredictable call counts. rpcMock = EasyMock.createMock(DatastoreRpc.class); - DatastoreOptions datastoreOptions = + DatastoreOptions.Builder builder = DatastoreOptions.newBuilder() .setProjectId(PROJECT_ID) .setDatabaseId(DATABASE_ID) @@ -89,20 +109,22 @@ public static void setUpClass() { DatastoreOpenTelemetryOptions.newBuilder() .setMetricsEnabled(true) .setOpenTelemetry(openTelemetry) - .build()) - .build(); + .build()); + + if (TelemetryConstants.Transport.GRPC.equals(transport)) { + builder.setTransportOptions(com.google.cloud.grpc.GrpcTransportOptions.newBuilder().build()); + } else { + builder.setTransportOptions(com.google.cloud.http.HttpTransportOptions.newBuilder().build()); + } + + DatastoreOptions datastoreOptions = builder.build(); EasyMock.expect(rpcFactoryMock.create(datastoreOptions)).andReturn(rpcMock); EasyMock.replay(rpcFactoryMock); datastore = datastoreOptions.getService(); EasyMock.verify(rpcFactoryMock); - } - @Before - public void setUp() { - // Drain any metrics accumulated during @BeforeClass or previous tests. - metricReader.collectAllMetrics(); - EasyMock.reset(rpcMock); + metricReader.collectAllMetrics(); // drain initialization } @Test @@ -498,6 +520,160 @@ public void runInTransaction_withTransactionOptions_recordsMetrics() { EasyMock.verify(rpcMock); } + @Test + public void lookup_recordsOperationAndAttemptMetrics() { + // Use `lookup` as a simple RPC test to test that metrics are recorded for operation + // and attempt. Any RPC could have worked (there is no specific reason for lookup) + EasyMock.expect(rpcMock.lookup(EasyMock.anyObject(LookupRequest.class))) + .andReturn(LookupResponse.getDefaultInstance()); + EasyMock.replay(rpcMock); + + // Use get() which triggers lookup internally + datastore.get(Key.newBuilder(PROJECT_ID, "Kind", "name").setDatabaseId(DATABASE_ID).build()); + + Collection metrics = metricReader.collectAllMetrics(); + + // Gax already records operation and attempt metrics natively for the gRPC transport. + // DatastoreImpl explicitly avoids recording them here to prevent double-counting. + // Since this unit test bypasses the GAX networking layer by mocking DatastoreRpc, + // we assert that no local duplicate metrics are emitted by DatastoreImpl for gRPC, + // and skip the rest of the assertions. + if (TelemetryConstants.Transport.GRPC.equals(transport)) { + Optional operationLatency = + findMetric(metrics, TelemetryConstants.METRIC_NAME_OPERATION_LATENCY); + assertThat(operationLatency.isPresent()).isFalse(); + EasyMock.verify(rpcMock); + return; + } + + // Verify operation latency + Optional operationLatency = + findMetric(metrics, TelemetryConstants.METRIC_NAME_OPERATION_LATENCY); + assertThat(operationLatency.isPresent()).isTrue(); + HistogramPointData opLatencyPoint = + operationLatency.get().getHistogramData().getPoints().stream() + .filter( + p -> + dataContainsStringAttribute( + p, + TelemetryConstants.ATTRIBUTES_KEY_METHOD, + TelemetryConstants.METHOD_LOOKUP)) + .findFirst() + .orElse(null); + assertThat(opLatencyPoint).isNotNull(); + assertThat(opLatencyPoint.getCount()).isEqualTo(1); + assertThat( + dataContainsStringAttribute( + opLatencyPoint, + TelemetryConstants.ATTRIBUTES_KEY_STATUS, + StatusCode.Code.OK.toString())) + .isTrue(); + assertThat( + dataContainsStringAttribute( + opLatencyPoint, TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, PROJECT_ID)) + .isTrue(); + assertThat( + dataContainsStringAttribute( + opLatencyPoint, TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, DATABASE_ID)) + .isTrue(); + + // Verify operation count + Optional operationCount = + findMetric(metrics, TelemetryConstants.METRIC_NAME_OPERATION_COUNT); + assertThat(operationCount.isPresent()).isTrue(); + + // Verify attempt latency + Optional attemptLatency = + findMetric(metrics, TelemetryConstants.METRIC_NAME_ATTEMPT_LATENCY); + assertThat(attemptLatency.isPresent()).isTrue(); + HistogramPointData attLatencyPoint = + attemptLatency.get().getHistogramData().getPoints().stream() + .filter( + p -> + dataContainsStringAttribute( + p, + TelemetryConstants.ATTRIBUTES_KEY_METHOD, + TelemetryConstants.METHOD_LOOKUP)) + .findFirst() + .orElse(null); + assertThat(attLatencyPoint).isNotNull(); + assertThat(attLatencyPoint.getCount()).isEqualTo(1); + + // Verify attempt count + Optional attemptCount = + findMetric(metrics, TelemetryConstants.METRIC_NAME_ATTEMPT_COUNT); + assertThat(attemptCount.isPresent()).isTrue(); + + EasyMock.verify(rpcMock); + } + + @Test + public void lookup_recordsFailureStatusOnError() { + // Use `lookup` as a simple RPC test to test that metrics are recorded for operation + // and attempt. Any RPC could have worked (there is no specific reason for lookup) + StatusCode.Code unavailableStatusCode = StatusCode.Code.UNAVAILABLE; + EasyMock.expect(rpcMock.lookup(EasyMock.anyObject(LookupRequest.class))) + .andThrow( + new DatastoreException( + 14, + unavailableStatusCode.toString(), + unavailableStatusCode.toString(), + false, + null)) + .anyTimes(); + EasyMock.replay(rpcMock); + + assertThrows( + DatastoreException.class, + () -> + datastore.get( + Key.newBuilder(PROJECT_ID, "Kind", "name").setDatabaseId(DATABASE_ID).build())); + + Collection metrics = metricReader.collectAllMetrics(); + + // Gax already records operation and attempt metrics natively for the gRPC transport. + // DatastoreImpl explicitly avoids recording them here to prevent double-counting. + // Since this unit test bypasses the GAX networking layer by mocking DatastoreRpc, + // we assert that no local duplicate metrics are emitted by DatastoreImpl for gRPC, + // and skip the rest of the assertions. + if (TelemetryConstants.Transport.GRPC.equals(transport)) { + Optional operationLatency = + findMetric(metrics, TelemetryConstants.METRIC_NAME_OPERATION_LATENCY); + assertThat(operationLatency.isPresent()).isFalse(); + EasyMock.verify(rpcMock); + return; + } + + // Verify operation latency with UNAVAILABLE status + Optional operationLatency = + findMetric(metrics, TelemetryConstants.METRIC_NAME_OPERATION_LATENCY); + assertThat(operationLatency.isPresent()).isTrue(); + HistogramPointData opLatencyPoint = + operationLatency.get().getHistogramData().getPoints().stream() + .filter( + p -> + dataContainsStringAttribute( + p, + TelemetryConstants.ATTRIBUTES_KEY_METHOD, + TelemetryConstants.METHOD_LOOKUP)) + .findFirst() + .orElse(null); + assertThat(opLatencyPoint).isNotNull(); + assertThat( + dataContainsStringAttribute( + opLatencyPoint, + TelemetryConstants.ATTRIBUTES_KEY_STATUS, + unavailableStatusCode.toString())) + .isTrue(); + + // Verify attempt metrics were also recorded with UNAVAILABLE + Optional attemptCount = + findMetric(metrics, TelemetryConstants.METRIC_NAME_ATTEMPT_COUNT); + assertThat(attemptCount.isPresent()).isTrue(); + + EasyMock.verify(rpcMock); + } + private static Optional findMetric( Collection metrics, String metricName) { return metrics.stream().filter(m -> m.getName().equals(metricName)).findFirst(); diff --git a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorderTest.java b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorderTest.java index d9c690680e74..1233e4f1bfac 100644 --- a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorderTest.java +++ b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorderTest.java @@ -162,4 +162,109 @@ public void recordTransactionLatency_nullAttributes() { Collection metrics = metricReader.collectAllMetrics(); assertThat(metrics).isNotEmpty(); } + + @Test + public void recordAttemptLatency_recordsHistogramWithAttributes() { + Map attributes = new HashMap<>(); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, StatusCode.Code.OK.toString()); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_METHOD, TelemetryConstants.METHOD_LOOKUP); + + recorder.recordAttemptLatency(42.0, attributes); + + Collection metrics = metricReader.collectAllMetrics(); + MetricData metric = + metrics.stream() + .filter(m -> m.getName().equals(TelemetryConstants.METRIC_NAME_ATTEMPT_LATENCY)) + .findFirst() + .orElse(null); + + assertThat(metric).isNotNull(); + assertThat(metric.getDescription()).isEqualTo("Latency of a single RPC attempt"); + assertThat(metric.getUnit()).isEqualTo("ms"); + + HistogramPointData point = + metric.getHistogramData().getPoints().stream().findFirst().orElse(null); + assertThat(point).isNotNull(); + assertThat(point.getSum()).isEqualTo(42.0); + assertThat(point.getCount()).isEqualTo(1); + assertThat( + point + .getAttributes() + .get(AttributeKey.stringKey(TelemetryConstants.ATTRIBUTES_KEY_METHOD))) + .isEqualTo(TelemetryConstants.METHOD_LOOKUP); + } + + @Test + public void recordAttemptCount_recordsCounterWithAttributes() { + Map attributes = new HashMap<>(); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, StatusCode.Code.OK.toString()); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_METHOD, TelemetryConstants.METHOD_COMMIT); + + recorder.recordAttemptCount(1, attributes); + + Collection metrics = metricReader.collectAllMetrics(); + MetricData metric = + metrics.stream() + .filter(m -> m.getName().equals(TelemetryConstants.METRIC_NAME_ATTEMPT_COUNT)) + .findFirst() + .orElse(null); + + assertThat(metric).isNotNull(); + assertThat(metric.getDescription()).isEqualTo("Number of RPC attempts"); + + LongPointData point = metric.getLongSumData().getPoints().stream().findFirst().orElse(null); + assertThat(point).isNotNull(); + assertThat(point.getValue()).isEqualTo(1); + } + + @Test + public void recordOperationLatency_recordsHistogramWithAttributes() { + Map attributes = new HashMap<>(); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, StatusCode.Code.OK.toString()); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_METHOD, TelemetryConstants.METHOD_RUN_QUERY); + + recorder.recordOperationLatency(200.0, attributes); + + Collection metrics = metricReader.collectAllMetrics(); + MetricData metric = + metrics.stream() + .filter(m -> m.getName().equals(TelemetryConstants.METRIC_NAME_OPERATION_LATENCY)) + .findFirst() + .orElse(null); + + assertThat(metric).isNotNull(); + assertThat(metric.getDescription()) + .isEqualTo("Total latency of an operation including retries"); + assertThat(metric.getUnit()).isEqualTo("ms"); + + HistogramPointData point = + metric.getHistogramData().getPoints().stream().findFirst().orElse(null); + assertThat(point).isNotNull(); + assertThat(point.getSum()).isEqualTo(200.0); + assertThat(point.getCount()).isEqualTo(1); + } + + @Test + public void recordOperationCount_recordsCounterWithAttributes() { + Map attributes = new HashMap<>(); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, StatusCode.Code.OK.toString()); + attributes.put( + TelemetryConstants.ATTRIBUTES_KEY_METHOD, TelemetryConstants.METHOD_ALLOCATE_IDS); + + recorder.recordOperationCount(1, attributes); + + Collection metrics = metricReader.collectAllMetrics(); + MetricData metric = + metrics.stream() + .filter(m -> m.getName().equals(TelemetryConstants.METRIC_NAME_OPERATION_COUNT)) + .findFirst() + .orElse(null); + + assertThat(metric).isNotNull(); + assertThat(metric.getDescription()).isEqualTo("Number of operations"); + + LongPointData point = metric.getLongSumData().getPoints().stream().findFirst().orElse(null); + assertThat(point).isNotNull(); + assertThat(point.getValue()).isEqualTo(1); + } }