From 33fc6966cdca31ea814a254208f8be1ada286244 Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Tue, 10 Mar 2026 12:32:57 -0400 Subject: [PATCH 01/15] chore: Add operation and attempt metrics for HttpJson --- .../google/cloud/datastore/DatastoreImpl.java | 158 +++++++++--- .../RetryAndTraceDatastoreRpcDecorator.java | 79 +++++- .../datastore/telemetry/MetricsRecorder.java | 12 + .../telemetry/NoOpMetricsRecorder.java | 20 ++ .../OpenTelemetryMetricsRecorder.java | 52 ++++ .../telemetry/TelemetryConstants.java | 76 ++++-- .../datastore/DatastoreImplMetricsTest.java | 227 ++++++++++++++++++ .../OpenTelemetryMetricsRecorderTest.java | 106 ++++++++ 8 files changed, 667 insertions(+), 63 deletions(-) diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java index 39447c7eeb41..d9661826d5ac 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java @@ -51,6 +51,7 @@ import com.google.cloud.datastore.telemetry.TelemetryConstants; import com.google.cloud.datastore.telemetry.TraceUtil; import com.google.cloud.datastore.telemetry.TraceUtil.Scope; +import com.google.cloud.http.HttpTransportOptions; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; @@ -98,6 +99,7 @@ final class DatastoreImpl extends BaseService implements Datas private final com.google.cloud.datastore.telemetry.TraceUtil otelTraceUtil = getOptions().getTraceUtil(); private final MetricsRecorder metricsRecorder = getOptions().getMetricsRecorder(); + private final boolean isHttpTransport; private final ReadOptionProtoPreparer readOptionProtoPreparer; private final AggregationQueryExecutor aggregationQueryExecutor; @@ -105,6 +107,7 @@ final class DatastoreImpl extends BaseService implements Datas DatastoreImpl(DatastoreOptions options) { super(options); this.datastoreRpc = options.getDatastoreRpcV1(); + this.isHttpTransport = options.getTransportOptions() instanceof HttpTransportOptions; retrySettings = MoreObjects.firstNonNull(options.getRetrySettings(), ServiceOptions.getNoRetrySettings()); @@ -112,7 +115,8 @@ final class DatastoreImpl extends BaseService implements Datas aggregationQueryExecutor = new AggregationQueryExecutor( new RetryAndTraceDatastoreRpcDecorator( - datastoreRpc, otelTraceUtil, retrySettings, options), + datastoreRpc, otelTraceUtil, retrySettings, options, metricsRecorder, + isHttpTransport), options); } @@ -354,11 +358,15 @@ com.google.datastore.v1.RunQueryResponse runQuery( boolean isTransactional = readOptions.hasTransaction() || readOptions.hasNewTransaction(); String spanName = (isTransactional ? SPAN_NAME_TRANSACTION_RUN_QUERY : SPAN_NAME_RUN_QUERY); com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(spanName); + Stopwatch operationStopwatch = isHttpTransport ? Stopwatch.createStarted() : null; + String operationStatus = StatusCode.Code.OK.toString(); try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { RunQueryResponse response = RetryHelper.runWithRetries( - () -> datastoreRpc.runQuery(requestPb), + withAttemptMetrics( + () -> datastoreRpc.runQuery(requestPb), + TelemetryConstants.METHOD_RUN_QUERY), retrySettings, requestPb.getReadOptions().getTransaction().isEmpty() ? EXCEPTION_HANDLER @@ -379,9 +387,12 @@ com.google.datastore.v1.RunQueryResponse runQuery( .build()); return response; } catch (RetryHelperException e) { + operationStatus = DatastoreException.extractStatusCode(e); span.end(e); throw DatastoreException.translateAndThrow(e); } finally { + recordOperationMetrics( + operationStopwatch, TelemetryConstants.METHOD_RUN_QUERY, operationStatus); span.end(); } } @@ -426,21 +437,23 @@ private com.google.datastore.v1.AllocateIdsResponse allocateIds( final com.google.datastore.v1.AllocateIdsRequest requestPb) { com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(SPAN_NAME_ALLOCATE_IDS); + Stopwatch operationStopwatch = isHttpTransport ? Stopwatch.createStarted() : null; + String operationStatus = StatusCode.Code.OK.toString(); try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { return RetryHelper.runWithRetries( - new Callable() { - @Override - public com.google.datastore.v1.AllocateIdsResponse call() throws DatastoreException { - return datastoreRpc.allocateIds(requestPb); - } - }, + withAttemptMetrics( + () -> datastoreRpc.allocateIds(requestPb), + TelemetryConstants.METHOD_ALLOCATE_IDS), retrySettings, EXCEPTION_HANDLER, getOptions().getClock()); } catch (RetryHelperException e) { + operationStatus = DatastoreException.extractStatusCode(e); span.end(e); throw DatastoreException.translateAndThrow(e); } finally { + recordOperationMetrics( + operationStopwatch, TelemetryConstants.METHOD_ALLOCATE_IDS, operationStatus); span.end(); } } @@ -588,33 +601,39 @@ com.google.datastore.v1.LookupResponse lookup( boolean isTransactional = readOptions.hasTransaction() || readOptions.hasNewTransaction(); String spanName = (isTransactional ? SPAN_NAME_TRANSACTION_LOOKUP : SPAN_NAME_LOOKUP); com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(spanName); + Stopwatch operationStopwatch = isHttpTransport ? Stopwatch.createStarted() : null; + String operationStatus = StatusCode.Code.OK.toString(); try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { return RetryHelper.runWithRetries( - () -> { - com.google.datastore.v1.LookupResponse response = datastoreRpc.lookup(requestPb); - span.addEvent( - spanName + " complete.", - new ImmutableMap.Builder() - .put(ATTRIBUTES_KEY_RECEIVED, response.getFoundCount()) - .put(ATTRIBUTES_KEY_MISSING, response.getMissingCount()) - .put(ATTRIBUTES_KEY_DEFERRED, response.getDeferredCount()) - .put(ATTRIBUTES_KEY_TRANSACTIONAL, isTransactional) - .put( - ATTRIBUTES_KEY_TRANSACTION_ID, - isTransactional ? readOptions.getTransaction().toStringUtf8() : "") - .build()); - return response; - }, + withAttemptMetrics( + () -> { + com.google.datastore.v1.LookupResponse response = datastoreRpc.lookup(requestPb); + span.addEvent( + spanName + " complete.", + new ImmutableMap.Builder() + .put(ATTRIBUTES_KEY_RECEIVED, response.getFoundCount()) + .put(ATTRIBUTES_KEY_MISSING, response.getMissingCount()) + .put(ATTRIBUTES_KEY_DEFERRED, response.getDeferredCount()) + .put(ATTRIBUTES_KEY_TRANSACTIONAL, isTransactional) + .put( + ATTRIBUTES_KEY_TRANSACTION_ID, + isTransactional ? readOptions.getTransaction().toStringUtf8() : "") + .build()); + return response; + }, + TelemetryConstants.METHOD_LOOKUP), retrySettings, requestPb.getReadOptions().getTransaction().isEmpty() ? EXCEPTION_HANDLER : TRANSACTION_OPERATION_EXCEPTION_HANDLER, getOptions().getClock()); } catch (RetryHelperException e) { + operationStatus = DatastoreException.extractStatusCode(e); span.end(e); throw DatastoreException.translateAndThrow(e); } finally { + recordOperationMetrics(operationStopwatch, TelemetryConstants.METHOD_LOOKUP, operationStatus); span.end(); } } @@ -641,21 +660,23 @@ com.google.datastore.v1.ReserveIdsResponse reserveIds( final com.google.datastore.v1.ReserveIdsRequest requestPb) { com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(SPAN_NAME_RESERVE_IDS); + Stopwatch operationStopwatch = isHttpTransport ? Stopwatch.createStarted() : null; + String operationStatus = StatusCode.Code.OK.toString(); try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { return RetryHelper.runWithRetries( - new Callable() { - @Override - public com.google.datastore.v1.ReserveIdsResponse call() throws DatastoreException { - return datastoreRpc.reserveIds(requestPb); - } - }, + withAttemptMetrics( + () -> datastoreRpc.reserveIds(requestPb), + TelemetryConstants.METHOD_RESERVE_IDS), retrySettings, EXCEPTION_HANDLER, getOptions().getClock()); } catch (RetryHelperException e) { + operationStatus = DatastoreException.extractStatusCode(e); span.end(e); throw DatastoreException.translateAndThrow(e); } finally { + recordOperationMetrics( + operationStopwatch, TelemetryConstants.METHOD_RESERVE_IDS, operationStatus); span.end(); } } @@ -754,10 +775,14 @@ com.google.datastore.v1.CommitResponse commit( requestPb.hasTransaction() || requestPb.hasSingleUseTransaction(); final String spanName = isTransactional ? SPAN_NAME_TRANSACTION_COMMIT : SPAN_NAME_COMMIT; com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(spanName); + Stopwatch operationStopwatch = isHttpTransport ? Stopwatch.createStarted() : null; + String operationStatus = StatusCode.Code.OK.toString(); try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { CommitResponse response = RetryHelper.runWithRetries( - () -> datastoreRpc.commit(requestPb), + withAttemptMetrics( + () -> datastoreRpc.commit(requestPb), + TelemetryConstants.METHOD_COMMIT), retrySettings, requestPb.getTransaction().isEmpty() ? EXCEPTION_HANDLER @@ -774,9 +799,12 @@ com.google.datastore.v1.CommitResponse commit( .build()); return response; } catch (RetryHelperException e) { + operationStatus = DatastoreException.extractStatusCode(e); span.end(e); throw DatastoreException.translateAndThrow(e); } finally { + recordOperationMetrics( + operationStopwatch, TelemetryConstants.METHOD_COMMIT, operationStatus); span.end(); } } @@ -790,16 +818,23 @@ com.google.datastore.v1.BeginTransactionResponse beginTransaction( final com.google.datastore.v1.BeginTransactionRequest requestPb) { com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(SPAN_NAME_BEGIN_TRANSACTION); + Stopwatch operationStopwatch = isHttpTransport ? Stopwatch.createStarted() : null; + String operationStatus = StatusCode.Code.OK.toString(); try (com.google.cloud.datastore.telemetry.TraceUtil.Scope scope = span.makeCurrent()) { return RetryHelper.runWithRetries( - () -> datastoreRpc.beginTransaction(requestPb), + withAttemptMetrics( + () -> datastoreRpc.beginTransaction(requestPb), + TelemetryConstants.METHOD_BEGIN_TRANSACTION), retrySettings, EXCEPTION_HANDLER, getOptions().getClock()); } catch (RetryHelperException e) { + operationStatus = DatastoreException.extractStatusCode(e); span.end(e); throw DatastoreException.translateAndThrow(e); } finally { + recordOperationMetrics( + operationStopwatch, TelemetryConstants.METHOD_BEGIN_TRANSACTION, operationStatus); span.end(); } } @@ -816,15 +851,16 @@ void rollbackTransaction(ByteString transaction) { void rollback(final com.google.datastore.v1.RollbackRequest requestPb) { com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(SPAN_NAME_ROLLBACK); + Stopwatch operationStopwatch = isHttpTransport ? Stopwatch.createStarted() : null; + String operationStatus = StatusCode.Code.OK.toString(); try (Scope scope = span.makeCurrent()) { RetryHelper.runWithRetries( - new Callable() { - @Override - public Void call() throws DatastoreException { - datastoreRpc.rollback(requestPb); - return null; - } - }, + withAttemptMetrics( + () -> { + datastoreRpc.rollback(requestPb); + return null; + }, + TelemetryConstants.METHOD_ROLLBACK), retrySettings, EXCEPTION_HANDLER, getOptions().getClock()); @@ -834,10 +870,56 @@ public Void call() throws DatastoreException { .put(ATTRIBUTES_KEY_TRANSACTION_ID, requestPb.getTransaction().toStringUtf8()) .build()); } catch (RetryHelperException e) { + operationStatus = DatastoreException.extractStatusCode(e); span.end(e); throw DatastoreException.translateAndThrow(e); } finally { + recordOperationMetrics( + operationStopwatch, TelemetryConstants.METHOD_ROLLBACK, operationStatus); span.end(); } } + + private Map buildMetricAttributes(String methodName, String status) { + Map attributes = new HashMap<>(); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_METHOD, methodName); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, status); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, getOptions().getProjectId()); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, getOptions().getDatabaseId()); + attributes.put( + TelemetryConstants.ATTRIBUTES_KEY_TRANSPORT, + TelemetryConstants.getTransportName(getOptions().getTransportOptions())); + return attributes; + } + + + private void recordOperationMetrics( + Stopwatch operationStopwatch, String methodName, String status) { + if (isHttpTransport) { + Map attributes = buildMetricAttributes(methodName, status); + metricsRecorder.recordOperationLatency( + operationStopwatch.elapsed(TimeUnit.MILLISECONDS), attributes); + metricsRecorder.recordOperationCount(1, attributes); + } + } + + private Callable withAttemptMetrics(Callable callable, String methodName) { + if (!isHttpTransport) { + return callable; + } + return () -> { + Stopwatch sw = Stopwatch.createStarted(); + String status = StatusCode.Code.OK.toString(); + try { + return callable.call(); + } catch (Exception e) { + status = DatastoreException.extractStatusCode(e); + throw e; + } finally { + Map attributes = buildMetricAttributes(methodName, status); + metricsRecorder.recordAttemptLatency(sw.elapsed(TimeUnit.MILLISECONDS), attributes); + metricsRecorder.recordAttemptCount(1, attributes); + } + }; + } } diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java index 630ddd225ce2..a4b74bbdc7e2 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java @@ -18,11 +18,17 @@ import static com.google.cloud.BaseService.EXCEPTION_HANDLER; import com.google.api.core.InternalApi; +import com.google.api.core.ObsoleteApi; import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.rpc.StatusCode; import com.google.cloud.RetryHelper; import com.google.cloud.RetryHelper.RetryHelperException; import com.google.cloud.datastore.spi.v1.DatastoreRpc; +import com.google.cloud.datastore.telemetry.MetricsRecorder; +import com.google.cloud.datastore.telemetry.TelemetryConstants; import com.google.cloud.datastore.telemetry.TraceUtil; +import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; import com.google.datastore.v1.AllocateIdsRequest; import com.google.datastore.v1.AllocateIdsResponse; import com.google.datastore.v1.BeginTransactionRequest; @@ -40,7 +46,10 @@ import com.google.datastore.v1.RunAggregationQueryResponse; import com.google.datastore.v1.RunQueryRequest; import com.google.datastore.v1.RunQueryResponse; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; /** * An implementation of {@link DatastoreRpc} which acts as a Decorator and decorates the underlying @@ -53,16 +62,30 @@ public class RetryAndTraceDatastoreRpcDecorator implements DatastoreRpc { private final com.google.cloud.datastore.telemetry.TraceUtil otelTraceUtil; private final RetrySettings retrySettings; private final DatastoreOptions datastoreOptions; + private final MetricsRecorder metricsRecorder; + private final boolean isHttpTransport; public RetryAndTraceDatastoreRpcDecorator( DatastoreRpc datastoreRpc, TraceUtil otelTraceUtil, RetrySettings retrySettings, DatastoreOptions datastoreOptions) { + this(datastoreRpc, otelTraceUtil, retrySettings, datastoreOptions, null, false); + } + + public RetryAndTraceDatastoreRpcDecorator( + DatastoreRpc datastoreRpc, + TraceUtil otelTraceUtil, + RetrySettings retrySettings, + DatastoreOptions datastoreOptions, + MetricsRecorder metricsRecorder, + boolean isHttpTransport) { this.datastoreRpc = datastoreRpc; this.retrySettings = retrySettings; this.datastoreOptions = datastoreOptions; this.otelTraceUtil = otelTraceUtil; + this.metricsRecorder = Preconditions.checkNotNull(metricsRecorder); + this.isHttpTransport = isHttpTransport; } @Override @@ -110,7 +133,10 @@ public RunAggregationQueryResponse runAggregationQuery(RunAggregationQueryReques ? com.google.cloud.datastore.telemetry.TraceUtil .SPAN_NAME_TRANSACTION_RUN_AGGREGATION_QUERY : com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_RUN_AGGREGATION_QUERY); - return invokeRpc(() -> datastoreRpc.runAggregationQuery(request), spanName); + return invokeRpc( + () -> datastoreRpc.runAggregationQuery(request), + spanName, + TelemetryConstants.METHOD_RUN_AGGREGATION_QUERY); } @Override @@ -124,15 +150,64 @@ public boolean isClosed() { } public O invokeRpc(Callable block, String startSpan) { + return invokeRpc(block, startSpan, null); + } + + O invokeRpc(Callable block, String startSpan, String methodName) { com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(startSpan); + Stopwatch operationStopwatch = + isHttpTransport && methodName != null ? Stopwatch.createStarted() : null; + String operationStatus = StatusCode.Code.OK.toString(); try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { + Callable callable = block; + if (isHttpTransport && methodName != null) { + callable = withAttemptMetrics(block, methodName); + } return RetryHelper.runWithRetries( - block, this.retrySettings, EXCEPTION_HANDLER, this.datastoreOptions.getClock()); + callable, this.retrySettings, EXCEPTION_HANDLER, this.datastoreOptions.getClock()); } catch (RetryHelperException e) { + operationStatus = DatastoreException.extractStatusCode(e); span.end(e); throw DatastoreException.translateAndThrow(e); } finally { + if (isHttpTransport && methodName != null) { + Map attrs = buildMetricAttributes(methodName, operationStatus); + metricsRecorder.recordOperationLatency( + operationStopwatch.elapsed(TimeUnit.MILLISECONDS), attrs); + metricsRecorder.recordOperationCount(1, attrs); + } span.end(); } } + + private Callable withAttemptMetrics(Callable callable, String methodName) { + return () -> { + Stopwatch sw = Stopwatch.createStarted(); + String status = StatusCode.Code.OK.toString(); + try { + return callable.call(); + } catch (Exception e) { + status = DatastoreException.extractStatusCode(e); + throw e; + } finally { + Map attrs = buildMetricAttributes(methodName, status); + metricsRecorder.recordAttemptLatency(sw.elapsed(TimeUnit.MILLISECONDS), attrs); + metricsRecorder.recordAttemptCount(1, attrs); + } + }; + } + + private Map buildMetricAttributes(String methodName, String status) { + Map attributes = new HashMap<>(); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_METHOD, methodName); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, status); + attributes.put( + TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, datastoreOptions.getProjectId()); + attributes.put( + TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, datastoreOptions.getDatabaseId()); + attributes.put( + TelemetryConstants.ATTRIBUTES_KEY_TRANSPORT, + TelemetryConstants.getTransportName(datastoreOptions.getTransportOptions())); + return attributes; + } } diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/MetricsRecorder.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/MetricsRecorder.java index 6a0ea16b5dd6..a71cb9b7c25c 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/MetricsRecorder.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/MetricsRecorder.java @@ -38,6 +38,18 @@ public interface MetricsRecorder { /** Records the number of attempts a transaction took. */ void recordTransactionAttemptCount(long count, Map attributes); + /** Records the latency of a single RPC attempt in milliseconds. */ + void recordAttemptLatency(double latencyMs, Map attributes); + + /** Records the count of a single RPC attempt. */ + void recordAttemptCount(long count, Map attributes); + + /** Records the total latency of an operation (including retries) in milliseconds. */ + void recordOperationLatency(double latencyMs, Map attributes); + + /** Records the count of an operation. */ + void recordOperationCount(long count, Map attributes); + /** * Returns a {@link MetricsRecorder} instance based on the provided OpenTelemetry options. * diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/NoOpMetricsRecorder.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/NoOpMetricsRecorder.java index 3ac5f31a023c..28fa4eb257d7 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/NoOpMetricsRecorder.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/NoOpMetricsRecorder.java @@ -33,4 +33,24 @@ public void recordTransactionLatency(double latencyMs, Map attri public void recordTransactionAttemptCount(long count, Map attributes) { /* No-Op OTel Operation */ } + + @Override + public void recordAttemptLatency(double latencyMs, Map attributes) { + /* No-Op OTel Operation */ + } + + @Override + public void recordAttemptCount(long count, Map attributes) { + /* No-Op OTel Operation */ + } + + @Override + public void recordOperationLatency(double latencyMs, Map attributes) { + /* No-Op OTel Operation */ + } + + @Override + public void recordOperationCount(long count, Map attributes) { + /* No-Op OTel Operation */ + } } diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorder.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorder.java index e50edbd5f79f..6314bbc6413c 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorder.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorder.java @@ -33,6 +33,10 @@ class OpenTelemetryMetricsRecorder implements MetricsRecorder { private final DoubleHistogram transactionLatency; private final LongCounter transactionAttemptCount; + private final DoubleHistogram attemptLatency; + private final LongCounter attemptCount; + private final DoubleHistogram operationLatency; + private final LongCounter operationCount; OpenTelemetryMetricsRecorder(@Nonnull OpenTelemetry openTelemetry) { this.openTelemetry = openTelemetry; @@ -51,6 +55,34 @@ class OpenTelemetryMetricsRecorder implements MetricsRecorder { .counterBuilder(TelemetryConstants.METRIC_NAME_TRANSACTION_ATTEMPT_COUNT) .setDescription("Number of attempts to commit a transaction") .build(); + + this.attemptLatency = + meter + .histogramBuilder(TelemetryConstants.METRIC_NAME_ATTEMPT_LATENCY) + .setDescription("Latency of a single RPC attempt") + .setUnit("ms") + .build(); + + this.attemptCount = + meter + .counterBuilder(TelemetryConstants.METRIC_NAME_ATTEMPT_COUNT) + .setDescription("Number of RPC attempts") + .setUnit("1") + .build(); + + this.operationLatency = + meter + .histogramBuilder(TelemetryConstants.METRIC_NAME_OPERATION_LATENCY) + .setDescription("Total latency of an operation including retries") + .setUnit("ms") + .build(); + + this.operationCount = + meter + .counterBuilder(TelemetryConstants.METRIC_NAME_OPERATION_COUNT) + .setDescription("Number of operations") + .setUnit("1") + .build(); } OpenTelemetry getOpenTelemetry() { @@ -67,6 +99,26 @@ public void recordTransactionAttemptCount(long count, Map attrib transactionAttemptCount.add(count, toOtelAttributes(attributes)); } + @Override + public void recordAttemptLatency(double latencyMs, Map attributes) { + attemptLatency.record(latencyMs, toOtelAttributes(attributes)); + } + + @Override + public void recordAttemptCount(long count, Map attributes) { + attemptCount.add(count, toOtelAttributes(attributes)); + } + + @Override + public void recordOperationLatency(double latencyMs, Map attributes) { + operationLatency.record(latencyMs, toOtelAttributes(attributes)); + } + + @Override + public void recordOperationCount(long count, Map attributes) { + operationCount.add(count, toOtelAttributes(attributes)); + } + private static Attributes toOtelAttributes(Map attributes) { AttributesBuilder builder = Attributes.builder(); if (attributes != null) { diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java index f948631d6ac2..9b4cfc5e518e 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java @@ -65,34 +65,64 @@ public class TelemetryConstants { public static final String METRIC_NAME_TRANSACTION_ATTEMPT_COUNT = SERVICE_NAME + "/client/transaction_attempt_count"; - /* TODO(lawrenceqiu): For now, these are a duplicate of method names in TraceUtil. Those will use these eventually */ - // Format is not SnakeCase to keep backward compatibility with the existing values TraceUtil spans - public static final String METHOD_ALLOCATE_IDS = "AllocateIds"; - public static final String METHOD_BEGIN_TRANSACTION = "Transaction.Begin"; - public static final String METHOD_COMMIT = "Commit"; - public static final String METHOD_LOOKUP = "Lookup"; - public static final String METHOD_RESERVE_IDS = "ReserveIds"; - public static final String METHOD_RUN_QUERY = "RunQuery"; - public static final String METHOD_TRANSACTION_COMMIT = "Transaction.Commit"; - public static final String METHOD_TRANSACTION_LOOKUP = "Transaction.Lookup"; - public static final String METHOD_TRANSACTION_RUN = "Transaction.Run"; - public static final String METHOD_TRANSACTION_RUN_QUERY = "Transaction.RunQuery"; - public static final String METHOD_TRANSACTION_ROLLBACK = "Transaction.Rollback"; - public static final String METHOD_TRANSACTION_RUN_AGGREGATION_QUERY = - "Transaction.RunAggregationQuery"; - public static final String METHOD_ADD = "add"; - public static final String METHOD_PUT = "put"; - public static final String METHOD_UPDATE = "update"; - public static final String METHOD_DELETE = "delete"; - public static final String METHOD_SUBMIT = "submit"; + /** Metric name for the total latency of an operation (one full RPC call including retries). */ + public static final String METRIC_NAME_OPERATION_LATENCY = + SERVICE_NAME + "/operation_latency"; - private TelemetryConstants() {} + /** Metric name for the latency of a single RPC attempt. */ + public static final String METRIC_NAME_ATTEMPT_LATENCY = + SERVICE_NAME + "/attempt_latency"; + + /** Metric name for the count of operations. */ + public static final String METRIC_NAME_OPERATION_COUNT = + SERVICE_NAME + "/operation_count"; + + /** Metric name for the count of RPC attempts. */ + public static final String METRIC_NAME_ATTEMPT_COUNT = + SERVICE_NAME + "/attempt_count"; + + + // This is intentionally different from the `SERVICE_NAME` constant as it matches Gax's logic for + // method name. + static final String METHOD_SERVICE_NAME = "Datastore"; + + // + public static final String METHOD_ALLOCATE_IDS = METHOD_SERVICE_NAME + ".AllocateIds"; + public static final String METHOD_BEGIN_TRANSACTION = METHOD_SERVICE_NAME + ".BeginTransaction"; + public static final String METHOD_COMMIT = METHOD_SERVICE_NAME + ".Commit"; + public static final String METHOD_LOOKUP = METHOD_SERVICE_NAME + ".Lookup"; + public static final String METHOD_RESERVE_IDS = METHOD_SERVICE_NAME + ".ReserveIds"; + public static final String METHOD_ROLLBACK = METHOD_SERVICE_NAME + ".Rollback"; + public static final String METHOD_RUN_QUERY = METHOD_SERVICE_NAME + ".RunQuery"; + public static final String METHOD_RUN_AGGREGATION_QUERY = METHOD_SERVICE_NAME + ".RunAggregationQuery"; + + // These metrics capture the specific transaction related + public static final String METHOD_TRANSACTION_COMMIT = METHOD_SERVICE_NAME + ".Transaction.Commit"; + public static final String METHOD_TRANSACTION_RUN = METHOD_SERVICE_NAME + ".Transaction.Run"; + + + public enum Transport { + GRPC("grpc"), + HTTP("http"); + + private final String transport; + + Transport(String transport) { + this.transport = transport; + } + + public String getTransport() { + return transport; + } + } public static String getTransportName(TransportOptions transportOptions) { if (transportOptions instanceof GrpcTransportOptions) { - return "grpc"; + return Transport.GRPC.getTransport(); } else { - return "http"; + return Transport.HTTP.getTransport(); } } + + private TelemetryConstants() {} } diff --git a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java index c8a0e079af7a..7f7141c6dc99 100644 --- a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java +++ b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java @@ -24,10 +24,14 @@ import com.google.cloud.datastore.spi.DatastoreRpcFactory; import com.google.cloud.datastore.spi.v1.DatastoreRpc; import com.google.cloud.datastore.telemetry.TelemetryConstants; +import com.google.datastore.v1.AllocateIdsRequest; +import com.google.datastore.v1.AllocateIdsResponse; import com.google.datastore.v1.BeginTransactionRequest; import com.google.datastore.v1.BeginTransactionResponse; import com.google.datastore.v1.CommitRequest; import com.google.datastore.v1.CommitResponse; +import com.google.datastore.v1.LookupRequest; +import com.google.datastore.v1.LookupResponse; import com.google.datastore.v1.RollbackRequest; import com.google.datastore.v1.RollbackResponse; import com.google.datastore.v1.TransactionOptions; @@ -498,6 +502,229 @@ public void runInTransaction_withTransactionOptions_recordsMetrics() { EasyMock.verify(rpcMock); } + @Test + public void lookup_recordsOperationAndAttemptMetrics() { + EasyMock.expect(rpcMock.lookup(EasyMock.anyObject(LookupRequest.class))) + .andReturn(LookupResponse.getDefaultInstance()); + EasyMock.replay(rpcMock); + + // Use get() which triggers lookup internally + datastore.get( + Key.newBuilder(PROJECT_ID, "Kind", "name").setDatabaseId(DATABASE_ID).build()); + + Collection metrics = metricReader.collectAllMetrics(); + + // Verify operation latency + Optional operationLatency = + findMetric(metrics, TelemetryConstants.METRIC_NAME_OPERATION_LATENCY); + assertThat(operationLatency.isPresent()).isTrue(); + HistogramPointData opLatencyPoint = + operationLatency.get().getHistogramData().getPoints().stream() + .filter( + p -> + dataContainsStringAttribute( + p, + TelemetryConstants.ATTRIBUTES_KEY_METHOD, + TelemetryConstants.METHOD_LOOKUP)) + .findFirst() + .orElse(null); + assertThat(opLatencyPoint).isNotNull(); + assertThat(opLatencyPoint.getCount()).isEqualTo(1); + assertThat( + dataContainsStringAttribute( + opLatencyPoint, + TelemetryConstants.ATTRIBUTES_KEY_STATUS, + StatusCode.Code.OK.toString())) + .isTrue(); + assertThat( + dataContainsStringAttribute( + opLatencyPoint, TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, PROJECT_ID)) + .isTrue(); + assertThat( + dataContainsStringAttribute( + opLatencyPoint, TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, DATABASE_ID)) + .isTrue(); + + // Verify operation count + Optional operationCount = + findMetric(metrics, TelemetryConstants.METRIC_NAME_OPERATION_COUNT); + assertThat(operationCount.isPresent()).isTrue(); + + // Verify attempt latency + Optional attemptLatency = + findMetric(metrics, TelemetryConstants.METRIC_NAME_ATTEMPT_LATENCY); + assertThat(attemptLatency.isPresent()).isTrue(); + HistogramPointData attLatencyPoint = + attemptLatency.get().getHistogramData().getPoints().stream() + .filter( + p -> + dataContainsStringAttribute( + p, + TelemetryConstants.ATTRIBUTES_KEY_METHOD, + TelemetryConstants.METHOD_LOOKUP)) + .findFirst() + .orElse(null); + assertThat(attLatencyPoint).isNotNull(); + assertThat(attLatencyPoint.getCount()).isEqualTo(1); + + // Verify attempt count + Optional attemptCount = + findMetric(metrics, TelemetryConstants.METRIC_NAME_ATTEMPT_COUNT); + assertThat(attemptCount.isPresent()).isTrue(); + + EasyMock.verify(rpcMock); + } + + @Test + public void commit_recordsOperationAndAttemptMetrics() { + EasyMock.expect(rpcMock.commit(EasyMock.anyObject(CommitRequest.class))) + .andReturn(CommitResponse.newBuilder().build()); + EasyMock.replay(rpcMock); + + Key key = Key.newBuilder(PROJECT_ID, "Kind", "name").setDatabaseId(DATABASE_ID).build(); + datastore.delete(key); + + Collection metrics = metricReader.collectAllMetrics(); + + // Verify operation latency for commit + Optional operationLatency = + findMetric(metrics, TelemetryConstants.METRIC_NAME_OPERATION_LATENCY); + assertThat(operationLatency.isPresent()).isTrue(); + HistogramPointData opLatencyPoint = + operationLatency.get().getHistogramData().getPoints().stream() + .filter( + p -> + dataContainsStringAttribute( + p, + TelemetryConstants.ATTRIBUTES_KEY_METHOD, + TelemetryConstants.METHOD_COMMIT)) + .findFirst() + .orElse(null); + assertThat(opLatencyPoint).isNotNull(); + assertThat(opLatencyPoint.getCount()).isEqualTo(1); + assertThat( + dataContainsStringAttribute( + opLatencyPoint, + TelemetryConstants.ATTRIBUTES_KEY_STATUS, + StatusCode.Code.OK.toString())) + .isTrue(); + + // Verify attempt latency for commit + Optional attemptLatency = + findMetric(metrics, TelemetryConstants.METRIC_NAME_ATTEMPT_LATENCY); + assertThat(attemptLatency.isPresent()).isTrue(); + HistogramPointData attLatencyPoint = + attemptLatency.get().getHistogramData().getPoints().stream() + .filter( + p -> + dataContainsStringAttribute( + p, + TelemetryConstants.ATTRIBUTES_KEY_METHOD, + TelemetryConstants.METHOD_COMMIT)) + .findFirst() + .orElse(null); + assertThat(attLatencyPoint).isNotNull(); + + EasyMock.verify(rpcMock); + } + + @Test + public void lookup_recordsFailureStatusOnError() { + EasyMock.expect(rpcMock.lookup(EasyMock.anyObject(LookupRequest.class))) + .andThrow( + new DatastoreException( + 14, "Unavailable", StatusCode.Code.UNAVAILABLE.toString(), false, null)) + .anyTimes(); + EasyMock.replay(rpcMock); + + assertThrows( + DatastoreException.class, + () -> + datastore.get( + Key.newBuilder(PROJECT_ID, "Kind", "name") + .setDatabaseId(DATABASE_ID) + .build())); + + Collection metrics = metricReader.collectAllMetrics(); + + // Verify operation latency with UNAVAILABLE status + Optional operationLatency = + findMetric(metrics, TelemetryConstants.METRIC_NAME_OPERATION_LATENCY); + assertThat(operationLatency.isPresent()).isTrue(); + HistogramPointData opLatencyPoint = + operationLatency.get().getHistogramData().getPoints().stream() + .filter( + p -> + dataContainsStringAttribute( + p, + TelemetryConstants.ATTRIBUTES_KEY_METHOD, + TelemetryConstants.METHOD_LOOKUP)) + .findFirst() + .orElse(null); + assertThat(opLatencyPoint).isNotNull(); + assertThat( + dataContainsStringAttribute( + opLatencyPoint, + TelemetryConstants.ATTRIBUTES_KEY_STATUS, + StatusCode.Code.UNAVAILABLE.toString())) + .isTrue(); + + // Verify attempt metrics were also recorded with UNAVAILABLE + Optional attemptCount = + findMetric(metrics, TelemetryConstants.METRIC_NAME_ATTEMPT_COUNT); + assertThat(attemptCount.isPresent()).isTrue(); + + EasyMock.verify(rpcMock); + } + + @Test + public void allocateIds_recordsOperationAndAttemptMetrics() { + EasyMock.expect(rpcMock.allocateIds(EasyMock.anyObject(AllocateIdsRequest.class))) + .andReturn( + AllocateIdsResponse.newBuilder() + .addKeys( + com.google.datastore.v1.Key.newBuilder() + .addPath( + com.google.datastore.v1.Key.PathElement.newBuilder() + .setKind("Kind") + .setId(123))) + .build()); + EasyMock.replay(rpcMock); + + IncompleteKey key = + IncompleteKey.newBuilder(PROJECT_ID, "Kind").setDatabaseId(DATABASE_ID).build(); + datastore.allocateId(key); + + Collection metrics = metricReader.collectAllMetrics(); + + Optional operationLatency = + findMetric(metrics, TelemetryConstants.METRIC_NAME_OPERATION_LATENCY); + assertThat(operationLatency.isPresent()).isTrue(); + HistogramPointData point = + operationLatency.get().getHistogramData().getPoints().stream() + .filter( + p -> + dataContainsStringAttribute( + p, + TelemetryConstants.ATTRIBUTES_KEY_METHOD, + TelemetryConstants.METHOD_ALLOCATE_IDS)) + .findFirst() + .orElse(null); + assertThat(point).isNotNull(); + assertThat( + dataContainsStringAttribute( + point, + TelemetryConstants.ATTRIBUTES_KEY_STATUS, + StatusCode.Code.OK.toString())) + .isTrue(); + + Optional attemptLatency = + findMetric(metrics, TelemetryConstants.METRIC_NAME_ATTEMPT_LATENCY); + assertThat(attemptLatency.isPresent()).isTrue(); + + EasyMock.verify(rpcMock); + } + private static Optional findMetric( Collection metrics, String metricName) { return metrics.stream().filter(m -> m.getName().equals(metricName)).findFirst(); diff --git a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorderTest.java b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorderTest.java index d9c690680e74..5c2b44694409 100644 --- a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorderTest.java +++ b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorderTest.java @@ -162,4 +162,110 @@ public void recordTransactionLatency_nullAttributes() { Collection metrics = metricReader.collectAllMetrics(); assertThat(metrics).isNotEmpty(); } + + @Test + public void recordAttemptLatency_recordsHistogramWithAttributes() { + Map attributes = new HashMap<>(); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, StatusCode.Code.OK.toString()); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_METHOD, TelemetryConstants.METHOD_LOOKUP); + + recorder.recordAttemptLatency(42.0, attributes); + + Collection metrics = metricReader.collectAllMetrics(); + MetricData metric = + metrics.stream() + .filter(m -> m.getName().equals(TelemetryConstants.METRIC_NAME_ATTEMPT_LATENCY)) + .findFirst() + .orElse(null); + + assertThat(metric).isNotNull(); + assertThat(metric.getDescription()).isEqualTo("Latency of a single RPC attempt"); + assertThat(metric.getUnit()).isEqualTo("ms"); + + HistogramPointData point = + metric.getHistogramData().getPoints().stream().findFirst().orElse(null); + assertThat(point).isNotNull(); + assertThat(point.getSum()).isEqualTo(42.0); + assertThat(point.getCount()).isEqualTo(1); + assertThat( + point + .getAttributes() + .get(AttributeKey.stringKey(TelemetryConstants.ATTRIBUTES_KEY_METHOD))) + .isEqualTo(TelemetryConstants.METHOD_LOOKUP); + } + + @Test + public void recordAttemptCount_recordsCounterWithAttributes() { + Map attributes = new HashMap<>(); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, StatusCode.Code.OK.toString()); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_METHOD, TelemetryConstants.METHOD_COMMIT); + + recorder.recordAttemptCount(1, attributes); + + Collection metrics = metricReader.collectAllMetrics(); + MetricData metric = + metrics.stream() + .filter(m -> m.getName().equals(TelemetryConstants.METRIC_NAME_ATTEMPT_COUNT)) + .findFirst() + .orElse(null); + + assertThat(metric).isNotNull(); + assertThat(metric.getDescription()).isEqualTo("Number of RPC attempts"); + + LongPointData point = + metric.getLongSumData().getPoints().stream().findFirst().orElse(null); + assertThat(point).isNotNull(); + assertThat(point.getValue()).isEqualTo(1); + } + + @Test + public void recordOperationLatency_recordsHistogramWithAttributes() { + Map attributes = new HashMap<>(); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, StatusCode.Code.OK.toString()); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_METHOD, TelemetryConstants.METHOD_RUN_QUERY); + + recorder.recordOperationLatency(200.0, attributes); + + Collection metrics = metricReader.collectAllMetrics(); + MetricData metric = + metrics.stream() + .filter(m -> m.getName().equals(TelemetryConstants.METRIC_NAME_OPERATION_LATENCY)) + .findFirst() + .orElse(null); + + assertThat(metric).isNotNull(); + assertThat(metric.getDescription()).isEqualTo("Total latency of an operation including retries"); + assertThat(metric.getUnit()).isEqualTo("ms"); + + HistogramPointData point = + metric.getHistogramData().getPoints().stream().findFirst().orElse(null); + assertThat(point).isNotNull(); + assertThat(point.getSum()).isEqualTo(200.0); + assertThat(point.getCount()).isEqualTo(1); + } + + @Test + public void recordOperationCount_recordsCounterWithAttributes() { + Map attributes = new HashMap<>(); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, StatusCode.Code.OK.toString()); + attributes.put( + TelemetryConstants.ATTRIBUTES_KEY_METHOD, TelemetryConstants.METHOD_ALLOCATE_IDS); + + recorder.recordOperationCount(1, attributes); + + Collection metrics = metricReader.collectAllMetrics(); + MetricData metric = + metrics.stream() + .filter(m -> m.getName().equals(TelemetryConstants.METRIC_NAME_OPERATION_COUNT)) + .findFirst() + .orElse(null); + + assertThat(metric).isNotNull(); + assertThat(metric.getDescription()).isEqualTo("Number of operations"); + + LongPointData point = + metric.getLongSumData().getPoints().stream().findFirst().orElse(null); + assertThat(point).isNotNull(); + assertThat(point.getValue()).isEqualTo(1); + } } From b3a6022d8bee3346d8ca64a5e3f3f912b1ec0d69 Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Tue, 10 Mar 2026 14:14:45 -0400 Subject: [PATCH 02/15] chore: Migrate to a common runWithObservability helper --- .../google/cloud/datastore/DatastoreImpl.java | 333 ++++++++---------- .../RetryAndTraceDatastoreRpcDecorator.java | 89 ++++- .../telemetry/NoOpMetricsRecorder.java | 8 +- .../telemetry/TelemetryConstants.java | 20 +- .../datastore/DatastoreImplMetricsTest.java | 11 +- .../OpenTelemetryMetricsRecorderTest.java | 9 +- 6 files changed, 233 insertions(+), 237 deletions(-) diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java index d9661826d5ac..fec1dacf917e 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java @@ -37,6 +37,7 @@ import static com.google.cloud.datastore.telemetry.TraceUtil.SPAN_NAME_TRANSACTION_RUN_QUERY; import com.google.api.core.BetaApi; +import com.google.api.gax.retrying.ResultRetryAlgorithm; import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.StatusCode; import com.google.cloud.BaseService; @@ -64,6 +65,7 @@ import com.google.datastore.v1.ExplainOptions; import com.google.datastore.v1.ReadOptions; import com.google.datastore.v1.ReserveIdsRequest; +import com.google.datastore.v1.RollbackRequest; import com.google.datastore.v1.RunQueryResponse; import com.google.datastore.v1.TransactionOptions; import com.google.protobuf.ByteString; @@ -114,9 +116,14 @@ final class DatastoreImpl extends BaseService implements Datas readOptionProtoPreparer = new ReadOptionProtoPreparer(); aggregationQueryExecutor = new AggregationQueryExecutor( - new RetryAndTraceDatastoreRpcDecorator( - datastoreRpc, otelTraceUtil, retrySettings, options, metricsRecorder, - isHttpTransport), + RetryAndTraceDatastoreRpcDecorator.newBuilder() + .setDatastoreRpc(datastoreRpc) + .setTraceUtil(otelTraceUtil) + .setRetrySettings(retrySettings) + .setDatastoreOptions(options) + .setMetricsRecorder(metricsRecorder) + .setIsHttpTransport(isHttpTransport) + .build(), options); } @@ -357,44 +364,37 @@ com.google.datastore.v1.RunQueryResponse runQuery( ReadOptions readOptions = requestPb.getReadOptions(); boolean isTransactional = readOptions.hasTransaction() || readOptions.hasNewTransaction(); String spanName = (isTransactional ? SPAN_NAME_TRANSACTION_RUN_QUERY : SPAN_NAME_RUN_QUERY); - com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(spanName); - Stopwatch operationStopwatch = isHttpTransport ? Stopwatch.createStarted() : null; - String operationStatus = StatusCode.Code.OK.toString(); - try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { - RunQueryResponse response = - RetryHelper.runWithRetries( - withAttemptMetrics( - () -> datastoreRpc.runQuery(requestPb), - TelemetryConstants.METHOD_RUN_QUERY), - retrySettings, - requestPb.getReadOptions().getTransaction().isEmpty() - ? EXCEPTION_HANDLER - : TRANSACTION_OPERATION_EXCEPTION_HANDLER, - getOptions().getClock()); - span.addEvent( - spanName + " complete.", - new ImmutableMap.Builder() - .put(ATTRIBUTES_KEY_DOCUMENT_COUNT, response.getBatch().getEntityResultsCount()) - .put(ATTRIBUTES_KEY_TRANSACTIONAL, isTransactional) - .put(ATTRIBUTES_KEY_READ_CONSISTENCY, readOptions.getReadConsistency().toString()) - .put( - ATTRIBUTES_KEY_TRANSACTION_ID, - (isTransactional - ? requestPb.getReadOptions().getTransaction().toStringUtf8() - : "")) - .put(ATTRIBUTES_KEY_MORE_RESULTS, response.getBatch().getMoreResults().toString()) - .build()); - return response; - } catch (RetryHelperException e) { - operationStatus = DatastoreException.extractStatusCode(e); - span.end(e); - throw DatastoreException.translateAndThrow(e); - } finally { - recordOperationMetrics( - operationStopwatch, TelemetryConstants.METHOD_RUN_QUERY, operationStatus); - span.end(); - } + return runWithObservability( + () -> { + RunQueryResponse response = datastoreRpc.runQuery(requestPb); + com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.getCurrentSpan(); + if (span != null) { + span.addEvent( + spanName + " complete.", + new ImmutableMap.Builder() + .put(ATTRIBUTES_KEY_DOCUMENT_COUNT, response.getBatch().getEntityResultsCount()) + .put(ATTRIBUTES_KEY_TRANSACTIONAL, isTransactional) + .put( + ATTRIBUTES_KEY_READ_CONSISTENCY, + readOptions.getReadConsistency().toString()) + .put( + ATTRIBUTES_KEY_TRANSACTION_ID, + (isTransactional + ? requestPb.getReadOptions().getTransaction().toStringUtf8() + : "")) + .put( + ATTRIBUTES_KEY_MORE_RESULTS, + response.getBatch().getMoreResults().toString()) + .build()); + } + return response; + }, + TelemetryConstants.METHOD_RUN_QUERY, + spanName, + requestPb.getReadOptions().getTransaction().isEmpty() + ? EXCEPTION_HANDLER + : TRANSACTION_OPERATION_EXCEPTION_HANDLER); } @Override @@ -435,27 +435,11 @@ public List allocateId(IncompleteKey... keys) { private com.google.datastore.v1.AllocateIdsResponse allocateIds( final com.google.datastore.v1.AllocateIdsRequest requestPb) { - com.google.cloud.datastore.telemetry.TraceUtil.Span span = - otelTraceUtil.startSpan(SPAN_NAME_ALLOCATE_IDS); - Stopwatch operationStopwatch = isHttpTransport ? Stopwatch.createStarted() : null; - String operationStatus = StatusCode.Code.OK.toString(); - try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { - return RetryHelper.runWithRetries( - withAttemptMetrics( - () -> datastoreRpc.allocateIds(requestPb), - TelemetryConstants.METHOD_ALLOCATE_IDS), - retrySettings, - EXCEPTION_HANDLER, - getOptions().getClock()); - } catch (RetryHelperException e) { - operationStatus = DatastoreException.extractStatusCode(e); - span.end(e); - throw DatastoreException.translateAndThrow(e); - } finally { - recordOperationMetrics( - operationStopwatch, TelemetryConstants.METHOD_ALLOCATE_IDS, operationStatus); - span.end(); - } + return runWithObservability( + () -> datastoreRpc.allocateIds(requestPb), + TelemetryConstants.METHOD_ALLOCATE_IDS, + SPAN_NAME_ALLOCATE_IDS, + EXCEPTION_HANDLER); } private IncompleteKey trimNameOrId(IncompleteKey key) { @@ -600,42 +584,31 @@ com.google.datastore.v1.LookupResponse lookup( ReadOptions readOptions = requestPb.getReadOptions(); boolean isTransactional = readOptions.hasTransaction() || readOptions.hasNewTransaction(); String spanName = (isTransactional ? SPAN_NAME_TRANSACTION_LOOKUP : SPAN_NAME_LOOKUP); - com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(spanName); - Stopwatch operationStopwatch = isHttpTransport ? Stopwatch.createStarted() : null; - String operationStatus = StatusCode.Code.OK.toString(); - try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { - return RetryHelper.runWithRetries( - withAttemptMetrics( - () -> { - com.google.datastore.v1.LookupResponse response = datastoreRpc.lookup(requestPb); - span.addEvent( - spanName + " complete.", - new ImmutableMap.Builder() - .put(ATTRIBUTES_KEY_RECEIVED, response.getFoundCount()) - .put(ATTRIBUTES_KEY_MISSING, response.getMissingCount()) - .put(ATTRIBUTES_KEY_DEFERRED, response.getDeferredCount()) - .put(ATTRIBUTES_KEY_TRANSACTIONAL, isTransactional) - .put( - ATTRIBUTES_KEY_TRANSACTION_ID, - isTransactional ? readOptions.getTransaction().toStringUtf8() : "") - .build()); - return response; - }, - TelemetryConstants.METHOD_LOOKUP), - retrySettings, - requestPb.getReadOptions().getTransaction().isEmpty() - ? EXCEPTION_HANDLER - : TRANSACTION_OPERATION_EXCEPTION_HANDLER, - getOptions().getClock()); - } catch (RetryHelperException e) { - operationStatus = DatastoreException.extractStatusCode(e); - span.end(e); - throw DatastoreException.translateAndThrow(e); - } finally { - recordOperationMetrics(operationStopwatch, TelemetryConstants.METHOD_LOOKUP, operationStatus); - span.end(); - } + return runWithObservability( + () -> { + com.google.datastore.v1.LookupResponse response = datastoreRpc.lookup(requestPb); + com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.getCurrentSpan(); + if (span != null) { + span.addEvent( + spanName + " complete.", + new ImmutableMap.Builder() + .put(ATTRIBUTES_KEY_RECEIVED, response.getFoundCount()) + .put(ATTRIBUTES_KEY_MISSING, response.getMissingCount()) + .put(ATTRIBUTES_KEY_DEFERRED, response.getDeferredCount()) + .put(ATTRIBUTES_KEY_TRANSACTIONAL, isTransactional) + .put( + ATTRIBUTES_KEY_TRANSACTION_ID, + isTransactional ? readOptions.getTransaction().toStringUtf8() : "") + .build()); + } + return response; + }, + TelemetryConstants.METHOD_LOOKUP, + spanName, + requestPb.getReadOptions().getTransaction().isEmpty() + ? EXCEPTION_HANDLER + : TRANSACTION_OPERATION_EXCEPTION_HANDLER); } @Override @@ -658,27 +631,11 @@ public List reserveIds(Key... keys) { com.google.datastore.v1.ReserveIdsResponse reserveIds( final com.google.datastore.v1.ReserveIdsRequest requestPb) { - com.google.cloud.datastore.telemetry.TraceUtil.Span span = - otelTraceUtil.startSpan(SPAN_NAME_RESERVE_IDS); - Stopwatch operationStopwatch = isHttpTransport ? Stopwatch.createStarted() : null; - String operationStatus = StatusCode.Code.OK.toString(); - try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { - return RetryHelper.runWithRetries( - withAttemptMetrics( - () -> datastoreRpc.reserveIds(requestPb), - TelemetryConstants.METHOD_RESERVE_IDS), - retrySettings, - EXCEPTION_HANDLER, - getOptions().getClock()); - } catch (RetryHelperException e) { - operationStatus = DatastoreException.extractStatusCode(e); - span.end(e); - throw DatastoreException.translateAndThrow(e); - } finally { - recordOperationMetrics( - operationStopwatch, TelemetryConstants.METHOD_RESERVE_IDS, operationStatus); - span.end(); - } + return runWithObservability( + () -> datastoreRpc.reserveIds(requestPb), + TelemetryConstants.METHOD_RESERVE_IDS, + SPAN_NAME_RESERVE_IDS, + EXCEPTION_HANDLER); } @Override @@ -774,39 +731,29 @@ com.google.datastore.v1.CommitResponse commit( final boolean isTransactional = requestPb.hasTransaction() || requestPb.hasSingleUseTransaction(); final String spanName = isTransactional ? SPAN_NAME_TRANSACTION_COMMIT : SPAN_NAME_COMMIT; - com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(spanName); - Stopwatch operationStopwatch = isHttpTransport ? Stopwatch.createStarted() : null; - String operationStatus = StatusCode.Code.OK.toString(); - try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { - CommitResponse response = - RetryHelper.runWithRetries( - withAttemptMetrics( - () -> datastoreRpc.commit(requestPb), - TelemetryConstants.METHOD_COMMIT), - retrySettings, - requestPb.getTransaction().isEmpty() - ? EXCEPTION_HANDLER - : TRANSACTION_OPERATION_EXCEPTION_HANDLER, - getOptions().getClock()); - span.addEvent( - spanName + " complete.", - new ImmutableMap.Builder() - .put(ATTRIBUTES_KEY_DOCUMENT_COUNT, response.getMutationResultsCount()) - .put(ATTRIBUTES_KEY_TRANSACTIONAL, isTransactional) - .put( - ATTRIBUTES_KEY_TRANSACTION_ID, - isTransactional ? requestPb.getTransaction().toStringUtf8() : "") - .build()); - return response; - } catch (RetryHelperException e) { - operationStatus = DatastoreException.extractStatusCode(e); - span.end(e); - throw DatastoreException.translateAndThrow(e); - } finally { - recordOperationMetrics( - operationStopwatch, TelemetryConstants.METHOD_COMMIT, operationStatus); - span.end(); - } + + return runWithObservability( + () -> { + CommitResponse response = datastoreRpc.commit(requestPb); + com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.getCurrentSpan(); + if (span != null) { + span.addEvent( + spanName + " complete.", + new ImmutableMap.Builder() + .put(ATTRIBUTES_KEY_DOCUMENT_COUNT, response.getMutationResultsCount()) + .put(ATTRIBUTES_KEY_TRANSACTIONAL, isTransactional) + .put( + ATTRIBUTES_KEY_TRANSACTION_ID, + isTransactional ? requestPb.getTransaction().toStringUtf8() : "") + .build()); + } + return response; + }, + TelemetryConstants.METHOD_COMMIT, + spanName, + requestPb.getTransaction().isEmpty() + ? EXCEPTION_HANDLER + : TRANSACTION_OPERATION_EXCEPTION_HANDLER); } ByteString requestTransactionId( @@ -816,27 +763,11 @@ ByteString requestTransactionId( com.google.datastore.v1.BeginTransactionResponse beginTransaction( final com.google.datastore.v1.BeginTransactionRequest requestPb) { - com.google.cloud.datastore.telemetry.TraceUtil.Span span = - otelTraceUtil.startSpan(SPAN_NAME_BEGIN_TRANSACTION); - Stopwatch operationStopwatch = isHttpTransport ? Stopwatch.createStarted() : null; - String operationStatus = StatusCode.Code.OK.toString(); - try (com.google.cloud.datastore.telemetry.TraceUtil.Scope scope = span.makeCurrent()) { - return RetryHelper.runWithRetries( - withAttemptMetrics( - () -> datastoreRpc.beginTransaction(requestPb), - TelemetryConstants.METHOD_BEGIN_TRANSACTION), - retrySettings, - EXCEPTION_HANDLER, - getOptions().getClock()); - } catch (RetryHelperException e) { - operationStatus = DatastoreException.extractStatusCode(e); - span.end(e); - throw DatastoreException.translateAndThrow(e); - } finally { - recordOperationMetrics( - operationStopwatch, TelemetryConstants.METHOD_BEGIN_TRANSACTION, operationStatus); - span.end(); - } + return runWithObservability( + () -> datastoreRpc.beginTransaction(requestPb), + TelemetryConstants.METHOD_BEGIN_TRANSACTION, + SPAN_NAME_BEGIN_TRANSACTION, + EXCEPTION_HANDLER); } void rollbackTransaction(ByteString transaction) { @@ -848,34 +779,49 @@ void rollbackTransaction(ByteString transaction) { rollback(requestPb.build()); } - void rollback(final com.google.datastore.v1.RollbackRequest requestPb) { - com.google.cloud.datastore.telemetry.TraceUtil.Span span = - otelTraceUtil.startSpan(SPAN_NAME_ROLLBACK); + void rollback(final RollbackRequest requestPb) { + runWithObservability( + () -> { + datastoreRpc.rollback(requestPb); + com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.getCurrentSpan(); + if (span != null) { + span.addEvent( + SPAN_NAME_ROLLBACK, + new ImmutableMap.Builder() + .put(ATTRIBUTES_KEY_TRANSACTION_ID, requestPb.getTransaction().toStringUtf8()) + .build()); + } + return null; + }, + TelemetryConstants.METHOD_ROLLBACK, + SPAN_NAME_ROLLBACK, + EXCEPTION_HANDLER); + } + + private T runWithObservability( + Callable callable, + String methodName, + String spanName, + ResultRetryAlgorithm exceptionHandler) { + com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(spanName); + + // Gax already records operation and attempt metrics. Since Datastore HttpJson does not + // integrate with Gax, manually instrument these metrics when using HttpJson for parity Stopwatch operationStopwatch = isHttpTransport ? Stopwatch.createStarted() : null; String operationStatus = StatusCode.Code.OK.toString(); - try (Scope scope = span.makeCurrent()) { - RetryHelper.runWithRetries( - withAttemptMetrics( - () -> { - datastoreRpc.rollback(requestPb); - return null; - }, - TelemetryConstants.METHOD_ROLLBACK), - retrySettings, - EXCEPTION_HANDLER, - getOptions().getClock()); - span.addEvent( - SPAN_NAME_ROLLBACK, - new ImmutableMap.Builder() - .put(ATTRIBUTES_KEY_TRANSACTION_ID, requestPb.getTransaction().toStringUtf8()) - .build()); + + Callable attemptMetricsCallable = attemptMetricsCallable(callable, methodName); + try (TraceUtil.Scope ignored = span.makeCurrent()) { + return RetryHelper.runWithRetries( + attemptMetricsCallable, retrySettings, exceptionHandler, getOptions().getClock()); } catch (RetryHelperException e) { operationStatus = DatastoreException.extractStatusCode(e); span.end(e); throw DatastoreException.translateAndThrow(e); } finally { - recordOperationMetrics( - operationStopwatch, TelemetryConstants.METHOD_ROLLBACK, operationStatus); + if (isHttpTransport) { + recordOperationMetrics(operationStopwatch, methodName, operationStatus); + } span.end(); } } @@ -892,7 +838,6 @@ private Map buildMetricAttributes(String methodName, String stat return attributes; } - private void recordOperationMetrics( Stopwatch operationStopwatch, String methodName, String status) { if (isHttpTransport) { @@ -903,7 +848,7 @@ private void recordOperationMetrics( } } - private Callable withAttemptMetrics(Callable callable, String methodName) { + private Callable attemptMetricsCallable(Callable callable, String methodName) { if (!isHttpTransport) { return callable; } diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java index a4b74bbdc7e2..788a0917201d 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java @@ -18,13 +18,13 @@ import static com.google.cloud.BaseService.EXCEPTION_HANDLER; import com.google.api.core.InternalApi; -import com.google.api.core.ObsoleteApi; import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.StatusCode; import com.google.cloud.RetryHelper; import com.google.cloud.RetryHelper.RetryHelperException; import com.google.cloud.datastore.spi.v1.DatastoreRpc; import com.google.cloud.datastore.telemetry.MetricsRecorder; +import com.google.cloud.datastore.telemetry.NoOpMetricsRecorder; import com.google.cloud.datastore.telemetry.TelemetryConstants; import com.google.cloud.datastore.telemetry.TraceUtil; import com.google.common.base.Preconditions; @@ -65,27 +65,84 @@ public class RetryAndTraceDatastoreRpcDecorator implements DatastoreRpc { private final MetricsRecorder metricsRecorder; private final boolean isHttpTransport; + /** + * @deprecated use {@link Builder} + */ + @Deprecated public RetryAndTraceDatastoreRpcDecorator( DatastoreRpc datastoreRpc, TraceUtil otelTraceUtil, RetrySettings retrySettings, DatastoreOptions datastoreOptions) { - this(datastoreRpc, otelTraceUtil, retrySettings, datastoreOptions, null, false); - } - - public RetryAndTraceDatastoreRpcDecorator( - DatastoreRpc datastoreRpc, - TraceUtil otelTraceUtil, - RetrySettings retrySettings, - DatastoreOptions datastoreOptions, - MetricsRecorder metricsRecorder, - boolean isHttpTransport) { this.datastoreRpc = datastoreRpc; this.retrySettings = retrySettings; this.datastoreOptions = datastoreOptions; this.otelTraceUtil = otelTraceUtil; - this.metricsRecorder = Preconditions.checkNotNull(metricsRecorder); - this.isHttpTransport = isHttpTransport; + this.metricsRecorder = new NoOpMetricsRecorder(); + this.isHttpTransport = false; + } + + private RetryAndTraceDatastoreRpcDecorator(Builder builder) { + this.datastoreRpc = builder.datastoreRpc; + this.otelTraceUtil = builder.otelTraceUtil; + this.retrySettings = builder.retrySettings; + this.datastoreOptions = builder.datastoreOptions; + this.metricsRecorder = builder.metricsRecorder; + this.isHttpTransport = builder.isHttpTransport; + } + + public static Builder newBuilder() { + return new Builder(); + } + + public static class Builder { + private DatastoreRpc datastoreRpc; + private TraceUtil otelTraceUtil; + private RetrySettings retrySettings; + private DatastoreOptions datastoreOptions; + private MetricsRecorder metricsRecorder = new NoOpMetricsRecorder(); + private boolean isHttpTransport = false; + + private Builder() {} + + public Builder setDatastoreRpc(DatastoreRpc datastoreRpc) { + Preconditions.checkNotNull(datastoreRpc, "datastoreRpc is required"); + this.datastoreRpc = datastoreRpc; + return this; + } + + public Builder setTraceUtil(TraceUtil otelTraceUtil) { + Preconditions.checkNotNull(otelTraceUtil, "otelTraceUtil is required"); + this.otelTraceUtil = otelTraceUtil; + return this; + } + + public Builder setRetrySettings(RetrySettings retrySettings) { + Preconditions.checkNotNull(retrySettings, "retrySettings is required"); + this.retrySettings = retrySettings; + return this; + } + + public Builder setDatastoreOptions(DatastoreOptions datastoreOptions) { + Preconditions.checkNotNull(datastoreOptions, "datastoreOptions is required"); + this.datastoreOptions = datastoreOptions; + return this; + } + + public Builder setMetricsRecorder(MetricsRecorder metricsRecorder) { + Preconditions.checkNotNull(metricsRecorder, "metricsRecorder can not be null"); + this.metricsRecorder = metricsRecorder; + return this; + } + + public Builder setIsHttpTransport(boolean isHttpTransport) { + this.isHttpTransport = isHttpTransport; + return this; + } + + public RetryAndTraceDatastoreRpcDecorator build() { + return new RetryAndTraceDatastoreRpcDecorator(this); + } } @Override @@ -201,10 +258,8 @@ private Map buildMetricAttributes(String methodName, String stat Map attributes = new HashMap<>(); attributes.put(TelemetryConstants.ATTRIBUTES_KEY_METHOD, methodName); attributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, status); - attributes.put( - TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, datastoreOptions.getProjectId()); - attributes.put( - TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, datastoreOptions.getDatabaseId()); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, datastoreOptions.getProjectId()); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, datastoreOptions.getDatabaseId()); attributes.put( TelemetryConstants.ATTRIBUTES_KEY_TRANSPORT, TelemetryConstants.getTransportName(datastoreOptions.getTransportOptions())); diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/NoOpMetricsRecorder.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/NoOpMetricsRecorder.java index 28fa4eb257d7..1523e569505f 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/NoOpMetricsRecorder.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/NoOpMetricsRecorder.java @@ -16,13 +16,19 @@ package com.google.cloud.datastore.telemetry; +import com.google.api.core.InternalApi; import java.util.Map; /** * Metrics recorder implementation, used to stub out metrics instrumentation when metrics are * disabled. + * + *

WARNING: This class is intended for internal use only. It was made public to be used across + * packages as a default. It should not be used by external customers and its API may change without + * notice. */ -class NoOpMetricsRecorder implements MetricsRecorder { +@InternalApi +public class NoOpMetricsRecorder implements MetricsRecorder { @Override public void recordTransactionLatency(double latencyMs, Map attributes) { diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java index 9b4cfc5e518e..99eb801ccbc3 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java @@ -66,21 +66,16 @@ public class TelemetryConstants { SERVICE_NAME + "/client/transaction_attempt_count"; /** Metric name for the total latency of an operation (one full RPC call including retries). */ - public static final String METRIC_NAME_OPERATION_LATENCY = - SERVICE_NAME + "/operation_latency"; + public static final String METRIC_NAME_OPERATION_LATENCY = SERVICE_NAME + "/operation_latency"; /** Metric name for the latency of a single RPC attempt. */ - public static final String METRIC_NAME_ATTEMPT_LATENCY = - SERVICE_NAME + "/attempt_latency"; + public static final String METRIC_NAME_ATTEMPT_LATENCY = SERVICE_NAME + "/attempt_latency"; /** Metric name for the count of operations. */ - public static final String METRIC_NAME_OPERATION_COUNT = - SERVICE_NAME + "/operation_count"; + public static final String METRIC_NAME_OPERATION_COUNT = SERVICE_NAME + "/operation_count"; /** Metric name for the count of RPC attempts. */ - public static final String METRIC_NAME_ATTEMPT_COUNT = - SERVICE_NAME + "/attempt_count"; - + public static final String METRIC_NAME_ATTEMPT_COUNT = SERVICE_NAME + "/attempt_count"; // This is intentionally different from the `SERVICE_NAME` constant as it matches Gax's logic for // method name. @@ -94,13 +89,14 @@ public class TelemetryConstants { public static final String METHOD_RESERVE_IDS = METHOD_SERVICE_NAME + ".ReserveIds"; public static final String METHOD_ROLLBACK = METHOD_SERVICE_NAME + ".Rollback"; public static final String METHOD_RUN_QUERY = METHOD_SERVICE_NAME + ".RunQuery"; - public static final String METHOD_RUN_AGGREGATION_QUERY = METHOD_SERVICE_NAME + ".RunAggregationQuery"; + public static final String METHOD_RUN_AGGREGATION_QUERY = + METHOD_SERVICE_NAME + ".RunAggregationQuery"; // These metrics capture the specific transaction related - public static final String METHOD_TRANSACTION_COMMIT = METHOD_SERVICE_NAME + ".Transaction.Commit"; + public static final String METHOD_TRANSACTION_COMMIT = + METHOD_SERVICE_NAME + ".Transaction.Commit"; public static final String METHOD_TRANSACTION_RUN = METHOD_SERVICE_NAME + ".Transaction.Run"; - public enum Transport { GRPC("grpc"), HTTP("http"); diff --git a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java index 7f7141c6dc99..0f0f50beb544 100644 --- a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java +++ b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java @@ -509,8 +509,7 @@ public void lookup_recordsOperationAndAttemptMetrics() { EasyMock.replay(rpcMock); // Use get() which triggers lookup internally - datastore.get( - Key.newBuilder(PROJECT_ID, "Kind", "name").setDatabaseId(DATABASE_ID).build()); + datastore.get(Key.newBuilder(PROJECT_ID, "Kind", "name").setDatabaseId(DATABASE_ID).build()); Collection metrics = metricReader.collectAllMetrics(); @@ -641,9 +640,7 @@ public void lookup_recordsFailureStatusOnError() { DatastoreException.class, () -> datastore.get( - Key.newBuilder(PROJECT_ID, "Kind", "name") - .setDatabaseId(DATABASE_ID) - .build())); + Key.newBuilder(PROJECT_ID, "Kind", "name").setDatabaseId(DATABASE_ID).build())); Collection metrics = metricReader.collectAllMetrics(); @@ -713,9 +710,7 @@ public void allocateIds_recordsOperationAndAttemptMetrics() { assertThat(point).isNotNull(); assertThat( dataContainsStringAttribute( - point, - TelemetryConstants.ATTRIBUTES_KEY_STATUS, - StatusCode.Code.OK.toString())) + point, TelemetryConstants.ATTRIBUTES_KEY_STATUS, StatusCode.Code.OK.toString())) .isTrue(); Optional attemptLatency = diff --git a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorderTest.java b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorderTest.java index 5c2b44694409..1233e4f1bfac 100644 --- a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorderTest.java +++ b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorderTest.java @@ -212,8 +212,7 @@ public void recordAttemptCount_recordsCounterWithAttributes() { assertThat(metric).isNotNull(); assertThat(metric.getDescription()).isEqualTo("Number of RPC attempts"); - LongPointData point = - metric.getLongSumData().getPoints().stream().findFirst().orElse(null); + LongPointData point = metric.getLongSumData().getPoints().stream().findFirst().orElse(null); assertThat(point).isNotNull(); assertThat(point.getValue()).isEqualTo(1); } @@ -234,7 +233,8 @@ public void recordOperationLatency_recordsHistogramWithAttributes() { .orElse(null); assertThat(metric).isNotNull(); - assertThat(metric.getDescription()).isEqualTo("Total latency of an operation including retries"); + assertThat(metric.getDescription()) + .isEqualTo("Total latency of an operation including retries"); assertThat(metric.getUnit()).isEqualTo("ms"); HistogramPointData point = @@ -263,8 +263,7 @@ public void recordOperationCount_recordsCounterWithAttributes() { assertThat(metric).isNotNull(); assertThat(metric.getDescription()).isEqualTo("Number of operations"); - LongPointData point = - metric.getLongSumData().getPoints().stream().findFirst().orElse(null); + LongPointData point = metric.getLongSumData().getPoints().stream().findFirst().orElse(null); assertThat(point).isNotNull(); assertThat(point.getValue()).isEqualTo(1); } From fc4ddaee6a439585f9b5b77f36075dd1f6f10a2a Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Tue, 10 Mar 2026 14:29:47 -0400 Subject: [PATCH 03/15] chore: Add comments for the helper methods --- .../google/cloud/datastore/DatastoreImpl.java | 30 +++++++++++++++++-- 1 file changed, 28 insertions(+), 2 deletions(-) diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java index fec1dacf917e..3f5ceac1c18e 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java @@ -826,6 +826,14 @@ private T runWithObservability( } } + /** + * Method to build a map of attributes to be used across both operation and attempt level + * metrics. + * + * @param methodName The name of the API method. + * @param status The status of the operation or attempt. + * @return The map of attributes. + */ private Map buildMetricAttributes(String methodName, String status) { Map attributes = new HashMap<>(); attributes.put(TelemetryConstants.ATTRIBUTES_KEY_METHOD, methodName); @@ -838,6 +846,14 @@ private Map buildMetricAttributes(String methodName, String stat return attributes; } + /** + * Method to record operation level metrics for HttpJson transport. This method should + * be called after the entire operation across all retry attempts has completed. + * + * @param operationStopwatch The stopwatch tracking the duration of the entire operation. + * @param methodName The name of the API method. + * @param status The final status of the operation after all retries. + */ private void recordOperationMetrics( Stopwatch operationStopwatch, String methodName, String status) { if (isHttpTransport) { @@ -848,12 +864,22 @@ private void recordOperationMetrics( } } + /** + * Wraps a callable with logic to record attempt-level metrics for HttpJson transport. Attempt + * metrics are recorded for each individual execution of the callable, regardless of whether it + * succeeds or fails. + * + * @param callable The original callable to execute. + * @param methodName The name of the API method. + * @param The return type of the callable. + * @return A wrapped callable that includes attempt-level metrics recording. + */ private Callable attemptMetricsCallable(Callable callable, String methodName) { if (!isHttpTransport) { return callable; } return () -> { - Stopwatch sw = Stopwatch.createStarted(); + Stopwatch stopwatch = Stopwatch.createStarted(); String status = StatusCode.Code.OK.toString(); try { return callable.call(); @@ -862,7 +888,7 @@ private Callable attemptMetricsCallable(Callable callable, String meth throw e; } finally { Map attributes = buildMetricAttributes(methodName, status); - metricsRecorder.recordAttemptLatency(sw.elapsed(TimeUnit.MILLISECONDS), attributes); + metricsRecorder.recordAttemptLatency(stopwatch.elapsed(TimeUnit.MILLISECONDS), attributes); metricsRecorder.recordAttemptCount(1, attributes); } }; From 84fc8d876186cc9ac5904f4d3bc7024c0a5b4934 Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Tue, 10 Mar 2026 14:35:55 -0400 Subject: [PATCH 04/15] chore: Refactor RetryAndTraceDatastoreRpcDecorator to check for valid input --- .../RetryAndTraceDatastoreRpcDecorator.java | 25 ++++++++----------- 1 file changed, 11 insertions(+), 14 deletions(-) diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java index 788a0917201d..33bbdcde546c 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java @@ -18,6 +18,7 @@ import static com.google.cloud.BaseService.EXCEPTION_HANDLER; import com.google.api.core.InternalApi; +import com.google.api.core.ObsoleteApi; import com.google.api.gax.retrying.RetrySettings; import com.google.api.gax.rpc.StatusCode; import com.google.cloud.RetryHelper; @@ -27,6 +28,7 @@ import com.google.cloud.datastore.telemetry.NoOpMetricsRecorder; import com.google.cloud.datastore.telemetry.TelemetryConstants; import com.google.cloud.datastore.telemetry.TraceUtil; +import com.google.cloud.http.HttpTransportOptions; import com.google.common.base.Preconditions; import com.google.common.base.Stopwatch; import com.google.datastore.v1.AllocateIdsRequest; @@ -65,10 +67,7 @@ public class RetryAndTraceDatastoreRpcDecorator implements DatastoreRpc { private final MetricsRecorder metricsRecorder; private final boolean isHttpTransport; - /** - * @deprecated use {@link Builder} - */ - @Deprecated + @ObsoleteApi("Prefer to create RetryAndTraceDatastoreRpcDecorator via the Builder") public RetryAndTraceDatastoreRpcDecorator( DatastoreRpc datastoreRpc, TraceUtil otelTraceUtil, @@ -79,7 +78,7 @@ public RetryAndTraceDatastoreRpcDecorator( this.datastoreOptions = datastoreOptions; this.otelTraceUtil = otelTraceUtil; this.metricsRecorder = new NoOpMetricsRecorder(); - this.isHttpTransport = false; + this.isHttpTransport = datastoreOptions.getTransportOptions() instanceof HttpTransportOptions; } private RetryAndTraceDatastoreRpcDecorator(Builder builder) { @@ -100,31 +99,29 @@ public static class Builder { private TraceUtil otelTraceUtil; private RetrySettings retrySettings; private DatastoreOptions datastoreOptions; + + // Defaults configured for this class private MetricsRecorder metricsRecorder = new NoOpMetricsRecorder(); private boolean isHttpTransport = false; private Builder() {} public Builder setDatastoreRpc(DatastoreRpc datastoreRpc) { - Preconditions.checkNotNull(datastoreRpc, "datastoreRpc is required"); this.datastoreRpc = datastoreRpc; return this; } public Builder setTraceUtil(TraceUtil otelTraceUtil) { - Preconditions.checkNotNull(otelTraceUtil, "otelTraceUtil is required"); this.otelTraceUtil = otelTraceUtil; return this; } public Builder setRetrySettings(RetrySettings retrySettings) { - Preconditions.checkNotNull(retrySettings, "retrySettings is required"); this.retrySettings = retrySettings; return this; } public Builder setDatastoreOptions(DatastoreOptions datastoreOptions) { - Preconditions.checkNotNull(datastoreOptions, "datastoreOptions is required"); this.datastoreOptions = datastoreOptions; return this; } @@ -135,12 +132,12 @@ public Builder setMetricsRecorder(MetricsRecorder metricsRecorder) { return this; } - public Builder setIsHttpTransport(boolean isHttpTransport) { - this.isHttpTransport = isHttpTransport; - return this; - } - public RetryAndTraceDatastoreRpcDecorator build() { + Preconditions.checkNotNull(datastoreRpc, "datastoreRpc is required"); + Preconditions.checkNotNull(otelTraceUtil, "otelTraceUtil is required"); + Preconditions.checkNotNull(retrySettings, "retrySettings is required"); + Preconditions.checkNotNull(datastoreOptions, "datastoreOptions is required"); + this.isHttpTransport = datastoreOptions.getTransportOptions() instanceof HttpTransportOptions; return new RetryAndTraceDatastoreRpcDecorator(this); } } From 938761958b2ac139d6ce570e1e7735ec27772deb Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Tue, 10 Mar 2026 15:17:41 -0400 Subject: [PATCH 05/15] chore: Refactor to a shared spot for operation and attempt metrics --- .../cloud/datastore/DatastoreException.java | 4 +- .../google/cloud/datastore/DatastoreImpl.java | 85 +++---------------- .../RetryAndTraceDatastoreRpcDecorator.java | 59 ++++--------- 3 files changed, 31 insertions(+), 117 deletions(-) diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreException.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreException.java index 6312bc76bb6e..5591cbb2f46f 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreException.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreException.java @@ -16,6 +16,7 @@ package com.google.cloud.datastore; +import com.google.api.core.InternalApi; import com.google.api.gax.grpc.GrpcStatusCode; import com.google.api.gax.rpc.ApiException; import com.google.api.gax.rpc.StatusCode; @@ -176,7 +177,8 @@ static DatastoreException propagateUserException(Exception ex) { * @param throwable the throwable to extract the status code from * @return the status code name, or "UNKNOWN" if not determinable */ - static String extractStatusCode(Throwable throwable) { + @InternalApi + public static String extractStatusCode(Throwable throwable) { Throwable current = throwable; while (current != null) { if (current instanceof DatastoreException) { diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java index beb7f0d62a85..1aacf9e6b4b6 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java @@ -50,6 +50,7 @@ import com.google.cloud.datastore.spi.v1.DatastoreRpc; import com.google.cloud.datastore.telemetry.MetricsRecorder; import com.google.cloud.datastore.telemetry.TelemetryConstants; +import com.google.cloud.datastore.telemetry.TelemetryUtils; import com.google.cloud.datastore.telemetry.TraceUtil; import com.google.cloud.datastore.telemetry.TraceUtil.Scope; import com.google.cloud.http.HttpTransportOptions; @@ -809,86 +810,26 @@ private T runWithObservability( Stopwatch operationStopwatch = isHttpTransport ? Stopwatch.createStarted() : null; String operationStatus = StatusCode.Code.OK.toString(); - Callable attemptMetricsCallable = attemptMetricsCallable(callable, methodName); + DatastoreOptions options = getOptions(); + callable = + TelemetryUtils.attemptMetricsCallable( + callable, metricsRecorder, options, isHttpTransport, methodName); try (TraceUtil.Scope ignored = span.makeCurrent()) { return RetryHelper.runWithRetries( - attemptMetricsCallable, retrySettings, exceptionHandler, getOptions().getClock()); + callable, retrySettings, exceptionHandler, options.getClock()); } catch (RetryHelperException e) { operationStatus = DatastoreException.extractStatusCode(e); span.end(e); throw DatastoreException.translateAndThrow(e); } finally { - if (isHttpTransport) { - recordOperationMetrics(operationStopwatch, methodName, operationStatus); - } + TelemetryUtils.recordOperationMetrics( + metricsRecorder, + options, + isHttpTransport, + operationStopwatch, + methodName, + operationStatus); span.end(); } } - - /** - * Method to build a map of attributes to be used across both operation and attempt level metrics. - * - * @param methodName The name of the API method. - * @param status The status of the operation or attempt. - * @return The map of attributes. - */ - private Map buildMetricAttributes(String methodName, String status) { - Map attributes = new HashMap<>(); - attributes.put(TelemetryConstants.ATTRIBUTES_KEY_METHOD, methodName); - attributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, status); - attributes.put(TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, getOptions().getProjectId()); - attributes.put(TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, getOptions().getDatabaseId()); - attributes.put( - TelemetryConstants.ATTRIBUTES_KEY_TRANSPORT, - TelemetryConstants.getTransportName(getOptions().getTransportOptions())); - return attributes; - } - - /** - * Method to record operation level metrics for HttpJson transport. This method should be called - * after the entire operation across all retry attempts has completed. - * - * @param operationStopwatch The stopwatch tracking the duration of the entire operation. - * @param methodName The name of the API method. - * @param status The final status of the operation after all retries. - */ - private void recordOperationMetrics( - Stopwatch operationStopwatch, String methodName, String status) { - if (isHttpTransport) { - Map attributes = buildMetricAttributes(methodName, status); - metricsRecorder.recordOperationLatency( - operationStopwatch.elapsed(TimeUnit.MILLISECONDS), attributes); - metricsRecorder.recordOperationCount(1, attributes); - } - } - - /** - * Wraps a callable with logic to record attempt-level metrics for HttpJson transport. Attempt - * metrics are recorded for each individual execution of the callable, regardless of whether it - * succeeds or fails. - * - * @param callable The original callable to execute. - * @param methodName The name of the API method. - * @param The return type of the callable. - * @return A wrapped callable that includes attempt-level metrics recording. - */ - private Callable attemptMetricsCallable(Callable callable, String methodName) { - if (!isHttpTransport) { - return callable; - } - return () -> { - Stopwatch stopwatch = Stopwatch.createStarted(); - String status = StatusCode.Code.OK.toString(); - try { - return callable.call(); - } catch (Exception e) { - status = DatastoreException.extractStatusCode(e); - throw e; - } finally { - Map attributes = buildMetricAttributes(methodName, status); - metricsRecorder.recordAttemptLatency(stopwatch.elapsed(TimeUnit.MILLISECONDS), attributes); - metricsRecorder.recordAttemptCount(1, attributes); - } - }; - } } diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java index 33bbdcde546c..739a2c86ccc7 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java @@ -27,6 +27,7 @@ import com.google.cloud.datastore.telemetry.MetricsRecorder; import com.google.cloud.datastore.telemetry.NoOpMetricsRecorder; import com.google.cloud.datastore.telemetry.TelemetryConstants; +import com.google.cloud.datastore.telemetry.TelemetryUtils; import com.google.cloud.datastore.telemetry.TraceUtil; import com.google.cloud.http.HttpTransportOptions; import com.google.common.base.Preconditions; @@ -48,10 +49,7 @@ import com.google.datastore.v1.RunAggregationQueryResponse; import com.google.datastore.v1.RunQueryRequest; import com.google.datastore.v1.RunQueryResponse; -import java.util.HashMap; -import java.util.Map; import java.util.concurrent.Callable; -import java.util.concurrent.TimeUnit; /** * An implementation of {@link DatastoreRpc} which acts as a Decorator and decorates the underlying @@ -208,14 +206,15 @@ public O invokeRpc(Callable block, String startSpan) { } O invokeRpc(Callable block, String startSpan, String methodName) { - com.google.cloud.datastore.telemetry.TraceUtil.Span span = otelTraceUtil.startSpan(startSpan); - Stopwatch operationStopwatch = - isHttpTransport && methodName != null ? Stopwatch.createStarted() : null; + TraceUtil.Span span = otelTraceUtil.startSpan(startSpan); + Stopwatch stopwatch = isHttpTransport ? Stopwatch.createStarted() : null; String operationStatus = StatusCode.Code.OK.toString(); - try (com.google.cloud.datastore.telemetry.TraceUtil.Scope ignored = span.makeCurrent()) { + try (TraceUtil.Scope ignored = span.makeCurrent()) { Callable callable = block; - if (isHttpTransport && methodName != null) { - callable = withAttemptMetrics(block, methodName); + if (isHttpTransport) { + callable = + TelemetryUtils.attemptMetricsCallable( + block, metricsRecorder, datastoreOptions, isHttpTransport, methodName); } return RetryHelper.runWithRetries( callable, this.retrySettings, EXCEPTION_HANDLER, this.datastoreOptions.getClock()); @@ -224,42 +223,14 @@ O invokeRpc(Callable block, String startSpan, String methodName) { span.end(e); throw DatastoreException.translateAndThrow(e); } finally { - if (isHttpTransport && methodName != null) { - Map attrs = buildMetricAttributes(methodName, operationStatus); - metricsRecorder.recordOperationLatency( - operationStopwatch.elapsed(TimeUnit.MILLISECONDS), attrs); - metricsRecorder.recordOperationCount(1, attrs); - } + TelemetryUtils.recordOperationMetrics( + metricsRecorder, + datastoreOptions, + isHttpTransport, + stopwatch, + methodName, + operationStatus); span.end(); } } - - private Callable withAttemptMetrics(Callable callable, String methodName) { - return () -> { - Stopwatch sw = Stopwatch.createStarted(); - String status = StatusCode.Code.OK.toString(); - try { - return callable.call(); - } catch (Exception e) { - status = DatastoreException.extractStatusCode(e); - throw e; - } finally { - Map attrs = buildMetricAttributes(methodName, status); - metricsRecorder.recordAttemptLatency(sw.elapsed(TimeUnit.MILLISECONDS), attrs); - metricsRecorder.recordAttemptCount(1, attrs); - } - }; - } - - private Map buildMetricAttributes(String methodName, String status) { - Map attributes = new HashMap<>(); - attributes.put(TelemetryConstants.ATTRIBUTES_KEY_METHOD, methodName); - attributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, status); - attributes.put(TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, datastoreOptions.getProjectId()); - attributes.put(TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, datastoreOptions.getDatabaseId()); - attributes.put( - TelemetryConstants.ATTRIBUTES_KEY_TRANSPORT, - TelemetryConstants.getTransportName(datastoreOptions.getTransportOptions())); - return attributes; - } } From b1dbf299e299cd7972e825f1a0906a798292133b Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Tue, 10 Mar 2026 15:38:47 -0400 Subject: [PATCH 06/15] chore: Fix TelemetryUtils visibility --- .../datastore/telemetry/TelemetryUtils.java | 132 ++++++++++++++++++ 1 file changed, 132 insertions(+) create mode 100644 java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryUtils.java diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryUtils.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryUtils.java new file mode 100644 index 000000000000..37da829793c8 --- /dev/null +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryUtils.java @@ -0,0 +1,132 @@ +/* + * Copyright 2024 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.datastore.telemetry; + +import com.google.api.core.InternalApi; +import com.google.api.gax.rpc.StatusCode; +import com.google.cloud.datastore.DatastoreException; +import com.google.cloud.datastore.DatastoreOptions; +import com.google.common.base.Stopwatch; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; + +/** + * Utility class for common telemetry operations in Datastore. + * + *

WARNING: This class is intended for internal use only. + */ +@InternalApi +public final class TelemetryUtils { + + private TelemetryUtils() {} + + /** + * Method to build a map of attributes to be used across both operation and attempt level metrics. + * + * @param datastoreOptions The DatastoreOptions object. + * @param methodName The name of the API method. + * @param status The status of the operation or attempt. + * @return The map of attributes. + */ + public static Map buildMetricAttributes( + DatastoreOptions datastoreOptions, String methodName, String status) { + Map attributes = new HashMap<>(); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_METHOD, methodName); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, status); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, datastoreOptions.getProjectId()); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, datastoreOptions.getDatabaseId()); + attributes.put( + TelemetryConstants.ATTRIBUTES_KEY_TRANSPORT, + TelemetryConstants.getTransportName(datastoreOptions.getTransportOptions())); + return attributes; + } + + /** + * Method to record operation level metrics for HttpJson transport. This method should be called + * after the entire operation across all retry attempts has completed. + * + * @param metricsRecorder The metrics recorder. + * @param datastoreOptions The DatastoreOptions object. + * @param isHttpTransport Whether the current transport is HTTP. + * @param operationStopwatch The stopwatch tracking the duration of the entire operation. + * @param methodName The name of the API method. + * @param status The final status of the operation after all retries. + */ + public static void recordOperationMetrics( + MetricsRecorder metricsRecorder, + DatastoreOptions datastoreOptions, + boolean isHttpTransport, + Stopwatch operationStopwatch, + String methodName, + String status) { + // Operation metrics are only recorded for HttpJson transport as Gax already records + // operation metrics for gRPC transport. This prevents metrics from being recorded twice + // for gRPC transport. + if (!isHttpTransport) { + return; + } + if (methodName != null) { + Map attributes = buildMetricAttributes(datastoreOptions, methodName, status); + metricsRecorder.recordOperationLatency( + operationStopwatch.elapsed(TimeUnit.MILLISECONDS), attributes); + metricsRecorder.recordOperationCount(1, attributes); + } + } + + /** + * Wraps a callable with logic to record attempt-level metrics for HttpJson transport. Attempt + * metrics are recorded for each individual execution of the callable, regardless of whether it + * succeeds or fails. + * + * @param callable The original callable to execute. + * @param metricsRecorder The metrics recorder. + * @param datastoreOptions The DatastoreOptions object. + * @param isHttpTransport Whether the current transport is HTTP. + * @param methodName The name of the API method. + * @param The return type of the callable. + * @return A wrapped callable that includes attempt-level metrics recording. + */ + public static Callable attemptMetricsCallable( + Callable callable, + MetricsRecorder metricsRecorder, + DatastoreOptions datastoreOptions, + boolean isHttpTransport, + String methodName) { + // Attempt metrics are already recorded by Gax for gRPC transport. This + // prevents the metrics from being recorded twice for gRPC transport. + if (!isHttpTransport) { + return callable; + } + return () -> { + Stopwatch stopwatch = Stopwatch.createStarted(); + String status = StatusCode.Code.OK.toString(); + try { + return callable.call(); + } catch (Exception e) { + status = DatastoreException.extractStatusCode(e); + throw e; + } finally { + Map attributes = + buildMetricAttributes(datastoreOptions, methodName, status); + metricsRecorder.recordAttemptLatency(stopwatch.elapsed(TimeUnit.MILLISECONDS), attributes); + metricsRecorder.recordAttemptCount(1, attributes); + } + }; + } +} From dd1cda72f2bcde4ee1e9be27b1221aeff7cdd903 Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Tue, 10 Mar 2026 15:40:25 -0400 Subject: [PATCH 07/15] chore: Update the copyright year --- .../com/google/cloud/datastore/telemetry/TelemetryUtils.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryUtils.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryUtils.java index 37da829793c8..d6c04abaf66f 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryUtils.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryUtils.java @@ -1,5 +1,5 @@ /* - * Copyright 2024 Google LLC + * Copyright 2026 Google LLC * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. From 4cc3b9b8233ba068e97b664653391c312c1499eb Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Tue, 10 Mar 2026 16:19:36 -0400 Subject: [PATCH 08/15] chore: Remove redundant transport check --- .../datastore/RetryAndTraceDatastoreRpcDecorator.java | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java index 739a2c86ccc7..3f2d773fef22 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java @@ -210,12 +210,9 @@ O invokeRpc(Callable block, String startSpan, String methodName) { Stopwatch stopwatch = isHttpTransport ? Stopwatch.createStarted() : null; String operationStatus = StatusCode.Code.OK.toString(); try (TraceUtil.Scope ignored = span.makeCurrent()) { - Callable callable = block; - if (isHttpTransport) { - callable = - TelemetryUtils.attemptMetricsCallable( - block, metricsRecorder, datastoreOptions, isHttpTransport, methodName); - } + Callable callable = + TelemetryUtils.attemptMetricsCallable( + block, metricsRecorder, datastoreOptions, isHttpTransport, methodName); return RetryHelper.runWithRetries( callable, this.retrySettings, EXCEPTION_HANDLER, this.datastoreOptions.getClock()); } catch (RetryHelperException e) { From f3c3db0423416e5fffa1afbec905700b8b4c1fe3 Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Wed, 11 Mar 2026 13:19:51 -0400 Subject: [PATCH 09/15] chore: Fix ITs --- .../main/java/com/google/cloud/datastore/DatastoreImpl.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java index 1aacf9e6b4b6..d4e64fd0e39b 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java @@ -811,12 +811,12 @@ private T runWithObservability( String operationStatus = StatusCode.Code.OK.toString(); DatastoreOptions options = getOptions(); - callable = + Callable attemptCallable = TelemetryUtils.attemptMetricsCallable( callable, metricsRecorder, options, isHttpTransport, methodName); try (TraceUtil.Scope ignored = span.makeCurrent()) { return RetryHelper.runWithRetries( - callable, retrySettings, exceptionHandler, options.getClock()); + attemptCallable, retrySettings, exceptionHandler, options.getClock()); } catch (RetryHelperException e) { operationStatus = DatastoreException.extractStatusCode(e); span.end(e); From e739053225aca1c8bcd3b0e0efffa5dd9437058f Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Wed, 11 Mar 2026 14:08:56 -0400 Subject: [PATCH 10/15] chore: Fix makeCurrent logic --- .../datastore/telemetry/EnabledTraceUtil.java | 18 ++++++++++++------ 1 file changed, 12 insertions(+), 6 deletions(-) diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/EnabledTraceUtil.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/EnabledTraceUtil.java index 711e94020548..bfcdaf8d23a8 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/EnabledTraceUtil.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/EnabledTraceUtil.java @@ -208,10 +208,13 @@ public TraceUtil.Span setAttribute(String key, boolean value) { } @Override + @SuppressWarnings("MustBeClosedChecker") public Scope makeCurrent() { - try (io.opentelemetry.context.Scope scope = span.makeCurrent()) { - return new Scope(scope); - } + // span.makeCurrent() opens a ThreadLocal scope that binds this span to the current thread. + // We explicitly leave this unclosed and suppress MustBeClosedChecker so that the returned + // TraceUtil.Scope can manage the lifecycle instead, allowing the caller's try-with-resources + // to control when the ThreadLocal context is restored. + return new Scope(span.makeCurrent()); } } @@ -238,10 +241,13 @@ static class Context implements TraceUtil.Context { } @Override + @SuppressWarnings("MustBeClosedChecker") public Scope makeCurrent() { - try (io.opentelemetry.context.Scope scope = context.makeCurrent()) { - return new Scope(scope); - } + // context.makeCurrent() opens a ThreadLocal scope that binds this context to the current thread. + // We explicitly leave this unclosed and suppress MustBeClosedChecker so that the returned + // TraceUtil.Scope can manage the lifecycle instead, allowing the caller's try-with-resources + // to control when the ThreadLocal context is restored. + return new Scope(context.makeCurrent()); } } From ee3e45e2a5b05120ad3c065ccca0860d4500c441 Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Wed, 11 Mar 2026 14:22:33 -0400 Subject: [PATCH 11/15] chore: Fix lint issues --- .../com/google/cloud/datastore/telemetry/EnabledTraceUtil.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/EnabledTraceUtil.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/EnabledTraceUtil.java index bfcdaf8d23a8..3dc8395d170b 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/EnabledTraceUtil.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/EnabledTraceUtil.java @@ -243,7 +243,8 @@ static class Context implements TraceUtil.Context { @Override @SuppressWarnings("MustBeClosedChecker") public Scope makeCurrent() { - // context.makeCurrent() opens a ThreadLocal scope that binds this context to the current thread. + // context.makeCurrent() opens a ThreadLocal scope that binds this context to the current + // thread. // We explicitly leave this unclosed and suppress MustBeClosedChecker so that the returned // TraceUtil.Scope can manage the lifecycle instead, allowing the caller's try-with-resources // to control when the ThreadLocal context is restored. From 3843f9b9afa9a2a20d3f7f40b29feaedccb58fe3 Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Wed, 11 Mar 2026 14:27:41 -0400 Subject: [PATCH 12/15] chore: Remove duplicate tests --- .../datastore/DatastoreImplMetricsTest.java | 117 ++---------------- 1 file changed, 12 insertions(+), 105 deletions(-) diff --git a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java index 0f0f50beb544..3d949dbb2dac 100644 --- a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java +++ b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java @@ -24,8 +24,6 @@ import com.google.cloud.datastore.spi.DatastoreRpcFactory; import com.google.cloud.datastore.spi.v1.DatastoreRpc; import com.google.cloud.datastore.telemetry.TelemetryConstants; -import com.google.datastore.v1.AllocateIdsRequest; -import com.google.datastore.v1.AllocateIdsResponse; import com.google.datastore.v1.BeginTransactionRequest; import com.google.datastore.v1.BeginTransactionResponse; import com.google.datastore.v1.CommitRequest; @@ -64,7 +62,6 @@ public class DatastoreImplMetricsTest { private static final String DATABASE_ID = "test-database"; private static InMemoryMetricReader metricReader; - private static DatastoreRpcFactory rpcFactoryMock; private static DatastoreRpc rpcMock; private static Datastore datastore; @@ -77,7 +74,7 @@ public static void setUpClass() { OpenTelemetrySdk openTelemetry = OpenTelemetrySdk.builder().setMeterProvider(meterProvider).build(); - rpcFactoryMock = EasyMock.createStrictMock(DatastoreRpcFactory.class); + DatastoreRpcFactory rpcFactoryMock = EasyMock.createStrictMock(DatastoreRpcFactory.class); // Use a regular (non-strict) mock for rpcMock so that anyTimes() expectations work without // enforcing call order — needed for retry tests with unpredictable call counts. rpcMock = EasyMock.createMock(DatastoreRpc.class); @@ -504,6 +501,8 @@ public void runInTransaction_withTransactionOptions_recordsMetrics() { @Test public void lookup_recordsOperationAndAttemptMetrics() { + // Use `lookup` as a simple RPC test to test that metrics are recorded for operation + // and attempt. Any RPC could have worked (there is no specific reason for lookup) EasyMock.expect(rpcMock.lookup(EasyMock.anyObject(LookupRequest.class))) .andReturn(LookupResponse.getDefaultInstance()); EasyMock.replay(rpcMock); @@ -574,65 +573,19 @@ public void lookup_recordsOperationAndAttemptMetrics() { EasyMock.verify(rpcMock); } - @Test - public void commit_recordsOperationAndAttemptMetrics() { - EasyMock.expect(rpcMock.commit(EasyMock.anyObject(CommitRequest.class))) - .andReturn(CommitResponse.newBuilder().build()); - EasyMock.replay(rpcMock); - - Key key = Key.newBuilder(PROJECT_ID, "Kind", "name").setDatabaseId(DATABASE_ID).build(); - datastore.delete(key); - - Collection metrics = metricReader.collectAllMetrics(); - - // Verify operation latency for commit - Optional operationLatency = - findMetric(metrics, TelemetryConstants.METRIC_NAME_OPERATION_LATENCY); - assertThat(operationLatency.isPresent()).isTrue(); - HistogramPointData opLatencyPoint = - operationLatency.get().getHistogramData().getPoints().stream() - .filter( - p -> - dataContainsStringAttribute( - p, - TelemetryConstants.ATTRIBUTES_KEY_METHOD, - TelemetryConstants.METHOD_COMMIT)) - .findFirst() - .orElse(null); - assertThat(opLatencyPoint).isNotNull(); - assertThat(opLatencyPoint.getCount()).isEqualTo(1); - assertThat( - dataContainsStringAttribute( - opLatencyPoint, - TelemetryConstants.ATTRIBUTES_KEY_STATUS, - StatusCode.Code.OK.toString())) - .isTrue(); - - // Verify attempt latency for commit - Optional attemptLatency = - findMetric(metrics, TelemetryConstants.METRIC_NAME_ATTEMPT_LATENCY); - assertThat(attemptLatency.isPresent()).isTrue(); - HistogramPointData attLatencyPoint = - attemptLatency.get().getHistogramData().getPoints().stream() - .filter( - p -> - dataContainsStringAttribute( - p, - TelemetryConstants.ATTRIBUTES_KEY_METHOD, - TelemetryConstants.METHOD_COMMIT)) - .findFirst() - .orElse(null); - assertThat(attLatencyPoint).isNotNull(); - - EasyMock.verify(rpcMock); - } - @Test public void lookup_recordsFailureStatusOnError() { + // Use `lookup` as a simple RPC test to test that metrics are recorded for operation + // and attempt. Any RPC could have worked (there is no specific reason for lookup) + StatusCode.Code unavailableStatusCode = StatusCode.Code.UNAVAILABLE; EasyMock.expect(rpcMock.lookup(EasyMock.anyObject(LookupRequest.class))) .andThrow( new DatastoreException( - 14, "Unavailable", StatusCode.Code.UNAVAILABLE.toString(), false, null)) + 14, + unavailableStatusCode.toString(), + unavailableStatusCode.toString(), + false, + null)) .anyTimes(); EasyMock.replay(rpcMock); @@ -663,7 +616,7 @@ public void lookup_recordsFailureStatusOnError() { dataContainsStringAttribute( opLatencyPoint, TelemetryConstants.ATTRIBUTES_KEY_STATUS, - StatusCode.Code.UNAVAILABLE.toString())) + unavailableStatusCode.toString())) .isTrue(); // Verify attempt metrics were also recorded with UNAVAILABLE @@ -674,52 +627,6 @@ public void lookup_recordsFailureStatusOnError() { EasyMock.verify(rpcMock); } - @Test - public void allocateIds_recordsOperationAndAttemptMetrics() { - EasyMock.expect(rpcMock.allocateIds(EasyMock.anyObject(AllocateIdsRequest.class))) - .andReturn( - AllocateIdsResponse.newBuilder() - .addKeys( - com.google.datastore.v1.Key.newBuilder() - .addPath( - com.google.datastore.v1.Key.PathElement.newBuilder() - .setKind("Kind") - .setId(123))) - .build()); - EasyMock.replay(rpcMock); - - IncompleteKey key = - IncompleteKey.newBuilder(PROJECT_ID, "Kind").setDatabaseId(DATABASE_ID).build(); - datastore.allocateId(key); - - Collection metrics = metricReader.collectAllMetrics(); - - Optional operationLatency = - findMetric(metrics, TelemetryConstants.METRIC_NAME_OPERATION_LATENCY); - assertThat(operationLatency.isPresent()).isTrue(); - HistogramPointData point = - operationLatency.get().getHistogramData().getPoints().stream() - .filter( - p -> - dataContainsStringAttribute( - p, - TelemetryConstants.ATTRIBUTES_KEY_METHOD, - TelemetryConstants.METHOD_ALLOCATE_IDS)) - .findFirst() - .orElse(null); - assertThat(point).isNotNull(); - assertThat( - dataContainsStringAttribute( - point, TelemetryConstants.ATTRIBUTES_KEY_STATUS, StatusCode.Code.OK.toString())) - .isTrue(); - - Optional attemptLatency = - findMetric(metrics, TelemetryConstants.METRIC_NAME_ATTEMPT_LATENCY); - assertThat(attemptLatency.isPresent()).isTrue(); - - EasyMock.verify(rpcMock); - } - private static Optional findMetric( Collection metrics, String metricName) { return metrics.stream().filter(m -> m.getName().equals(metricName)).findFirst(); From 2e610177c39bddb543aa417d9b90ae61aec076f2 Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Wed, 11 Mar 2026 15:35:45 -0400 Subject: [PATCH 13/15] chore: Add conditional logic for the transports in the test --- .../datastore/DatastoreImplMetricsTest.java | 82 +++++++++++++++---- 1 file changed, 64 insertions(+), 18 deletions(-) diff --git a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java index 3d949dbb2dac..be5725165361 100644 --- a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java +++ b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java @@ -41,32 +41,51 @@ import io.opentelemetry.sdk.metrics.data.MetricData; import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import java.util.Arrays; import java.util.Collection; +import java.util.List; import java.util.Optional; import org.easymock.EasyMock; import org.junit.Before; -import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; -import org.junit.runners.JUnit4; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; /** * Tests for transaction metrics recording in {@link DatastoreImpl}. These tests verify that * transaction latency and per-attempt metrics are correctly recorded via the {@link * com.google.cloud.datastore.telemetry.MetricsRecorder}. */ -@RunWith(JUnit4.class) +@RunWith(Parameterized.class) public class DatastoreImplMetricsTest { private static final String PROJECT_ID = "test-project"; private static final String DATABASE_ID = "test-database"; - private static InMemoryMetricReader metricReader; - private static DatastoreRpc rpcMock; - private static Datastore datastore; + private InMemoryMetricReader metricReader; + private DatastoreRpc rpcMock; + private Datastore datastore; + private final TelemetryConstants.Transport transport; + + /** + * We parameterize this test to run against both GRPC and HTTP transports. + * This ensures that Datastore-specific transaction and commit metrics are correctly + * recorded for both transports, while verifying that operation and attempt metrics + * are only manually recorded for HTTP (since GAX handles them natively for gRPC). + * Parameterizing allows us to automatically test both transports for any new metrics added. + */ + @Parameters(name = "transport={0}") + public static List data() { + return Arrays.asList(TelemetryConstants.Transport.GRPC, TelemetryConstants.Transport.HTTP); + } + + public DatastoreImplMetricsTest(TelemetryConstants.Transport transport) { + this.transport = transport; + } - @BeforeClass - public static void setUpClass() { + @Before + public void setUp() { // Use delta temporality so each collectAllMetrics() call returns only new data and resets. metricReader = InMemoryMetricReader.createDelta(); SdkMeterProvider meterProvider = @@ -79,8 +98,7 @@ public static void setUpClass() { // enforcing call order — needed for retry tests with unpredictable call counts. rpcMock = EasyMock.createMock(DatastoreRpc.class); - DatastoreOptions datastoreOptions = - DatastoreOptions.newBuilder() + DatastoreOptions.Builder builder = DatastoreOptions.newBuilder() .setProjectId(PROJECT_ID) .setDatabaseId(DATABASE_ID) .setCredentials(NoCredentials.getInstance()) @@ -90,20 +108,22 @@ public static void setUpClass() { DatastoreOpenTelemetryOptions.newBuilder() .setMetricsEnabled(true) .setOpenTelemetry(openTelemetry) - .build()) - .build(); + .build()); + + if (TelemetryConstants.Transport.GRPC.equals(transport)) { + builder.setTransportOptions(com.google.cloud.grpc.GrpcTransportOptions.newBuilder().build()); + } else { + builder.setTransportOptions(com.google.cloud.http.HttpTransportOptions.newBuilder().build()); + } + + DatastoreOptions datastoreOptions = builder.build(); EasyMock.expect(rpcFactoryMock.create(datastoreOptions)).andReturn(rpcMock); EasyMock.replay(rpcFactoryMock); datastore = datastoreOptions.getService(); EasyMock.verify(rpcFactoryMock); - } - @Before - public void setUp() { - // Drain any metrics accumulated during @BeforeClass or previous tests. - metricReader.collectAllMetrics(); - EasyMock.reset(rpcMock); + metricReader.collectAllMetrics(); // drain initialization } @Test @@ -512,6 +532,19 @@ public void lookup_recordsOperationAndAttemptMetrics() { Collection metrics = metricReader.collectAllMetrics(); + // Gax already records operation and attempt metrics natively for the gRPC transport. + // DatastoreImpl explicitly avoids recording them here to prevent double-counting. + // Since this unit test bypasses the GAX networking layer by mocking DatastoreRpc, + // we assert that no local duplicate metrics are emitted by DatastoreImpl for gRPC, + // and skip the rest of the assertions. + if (TelemetryConstants.Transport.GRPC.equals(transport)) { + Optional operationLatency = + findMetric(metrics, TelemetryConstants.METRIC_NAME_OPERATION_LATENCY); + assertThat(operationLatency.isPresent()).isFalse(); + EasyMock.verify(rpcMock); + return; + } + // Verify operation latency Optional operationLatency = findMetric(metrics, TelemetryConstants.METRIC_NAME_OPERATION_LATENCY); @@ -597,6 +630,19 @@ public void lookup_recordsFailureStatusOnError() { Collection metrics = metricReader.collectAllMetrics(); + // Gax already records operation and attempt metrics natively for the gRPC transport. + // DatastoreImpl explicitly avoids recording them here to prevent double-counting. + // Since this unit test bypasses the GAX networking layer by mocking DatastoreRpc, + // we assert that no local duplicate metrics are emitted by DatastoreImpl for gRPC, + // and skip the rest of the assertions. + if (TelemetryConstants.Transport.GRPC.equals(transport)) { + Optional operationLatency = + findMetric(metrics, TelemetryConstants.METRIC_NAME_OPERATION_LATENCY); + assertThat(operationLatency.isPresent()).isFalse(); + EasyMock.verify(rpcMock); + return; + } + // Verify operation latency with UNAVAILABLE status Optional operationLatency = findMetric(metrics, TelemetryConstants.METRIC_NAME_OPERATION_LATENCY); From aa0bbbf21704462ff3cf0a406024167b68849e09 Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Wed, 11 Mar 2026 15:36:20 -0400 Subject: [PATCH 14/15] chore: Fix lint issues --- .../cloud/datastore/DatastoreImplMetricsTest.java | 13 +++++++------ 1 file changed, 7 insertions(+), 6 deletions(-) diff --git a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java index be5725165361..b8129af13d37 100644 --- a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java +++ b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java @@ -69,11 +69,11 @@ public class DatastoreImplMetricsTest { private final TelemetryConstants.Transport transport; /** - * We parameterize this test to run against both GRPC and HTTP transports. - * This ensures that Datastore-specific transaction and commit metrics are correctly - * recorded for both transports, while verifying that operation and attempt metrics - * are only manually recorded for HTTP (since GAX handles them natively for gRPC). - * Parameterizing allows us to automatically test both transports for any new metrics added. + * We parameterize this test to run against both GRPC and HTTP transports. This ensures that + * Datastore-specific transaction and commit metrics are correctly recorded for both transports, + * while verifying that operation and attempt metrics are only manually recorded for HTTP (since + * GAX handles them natively for gRPC). Parameterizing allows us to automatically test both + * transports for any new metrics added. */ @Parameters(name = "transport={0}") public static List data() { @@ -98,7 +98,8 @@ public void setUp() { // enforcing call order — needed for retry tests with unpredictable call counts. rpcMock = EasyMock.createMock(DatastoreRpc.class); - DatastoreOptions.Builder builder = DatastoreOptions.newBuilder() + DatastoreOptions.Builder builder = + DatastoreOptions.newBuilder() .setProjectId(PROJECT_ID) .setDatabaseId(DATABASE_ID) .setCredentials(NoCredentials.getInstance()) From 95ca5be904fcad6b9cd6624ff4ea92ee1b489f81 Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Wed, 11 Mar 2026 17:28:36 -0400 Subject: [PATCH 15/15] chore: Address PR comments --- .../datastore/RetryAndTraceDatastoreRpcDecorator.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java index 3f2d773fef22..397015c5ef4e 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/RetryAndTraceDatastoreRpcDecorator.java @@ -208,13 +208,16 @@ public O invokeRpc(Callable block, String startSpan) { O invokeRpc(Callable block, String startSpan, String methodName) { TraceUtil.Span span = otelTraceUtil.startSpan(startSpan); Stopwatch stopwatch = isHttpTransport ? Stopwatch.createStarted() : null; - String operationStatus = StatusCode.Code.OK.toString(); + String operationStatus = StatusCode.Code.UNKNOWN.toString(); try (TraceUtil.Scope ignored = span.makeCurrent()) { Callable callable = TelemetryUtils.attemptMetricsCallable( block, metricsRecorder, datastoreOptions, isHttpTransport, methodName); - return RetryHelper.runWithRetries( - callable, this.retrySettings, EXCEPTION_HANDLER, this.datastoreOptions.getClock()); + O result = + RetryHelper.runWithRetries( + callable, this.retrySettings, EXCEPTION_HANDLER, this.datastoreOptions.getClock()); + operationStatus = StatusCode.Code.OK.toString(); + return result; } catch (RetryHelperException e) { operationStatus = DatastoreException.extractStatusCode(e); span.end(e);