From 1697208059f7b942dbf2a06907cfc5656d0c0f1f Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Fri, 6 Mar 2026 13:13:33 -0500 Subject: [PATCH 1/6] feat: Record transaction metrics for Datastore --- .../cloud/datastore/DatastoreException.java | 29 ++ .../google/cloud/datastore/DatastoreImpl.java | 141 ++++---- .../cloud/datastore/DatastoreOptions.java | 8 + .../datastore/telemetry/MetricsRecorder.java | 11 +- .../OpenTelemetryMetricsRecorder.java | 2 +- .../telemetry/TelemetryConstants.java | 12 + .../datastore/DatastoreImplMetricsTest.java | 333 ++++++++++++++++++ .../telemetry/MetricsRecorderTest.java | 81 +++++ .../OpenTelemetryMetricsRecorderTest.java | 176 +++++++++ 9 files changed, 723 insertions(+), 70 deletions(-) create mode 100644 java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java create mode 100644 java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/MetricsRecorderTest.java create mode 100644 java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorderTest.java diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreException.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreException.java index 44bde2c107ff..1be84b859c6c 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreException.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreException.java @@ -160,4 +160,33 @@ static DatastoreException throwInvalidRequest(String massage, Object... params) static DatastoreException propagateUserException(Exception ex) { throw new DatastoreException(BaseServiceException.UNKNOWN_CODE, ex.getMessage(), null, ex); } + + /** + * Extracts the status code name from the given throwable. Walks the exception cause chain looking + * for a {@link DatastoreException} that carries a reason string representing the status code + * (e.g. "ABORTED", "UNAVAILABLE"). The reason is set from {@link + * com.google.api.gax.rpc.StatusCode.Code} which is transport-neutral, supporting both gRPC and + * HttpJson. Falls back to "UNKNOWN" if the status cannot be determined. + * + *

Note: Some {@link DatastoreException} instances are constructed without a reason (e.g. via + * {@link DatastoreException#DatastoreException(int, String, Throwable)}). If all {@link + * DatastoreException} instances in the cause chain have a null or empty reason, this method + * returns "UNKNOWN" even if the underlying error carries a meaningful status. + * + * @param throwable the throwable to extract the status code from + * @return the status code name, or "UNKNOWN" if not determinable + */ + public static String extractStatusCode(Throwable throwable) { + Throwable current = throwable; + while (current != null) { + if (current instanceof DatastoreException) { + String reason = ((DatastoreException) current).getReason(); + if (!Strings.isNullOrEmpty(reason)) { + return reason; + } + } + current = current.getCause(); + } + return StatusCode.Code.UNKNOWN.toString(); + } } diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java index 63d09b0a0ed4..a8c71d5e49c8 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java @@ -38,6 +38,7 @@ import com.google.api.core.BetaApi; import com.google.api.gax.retrying.RetrySettings; +import com.google.api.gax.rpc.StatusCode; import com.google.cloud.BaseService; import com.google.cloud.ExceptionHandler; import com.google.cloud.RetryHelper; @@ -45,10 +46,13 @@ import com.google.cloud.ServiceOptions; import com.google.cloud.datastore.execution.AggregationQueryExecutor; import com.google.cloud.datastore.spi.v1.DatastoreRpc; +import com.google.cloud.datastore.telemetry.MetricsRecorder; +import com.google.cloud.datastore.telemetry.TelemetryConstants; import com.google.cloud.datastore.telemetry.TraceUtil; import com.google.cloud.datastore.telemetry.TraceUtil.Scope; import com.google.common.base.MoreObjects; import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; import com.google.common.collect.AbstractIterator; import com.google.common.collect.ImmutableList; import com.google.common.collect.ImmutableMap; @@ -61,10 +65,12 @@ import com.google.datastore.v1.RunQueryResponse; import com.google.datastore.v1.TransactionOptions; import com.google.protobuf.ByteString; +import io.grpc.Status; import io.opentelemetry.context.Context; import java.util.ArrayList; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.LinkedHashSet; @@ -73,6 +79,7 @@ import java.util.Optional; import java.util.Set; import java.util.concurrent.Callable; +import java.util.concurrent.TimeUnit; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -89,6 +96,7 @@ final class DatastoreImpl extends BaseService implements Datas private final com.google.cloud.datastore.telemetry.TraceUtil otelTraceUtil = getOptions().getTraceUtil(); + private final MetricsRecorder metricsRecorder = getOptions().getMetricsRecorder(); private final ReadOptionProtoPreparer readOptionProtoPreparer; private final AggregationQueryExecutor aggregationQueryExecutor; @@ -122,63 +130,31 @@ public Transaction newTransaction() { return new TransactionImpl(this); } + /** + * A wrapper around {@link ReadWriteTransactionCallable} that adds OpenTelemetry tracing context + * propagation. All transaction logic (begin, run, commit, rollback, metrics recording) is + * delegated to the underlying {@link ReadWriteTransactionCallable}. + */ static class TracedReadWriteTransactionCallable implements Callable { - private final Datastore datastore; - private final TransactionCallable callable; - private volatile TransactionOptions options; - private volatile Transaction transaction; - + private final ReadWriteTransactionCallable delegate; private final TraceUtil.Span parentSpan; TracedReadWriteTransactionCallable( - Datastore datastore, - TransactionCallable callable, - TransactionOptions options, + ReadWriteTransactionCallable delegate, @Nullable com.google.cloud.datastore.telemetry.TraceUtil.Span parentSpan) { - this.datastore = datastore; - this.callable = callable; - this.options = options; - this.transaction = null; + this.delegate = delegate; this.parentSpan = parentSpan; } - Datastore getDatastore() { - return datastore; - } - - TransactionOptions getOptions() { - return options; - } - - Transaction getTransaction() { - return transaction; - } - - void setPrevTransactionId(ByteString transactionId) { - TransactionOptions.ReadWrite readWrite = - TransactionOptions.ReadWrite.newBuilder().setPreviousTransaction(transactionId).build(); - options = options.toBuilder().setReadWrite(readWrite).build(); + ReadWriteTransactionCallable getDelegate() { + return delegate; } @Override public T call() throws DatastoreException { try (io.opentelemetry.context.Scope ignored = Context.current().with(parentSpan.getSpan()).makeCurrent()) { - transaction = datastore.newTransaction(options); - T value = callable.run(transaction); - transaction.commit(); - return value; - } catch (Exception ex) { - transaction.rollback(); - throw DatastoreException.propagateUserException(ex); - } finally { - if (transaction.isActive()) { - transaction.rollback(); - } - if (options != null - && options.getModeCase().equals(TransactionOptions.ModeCase.READ_WRITE)) { - setPrevTransactionId(transaction.getTransactionId()); - } + return delegate.call(); } } } @@ -200,14 +176,19 @@ public boolean isClosed() { static class ReadWriteTransactionCallable implements Callable { private final Datastore datastore; private final TransactionCallable callable; + private final MetricsRecorder metricsRecorder; private volatile TransactionOptions options; private volatile Transaction transaction; ReadWriteTransactionCallable( - Datastore datastore, TransactionCallable callable, TransactionOptions options) { + Datastore datastore, + TransactionCallable callable, + TransactionOptions options, + MetricsRecorder metricsRecorder) { this.datastore = datastore; this.callable = callable; this.options = options; + this.metricsRecorder = metricsRecorder; this.transaction = null; } @@ -231,15 +212,28 @@ void setPrevTransactionId(ByteString transactionId) { @Override public T call() throws DatastoreException { + String attemptStatus = StatusCode.Code.UNKNOWN.toString(); try { transaction = datastore.newTransaction(options); T value = callable.run(transaction); transaction.commit(); + attemptStatus = Status.Code.OK.toString(); return value; } catch (Exception ex) { - transaction.rollback(); + attemptStatus = DatastoreException.extractStatusCode(ex); + // An exception here can originate from either callable.run() (before commit was attempted) + // or from transaction.commit() itself. In both cases the transaction is still active. + // isActive() returns false if the transaction was already committed or rolled back, so + // it is safe to use as the sole guard here without tracking a separate committed flag. + if (transaction.isActive()) { + transaction.rollback(); + } throw DatastoreException.propagateUserException(ex); } finally { + recordAttempt(attemptStatus); + // transaction.isActive() returns false after both a successful commit or a completed + // rollback, so it already guards against rolling back a committed transaction or + // rolling back a transaction that has already been rolled back. if (transaction.isActive()) { transaction.rollback(); } @@ -249,28 +243,27 @@ public T call() throws DatastoreException { } } } + + /** + * Records a single transaction commit attempt with the given status code. This is called once + * per invocation of {@link #call()}, capturing the outcome of each individual commit attempt. + */ + private void recordAttempt(String status) { + Map attributes = new HashMap<>(); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, status); + attributes.put( + TelemetryConstants.ATTRIBUTES_KEY_METHOD, TelemetryConstants.METHOD_TRANSACTION_COMMIT); + attributes.put( + TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, datastore.getOptions().getProjectId()); + attributes.put( + TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, datastore.getOptions().getDatabaseId()); + metricsRecorder.recordTransactionAttemptCount(1, attributes); + } } @Override public T runInTransaction(final TransactionCallable callable) { - TraceUtil.Span span = otelTraceUtil.startSpan(SPAN_NAME_TRANSACTION_RUN); - Callable transactionCallable = - (getOptions().getOpenTelemetryOptions().isTracingEnabled() - ? new TracedReadWriteTransactionCallable( - this, callable, /* transactionOptions= */ null, span) - : new ReadWriteTransactionCallable(this, callable, /* transactionOptions= */ null)); - try (Scope ignored = span.makeCurrent()) { - return RetryHelper.runWithRetries( - transactionCallable, - retrySettings, - TRANSACTION_EXCEPTION_HANDLER, - getOptions().getClock()); - } catch (RetryHelperException e) { - span.end(e); - throw DatastoreException.translateAndThrow(e); - } finally { - span.end(); - } + return runInTransaction(callable, /* transactionOptions= */ null); } @Override @@ -278,11 +271,16 @@ public T runInTransaction( final TransactionCallable callable, TransactionOptions transactionOptions) { TraceUtil.Span span = otelTraceUtil.startSpan(SPAN_NAME_TRANSACTION_RUN); - Callable transactionCallable = - (getOptions().getOpenTelemetryOptions().isTracingEnabled() - ? new TracedReadWriteTransactionCallable(this, callable, transactionOptions, span) - : new ReadWriteTransactionCallable(this, callable, transactionOptions)); + ReadWriteTransactionCallable baseCallable = + new ReadWriteTransactionCallable<>(this, callable, transactionOptions, metricsRecorder); + + Callable transactionCallable = baseCallable; + if (getOptions().getOpenTelemetryOptions().isTracingEnabled()) { + transactionCallable = new TracedReadWriteTransactionCallable<>(baseCallable, span); + } + String status = StatusCode.Code.OK.toString(); + Stopwatch stopwatch = Stopwatch.createStarted(); try (Scope ignored = span.makeCurrent()) { return RetryHelper.runWithRetries( transactionCallable, @@ -290,9 +288,18 @@ public T runInTransaction( TRANSACTION_EXCEPTION_HANDLER, getOptions().getClock()); } catch (RetryHelperException e) { + status = DatastoreException.extractStatusCode(e); span.end(e); throw DatastoreException.translateAndThrow(e); } finally { + long latencyMs = stopwatch.elapsed(TimeUnit.MILLISECONDS); + Map attributes = new HashMap<>(); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, status); + attributes.put( + TelemetryConstants.ATTRIBUTES_KEY_METHOD, TelemetryConstants.METHOD_TRANSACTION_RUN); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, getOptions().getProjectId()); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, getOptions().getDatabaseId()); + metricsRecorder.recordTransactionLatency(latencyMs, attributes); span.end(); } } diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreOptions.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreOptions.java index 242ce3b01776..1cd8e4038314 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreOptions.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreOptions.java @@ -31,6 +31,7 @@ import com.google.cloud.datastore.spi.v1.DatastoreRpc; import com.google.cloud.datastore.spi.v1.GrpcDatastoreRpc; import com.google.cloud.datastore.spi.v1.HttpDatastoreRpc; +import com.google.cloud.datastore.telemetry.MetricsRecorder; import com.google.cloud.datastore.v1.DatastoreSettings; import com.google.cloud.grpc.GrpcTransportOptions; import com.google.cloud.http.HttpTransportOptions; @@ -64,6 +65,7 @@ public class DatastoreOptions extends ServiceOptions { private String namespace; @@ -216,6 +223,7 @@ private DatastoreOptions(Builder builder) { ? builder.openTelemetryOptions : DatastoreOpenTelemetryOptions.newBuilder().build(); this.traceUtil = com.google.cloud.datastore.telemetry.TraceUtil.getInstance(this); + this.metricsRecorder = MetricsRecorder.getInstance(openTelemetryOptions); namespace = MoreObjects.firstNonNull(builder.namespace, defaultNamespace()); databaseId = MoreObjects.firstNonNull(builder.databaseId, DEFAULT_DATABASE_ID); diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/MetricsRecorder.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/MetricsRecorder.java index a8599e019b3c..ced2efc1da1f 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/MetricsRecorder.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/MetricsRecorder.java @@ -16,14 +16,21 @@ package com.google.cloud.datastore.telemetry; +import com.google.api.core.InternalExtensionOnly; import com.google.cloud.datastore.DatastoreOpenTelemetryOptions; import io.opentelemetry.api.GlobalOpenTelemetry; import io.opentelemetry.api.OpenTelemetry; import java.util.Map; import javax.annotation.Nonnull; -/** Interface to record specific metric operations. */ -interface MetricsRecorder { +/** + * Interface to record specific metric operations. + * + *

Warning: This is an internal API and is not intended for external use. Do not implement + * or extend this interface. + */ +@InternalExtensionOnly +public interface MetricsRecorder { /** Records the total latency of a transaction in milliseconds. */ void recordTransactionLatency(double latencyMs, Map attributes); diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorder.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorder.java index 470ffe118b5c..73225dca9971 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorder.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorder.java @@ -42,7 +42,7 @@ class OpenTelemetryMetricsRecorder implements MetricsRecorder { this.transactionLatency = meter .histogramBuilder(TelemetryConstants.SERVICE_NAME + "/transaction_latency") - .setDescription("Total latency for successful transaction operations") + .setDescription("Total latency of transaction operations") .setUnit("ms") .build(); diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java index 263b85526501..7c0b49f75037 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java @@ -33,6 +33,18 @@ public class TelemetryConstants { public static final String ATTRIBUTES_KEY_DEFERRED = "Deferred"; public static final String ATTRIBUTES_KEY_MORE_RESULTS = "more_results"; + /** Attribute key for the gRPC status code (e.g. "OK", "ABORTED", "UNAVAILABLE"). */ + public static final String ATTRIBUTES_KEY_STATUS = "status"; + + /** Attribute key for the RPC method name (e.g. "Transaction.Run"). */ + public static final String ATTRIBUTES_KEY_METHOD = "method"; + + /** Attribute key for the GCP project ID. */ + public static final String ATTRIBUTES_KEY_PROJECT_ID = "project_id"; + + /** Attribute key for the Datastore database ID. */ + public static final String ATTRIBUTES_KEY_DATABASE_ID = "database_id"; + /* TODO(lawrenceqiu): For now, these are a duplicate of method names in TraceUtil. Those will use these eventually */ // Format is not SnakeCase to keep backward compatibility with the existing values TraceUtil spans public static final String METHOD_ALLOCATE_IDS = "AllocateIds"; diff --git a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java new file mode 100644 index 000000000000..7879c0dbddb2 --- /dev/null +++ b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java @@ -0,0 +1,333 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.datastore; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; + +import com.google.cloud.NoCredentials; +import com.google.cloud.ServiceOptions; +import com.google.cloud.datastore.spi.DatastoreRpcFactory; +import com.google.cloud.datastore.spi.v1.DatastoreRpc; +import com.google.cloud.datastore.telemetry.TelemetryConstants; +import com.google.datastore.v1.BeginTransactionRequest; +import com.google.datastore.v1.BeginTransactionResponse; +import com.google.datastore.v1.CommitRequest; +import com.google.datastore.v1.CommitResponse; +import com.google.datastore.v1.RollbackRequest; +import com.google.datastore.v1.RollbackResponse; +import com.google.datastore.v1.TransactionOptions; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.data.HistogramPointData; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import java.util.Collection; +import java.util.Optional; +import org.easymock.EasyMock; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for transaction metrics recording in {@link DatastoreImpl}. These tests verify that + * transaction latency and per-attempt metrics are correctly recorded via the {@link + * com.google.cloud.datastore.telemetry.MetricsRecorder}. + */ +@RunWith(JUnit4.class) +public class DatastoreImplMetricsTest { + + private static final String PROJECT_ID = "test-project"; + private static final String LATENCY_METRIC_NAME = "datastore.googleapis.com/transaction_latency"; + private static final String ATTEMPT_COUNT_METRIC_NAME = + "datastore.googleapis.com/transaction_attempt_count"; + + private InMemoryMetricReader metricReader; + private DatastoreRpcFactory rpcFactoryMock; + private DatastoreRpc rpcMock; + private DatastoreOptions datastoreOptions; + + @Before + public void setUp() { + metricReader = InMemoryMetricReader.create(); + SdkMeterProvider meterProvider = + SdkMeterProvider.builder().registerMetricReader(metricReader).build(); + OpenTelemetrySdk openTelemetry = + OpenTelemetrySdk.builder().setMeterProvider(meterProvider).build(); + + rpcFactoryMock = EasyMock.createStrictMock(DatastoreRpcFactory.class); + rpcMock = EasyMock.createStrictMock(DatastoreRpc.class); + + datastoreOptions = + DatastoreOptions.newBuilder() + .setProjectId(PROJECT_ID) + .setCredentials(NoCredentials.getInstance()) + .setRetrySettings(ServiceOptions.getDefaultRetrySettings()) + .setServiceRpcFactory(rpcFactoryMock) + .setOpenTelemetryOptions( + DatastoreOpenTelemetryOptions.newBuilder() + .setMetricsEnabled(true) + .setOpenTelemetry(openTelemetry) + .build()) + .build(); + + EasyMock.expect(rpcFactoryMock.create(datastoreOptions)).andReturn(rpcMock); + } + + @Test + public void runInTransaction_recordsLatencyOnSuccess() { + // Mock a successful transaction: begin -> commit + EasyMock.expect(rpcMock.beginTransaction(EasyMock.anyObject(BeginTransactionRequest.class))) + .andReturn(BeginTransactionResponse.getDefaultInstance()); + EasyMock.expect(rpcMock.commit(EasyMock.anyObject(CommitRequest.class))) + .andReturn(CommitResponse.newBuilder().build()); + EasyMock.replay(rpcFactoryMock, rpcMock); + + Datastore datastore = datastoreOptions.getService(); + datastore.runInTransaction(transaction -> null); + + // Verify latency metric was recorded with status OK + Optional latencyMetric = findMetric(LATENCY_METRIC_NAME); + assertThat(latencyMetric.isPresent()).isTrue(); + + HistogramPointData point = + latencyMetric.get().getHistogramData().getPoints().stream().findFirst().orElse(null); + assertThat(point).isNotNull(); + assertThat(point.getCount()).isEqualTo(1); + assertThat(point.getSum()).isAtLeast(0.0); + assertThat(point.getAttributes().get(AttributeKey.stringKey("status"))).isEqualTo("OK"); + assertThat(point.getAttributes().get(AttributeKey.stringKey("method"))) + .isEqualTo(TelemetryConstants.METHOD_TRANSACTION_RUN); + assertThat(point.getAttributes().get(AttributeKey.stringKey("project_id"))) + .isEqualTo(PROJECT_ID); + assertThat(point.getAttributes().get(AttributeKey.stringKey("database_id"))).isNotNull(); + + EasyMock.verify(rpcFactoryMock, rpcMock); + } + + @Test + public void runInTransaction_recordsPerAttemptCountOnSuccess() { + // Mock a successful transaction: begin -> commit + EasyMock.expect(rpcMock.beginTransaction(EasyMock.anyObject(BeginTransactionRequest.class))) + .andReturn(BeginTransactionResponse.getDefaultInstance()); + EasyMock.expect(rpcMock.commit(EasyMock.anyObject(CommitRequest.class))) + .andReturn(CommitResponse.newBuilder().build()); + EasyMock.replay(rpcFactoryMock, rpcMock); + + Datastore datastore = datastoreOptions.getService(); + datastore.runInTransaction(transaction -> null); + + // Verify attempt count was recorded once with status OK + Optional attemptMetric = findMetric(ATTEMPT_COUNT_METRIC_NAME); + assertThat(attemptMetric.isPresent()).isTrue(); + + LongPointData point = + attemptMetric.get().getLongSumData().getPoints().stream() + .filter(p -> "OK".equals(p.getAttributes().get(AttributeKey.stringKey("status")))) + .findFirst() + .orElse(null); + assertThat(point).isNotNull(); + assertThat(point.getValue()).isEqualTo(1); + assertThat(point.getAttributes().get(AttributeKey.stringKey("method"))) + .isEqualTo(TelemetryConstants.METHOD_TRANSACTION_COMMIT); + + EasyMock.verify(rpcFactoryMock, rpcMock); + } + + @Test + public void runInTransaction_recordsPerAttemptOnRetry() { + // First attempt: begin -> ABORTED -> rollback + EasyMock.expect(rpcMock.beginTransaction(EasyMock.anyObject(BeginTransactionRequest.class))) + .andReturn(BeginTransactionResponse.getDefaultInstance()); + EasyMock.expect(rpcMock.rollback(EasyMock.anyObject(RollbackRequest.class))) + .andReturn(RollbackResponse.getDefaultInstance()); + + // Second attempt: begin -> commit (success) + EasyMock.expect(rpcMock.beginTransaction(EasyMock.anyObject(BeginTransactionRequest.class))) + .andReturn(BeginTransactionResponse.getDefaultInstance()); + EasyMock.expect(rpcMock.commit(EasyMock.anyObject(CommitRequest.class))) + .andReturn(CommitResponse.newBuilder().build()); + EasyMock.replay(rpcFactoryMock, rpcMock); + + Datastore datastore = datastoreOptions.getService(); + + TransactionOptions options = + TransactionOptions.newBuilder() + .setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()) + .build(); + + Datastore.TransactionCallable callable = + new Datastore.TransactionCallable() { + private int attempts = 0; + + @Override + public Integer run(DatastoreReaderWriter transaction) { + attempts++; + if (attempts < 2) { + throw new DatastoreException(10, "", "ABORTED", false, null); + } + return attempts; + } + }; + + Integer result = datastore.runInTransaction(callable, options); + assertThat(result).isEqualTo(2); + + // Verify attempt count has two data points: one with ABORTED and one with OK + Optional attemptMetric = findMetric(ATTEMPT_COUNT_METRIC_NAME); + assertThat(attemptMetric.isPresent()).isTrue(); + assertThat(attemptMetric.get().getLongSumData().getPoints()).hasSize(2); + + // Verify ABORTED attempt + LongPointData abortedPoint = + attemptMetric.get().getLongSumData().getPoints().stream() + .filter(p -> "ABORTED".equals(p.getAttributes().get(AttributeKey.stringKey("status")))) + .findFirst() + .orElse(null); + assertThat(abortedPoint).isNotNull(); + assertThat(abortedPoint.getValue()).isEqualTo(1); + + // Verify OK attempt + LongPointData okPoint = + attemptMetric.get().getLongSumData().getPoints().stream() + .filter(p -> "OK".equals(p.getAttributes().get(AttributeKey.stringKey("status")))) + .findFirst() + .orElse(null); + assertThat(okPoint).isNotNull(); + assertThat(okPoint.getValue()).isEqualTo(1); + + // Verify latency was recorded with OK (overall transaction succeeded) + Optional latencyMetric = findMetric(LATENCY_METRIC_NAME); + assertThat(latencyMetric.isPresent()).isTrue(); + HistogramPointData latencyPoint = + latencyMetric.get().getHistogramData().getPoints().stream().findFirst().orElse(null); + assertThat(latencyPoint).isNotNull(); + assertThat(latencyPoint.getAttributes().get(AttributeKey.stringKey("status"))).isEqualTo("OK"); + + EasyMock.verify(rpcFactoryMock, rpcMock); + } + + @Test + public void runInTransaction_recordsGrpcStatusCodeOnFailure() { + // This test uses a separate set of nice mocks since the retry loop makes + // multiple begin/rollback calls whose exact count depends on retry settings. + DatastoreRpcFactory localRpcFactoryMock = EasyMock.createNiceMock(DatastoreRpcFactory.class); + DatastoreRpc localRpcMock = EasyMock.createNiceMock(DatastoreRpc.class); + + InMemoryMetricReader localMetricReader = InMemoryMetricReader.create(); + SdkMeterProvider localMeterProvider = + SdkMeterProvider.builder().registerMetricReader(localMetricReader).build(); + OpenTelemetrySdk localOpenTelemetry = + OpenTelemetrySdk.builder().setMeterProvider(localMeterProvider).build(); + + DatastoreOptions localOptions = + DatastoreOptions.newBuilder() + .setProjectId(PROJECT_ID) + .setCredentials(NoCredentials.getInstance()) + .setRetrySettings(ServiceOptions.getDefaultRetrySettings()) + .setServiceRpcFactory(localRpcFactoryMock) + .setOpenTelemetryOptions( + DatastoreOpenTelemetryOptions.newBuilder() + .setMetricsEnabled(true) + .setOpenTelemetry(localOpenTelemetry) + .build()) + .build(); + + EasyMock.expect(localRpcFactoryMock.create(localOptions)).andReturn(localRpcMock); + EasyMock.expect( + localRpcMock.beginTransaction(EasyMock.anyObject(BeginTransactionRequest.class))) + .andReturn(BeginTransactionResponse.getDefaultInstance()) + .anyTimes(); + EasyMock.expect(localRpcMock.rollback(EasyMock.anyObject(RollbackRequest.class))) + .andReturn(RollbackResponse.getDefaultInstance()) + .anyTimes(); + EasyMock.replay(localRpcFactoryMock, localRpcMock); + + Datastore datastore = localOptions.getService(); + + Datastore.TransactionCallable callable = + transaction -> { + throw new DatastoreException(10, "ABORTED", "ABORTED", false, null); + }; + + assertThrows(DatastoreException.class, () -> datastore.runInTransaction(callable)); + + // Verify that attempt count was recorded with ABORTED status + Optional attemptMetric = + localMetricReader.collectAllMetrics().stream() + .filter(m -> m.getName().equals(ATTEMPT_COUNT_METRIC_NAME)) + .findFirst(); + assertThat(attemptMetric.isPresent()).isTrue(); + + LongPointData abortedPoint = + attemptMetric.get().getLongSumData().getPoints().stream() + .filter(p -> "ABORTED".equals(p.getAttributes().get(AttributeKey.stringKey("status")))) + .findFirst() + .orElse(null); + assertThat(abortedPoint).isNotNull(); + assertThat(abortedPoint.getValue()).isAtLeast(1); + + // Verify that latency was recorded with the failure status code + Optional latencyMetric = + localMetricReader.collectAllMetrics().stream() + .filter(m -> m.getName().equals(LATENCY_METRIC_NAME)) + .findFirst(); + assertThat(latencyMetric.isPresent()).isTrue(); + HistogramPointData latencyPoint = + latencyMetric.get().getHistogramData().getPoints().stream().findFirst().orElse(null); + assertThat(latencyPoint).isNotNull(); + assertThat(latencyPoint.getAttributes().get(AttributeKey.stringKey("status"))) + .isEqualTo("ABORTED"); + } + + @Test + public void runInTransaction_withTransactionOptions_recordsMetrics() { + // Verify metrics are recorded when calling the overload with TransactionOptions + EasyMock.expect(rpcMock.beginTransaction(EasyMock.anyObject(BeginTransactionRequest.class))) + .andReturn(BeginTransactionResponse.getDefaultInstance()); + EasyMock.expect(rpcMock.commit(EasyMock.anyObject(CommitRequest.class))) + .andReturn(CommitResponse.newBuilder().build()); + EasyMock.replay(rpcFactoryMock, rpcMock); + + Datastore datastore = datastoreOptions.getService(); + + TransactionOptions options = + TransactionOptions.newBuilder() + .setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance()) + .build(); + datastore.runInTransaction(transaction -> null, options); + + // Verify both metrics were recorded + assertThat(findMetric(LATENCY_METRIC_NAME).isPresent()).isTrue(); + assertThat(findMetric(ATTEMPT_COUNT_METRIC_NAME).isPresent()).isTrue(); + + EasyMock.verify(rpcFactoryMock, rpcMock); + } + + /** + * Finds a specific metric by name from the in-memory metric reader. + * + * @param metricName The fully qualified name of the metric to find. + * @return An {@link Optional} containing the {@link MetricData} if found. + */ + private Optional findMetric(String metricName) { + Collection metrics = metricReader.collectAllMetrics(); + return metrics.stream().filter(m -> m.getName().equals(metricName)).findFirst(); + } +} diff --git a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/MetricsRecorderTest.java b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/MetricsRecorderTest.java new file mode 100644 index 000000000000..c2543dd2cfb6 --- /dev/null +++ b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/MetricsRecorderTest.java @@ -0,0 +1,81 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.datastore.telemetry; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.cloud.datastore.DatastoreOpenTelemetryOptions; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +/** + * Tests for {@link MetricsRecorder#getInstance(DatastoreOpenTelemetryOptions)}. + * + *

Note: Since {@code setMetricsEnabled()} is package-private on {@link + * DatastoreOpenTelemetryOptions.Builder}, these tests can only verify the default (metrics + * disabled) behavior and the behavior when an explicit OpenTelemetry instance is provided. The + * metrics-enabled paths are exercised through the {@link DatastoreImplMetricsTest} which operates + * in the {@code com.google.cloud.datastore} package. + */ +@RunWith(JUnit4.class) +public class MetricsRecorderTest { + + @Test + public void defaultOptionsReturnNoOp() { + DatastoreOpenTelemetryOptions options = DatastoreOpenTelemetryOptions.newBuilder().build(); + + MetricsRecorder recorder = MetricsRecorder.getInstance(options); + + assertThat(recorder).isInstanceOf(NoOpMetricsRecorder.class); + } + + @Test + public void tracingEnabledButMetricsDisabledReturnsNoOp() { + // Enabling tracing alone should not enable metrics + DatastoreOpenTelemetryOptions options = + DatastoreOpenTelemetryOptions.newBuilder().setTracingEnabled(true).build(); + + MetricsRecorder recorder = MetricsRecorder.getInstance(options); + + assertThat(recorder).isInstanceOf(NoOpMetricsRecorder.class); + } + + @Test + public void noOpRecorderDoesNotThrow() { + // Verify NoOpMetricsRecorder methods do not throw + NoOpMetricsRecorder recorder = new NoOpMetricsRecorder(); + recorder.recordTransactionLatency(100.0, null); + recorder.recordTransactionAttemptCount(1, null); + } + + @Test + public void openTelemetryRecorderCreatedWithExplicitOpenTelemetry() { + InMemoryMetricReader metricReader = InMemoryMetricReader.create(); + SdkMeterProvider meterProvider = + SdkMeterProvider.builder().registerMetricReader(metricReader).build(); + OpenTelemetry openTelemetry = + OpenTelemetrySdk.builder().setMeterProvider(meterProvider).build(); + + OpenTelemetryMetricsRecorder recorder = new OpenTelemetryMetricsRecorder(openTelemetry); + + assertThat(recorder.getOpenTelemetry()).isSameInstanceAs(openTelemetry); + } +} diff --git a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorderTest.java b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorderTest.java new file mode 100644 index 000000000000..85a449827851 --- /dev/null +++ b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorderTest.java @@ -0,0 +1,176 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.google.cloud.datastore.telemetry; + +import static com.google.common.truth.Truth.assertThat; + +import com.google.api.gax.rpc.StatusCode; +import io.opentelemetry.api.OpenTelemetry; +import io.opentelemetry.api.common.AttributeKey; +import io.opentelemetry.sdk.OpenTelemetrySdk; +import io.opentelemetry.sdk.metrics.SdkMeterProvider; +import io.opentelemetry.sdk.metrics.data.HistogramPointData; +import io.opentelemetry.sdk.metrics.data.LongPointData; +import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class OpenTelemetryMetricsRecorderTest { + + private InMemoryMetricReader metricReader; + private OpenTelemetryMetricsRecorder recorder; + + @Before + public void setUp() { + metricReader = InMemoryMetricReader.create(); + SdkMeterProvider meterProvider = + SdkMeterProvider.builder().registerMetricReader(metricReader).build(); + OpenTelemetry openTelemetry = + OpenTelemetrySdk.builder().setMeterProvider(meterProvider).build(); + recorder = new OpenTelemetryMetricsRecorder(openTelemetry); + } + + @Test + public void recordTransactionLatency_recordsHistogramWithAttributes() { + Map attributes = new HashMap<>(); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, StatusCode.Code.OK.toString()); + attributes.put( + TelemetryConstants.ATTRIBUTES_KEY_METHOD, TelemetryConstants.METHOD_TRANSACTION_COMMIT); + + recorder.recordTransactionLatency(150.0, attributes); + + Collection metrics = metricReader.collectAllMetrics(); + MetricData latencyMetric = + metrics.stream() + .filter( + m -> m.getName().equals(TelemetryConstants.SERVICE_NAME + "/transaction_latency")) + .findFirst() + .orElse(null); + + assertThat(latencyMetric).isNotNull(); + assertThat(latencyMetric.getDescription()).isEqualTo("Total latency of transaction operations"); + assertThat(latencyMetric.getUnit()).isEqualTo("ms"); + + HistogramPointData point = + latencyMetric.getHistogramData().getPoints().stream().findFirst().orElse(null); + assertThat(point).isNotNull(); + assertThat(point.getSum()).isEqualTo(150.0); + assertThat(point.getCount()).isEqualTo(1); + assertThat( + point + .getAttributes() + .get(AttributeKey.stringKey(TelemetryConstants.ATTRIBUTES_KEY_STATUS))) + .isEqualTo(StatusCode.Code.OK.toString()); + assertThat( + point + .getAttributes() + .get(AttributeKey.stringKey(TelemetryConstants.ATTRIBUTES_KEY_METHOD))) + .isEqualTo(TelemetryConstants.METHOD_TRANSACTION_COMMIT); + } + + @Test + public void recordTransactionAttemptCount_recordsCounterWithAttributes() { + Map attributes = new HashMap<>(); + attributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, StatusCode.Code.ABORTED.toString()); + attributes.put( + TelemetryConstants.ATTRIBUTES_KEY_METHOD, TelemetryConstants.METHOD_TRANSACTION_COMMIT); + + recorder.recordTransactionAttemptCount(1, attributes); + + Collection metrics = metricReader.collectAllMetrics(); + MetricData attemptMetric = + metrics.stream() + .filter( + m -> + m.getName() + .equals(TelemetryConstants.SERVICE_NAME + "/transaction_attempt_count")) + .findFirst() + .orElse(null); + + assertThat(attemptMetric).isNotNull(); + assertThat(attemptMetric.getDescription()) + .isEqualTo("Number of attempts to commit a transaction"); + + LongPointData point = + attemptMetric.getLongSumData().getPoints().stream().findFirst().orElse(null); + assertThat(point).isNotNull(); + assertThat(point.getValue()).isEqualTo(1); + assertThat( + point + .getAttributes() + .get(AttributeKey.stringKey(TelemetryConstants.ATTRIBUTES_KEY_STATUS))) + .isEqualTo(StatusCode.Code.ABORTED.toString()); + assertThat( + point + .getAttributes() + .get(AttributeKey.stringKey(TelemetryConstants.ATTRIBUTES_KEY_METHOD))) + .isEqualTo(TelemetryConstants.METHOD_TRANSACTION_COMMIT); + } + + @Test + public void recordTransactionAttemptCount_multipleAttempts_accumulates() { + Map abortedAttributes = new HashMap<>(); + abortedAttributes.put( + TelemetryConstants.ATTRIBUTES_KEY_STATUS, StatusCode.Code.ABORTED.toString()); + abortedAttributes.put( + TelemetryConstants.ATTRIBUTES_KEY_METHOD, TelemetryConstants.METHOD_TRANSACTION_COMMIT); + + Map okAttributes = new HashMap<>(); + okAttributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, StatusCode.Code.OK.toString()); + okAttributes.put( + TelemetryConstants.ATTRIBUTES_KEY_METHOD, TelemetryConstants.METHOD_TRANSACTION_COMMIT); + + // Simulate a retry scenario: first attempt ABORTED, second attempt OK + recorder.recordTransactionAttemptCount(1, abortedAttributes); + recorder.recordTransactionAttemptCount(1, okAttributes); + + Collection metrics = metricReader.collectAllMetrics(); + MetricData attemptMetric = + metrics.stream() + .filter( + m -> + m.getName() + .equals(TelemetryConstants.SERVICE_NAME + "/transaction_attempt_count")) + .findFirst() + .orElse(null); + + assertThat(attemptMetric).isNotNull(); + + // There should be two data points — one for ABORTED and one for OK + assertThat(attemptMetric.getLongSumData().getPoints()).hasSize(2); + } + + @Test + public void recordTransactionLatency_nullAttributes_doesNotThrow() { + // Should not throw even when attributes are null + recorder.recordTransactionLatency(100.0, null); + + Collection metrics = metricReader.collectAllMetrics(); + assertThat(metrics).isNotEmpty(); + } + + @Test + public void getOpenTelemetry_returnsConfiguredInstance() { + assertThat(recorder.getOpenTelemetry()).isNotNull(); + } +} From dd59c366a2f81d19ff27f4c1cf3e53ee9489a02b Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Fri, 6 Mar 2026 14:12:27 -0500 Subject: [PATCH 2/6] chore: Refactor DatastoreImplMetricsTest to check the attributes --- .../google/cloud/datastore/DatastoreImpl.java | 37 +- .../datastore/telemetry/MetricsRecorder.java | 5 +- .../OpenTelemetryMetricsRecorder.java | 4 +- .../telemetry/TelemetryConstants.java | 8 + .../datastore/DatastoreImplMetricsTest.java | 404 +++++++++++++----- 5 files changed, 315 insertions(+), 143 deletions(-) diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java index a8c71d5e49c8..dcf142c81916 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java @@ -131,9 +131,10 @@ public Transaction newTransaction() { } /** - * A wrapper around {@link ReadWriteTransactionCallable} that adds OpenTelemetry tracing context - * propagation. All transaction logic (begin, run, commit, rollback, metrics recording) is - * delegated to the underlying {@link ReadWriteTransactionCallable}. + * A Tracing callable that adds OpenTelemetry tracing context. Intended to be used for + * transactions and wraps {@link ReadWriteTransactionCallable} as the delegate. All transaction + * logic (begin, run, commit, rollback, metrics recording) is handled in the delegate (this solely + * handles tracing). */ static class TracedReadWriteTransactionCallable implements Callable { private final ReadWriteTransactionCallable delegate; @@ -146,10 +147,6 @@ static class TracedReadWriteTransactionCallable implements Callable { this.parentSpan = parentSpan; } - ReadWriteTransactionCallable getDelegate() { - return delegate; - } - @Override public T call() throws DatastoreException { try (io.opentelemetry.context.Scope ignored = @@ -192,18 +189,6 @@ static class ReadWriteTransactionCallable implements Callable { this.transaction = null; } - Datastore getDatastore() { - return datastore; - } - - TransactionOptions getOptions() { - return options; - } - - Transaction getTransaction() { - return transaction; - } - void setPrevTransactionId(ByteString transactionId) { TransactionOptions.ReadWrite readWrite = TransactionOptions.ReadWrite.newBuilder().setPreviousTransaction(transactionId).build(); @@ -221,19 +206,19 @@ public T call() throws DatastoreException { return value; } catch (Exception ex) { attemptStatus = DatastoreException.extractStatusCode(ex); - // An exception here can originate from either callable.run() (before commit was attempted) - // or from transaction.commit() itself. In both cases the transaction is still active. - // isActive() returns false if the transaction was already committed or rolled back, so - // it is safe to use as the sole guard here without tracking a separate committed flag. + // An exception here can stem from either `callable.run()` (before commit was attempted) + // or from `transaction.commit()`. If there is an exception thrown from either call site, + // then the transaction is still active. Check if it is still active (e.g. not commited) + // and roll back the transaction. if (transaction.isActive()) { transaction.rollback(); } throw DatastoreException.propagateUserException(ex); } finally { recordAttempt(attemptStatus); - // transaction.isActive() returns false after both a successful commit or a completed - // rollback, so it already guards against rolling back a committed transaction or - // rolling back a transaction that has already been rolled back. + // If the transaction is active, then commit the rollback. If it was already successfully + // rolled back, the transaction is inactive (prevents rolling back an already rolled back + // transaction). if (transaction.isActive()) { transaction.rollback(); } diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/MetricsRecorder.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/MetricsRecorder.java index ced2efc1da1f..6a0ea16b5dd6 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/MetricsRecorder.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/MetricsRecorder.java @@ -26,8 +26,9 @@ /** * Interface to record specific metric operations. * - *

Warning: This is an internal API and is not intended for external use. Do not implement - * or extend this interface. + *

Warning: This is intended to be an internal API and is not intended for external use. + * This is public solely for implementation purposes and does not promise any backwards + * compatibility. */ @InternalExtensionOnly public interface MetricsRecorder { diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorder.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorder.java index 73225dca9971..e50edbd5f79f 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorder.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorder.java @@ -41,14 +41,14 @@ class OpenTelemetryMetricsRecorder implements MetricsRecorder { this.transactionLatency = meter - .histogramBuilder(TelemetryConstants.SERVICE_NAME + "/transaction_latency") + .histogramBuilder(TelemetryConstants.METRIC_NAME_TRANSACTION_LATENCY) .setDescription("Total latency of transaction operations") .setUnit("ms") .build(); this.transactionAttemptCount = meter - .counterBuilder(TelemetryConstants.SERVICE_NAME + "/transaction_attempt_count") + .counterBuilder(TelemetryConstants.METRIC_NAME_TRANSACTION_ATTEMPT_COUNT) .setDescription("Number of attempts to commit a transaction") .build(); } diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java index 7c0b49f75037..32c08de88e5c 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java @@ -45,6 +45,14 @@ public class TelemetryConstants { /** Attribute key for the Datastore database ID. */ public static final String ATTRIBUTES_KEY_DATABASE_ID = "database_id"; + /** Metric name for the total latency of a transaction. */ + public static final String METRIC_NAME_TRANSACTION_LATENCY = + SERVICE_NAME + "/transaction_latency"; + + /** Metric name for the number of attempts a transaction took. */ + public static final String METRIC_NAME_TRANSACTION_ATTEMPT_COUNT = + SERVICE_NAME + "/transaction_attempt_count"; + /* TODO(lawrenceqiu): For now, these are a duplicate of method names in TraceUtil. Those will use these eventually */ // Format is not SnakeCase to keep backward compatibility with the existing values TraceUtil spans public static final String METHOD_ALLOCATE_IDS = "AllocateIds"; diff --git a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java index 7879c0dbddb2..c8a0e079af7a 100644 --- a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java +++ b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java @@ -18,6 +18,7 @@ import static com.google.common.truth.Truth.assertThat; import static org.junit.Assert.assertThrows; +import com.google.api.gax.rpc.StatusCode; import com.google.cloud.NoCredentials; import com.google.cloud.ServiceOptions; import com.google.cloud.datastore.spi.DatastoreRpcFactory; @@ -36,11 +37,13 @@ import io.opentelemetry.sdk.metrics.data.HistogramPointData; import io.opentelemetry.sdk.metrics.data.LongPointData; import io.opentelemetry.sdk.metrics.data.MetricData; +import io.opentelemetry.sdk.metrics.data.PointData; import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader; import java.util.Collection; import java.util.Optional; import org.easymock.EasyMock; import org.junit.Before; +import org.junit.BeforeClass; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -54,29 +57,31 @@ public class DatastoreImplMetricsTest { private static final String PROJECT_ID = "test-project"; - private static final String LATENCY_METRIC_NAME = "datastore.googleapis.com/transaction_latency"; - private static final String ATTEMPT_COUNT_METRIC_NAME = - "datastore.googleapis.com/transaction_attempt_count"; + private static final String DATABASE_ID = "test-database"; - private InMemoryMetricReader metricReader; - private DatastoreRpcFactory rpcFactoryMock; - private DatastoreRpc rpcMock; - private DatastoreOptions datastoreOptions; + private static InMemoryMetricReader metricReader; + private static DatastoreRpcFactory rpcFactoryMock; + private static DatastoreRpc rpcMock; + private static Datastore datastore; - @Before - public void setUp() { - metricReader = InMemoryMetricReader.create(); + @BeforeClass + public static void setUpClass() { + // Use delta temporality so each collectAllMetrics() call returns only new data and resets. + metricReader = InMemoryMetricReader.createDelta(); SdkMeterProvider meterProvider = SdkMeterProvider.builder().registerMetricReader(metricReader).build(); OpenTelemetrySdk openTelemetry = OpenTelemetrySdk.builder().setMeterProvider(meterProvider).build(); rpcFactoryMock = EasyMock.createStrictMock(DatastoreRpcFactory.class); - rpcMock = EasyMock.createStrictMock(DatastoreRpc.class); + // Use a regular (non-strict) mock for rpcMock so that anyTimes() expectations work without + // enforcing call order — needed for retry tests with unpredictable call counts. + rpcMock = EasyMock.createMock(DatastoreRpc.class); - datastoreOptions = + DatastoreOptions datastoreOptions = DatastoreOptions.newBuilder() .setProjectId(PROJECT_ID) + .setDatabaseId(DATABASE_ID) .setCredentials(NoCredentials.getInstance()) .setRetrySettings(ServiceOptions.getDefaultRetrySettings()) .setServiceRpcFactory(rpcFactoryMock) @@ -88,70 +93,108 @@ public void setUp() { .build(); EasyMock.expect(rpcFactoryMock.create(datastoreOptions)).andReturn(rpcMock); + EasyMock.replay(rpcFactoryMock); + datastore = datastoreOptions.getService(); + EasyMock.verify(rpcFactoryMock); + } + + @Before + public void setUp() { + // Drain any metrics accumulated during @BeforeClass or previous tests. + metricReader.collectAllMetrics(); + EasyMock.reset(rpcMock); } @Test public void runInTransaction_recordsLatencyOnSuccess() { - // Mock a successful transaction: begin -> commit EasyMock.expect(rpcMock.beginTransaction(EasyMock.anyObject(BeginTransactionRequest.class))) .andReturn(BeginTransactionResponse.getDefaultInstance()); EasyMock.expect(rpcMock.commit(EasyMock.anyObject(CommitRequest.class))) .andReturn(CommitResponse.newBuilder().build()); - EasyMock.replay(rpcFactoryMock, rpcMock); + EasyMock.replay(rpcMock); - Datastore datastore = datastoreOptions.getService(); datastore.runInTransaction(transaction -> null); - // Verify latency metric was recorded with status OK - Optional latencyMetric = findMetric(LATENCY_METRIC_NAME); + Collection metrics = metricReader.collectAllMetrics(); + + Optional latencyMetric = + findMetric(metrics, TelemetryConstants.METRIC_NAME_TRANSACTION_LATENCY); assertThat(latencyMetric.isPresent()).isTrue(); HistogramPointData point = latencyMetric.get().getHistogramData().getPoints().stream().findFirst().orElse(null); assertThat(point).isNotNull(); assertThat(point.getCount()).isEqualTo(1); - assertThat(point.getSum()).isAtLeast(0.0); - assertThat(point.getAttributes().get(AttributeKey.stringKey("status"))).isEqualTo("OK"); - assertThat(point.getAttributes().get(AttributeKey.stringKey("method"))) - .isEqualTo(TelemetryConstants.METHOD_TRANSACTION_RUN); - assertThat(point.getAttributes().get(AttributeKey.stringKey("project_id"))) - .isEqualTo(PROJECT_ID); - assertThat(point.getAttributes().get(AttributeKey.stringKey("database_id"))).isNotNull(); - - EasyMock.verify(rpcFactoryMock, rpcMock); + + assertThat( + dataContainsStringAttribute( + point, TelemetryConstants.ATTRIBUTES_KEY_STATUS, StatusCode.Code.OK.toString())) + .isTrue(); + assertThat( + dataContainsStringAttribute( + point, + TelemetryConstants.ATTRIBUTES_KEY_METHOD, + TelemetryConstants.METHOD_TRANSACTION_RUN)) + .isTrue(); + assertThat( + dataContainsStringAttribute( + point, TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, PROJECT_ID)) + .isTrue(); + assertThat( + dataContainsStringAttribute( + point, TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, DATABASE_ID)) + .isTrue(); + + EasyMock.verify(rpcMock); } @Test public void runInTransaction_recordsPerAttemptCountOnSuccess() { - // Mock a successful transaction: begin -> commit EasyMock.expect(rpcMock.beginTransaction(EasyMock.anyObject(BeginTransactionRequest.class))) .andReturn(BeginTransactionResponse.getDefaultInstance()); EasyMock.expect(rpcMock.commit(EasyMock.anyObject(CommitRequest.class))) .andReturn(CommitResponse.newBuilder().build()); - EasyMock.replay(rpcFactoryMock, rpcMock); + EasyMock.replay(rpcMock); - Datastore datastore = datastoreOptions.getService(); datastore.runInTransaction(transaction -> null); - // Verify attempt count was recorded once with status OK - Optional attemptMetric = findMetric(ATTEMPT_COUNT_METRIC_NAME); + Collection metrics = metricReader.collectAllMetrics(); + + Optional attemptMetric = + findMetric(metrics, TelemetryConstants.METRIC_NAME_TRANSACTION_ATTEMPT_COUNT); assertThat(attemptMetric.isPresent()).isTrue(); LongPointData point = - attemptMetric.get().getLongSumData().getPoints().stream() - .filter(p -> "OK".equals(p.getAttributes().get(AttributeKey.stringKey("status")))) - .findFirst() - .orElse(null); + attemptMetric.get().getLongSumData().getPoints().stream().findFirst().orElse(null); assertThat(point).isNotNull(); assertThat(point.getValue()).isEqualTo(1); - assertThat(point.getAttributes().get(AttributeKey.stringKey("method"))) - .isEqualTo(TelemetryConstants.METHOD_TRANSACTION_COMMIT); - EasyMock.verify(rpcFactoryMock, rpcMock); + assertThat( + dataContainsStringAttribute( + point, TelemetryConstants.ATTRIBUTES_KEY_STATUS, StatusCode.Code.OK.toString())) + .isTrue(); + assertThat( + dataContainsStringAttribute( + point, + TelemetryConstants.ATTRIBUTES_KEY_METHOD, + TelemetryConstants.METHOD_TRANSACTION_COMMIT)) + .isTrue(); + assertThat( + dataContainsStringAttribute( + point, TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, PROJECT_ID)) + .isTrue(); + assertThat( + dataContainsStringAttribute( + point, TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, DATABASE_ID)) + .isTrue(); + + EasyMock.verify(rpcMock); } @Test public void runInTransaction_recordsPerAttemptOnRetry() { + String abortedStatusCodeString = StatusCode.Code.ABORTED.toString(); + String okStatusCodeString = StatusCode.Code.OK.toString(); // First attempt: begin -> ABORTED -> rollback EasyMock.expect(rpcMock.beginTransaction(EasyMock.anyObject(BeginTransactionRequest.class))) .andReturn(BeginTransactionResponse.getDefaultInstance()); @@ -163,9 +206,7 @@ public void runInTransaction_recordsPerAttemptOnRetry() { .andReturn(BeginTransactionResponse.getDefaultInstance()); EasyMock.expect(rpcMock.commit(EasyMock.anyObject(CommitRequest.class))) .andReturn(CommitResponse.newBuilder().build()); - EasyMock.replay(rpcFactoryMock, rpcMock); - - Datastore datastore = datastoreOptions.getService(); + EasyMock.replay(rpcMock); TransactionOptions options = TransactionOptions.newBuilder() @@ -180,7 +221,7 @@ public void runInTransaction_recordsPerAttemptOnRetry() { public Integer run(DatastoreReaderWriter transaction) { attempts++; if (attempts < 2) { - throw new DatastoreException(10, "", "ABORTED", false, null); + throw new DatastoreException(10, "", abortedStatusCodeString, false, null); } return attempts; } @@ -189,123 +230,255 @@ public Integer run(DatastoreReaderWriter transaction) { Integer result = datastore.runInTransaction(callable, options); assertThat(result).isEqualTo(2); + Collection metrics = metricReader.collectAllMetrics(); + // Verify attempt count has two data points: one with ABORTED and one with OK - Optional attemptMetric = findMetric(ATTEMPT_COUNT_METRIC_NAME); + Optional attemptMetric = + findMetric(metrics, TelemetryConstants.METRIC_NAME_TRANSACTION_ATTEMPT_COUNT); assertThat(attemptMetric.isPresent()).isTrue(); assertThat(attemptMetric.get().getLongSumData().getPoints()).hasSize(2); - // Verify ABORTED attempt + // Find the first point (this would match to ABORTED) LongPointData abortedPoint = attemptMetric.get().getLongSumData().getPoints().stream() - .filter(p -> "ABORTED".equals(p.getAttributes().get(AttributeKey.stringKey("status")))) + .filter( + p -> + abortedStatusCodeString.equals( + p.getAttributes() + .get(AttributeKey.stringKey(TelemetryConstants.ATTRIBUTES_KEY_STATUS)))) .findFirst() .orElse(null); assertThat(abortedPoint).isNotNull(); assertThat(abortedPoint.getValue()).isEqualTo(1); - // Verify OK attempt + assertThat( + dataContainsStringAttribute( + abortedPoint, TelemetryConstants.ATTRIBUTES_KEY_STATUS, abortedStatusCodeString)) + .isTrue(); + assertThat( + dataContainsStringAttribute( + abortedPoint, + TelemetryConstants.ATTRIBUTES_KEY_METHOD, + TelemetryConstants.METHOD_TRANSACTION_COMMIT)) + .isTrue(); + assertThat( + dataContainsStringAttribute( + abortedPoint, TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, PROJECT_ID)) + .isTrue(); + assertThat( + dataContainsStringAttribute( + abortedPoint, TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, DATABASE_ID)) + .isTrue(); + LongPointData okPoint = attemptMetric.get().getLongSumData().getPoints().stream() - .filter(p -> "OK".equals(p.getAttributes().get(AttributeKey.stringKey("status")))) + .filter( + p -> + okStatusCodeString.equals( + p.getAttributes() + .get(AttributeKey.stringKey(TelemetryConstants.ATTRIBUTES_KEY_STATUS)))) .findFirst() .orElse(null); assertThat(okPoint).isNotNull(); assertThat(okPoint.getValue()).isEqualTo(1); + assertThat( + dataContainsStringAttribute( + okPoint, TelemetryConstants.ATTRIBUTES_KEY_STATUS, okStatusCodeString)) + .isTrue(); + assertThat( + dataContainsStringAttribute( + okPoint, + TelemetryConstants.ATTRIBUTES_KEY_METHOD, + TelemetryConstants.METHOD_TRANSACTION_COMMIT)) + .isTrue(); + assertThat( + dataContainsStringAttribute( + okPoint, TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, PROJECT_ID)) + .isTrue(); + assertThat( + dataContainsStringAttribute( + okPoint, TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, DATABASE_ID)) + .isTrue(); + // Verify latency was recorded with OK (overall transaction succeeded) - Optional latencyMetric = findMetric(LATENCY_METRIC_NAME); + Optional latencyMetric = + findMetric(metrics, TelemetryConstants.METRIC_NAME_TRANSACTION_LATENCY); assertThat(latencyMetric.isPresent()).isTrue(); HistogramPointData latencyPoint = latencyMetric.get().getHistogramData().getPoints().stream().findFirst().orElse(null); assertThat(latencyPoint).isNotNull(); - assertThat(latencyPoint.getAttributes().get(AttributeKey.stringKey("status"))).isEqualTo("OK"); - - EasyMock.verify(rpcFactoryMock, rpcMock); + assertThat( + latencyPoint + .getAttributes() + .get(AttributeKey.stringKey(TelemetryConstants.ATTRIBUTES_KEY_STATUS))) + .isEqualTo(okStatusCodeString); + + assertThat( + dataContainsStringAttribute( + latencyPoint, TelemetryConstants.ATTRIBUTES_KEY_STATUS, okStatusCodeString)) + .isTrue(); + assertThat( + dataContainsStringAttribute( + latencyPoint, + TelemetryConstants.ATTRIBUTES_KEY_METHOD, + TelemetryConstants.METHOD_TRANSACTION_RUN)) + .isTrue(); + assertThat( + dataContainsStringAttribute( + latencyPoint, TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, PROJECT_ID)) + .isTrue(); + assertThat( + dataContainsStringAttribute( + latencyPoint, TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, DATABASE_ID)) + .isTrue(); + + EasyMock.verify(rpcMock); } @Test - public void runInTransaction_recordsGrpcStatusCodeOnFailure() { - // This test uses a separate set of nice mocks since the retry loop makes - // multiple begin/rollback calls whose exact count depends on retry settings. - DatastoreRpcFactory localRpcFactoryMock = EasyMock.createNiceMock(DatastoreRpcFactory.class); - DatastoreRpc localRpcMock = EasyMock.createNiceMock(DatastoreRpc.class); - - InMemoryMetricReader localMetricReader = InMemoryMetricReader.create(); - SdkMeterProvider localMeterProvider = - SdkMeterProvider.builder().registerMetricReader(localMetricReader).build(); - OpenTelemetrySdk localOpenTelemetry = - OpenTelemetrySdk.builder().setMeterProvider(localMeterProvider).build(); - - DatastoreOptions localOptions = - DatastoreOptions.newBuilder() - .setProjectId(PROJECT_ID) - .setCredentials(NoCredentials.getInstance()) - .setRetrySettings(ServiceOptions.getDefaultRetrySettings()) - .setServiceRpcFactory(localRpcFactoryMock) - .setOpenTelemetryOptions( - DatastoreOpenTelemetryOptions.newBuilder() - .setMetricsEnabled(true) - .setOpenTelemetry(localOpenTelemetry) - .build()) - .build(); + public void runInTransaction_recordsStatusCodeOnFailure() { + String abortedStatusCodeString = StatusCode.Code.ABORTED.toString(); + String cancelledStatusCodeString = StatusCode.Code.CANCELLED.toString(); - EasyMock.expect(localRpcFactoryMock.create(localOptions)).andReturn(localRpcMock); - EasyMock.expect( - localRpcMock.beginTransaction(EasyMock.anyObject(BeginTransactionRequest.class))) + // The retry loop makes multiple begin/rollback calls with unpredictable counts, so use + // anyTimes() rather than exact expectations. + EasyMock.expect(rpcMock.beginTransaction(EasyMock.anyObject(BeginTransactionRequest.class))) .andReturn(BeginTransactionResponse.getDefaultInstance()) .anyTimes(); - EasyMock.expect(localRpcMock.rollback(EasyMock.anyObject(RollbackRequest.class))) + EasyMock.expect(rpcMock.rollback(EasyMock.anyObject(RollbackRequest.class))) .andReturn(RollbackResponse.getDefaultInstance()) .anyTimes(); - EasyMock.replay(localRpcFactoryMock, localRpcMock); + EasyMock.replay(rpcMock); - Datastore datastore = localOptions.getService(); + Datastore.TransactionCallable callable = + new Datastore.TransactionCallable() { + private int attempts = 0; - Datastore.TransactionCallable callable = - transaction -> { - throw new DatastoreException(10, "ABORTED", "ABORTED", false, null); + @Override + public Integer run(DatastoreReaderWriter transaction) { + attempts++; + if (attempts < 2) { + throw new DatastoreException(10, "", abortedStatusCodeString, false, null); + } + throw new DatastoreException(1, "", cancelledStatusCodeString, false, null); + } }; assertThrows(DatastoreException.class, () -> datastore.runInTransaction(callable)); - // Verify that attempt count was recorded with ABORTED status + Collection metrics = metricReader.collectAllMetrics(); + + // Verify attempt count was recorded with ABORTED status Optional attemptMetric = - localMetricReader.collectAllMetrics().stream() - .filter(m -> m.getName().equals(ATTEMPT_COUNT_METRIC_NAME)) - .findFirst(); + findMetric(metrics, TelemetryConstants.METRIC_NAME_TRANSACTION_ATTEMPT_COUNT); assertThat(attemptMetric.isPresent()).isTrue(); + Collection transactionAttemptCountData = + attemptMetric.get().getLongSumData().getPoints(); LongPointData abortedPoint = - attemptMetric.get().getLongSumData().getPoints().stream() - .filter(p -> "ABORTED".equals(p.getAttributes().get(AttributeKey.stringKey("status")))) + transactionAttemptCountData.stream() + .filter( + p -> + dataContainsStringAttribute( + p, TelemetryConstants.ATTRIBUTES_KEY_STATUS, abortedStatusCodeString)) .findFirst() .orElse(null); assertThat(abortedPoint).isNotNull(); assertThat(abortedPoint.getValue()).isAtLeast(1); - // Verify that latency was recorded with the failure status code + assertThat( + dataContainsStringAttribute( + abortedPoint, TelemetryConstants.ATTRIBUTES_KEY_STATUS, abortedStatusCodeString)) + .isTrue(); + assertThat( + dataContainsStringAttribute( + abortedPoint, + TelemetryConstants.ATTRIBUTES_KEY_METHOD, + TelemetryConstants.METHOD_TRANSACTION_COMMIT)) + .isTrue(); + assertThat( + dataContainsStringAttribute( + abortedPoint, TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, PROJECT_ID)) + .isTrue(); + assertThat( + dataContainsStringAttribute( + abortedPoint, TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, DATABASE_ID)) + .isTrue(); + + LongPointData cancelledPoint = + transactionAttemptCountData.stream() + .filter( + p -> + dataContainsStringAttribute( + p, TelemetryConstants.ATTRIBUTES_KEY_STATUS, cancelledStatusCodeString)) + .findFirst() + .orElse(null); + assertThat(cancelledPoint).isNotNull(); + assertThat(cancelledPoint.getValue()).isAtLeast(1); + + assertThat( + dataContainsStringAttribute( + cancelledPoint, + TelemetryConstants.ATTRIBUTES_KEY_STATUS, + cancelledStatusCodeString)) + .isTrue(); + assertThat( + dataContainsStringAttribute( + cancelledPoint, + TelemetryConstants.ATTRIBUTES_KEY_METHOD, + TelemetryConstants.METHOD_TRANSACTION_COMMIT)) + .isTrue(); + assertThat( + dataContainsStringAttribute( + cancelledPoint, TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, PROJECT_ID)) + .isTrue(); + assertThat( + dataContainsStringAttribute( + cancelledPoint, TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, DATABASE_ID)) + .isTrue(); + + // Verify latency was recorded with the failure status code Optional latencyMetric = - localMetricReader.collectAllMetrics().stream() - .filter(m -> m.getName().equals(LATENCY_METRIC_NAME)) - .findFirst(); + findMetric(metrics, TelemetryConstants.METRIC_NAME_TRANSACTION_LATENCY); assertThat(latencyMetric.isPresent()).isTrue(); HistogramPointData latencyPoint = - latencyMetric.get().getHistogramData().getPoints().stream().findFirst().orElse(null); + latencyMetric.get().getHistogramData().getPoints().stream() + .filter( + p -> + dataContainsStringAttribute( + p, TelemetryConstants.ATTRIBUTES_KEY_STATUS, cancelledStatusCodeString)) + .findFirst() + .orElse(null); assertThat(latencyPoint).isNotNull(); - assertThat(latencyPoint.getAttributes().get(AttributeKey.stringKey("status"))) - .isEqualTo("ABORTED"); + + assertThat( + dataContainsStringAttribute( + latencyPoint, TelemetryConstants.ATTRIBUTES_KEY_STATUS, cancelledStatusCodeString)) + .isTrue(); + assertThat( + dataContainsStringAttribute( + latencyPoint, + TelemetryConstants.ATTRIBUTES_KEY_METHOD, + TelemetryConstants.METHOD_TRANSACTION_RUN)) + .isTrue(); + assertThat( + dataContainsStringAttribute( + latencyPoint, TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, PROJECT_ID)) + .isTrue(); + assertThat( + dataContainsStringAttribute( + latencyPoint, TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, DATABASE_ID)) + .isTrue(); } @Test public void runInTransaction_withTransactionOptions_recordsMetrics() { - // Verify metrics are recorded when calling the overload with TransactionOptions EasyMock.expect(rpcMock.beginTransaction(EasyMock.anyObject(BeginTransactionRequest.class))) .andReturn(BeginTransactionResponse.getDefaultInstance()); EasyMock.expect(rpcMock.commit(EasyMock.anyObject(CommitRequest.class))) .andReturn(CommitResponse.newBuilder().build()); - EasyMock.replay(rpcFactoryMock, rpcMock); - - Datastore datastore = datastoreOptions.getService(); + EasyMock.replay(rpcMock); TransactionOptions options = TransactionOptions.newBuilder() @@ -313,21 +486,26 @@ public void runInTransaction_withTransactionOptions_recordsMetrics() { .build(); datastore.runInTransaction(transaction -> null, options); - // Verify both metrics were recorded - assertThat(findMetric(LATENCY_METRIC_NAME).isPresent()).isTrue(); - assertThat(findMetric(ATTEMPT_COUNT_METRIC_NAME).isPresent()).isTrue(); + Collection metrics = metricReader.collectAllMetrics(); + + Optional transactionLatencyMetric = + findMetric(metrics, TelemetryConstants.METRIC_NAME_TRANSACTION_LATENCY); + assertThat(transactionLatencyMetric.isPresent()).isTrue(); + Optional transactionAttemptCountMetric = + findMetric(metrics, TelemetryConstants.METRIC_NAME_TRANSACTION_ATTEMPT_COUNT); + assertThat(transactionAttemptCountMetric.isPresent()).isTrue(); - EasyMock.verify(rpcFactoryMock, rpcMock); + EasyMock.verify(rpcMock); } - /** - * Finds a specific metric by name from the in-memory metric reader. - * - * @param metricName The fully qualified name of the metric to find. - * @return An {@link Optional} containing the {@link MetricData} if found. - */ - private Optional findMetric(String metricName) { - Collection metrics = metricReader.collectAllMetrics(); + private static Optional findMetric( + Collection metrics, String metricName) { return metrics.stream().filter(m -> m.getName().equals(metricName)).findFirst(); } + + private static boolean dataContainsStringAttribute( + PointData p, String attributeKey, String matchedAttributeValue) { + String attributeValue = p.getAttributes().get(AttributeKey.stringKey(attributeKey)); + return matchedAttributeValue.equals(attributeValue); + } } From 2266f7e34ecfab16ce8a3040766ca0b6478d028a Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Fri, 6 Mar 2026 14:23:06 -0500 Subject: [PATCH 3/6] chore: Refactor tests to create Otel objects --- .../telemetry/TelemetryConstants.java | 8 +++++- .../telemetry/MetricsRecorderTest.java | 26 ++++--------------- .../OpenTelemetryMetricsRecorderTest.java | 19 +++----------- 3 files changed, 16 insertions(+), 37 deletions(-) diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java index 32c08de88e5c..b352a700ee15 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java @@ -18,7 +18,13 @@ import com.google.api.core.InternalApi; -/** Internal telemetry constants shared between OpenTelemetry tracing and metrics. */ +/** + * Internal telemetry constants shared between OpenTelemetry tracing and metrics. + * + *

Warning: This is intended to be an internal API and is not intended for external use. + * This is public solely for implementation purposes and does not promise any backwards + * compatibility. + */ @InternalApi public class TelemetryConstants { static final String SERVICE_NAME = "datastore.googleapis.com"; diff --git a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/MetricsRecorderTest.java b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/MetricsRecorderTest.java index c2543dd2cfb6..51c24b8df30f 100644 --- a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/MetricsRecorderTest.java +++ b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/MetricsRecorderTest.java @@ -26,24 +26,16 @@ import org.junit.runner.RunWith; import org.junit.runners.JUnit4; -/** - * Tests for {@link MetricsRecorder#getInstance(DatastoreOpenTelemetryOptions)}. - * - *

Note: Since {@code setMetricsEnabled()} is package-private on {@link - * DatastoreOpenTelemetryOptions.Builder}, these tests can only verify the default (metrics - * disabled) behavior and the behavior when an explicit OpenTelemetry instance is provided. The - * metrics-enabled paths are exercised through the {@link DatastoreImplMetricsTest} which operates - * in the {@code com.google.cloud.datastore} package. - */ +/** Tests for {@link MetricsRecorder#getInstance(DatastoreOpenTelemetryOptions)}. */ @RunWith(JUnit4.class) public class MetricsRecorderTest { + // TODO(lawrenceqiu): For now, the default behavior is no-op. Add a test for + // instance being OpenTelemetryMetricsRecorder later (visibility changes) @Test public void defaultOptionsReturnNoOp() { DatastoreOpenTelemetryOptions options = DatastoreOpenTelemetryOptions.newBuilder().build(); - MetricsRecorder recorder = MetricsRecorder.getInstance(options); - assertThat(recorder).isInstanceOf(NoOpMetricsRecorder.class); } @@ -52,20 +44,12 @@ public void tracingEnabledButMetricsDisabledReturnsNoOp() { // Enabling tracing alone should not enable metrics DatastoreOpenTelemetryOptions options = DatastoreOpenTelemetryOptions.newBuilder().setTracingEnabled(true).build(); - MetricsRecorder recorder = MetricsRecorder.getInstance(options); - assertThat(recorder).isInstanceOf(NoOpMetricsRecorder.class); } - @Test - public void noOpRecorderDoesNotThrow() { - // Verify NoOpMetricsRecorder methods do not throw - NoOpMetricsRecorder recorder = new NoOpMetricsRecorder(); - recorder.recordTransactionLatency(100.0, null); - recorder.recordTransactionAttemptCount(1, null); - } - + // TODO(lawrenceqiu): Temporary test to ensure that OpenTelemetryMetricsRecorder can + // be created by the DatastoreOpenTelemetryOptions and creates with Otel object @Test public void openTelemetryRecorderCreatedWithExplicitOpenTelemetry() { InMemoryMetricReader metricReader = InMemoryMetricReader.create(); diff --git a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorderTest.java b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorderTest.java index 85a449827851..d9c690680e74 100644 --- a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorderTest.java +++ b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorderTest.java @@ -62,8 +62,7 @@ public void recordTransactionLatency_recordsHistogramWithAttributes() { Collection metrics = metricReader.collectAllMetrics(); MetricData latencyMetric = metrics.stream() - .filter( - m -> m.getName().equals(TelemetryConstants.SERVICE_NAME + "/transaction_latency")) + .filter(m -> m.getName().equals(TelemetryConstants.METRIC_NAME_TRANSACTION_LATENCY)) .findFirst() .orElse(null); @@ -101,9 +100,7 @@ public void recordTransactionAttemptCount_recordsCounterWithAttributes() { MetricData attemptMetric = metrics.stream() .filter( - m -> - m.getName() - .equals(TelemetryConstants.SERVICE_NAME + "/transaction_attempt_count")) + m -> m.getName().equals(TelemetryConstants.METRIC_NAME_TRANSACTION_ATTEMPT_COUNT)) .findFirst() .orElse(null); @@ -148,9 +145,7 @@ public void recordTransactionAttemptCount_multipleAttempts_accumulates() { MetricData attemptMetric = metrics.stream() .filter( - m -> - m.getName() - .equals(TelemetryConstants.SERVICE_NAME + "/transaction_attempt_count")) + m -> m.getName().equals(TelemetryConstants.METRIC_NAME_TRANSACTION_ATTEMPT_COUNT)) .findFirst() .orElse(null); @@ -161,16 +156,10 @@ public void recordTransactionAttemptCount_multipleAttempts_accumulates() { } @Test - public void recordTransactionLatency_nullAttributes_doesNotThrow() { - // Should not throw even when attributes are null + public void recordTransactionLatency_nullAttributes() { recorder.recordTransactionLatency(100.0, null); Collection metrics = metricReader.collectAllMetrics(); assertThat(metrics).isNotEmpty(); } - - @Test - public void getOpenTelemetry_returnsConfiguredInstance() { - assertThat(recorder.getOpenTelemetry()).isNotNull(); - } } From 6a981e401e58a127d2cb25104528224092854a03 Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Fri, 6 Mar 2026 14:33:48 -0500 Subject: [PATCH 4/6] chore: Make extractStatus method package-private --- .../java/com/google/cloud/datastore/DatastoreException.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreException.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreException.java index 1be84b859c6c..6312bc76bb6e 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreException.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreException.java @@ -176,7 +176,7 @@ static DatastoreException propagateUserException(Exception ex) { * @param throwable the throwable to extract the status code from * @return the status code name, or "UNKNOWN" if not determinable */ - public static String extractStatusCode(Throwable throwable) { + static String extractStatusCode(Throwable throwable) { Throwable current = throwable; while (current != null) { if (current instanceof DatastoreException) { From 8edac53d95fc5c99fe0f43b0f08ef2b1aa69e5c9 Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Mon, 9 Mar 2026 12:47:03 -0400 Subject: [PATCH 5/6] chore: Add transport as an attribute --- .../google/cloud/datastore/DatastoreImpl.java | 11 +++++++++-- .../telemetry/TelemetryConstants.java | 18 ++++++++++++++++-- 2 files changed, 25 insertions(+), 4 deletions(-) diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java index dcf142c81916..39447c7eeb41 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java @@ -44,6 +44,7 @@ import com.google.cloud.RetryHelper; import com.google.cloud.RetryHelper.RetryHelperException; import com.google.cloud.ServiceOptions; +import com.google.cloud.TransportOptions; import com.google.cloud.datastore.execution.AggregationQueryExecutor; import com.google.cloud.datastore.spi.v1.DatastoreRpc; import com.google.cloud.datastore.telemetry.MetricsRecorder; @@ -215,7 +216,7 @@ public T call() throws DatastoreException { } throw DatastoreException.propagateUserException(ex); } finally { - recordAttempt(attemptStatus); + recordAttempt(attemptStatus, datastore.getOptions().getTransportOptions()); // If the transaction is active, then commit the rollback. If it was already successfully // rolled back, the transaction is inactive (prevents rolling back an already rolled back // transaction). @@ -233,7 +234,7 @@ public T call() throws DatastoreException { * Records a single transaction commit attempt with the given status code. This is called once * per invocation of {@link #call()}, capturing the outcome of each individual commit attempt. */ - private void recordAttempt(String status) { + private void recordAttempt(String status, TransportOptions transportOptions) { Map attributes = new HashMap<>(); attributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, status); attributes.put( @@ -242,6 +243,9 @@ private void recordAttempt(String status) { TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, datastore.getOptions().getProjectId()); attributes.put( TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, datastore.getOptions().getDatabaseId()); + attributes.put( + TelemetryConstants.ATTRIBUTES_KEY_TRANSPORT, + TelemetryConstants.getTransportName(transportOptions)); metricsRecorder.recordTransactionAttemptCount(1, attributes); } } @@ -284,6 +288,9 @@ public T runInTransaction( TelemetryConstants.ATTRIBUTES_KEY_METHOD, TelemetryConstants.METHOD_TRANSACTION_RUN); attributes.put(TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, getOptions().getProjectId()); attributes.put(TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, getOptions().getDatabaseId()); + attributes.put( + TelemetryConstants.ATTRIBUTES_KEY_TRANSPORT, + TelemetryConstants.getTransportName(getOptions().getTransportOptions())); metricsRecorder.recordTransactionLatency(latencyMs, attributes); span.end(); } diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java index b352a700ee15..f948631d6ac2 100644 --- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java +++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java @@ -17,6 +17,8 @@ package com.google.cloud.datastore.telemetry; import com.google.api.core.InternalApi; +import com.google.cloud.TransportOptions; +import com.google.cloud.grpc.GrpcTransportOptions; /** * Internal telemetry constants shared between OpenTelemetry tracing and metrics. @@ -51,13 +53,17 @@ public class TelemetryConstants { /** Attribute key for the Datastore database ID. */ public static final String ATTRIBUTES_KEY_DATABASE_ID = "database_id"; + public static final String ATTRIBUTES_KEY_LIBRARY_VERSION = "library_version"; + + public static final String ATTRIBUTES_KEY_TRANSPORT = "transport"; + /** Metric name for the total latency of a transaction. */ public static final String METRIC_NAME_TRANSACTION_LATENCY = - SERVICE_NAME + "/transaction_latency"; + SERVICE_NAME + "/client/transaction_latency"; /** Metric name for the number of attempts a transaction took. */ public static final String METRIC_NAME_TRANSACTION_ATTEMPT_COUNT = - SERVICE_NAME + "/transaction_attempt_count"; + SERVICE_NAME + "/client/transaction_attempt_count"; /* TODO(lawrenceqiu): For now, these are a duplicate of method names in TraceUtil. Those will use these eventually */ // Format is not SnakeCase to keep backward compatibility with the existing values TraceUtil spans @@ -81,4 +87,12 @@ public class TelemetryConstants { public static final String METHOD_SUBMIT = "submit"; private TelemetryConstants() {} + + public static String getTransportName(TransportOptions transportOptions) { + if (transportOptions instanceof GrpcTransportOptions) { + return "grpc"; + } else { + return "http"; + } + } } From edb1b867d6f02539b338f42b9d571b2f095d2160 Mon Sep 17 00:00:00 2001 From: Lawrence Qiu Date: Mon, 9 Mar 2026 14:50:18 -0400 Subject: [PATCH 6/6] chore: Add opentelemetry-sdk-metrics as test-scope dep --- java-datastore/google-cloud-datastore/pom.xml | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/java-datastore/google-cloud-datastore/pom.xml b/java-datastore/google-cloud-datastore/pom.xml index f286775f096d..6057cf0e8314 100644 --- a/java-datastore/google-cloud-datastore/pom.xml +++ b/java-datastore/google-cloud-datastore/pom.xml @@ -213,6 +213,11 @@ opentelemetry-sdk-trace test + + io.opentelemetry + opentelemetry-sdk-metrics + test + io.opentelemetry opentelemetry-semconv