io.opentelemetry
opentelemetry-semconv
diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreException.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreException.java
index 44bde2c107ff..6312bc76bb6e 100644
--- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreException.java
+++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreException.java
@@ -160,4 +160,33 @@ static DatastoreException throwInvalidRequest(String massage, Object... params)
static DatastoreException propagateUserException(Exception ex) {
throw new DatastoreException(BaseServiceException.UNKNOWN_CODE, ex.getMessage(), null, ex);
}
+
+ /**
+ * Extracts the status code name from the given throwable. Walks the exception cause chain looking
+ * for a {@link DatastoreException} that carries a reason string representing the status code
+ * (e.g. "ABORTED", "UNAVAILABLE"). The reason is set from {@link
+ * com.google.api.gax.rpc.StatusCode.Code} which is transport-neutral, supporting both gRPC and
+ * HttpJson. Falls back to "UNKNOWN" if the status cannot be determined.
+ *
+ * Note: Some {@link DatastoreException} instances are constructed without a reason (e.g. via
+ * {@link DatastoreException#DatastoreException(int, String, Throwable)}). If all {@link
+ * DatastoreException} instances in the cause chain have a null or empty reason, this method
+ * returns "UNKNOWN" even if the underlying error carries a meaningful status.
+ *
+ * @param throwable the throwable to extract the status code from
+ * @return the status code name, or "UNKNOWN" if not determinable
+ */
+ static String extractStatusCode(Throwable throwable) {
+ Throwable current = throwable;
+ while (current != null) {
+ if (current instanceof DatastoreException) {
+ String reason = ((DatastoreException) current).getReason();
+ if (!Strings.isNullOrEmpty(reason)) {
+ return reason;
+ }
+ }
+ current = current.getCause();
+ }
+ return StatusCode.Code.UNKNOWN.toString();
+ }
}
diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java
index 63d09b0a0ed4..39447c7eeb41 100644
--- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java
+++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreImpl.java
@@ -38,17 +38,22 @@
import com.google.api.core.BetaApi;
import com.google.api.gax.retrying.RetrySettings;
+import com.google.api.gax.rpc.StatusCode;
import com.google.cloud.BaseService;
import com.google.cloud.ExceptionHandler;
import com.google.cloud.RetryHelper;
import com.google.cloud.RetryHelper.RetryHelperException;
import com.google.cloud.ServiceOptions;
+import com.google.cloud.TransportOptions;
import com.google.cloud.datastore.execution.AggregationQueryExecutor;
import com.google.cloud.datastore.spi.v1.DatastoreRpc;
+import com.google.cloud.datastore.telemetry.MetricsRecorder;
+import com.google.cloud.datastore.telemetry.TelemetryConstants;
import com.google.cloud.datastore.telemetry.TraceUtil;
import com.google.cloud.datastore.telemetry.TraceUtil.Scope;
import com.google.common.base.MoreObjects;
import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
import com.google.common.collect.AbstractIterator;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
@@ -61,10 +66,12 @@
import com.google.datastore.v1.RunQueryResponse;
import com.google.datastore.v1.TransactionOptions;
import com.google.protobuf.ByteString;
+import io.grpc.Status;
import io.opentelemetry.context.Context;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
@@ -73,6 +80,7 @@
import java.util.Optional;
import java.util.Set;
import java.util.concurrent.Callable;
+import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@@ -89,6 +97,7 @@ final class DatastoreImpl extends BaseService implements Datas
private final com.google.cloud.datastore.telemetry.TraceUtil otelTraceUtil =
getOptions().getTraceUtil();
+ private final MetricsRecorder metricsRecorder = getOptions().getMetricsRecorder();
private final ReadOptionProtoPreparer readOptionProtoPreparer;
private final AggregationQueryExecutor aggregationQueryExecutor;
@@ -122,63 +131,28 @@ public Transaction newTransaction() {
return new TransactionImpl(this);
}
+ /**
+ * A Tracing callable that adds OpenTelemetry tracing context. Intended to be used for
+ * transactions and wraps {@link ReadWriteTransactionCallable} as the delegate. All transaction
+ * logic (begin, run, commit, rollback, metrics recording) is handled in the delegate (this solely
+ * handles tracing).
+ */
static class TracedReadWriteTransactionCallable implements Callable {
- private final Datastore datastore;
- private final TransactionCallable callable;
- private volatile TransactionOptions options;
- private volatile Transaction transaction;
-
+ private final ReadWriteTransactionCallable delegate;
private final TraceUtil.Span parentSpan;
TracedReadWriteTransactionCallable(
- Datastore datastore,
- TransactionCallable callable,
- TransactionOptions options,
+ ReadWriteTransactionCallable delegate,
@Nullable com.google.cloud.datastore.telemetry.TraceUtil.Span parentSpan) {
- this.datastore = datastore;
- this.callable = callable;
- this.options = options;
- this.transaction = null;
+ this.delegate = delegate;
this.parentSpan = parentSpan;
}
- Datastore getDatastore() {
- return datastore;
- }
-
- TransactionOptions getOptions() {
- return options;
- }
-
- Transaction getTransaction() {
- return transaction;
- }
-
- void setPrevTransactionId(ByteString transactionId) {
- TransactionOptions.ReadWrite readWrite =
- TransactionOptions.ReadWrite.newBuilder().setPreviousTransaction(transactionId).build();
- options = options.toBuilder().setReadWrite(readWrite).build();
- }
-
@Override
public T call() throws DatastoreException {
try (io.opentelemetry.context.Scope ignored =
Context.current().with(parentSpan.getSpan()).makeCurrent()) {
- transaction = datastore.newTransaction(options);
- T value = callable.run(transaction);
- transaction.commit();
- return value;
- } catch (Exception ex) {
- transaction.rollback();
- throw DatastoreException.propagateUserException(ex);
- } finally {
- if (transaction.isActive()) {
- transaction.rollback();
- }
- if (options != null
- && options.getModeCase().equals(TransactionOptions.ModeCase.READ_WRITE)) {
- setPrevTransactionId(transaction.getTransactionId());
- }
+ return delegate.call();
}
}
}
@@ -200,29 +174,22 @@ public boolean isClosed() {
static class ReadWriteTransactionCallable implements Callable {
private final Datastore datastore;
private final TransactionCallable callable;
+ private final MetricsRecorder metricsRecorder;
private volatile TransactionOptions options;
private volatile Transaction transaction;
ReadWriteTransactionCallable(
- Datastore datastore, TransactionCallable callable, TransactionOptions options) {
+ Datastore datastore,
+ TransactionCallable callable,
+ TransactionOptions options,
+ MetricsRecorder metricsRecorder) {
this.datastore = datastore;
this.callable = callable;
this.options = options;
+ this.metricsRecorder = metricsRecorder;
this.transaction = null;
}
- Datastore getDatastore() {
- return datastore;
- }
-
- TransactionOptions getOptions() {
- return options;
- }
-
- Transaction getTransaction() {
- return transaction;
- }
-
void setPrevTransactionId(ByteString transactionId) {
TransactionOptions.ReadWrite readWrite =
TransactionOptions.ReadWrite.newBuilder().setPreviousTransaction(transactionId).build();
@@ -231,15 +198,28 @@ void setPrevTransactionId(ByteString transactionId) {
@Override
public T call() throws DatastoreException {
+ String attemptStatus = StatusCode.Code.UNKNOWN.toString();
try {
transaction = datastore.newTransaction(options);
T value = callable.run(transaction);
transaction.commit();
+ attemptStatus = Status.Code.OK.toString();
return value;
} catch (Exception ex) {
- transaction.rollback();
+ attemptStatus = DatastoreException.extractStatusCode(ex);
+ // An exception here can stem from either `callable.run()` (before commit was attempted)
+ // or from `transaction.commit()`. If there is an exception thrown from either call site,
+ // then the transaction is still active. Check if it is still active (e.g. not commited)
+ // and roll back the transaction.
+ if (transaction.isActive()) {
+ transaction.rollback();
+ }
throw DatastoreException.propagateUserException(ex);
} finally {
+ recordAttempt(attemptStatus, datastore.getOptions().getTransportOptions());
+ // If the transaction is active, then commit the rollback. If it was already successfully
+ // rolled back, the transaction is inactive (prevents rolling back an already rolled back
+ // transaction).
if (transaction.isActive()) {
transaction.rollback();
}
@@ -249,28 +229,30 @@ public T call() throws DatastoreException {
}
}
}
+
+ /**
+ * Records a single transaction commit attempt with the given status code. This is called once
+ * per invocation of {@link #call()}, capturing the outcome of each individual commit attempt.
+ */
+ private void recordAttempt(String status, TransportOptions transportOptions) {
+ Map attributes = new HashMap<>();
+ attributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, status);
+ attributes.put(
+ TelemetryConstants.ATTRIBUTES_KEY_METHOD, TelemetryConstants.METHOD_TRANSACTION_COMMIT);
+ attributes.put(
+ TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, datastore.getOptions().getProjectId());
+ attributes.put(
+ TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, datastore.getOptions().getDatabaseId());
+ attributes.put(
+ TelemetryConstants.ATTRIBUTES_KEY_TRANSPORT,
+ TelemetryConstants.getTransportName(transportOptions));
+ metricsRecorder.recordTransactionAttemptCount(1, attributes);
+ }
}
@Override
public T runInTransaction(final TransactionCallable callable) {
- TraceUtil.Span span = otelTraceUtil.startSpan(SPAN_NAME_TRANSACTION_RUN);
- Callable transactionCallable =
- (getOptions().getOpenTelemetryOptions().isTracingEnabled()
- ? new TracedReadWriteTransactionCallable(
- this, callable, /* transactionOptions= */ null, span)
- : new ReadWriteTransactionCallable(this, callable, /* transactionOptions= */ null));
- try (Scope ignored = span.makeCurrent()) {
- return RetryHelper.runWithRetries(
- transactionCallable,
- retrySettings,
- TRANSACTION_EXCEPTION_HANDLER,
- getOptions().getClock());
- } catch (RetryHelperException e) {
- span.end(e);
- throw DatastoreException.translateAndThrow(e);
- } finally {
- span.end();
- }
+ return runInTransaction(callable, /* transactionOptions= */ null);
}
@Override
@@ -278,11 +260,16 @@ public T runInTransaction(
final TransactionCallable callable, TransactionOptions transactionOptions) {
TraceUtil.Span span = otelTraceUtil.startSpan(SPAN_NAME_TRANSACTION_RUN);
- Callable transactionCallable =
- (getOptions().getOpenTelemetryOptions().isTracingEnabled()
- ? new TracedReadWriteTransactionCallable(this, callable, transactionOptions, span)
- : new ReadWriteTransactionCallable(this, callable, transactionOptions));
+ ReadWriteTransactionCallable baseCallable =
+ new ReadWriteTransactionCallable<>(this, callable, transactionOptions, metricsRecorder);
+
+ Callable transactionCallable = baseCallable;
+ if (getOptions().getOpenTelemetryOptions().isTracingEnabled()) {
+ transactionCallable = new TracedReadWriteTransactionCallable<>(baseCallable, span);
+ }
+ String status = StatusCode.Code.OK.toString();
+ Stopwatch stopwatch = Stopwatch.createStarted();
try (Scope ignored = span.makeCurrent()) {
return RetryHelper.runWithRetries(
transactionCallable,
@@ -290,9 +277,21 @@ public T runInTransaction(
TRANSACTION_EXCEPTION_HANDLER,
getOptions().getClock());
} catch (RetryHelperException e) {
+ status = DatastoreException.extractStatusCode(e);
span.end(e);
throw DatastoreException.translateAndThrow(e);
} finally {
+ long latencyMs = stopwatch.elapsed(TimeUnit.MILLISECONDS);
+ Map attributes = new HashMap<>();
+ attributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, status);
+ attributes.put(
+ TelemetryConstants.ATTRIBUTES_KEY_METHOD, TelemetryConstants.METHOD_TRANSACTION_RUN);
+ attributes.put(TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, getOptions().getProjectId());
+ attributes.put(TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, getOptions().getDatabaseId());
+ attributes.put(
+ TelemetryConstants.ATTRIBUTES_KEY_TRANSPORT,
+ TelemetryConstants.getTransportName(getOptions().getTransportOptions()));
+ metricsRecorder.recordTransactionLatency(latencyMs, attributes);
span.end();
}
}
diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreOptions.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreOptions.java
index 242ce3b01776..1cd8e4038314 100644
--- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreOptions.java
+++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/DatastoreOptions.java
@@ -31,6 +31,7 @@
import com.google.cloud.datastore.spi.v1.DatastoreRpc;
import com.google.cloud.datastore.spi.v1.GrpcDatastoreRpc;
import com.google.cloud.datastore.spi.v1.HttpDatastoreRpc;
+import com.google.cloud.datastore.telemetry.MetricsRecorder;
import com.google.cloud.datastore.v1.DatastoreSettings;
import com.google.cloud.grpc.GrpcTransportOptions;
import com.google.cloud.http.HttpTransportOptions;
@@ -64,6 +65,7 @@ public class DatastoreOptions extends ServiceOptions {
private String namespace;
@@ -216,6 +223,7 @@ private DatastoreOptions(Builder builder) {
? builder.openTelemetryOptions
: DatastoreOpenTelemetryOptions.newBuilder().build();
this.traceUtil = com.google.cloud.datastore.telemetry.TraceUtil.getInstance(this);
+ this.metricsRecorder = MetricsRecorder.getInstance(openTelemetryOptions);
namespace = MoreObjects.firstNonNull(builder.namespace, defaultNamespace());
databaseId = MoreObjects.firstNonNull(builder.databaseId, DEFAULT_DATABASE_ID);
diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/MetricsRecorder.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/MetricsRecorder.java
index a8599e019b3c..6a0ea16b5dd6 100644
--- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/MetricsRecorder.java
+++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/MetricsRecorder.java
@@ -16,14 +16,22 @@
package com.google.cloud.datastore.telemetry;
+import com.google.api.core.InternalExtensionOnly;
import com.google.cloud.datastore.DatastoreOpenTelemetryOptions;
import io.opentelemetry.api.GlobalOpenTelemetry;
import io.opentelemetry.api.OpenTelemetry;
import java.util.Map;
import javax.annotation.Nonnull;
-/** Interface to record specific metric operations. */
-interface MetricsRecorder {
+/**
+ * Interface to record specific metric operations.
+ *
+ * Warning: This is intended to be an internal API and is not intended for external use.
+ * This is public solely for implementation purposes and does not promise any backwards
+ * compatibility.
+ */
+@InternalExtensionOnly
+public interface MetricsRecorder {
/** Records the total latency of a transaction in milliseconds. */
void recordTransactionLatency(double latencyMs, Map attributes);
diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorder.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorder.java
index 470ffe118b5c..e50edbd5f79f 100644
--- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorder.java
+++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorder.java
@@ -41,14 +41,14 @@ class OpenTelemetryMetricsRecorder implements MetricsRecorder {
this.transactionLatency =
meter
- .histogramBuilder(TelemetryConstants.SERVICE_NAME + "/transaction_latency")
- .setDescription("Total latency for successful transaction operations")
+ .histogramBuilder(TelemetryConstants.METRIC_NAME_TRANSACTION_LATENCY)
+ .setDescription("Total latency of transaction operations")
.setUnit("ms")
.build();
this.transactionAttemptCount =
meter
- .counterBuilder(TelemetryConstants.SERVICE_NAME + "/transaction_attempt_count")
+ .counterBuilder(TelemetryConstants.METRIC_NAME_TRANSACTION_ATTEMPT_COUNT)
.setDescription("Number of attempts to commit a transaction")
.build();
}
diff --git a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java
index 263b85526501..f948631d6ac2 100644
--- a/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java
+++ b/java-datastore/google-cloud-datastore/src/main/java/com/google/cloud/datastore/telemetry/TelemetryConstants.java
@@ -17,8 +17,16 @@
package com.google.cloud.datastore.telemetry;
import com.google.api.core.InternalApi;
+import com.google.cloud.TransportOptions;
+import com.google.cloud.grpc.GrpcTransportOptions;
-/** Internal telemetry constants shared between OpenTelemetry tracing and metrics. */
+/**
+ * Internal telemetry constants shared between OpenTelemetry tracing and metrics.
+ *
+ * Warning: This is intended to be an internal API and is not intended for external use.
+ * This is public solely for implementation purposes and does not promise any backwards
+ * compatibility.
+ */
@InternalApi
public class TelemetryConstants {
static final String SERVICE_NAME = "datastore.googleapis.com";
@@ -33,6 +41,30 @@ public class TelemetryConstants {
public static final String ATTRIBUTES_KEY_DEFERRED = "Deferred";
public static final String ATTRIBUTES_KEY_MORE_RESULTS = "more_results";
+ /** Attribute key for the gRPC status code (e.g. "OK", "ABORTED", "UNAVAILABLE"). */
+ public static final String ATTRIBUTES_KEY_STATUS = "status";
+
+ /** Attribute key for the RPC method name (e.g. "Transaction.Run"). */
+ public static final String ATTRIBUTES_KEY_METHOD = "method";
+
+ /** Attribute key for the GCP project ID. */
+ public static final String ATTRIBUTES_KEY_PROJECT_ID = "project_id";
+
+ /** Attribute key for the Datastore database ID. */
+ public static final String ATTRIBUTES_KEY_DATABASE_ID = "database_id";
+
+ public static final String ATTRIBUTES_KEY_LIBRARY_VERSION = "library_version";
+
+ public static final String ATTRIBUTES_KEY_TRANSPORT = "transport";
+
+ /** Metric name for the total latency of a transaction. */
+ public static final String METRIC_NAME_TRANSACTION_LATENCY =
+ SERVICE_NAME + "/client/transaction_latency";
+
+ /** Metric name for the number of attempts a transaction took. */
+ public static final String METRIC_NAME_TRANSACTION_ATTEMPT_COUNT =
+ SERVICE_NAME + "/client/transaction_attempt_count";
+
/* TODO(lawrenceqiu): For now, these are a duplicate of method names in TraceUtil. Those will use these eventually */
// Format is not SnakeCase to keep backward compatibility with the existing values TraceUtil spans
public static final String METHOD_ALLOCATE_IDS = "AllocateIds";
@@ -55,4 +87,12 @@ public class TelemetryConstants {
public static final String METHOD_SUBMIT = "submit";
private TelemetryConstants() {}
+
+ public static String getTransportName(TransportOptions transportOptions) {
+ if (transportOptions instanceof GrpcTransportOptions) {
+ return "grpc";
+ } else {
+ return "http";
+ }
+ }
}
diff --git a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java
new file mode 100644
index 000000000000..c8a0e079af7a
--- /dev/null
+++ b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/DatastoreImplMetricsTest.java
@@ -0,0 +1,511 @@
+/*
+ * Copyright 2026 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.datastore;
+
+import static com.google.common.truth.Truth.assertThat;
+import static org.junit.Assert.assertThrows;
+
+import com.google.api.gax.rpc.StatusCode;
+import com.google.cloud.NoCredentials;
+import com.google.cloud.ServiceOptions;
+import com.google.cloud.datastore.spi.DatastoreRpcFactory;
+import com.google.cloud.datastore.spi.v1.DatastoreRpc;
+import com.google.cloud.datastore.telemetry.TelemetryConstants;
+import com.google.datastore.v1.BeginTransactionRequest;
+import com.google.datastore.v1.BeginTransactionResponse;
+import com.google.datastore.v1.CommitRequest;
+import com.google.datastore.v1.CommitResponse;
+import com.google.datastore.v1.RollbackRequest;
+import com.google.datastore.v1.RollbackResponse;
+import com.google.datastore.v1.TransactionOptions;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.data.HistogramPointData;
+import io.opentelemetry.sdk.metrics.data.LongPointData;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.metrics.data.PointData;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import java.util.Collection;
+import java.util.Optional;
+import org.easymock.EasyMock;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Tests for transaction metrics recording in {@link DatastoreImpl}. These tests verify that
+ * transaction latency and per-attempt metrics are correctly recorded via the {@link
+ * com.google.cloud.datastore.telemetry.MetricsRecorder}.
+ */
+@RunWith(JUnit4.class)
+public class DatastoreImplMetricsTest {
+
+ private static final String PROJECT_ID = "test-project";
+ private static final String DATABASE_ID = "test-database";
+
+ private static InMemoryMetricReader metricReader;
+ private static DatastoreRpcFactory rpcFactoryMock;
+ private static DatastoreRpc rpcMock;
+ private static Datastore datastore;
+
+ @BeforeClass
+ public static void setUpClass() {
+ // Use delta temporality so each collectAllMetrics() call returns only new data and resets.
+ metricReader = InMemoryMetricReader.createDelta();
+ SdkMeterProvider meterProvider =
+ SdkMeterProvider.builder().registerMetricReader(metricReader).build();
+ OpenTelemetrySdk openTelemetry =
+ OpenTelemetrySdk.builder().setMeterProvider(meterProvider).build();
+
+ rpcFactoryMock = EasyMock.createStrictMock(DatastoreRpcFactory.class);
+ // Use a regular (non-strict) mock for rpcMock so that anyTimes() expectations work without
+ // enforcing call order — needed for retry tests with unpredictable call counts.
+ rpcMock = EasyMock.createMock(DatastoreRpc.class);
+
+ DatastoreOptions datastoreOptions =
+ DatastoreOptions.newBuilder()
+ .setProjectId(PROJECT_ID)
+ .setDatabaseId(DATABASE_ID)
+ .setCredentials(NoCredentials.getInstance())
+ .setRetrySettings(ServiceOptions.getDefaultRetrySettings())
+ .setServiceRpcFactory(rpcFactoryMock)
+ .setOpenTelemetryOptions(
+ DatastoreOpenTelemetryOptions.newBuilder()
+ .setMetricsEnabled(true)
+ .setOpenTelemetry(openTelemetry)
+ .build())
+ .build();
+
+ EasyMock.expect(rpcFactoryMock.create(datastoreOptions)).andReturn(rpcMock);
+ EasyMock.replay(rpcFactoryMock);
+ datastore = datastoreOptions.getService();
+ EasyMock.verify(rpcFactoryMock);
+ }
+
+ @Before
+ public void setUp() {
+ // Drain any metrics accumulated during @BeforeClass or previous tests.
+ metricReader.collectAllMetrics();
+ EasyMock.reset(rpcMock);
+ }
+
+ @Test
+ public void runInTransaction_recordsLatencyOnSuccess() {
+ EasyMock.expect(rpcMock.beginTransaction(EasyMock.anyObject(BeginTransactionRequest.class)))
+ .andReturn(BeginTransactionResponse.getDefaultInstance());
+ EasyMock.expect(rpcMock.commit(EasyMock.anyObject(CommitRequest.class)))
+ .andReturn(CommitResponse.newBuilder().build());
+ EasyMock.replay(rpcMock);
+
+ datastore.runInTransaction(transaction -> null);
+
+ Collection metrics = metricReader.collectAllMetrics();
+
+ Optional latencyMetric =
+ findMetric(metrics, TelemetryConstants.METRIC_NAME_TRANSACTION_LATENCY);
+ assertThat(latencyMetric.isPresent()).isTrue();
+
+ HistogramPointData point =
+ latencyMetric.get().getHistogramData().getPoints().stream().findFirst().orElse(null);
+ assertThat(point).isNotNull();
+ assertThat(point.getCount()).isEqualTo(1);
+
+ assertThat(
+ dataContainsStringAttribute(
+ point, TelemetryConstants.ATTRIBUTES_KEY_STATUS, StatusCode.Code.OK.toString()))
+ .isTrue();
+ assertThat(
+ dataContainsStringAttribute(
+ point,
+ TelemetryConstants.ATTRIBUTES_KEY_METHOD,
+ TelemetryConstants.METHOD_TRANSACTION_RUN))
+ .isTrue();
+ assertThat(
+ dataContainsStringAttribute(
+ point, TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, PROJECT_ID))
+ .isTrue();
+ assertThat(
+ dataContainsStringAttribute(
+ point, TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, DATABASE_ID))
+ .isTrue();
+
+ EasyMock.verify(rpcMock);
+ }
+
+ @Test
+ public void runInTransaction_recordsPerAttemptCountOnSuccess() {
+ EasyMock.expect(rpcMock.beginTransaction(EasyMock.anyObject(BeginTransactionRequest.class)))
+ .andReturn(BeginTransactionResponse.getDefaultInstance());
+ EasyMock.expect(rpcMock.commit(EasyMock.anyObject(CommitRequest.class)))
+ .andReturn(CommitResponse.newBuilder().build());
+ EasyMock.replay(rpcMock);
+
+ datastore.runInTransaction(transaction -> null);
+
+ Collection metrics = metricReader.collectAllMetrics();
+
+ Optional attemptMetric =
+ findMetric(metrics, TelemetryConstants.METRIC_NAME_TRANSACTION_ATTEMPT_COUNT);
+ assertThat(attemptMetric.isPresent()).isTrue();
+
+ LongPointData point =
+ attemptMetric.get().getLongSumData().getPoints().stream().findFirst().orElse(null);
+ assertThat(point).isNotNull();
+ assertThat(point.getValue()).isEqualTo(1);
+
+ assertThat(
+ dataContainsStringAttribute(
+ point, TelemetryConstants.ATTRIBUTES_KEY_STATUS, StatusCode.Code.OK.toString()))
+ .isTrue();
+ assertThat(
+ dataContainsStringAttribute(
+ point,
+ TelemetryConstants.ATTRIBUTES_KEY_METHOD,
+ TelemetryConstants.METHOD_TRANSACTION_COMMIT))
+ .isTrue();
+ assertThat(
+ dataContainsStringAttribute(
+ point, TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, PROJECT_ID))
+ .isTrue();
+ assertThat(
+ dataContainsStringAttribute(
+ point, TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, DATABASE_ID))
+ .isTrue();
+
+ EasyMock.verify(rpcMock);
+ }
+
+ @Test
+ public void runInTransaction_recordsPerAttemptOnRetry() {
+ String abortedStatusCodeString = StatusCode.Code.ABORTED.toString();
+ String okStatusCodeString = StatusCode.Code.OK.toString();
+ // First attempt: begin -> ABORTED -> rollback
+ EasyMock.expect(rpcMock.beginTransaction(EasyMock.anyObject(BeginTransactionRequest.class)))
+ .andReturn(BeginTransactionResponse.getDefaultInstance());
+ EasyMock.expect(rpcMock.rollback(EasyMock.anyObject(RollbackRequest.class)))
+ .andReturn(RollbackResponse.getDefaultInstance());
+
+ // Second attempt: begin -> commit (success)
+ EasyMock.expect(rpcMock.beginTransaction(EasyMock.anyObject(BeginTransactionRequest.class)))
+ .andReturn(BeginTransactionResponse.getDefaultInstance());
+ EasyMock.expect(rpcMock.commit(EasyMock.anyObject(CommitRequest.class)))
+ .andReturn(CommitResponse.newBuilder().build());
+ EasyMock.replay(rpcMock);
+
+ TransactionOptions options =
+ TransactionOptions.newBuilder()
+ .setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance())
+ .build();
+
+ Datastore.TransactionCallable callable =
+ new Datastore.TransactionCallable() {
+ private int attempts = 0;
+
+ @Override
+ public Integer run(DatastoreReaderWriter transaction) {
+ attempts++;
+ if (attempts < 2) {
+ throw new DatastoreException(10, "", abortedStatusCodeString, false, null);
+ }
+ return attempts;
+ }
+ };
+
+ Integer result = datastore.runInTransaction(callable, options);
+ assertThat(result).isEqualTo(2);
+
+ Collection metrics = metricReader.collectAllMetrics();
+
+ // Verify attempt count has two data points: one with ABORTED and one with OK
+ Optional attemptMetric =
+ findMetric(metrics, TelemetryConstants.METRIC_NAME_TRANSACTION_ATTEMPT_COUNT);
+ assertThat(attemptMetric.isPresent()).isTrue();
+ assertThat(attemptMetric.get().getLongSumData().getPoints()).hasSize(2);
+
+ // Find the first point (this would match to ABORTED)
+ LongPointData abortedPoint =
+ attemptMetric.get().getLongSumData().getPoints().stream()
+ .filter(
+ p ->
+ abortedStatusCodeString.equals(
+ p.getAttributes()
+ .get(AttributeKey.stringKey(TelemetryConstants.ATTRIBUTES_KEY_STATUS))))
+ .findFirst()
+ .orElse(null);
+ assertThat(abortedPoint).isNotNull();
+ assertThat(abortedPoint.getValue()).isEqualTo(1);
+
+ assertThat(
+ dataContainsStringAttribute(
+ abortedPoint, TelemetryConstants.ATTRIBUTES_KEY_STATUS, abortedStatusCodeString))
+ .isTrue();
+ assertThat(
+ dataContainsStringAttribute(
+ abortedPoint,
+ TelemetryConstants.ATTRIBUTES_KEY_METHOD,
+ TelemetryConstants.METHOD_TRANSACTION_COMMIT))
+ .isTrue();
+ assertThat(
+ dataContainsStringAttribute(
+ abortedPoint, TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, PROJECT_ID))
+ .isTrue();
+ assertThat(
+ dataContainsStringAttribute(
+ abortedPoint, TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, DATABASE_ID))
+ .isTrue();
+
+ LongPointData okPoint =
+ attemptMetric.get().getLongSumData().getPoints().stream()
+ .filter(
+ p ->
+ okStatusCodeString.equals(
+ p.getAttributes()
+ .get(AttributeKey.stringKey(TelemetryConstants.ATTRIBUTES_KEY_STATUS))))
+ .findFirst()
+ .orElse(null);
+ assertThat(okPoint).isNotNull();
+ assertThat(okPoint.getValue()).isEqualTo(1);
+
+ assertThat(
+ dataContainsStringAttribute(
+ okPoint, TelemetryConstants.ATTRIBUTES_KEY_STATUS, okStatusCodeString))
+ .isTrue();
+ assertThat(
+ dataContainsStringAttribute(
+ okPoint,
+ TelemetryConstants.ATTRIBUTES_KEY_METHOD,
+ TelemetryConstants.METHOD_TRANSACTION_COMMIT))
+ .isTrue();
+ assertThat(
+ dataContainsStringAttribute(
+ okPoint, TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, PROJECT_ID))
+ .isTrue();
+ assertThat(
+ dataContainsStringAttribute(
+ okPoint, TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, DATABASE_ID))
+ .isTrue();
+
+ // Verify latency was recorded with OK (overall transaction succeeded)
+ Optional latencyMetric =
+ findMetric(metrics, TelemetryConstants.METRIC_NAME_TRANSACTION_LATENCY);
+ assertThat(latencyMetric.isPresent()).isTrue();
+ HistogramPointData latencyPoint =
+ latencyMetric.get().getHistogramData().getPoints().stream().findFirst().orElse(null);
+ assertThat(latencyPoint).isNotNull();
+ assertThat(
+ latencyPoint
+ .getAttributes()
+ .get(AttributeKey.stringKey(TelemetryConstants.ATTRIBUTES_KEY_STATUS)))
+ .isEqualTo(okStatusCodeString);
+
+ assertThat(
+ dataContainsStringAttribute(
+ latencyPoint, TelemetryConstants.ATTRIBUTES_KEY_STATUS, okStatusCodeString))
+ .isTrue();
+ assertThat(
+ dataContainsStringAttribute(
+ latencyPoint,
+ TelemetryConstants.ATTRIBUTES_KEY_METHOD,
+ TelemetryConstants.METHOD_TRANSACTION_RUN))
+ .isTrue();
+ assertThat(
+ dataContainsStringAttribute(
+ latencyPoint, TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, PROJECT_ID))
+ .isTrue();
+ assertThat(
+ dataContainsStringAttribute(
+ latencyPoint, TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, DATABASE_ID))
+ .isTrue();
+
+ EasyMock.verify(rpcMock);
+ }
+
+ @Test
+ public void runInTransaction_recordsStatusCodeOnFailure() {
+ String abortedStatusCodeString = StatusCode.Code.ABORTED.toString();
+ String cancelledStatusCodeString = StatusCode.Code.CANCELLED.toString();
+
+ // The retry loop makes multiple begin/rollback calls with unpredictable counts, so use
+ // anyTimes() rather than exact expectations.
+ EasyMock.expect(rpcMock.beginTransaction(EasyMock.anyObject(BeginTransactionRequest.class)))
+ .andReturn(BeginTransactionResponse.getDefaultInstance())
+ .anyTimes();
+ EasyMock.expect(rpcMock.rollback(EasyMock.anyObject(RollbackRequest.class)))
+ .andReturn(RollbackResponse.getDefaultInstance())
+ .anyTimes();
+ EasyMock.replay(rpcMock);
+
+ Datastore.TransactionCallable callable =
+ new Datastore.TransactionCallable() {
+ private int attempts = 0;
+
+ @Override
+ public Integer run(DatastoreReaderWriter transaction) {
+ attempts++;
+ if (attempts < 2) {
+ throw new DatastoreException(10, "", abortedStatusCodeString, false, null);
+ }
+ throw new DatastoreException(1, "", cancelledStatusCodeString, false, null);
+ }
+ };
+
+ assertThrows(DatastoreException.class, () -> datastore.runInTransaction(callable));
+
+ Collection metrics = metricReader.collectAllMetrics();
+
+ // Verify attempt count was recorded with ABORTED status
+ Optional attemptMetric =
+ findMetric(metrics, TelemetryConstants.METRIC_NAME_TRANSACTION_ATTEMPT_COUNT);
+ assertThat(attemptMetric.isPresent()).isTrue();
+
+ Collection transactionAttemptCountData =
+ attemptMetric.get().getLongSumData().getPoints();
+ LongPointData abortedPoint =
+ transactionAttemptCountData.stream()
+ .filter(
+ p ->
+ dataContainsStringAttribute(
+ p, TelemetryConstants.ATTRIBUTES_KEY_STATUS, abortedStatusCodeString))
+ .findFirst()
+ .orElse(null);
+ assertThat(abortedPoint).isNotNull();
+ assertThat(abortedPoint.getValue()).isAtLeast(1);
+
+ assertThat(
+ dataContainsStringAttribute(
+ abortedPoint, TelemetryConstants.ATTRIBUTES_KEY_STATUS, abortedStatusCodeString))
+ .isTrue();
+ assertThat(
+ dataContainsStringAttribute(
+ abortedPoint,
+ TelemetryConstants.ATTRIBUTES_KEY_METHOD,
+ TelemetryConstants.METHOD_TRANSACTION_COMMIT))
+ .isTrue();
+ assertThat(
+ dataContainsStringAttribute(
+ abortedPoint, TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, PROJECT_ID))
+ .isTrue();
+ assertThat(
+ dataContainsStringAttribute(
+ abortedPoint, TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, DATABASE_ID))
+ .isTrue();
+
+ LongPointData cancelledPoint =
+ transactionAttemptCountData.stream()
+ .filter(
+ p ->
+ dataContainsStringAttribute(
+ p, TelemetryConstants.ATTRIBUTES_KEY_STATUS, cancelledStatusCodeString))
+ .findFirst()
+ .orElse(null);
+ assertThat(cancelledPoint).isNotNull();
+ assertThat(cancelledPoint.getValue()).isAtLeast(1);
+
+ assertThat(
+ dataContainsStringAttribute(
+ cancelledPoint,
+ TelemetryConstants.ATTRIBUTES_KEY_STATUS,
+ cancelledStatusCodeString))
+ .isTrue();
+ assertThat(
+ dataContainsStringAttribute(
+ cancelledPoint,
+ TelemetryConstants.ATTRIBUTES_KEY_METHOD,
+ TelemetryConstants.METHOD_TRANSACTION_COMMIT))
+ .isTrue();
+ assertThat(
+ dataContainsStringAttribute(
+ cancelledPoint, TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, PROJECT_ID))
+ .isTrue();
+ assertThat(
+ dataContainsStringAttribute(
+ cancelledPoint, TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, DATABASE_ID))
+ .isTrue();
+
+ // Verify latency was recorded with the failure status code
+ Optional latencyMetric =
+ findMetric(metrics, TelemetryConstants.METRIC_NAME_TRANSACTION_LATENCY);
+ assertThat(latencyMetric.isPresent()).isTrue();
+ HistogramPointData latencyPoint =
+ latencyMetric.get().getHistogramData().getPoints().stream()
+ .filter(
+ p ->
+ dataContainsStringAttribute(
+ p, TelemetryConstants.ATTRIBUTES_KEY_STATUS, cancelledStatusCodeString))
+ .findFirst()
+ .orElse(null);
+ assertThat(latencyPoint).isNotNull();
+
+ assertThat(
+ dataContainsStringAttribute(
+ latencyPoint, TelemetryConstants.ATTRIBUTES_KEY_STATUS, cancelledStatusCodeString))
+ .isTrue();
+ assertThat(
+ dataContainsStringAttribute(
+ latencyPoint,
+ TelemetryConstants.ATTRIBUTES_KEY_METHOD,
+ TelemetryConstants.METHOD_TRANSACTION_RUN))
+ .isTrue();
+ assertThat(
+ dataContainsStringAttribute(
+ latencyPoint, TelemetryConstants.ATTRIBUTES_KEY_PROJECT_ID, PROJECT_ID))
+ .isTrue();
+ assertThat(
+ dataContainsStringAttribute(
+ latencyPoint, TelemetryConstants.ATTRIBUTES_KEY_DATABASE_ID, DATABASE_ID))
+ .isTrue();
+ }
+
+ @Test
+ public void runInTransaction_withTransactionOptions_recordsMetrics() {
+ EasyMock.expect(rpcMock.beginTransaction(EasyMock.anyObject(BeginTransactionRequest.class)))
+ .andReturn(BeginTransactionResponse.getDefaultInstance());
+ EasyMock.expect(rpcMock.commit(EasyMock.anyObject(CommitRequest.class)))
+ .andReturn(CommitResponse.newBuilder().build());
+ EasyMock.replay(rpcMock);
+
+ TransactionOptions options =
+ TransactionOptions.newBuilder()
+ .setReadWrite(TransactionOptions.ReadWrite.getDefaultInstance())
+ .build();
+ datastore.runInTransaction(transaction -> null, options);
+
+ Collection metrics = metricReader.collectAllMetrics();
+
+ Optional transactionLatencyMetric =
+ findMetric(metrics, TelemetryConstants.METRIC_NAME_TRANSACTION_LATENCY);
+ assertThat(transactionLatencyMetric.isPresent()).isTrue();
+ Optional transactionAttemptCountMetric =
+ findMetric(metrics, TelemetryConstants.METRIC_NAME_TRANSACTION_ATTEMPT_COUNT);
+ assertThat(transactionAttemptCountMetric.isPresent()).isTrue();
+
+ EasyMock.verify(rpcMock);
+ }
+
+ private static Optional findMetric(
+ Collection metrics, String metricName) {
+ return metrics.stream().filter(m -> m.getName().equals(metricName)).findFirst();
+ }
+
+ private static boolean dataContainsStringAttribute(
+ PointData p, String attributeKey, String matchedAttributeValue) {
+ String attributeValue = p.getAttributes().get(AttributeKey.stringKey(attributeKey));
+ return matchedAttributeValue.equals(attributeValue);
+ }
+}
diff --git a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/MetricsRecorderTest.java b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/MetricsRecorderTest.java
new file mode 100644
index 000000000000..51c24b8df30f
--- /dev/null
+++ b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/MetricsRecorderTest.java
@@ -0,0 +1,65 @@
+/*
+ * Copyright 2026 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.datastore.telemetry;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.cloud.datastore.DatastoreOpenTelemetryOptions;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/** Tests for {@link MetricsRecorder#getInstance(DatastoreOpenTelemetryOptions)}. */
+@RunWith(JUnit4.class)
+public class MetricsRecorderTest {
+
+ // TODO(lawrenceqiu): For now, the default behavior is no-op. Add a test for
+ // instance being OpenTelemetryMetricsRecorder later (visibility changes)
+ @Test
+ public void defaultOptionsReturnNoOp() {
+ DatastoreOpenTelemetryOptions options = DatastoreOpenTelemetryOptions.newBuilder().build();
+ MetricsRecorder recorder = MetricsRecorder.getInstance(options);
+ assertThat(recorder).isInstanceOf(NoOpMetricsRecorder.class);
+ }
+
+ @Test
+ public void tracingEnabledButMetricsDisabledReturnsNoOp() {
+ // Enabling tracing alone should not enable metrics
+ DatastoreOpenTelemetryOptions options =
+ DatastoreOpenTelemetryOptions.newBuilder().setTracingEnabled(true).build();
+ MetricsRecorder recorder = MetricsRecorder.getInstance(options);
+ assertThat(recorder).isInstanceOf(NoOpMetricsRecorder.class);
+ }
+
+ // TODO(lawrenceqiu): Temporary test to ensure that OpenTelemetryMetricsRecorder can
+ // be created by the DatastoreOpenTelemetryOptions and creates with Otel object
+ @Test
+ public void openTelemetryRecorderCreatedWithExplicitOpenTelemetry() {
+ InMemoryMetricReader metricReader = InMemoryMetricReader.create();
+ SdkMeterProvider meterProvider =
+ SdkMeterProvider.builder().registerMetricReader(metricReader).build();
+ OpenTelemetry openTelemetry =
+ OpenTelemetrySdk.builder().setMeterProvider(meterProvider).build();
+
+ OpenTelemetryMetricsRecorder recorder = new OpenTelemetryMetricsRecorder(openTelemetry);
+
+ assertThat(recorder.getOpenTelemetry()).isSameInstanceAs(openTelemetry);
+ }
+}
diff --git a/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorderTest.java b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorderTest.java
new file mode 100644
index 000000000000..d9c690680e74
--- /dev/null
+++ b/java-datastore/google-cloud-datastore/src/test/java/com/google/cloud/datastore/telemetry/OpenTelemetryMetricsRecorderTest.java
@@ -0,0 +1,165 @@
+/*
+ * Copyright 2026 Google LLC
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package com.google.cloud.datastore.telemetry;
+
+import static com.google.common.truth.Truth.assertThat;
+
+import com.google.api.gax.rpc.StatusCode;
+import io.opentelemetry.api.OpenTelemetry;
+import io.opentelemetry.api.common.AttributeKey;
+import io.opentelemetry.sdk.OpenTelemetrySdk;
+import io.opentelemetry.sdk.metrics.SdkMeterProvider;
+import io.opentelemetry.sdk.metrics.data.HistogramPointData;
+import io.opentelemetry.sdk.metrics.data.LongPointData;
+import io.opentelemetry.sdk.metrics.data.MetricData;
+import io.opentelemetry.sdk.testing.exporter.InMemoryMetricReader;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class OpenTelemetryMetricsRecorderTest {
+
+ private InMemoryMetricReader metricReader;
+ private OpenTelemetryMetricsRecorder recorder;
+
+ @Before
+ public void setUp() {
+ metricReader = InMemoryMetricReader.create();
+ SdkMeterProvider meterProvider =
+ SdkMeterProvider.builder().registerMetricReader(metricReader).build();
+ OpenTelemetry openTelemetry =
+ OpenTelemetrySdk.builder().setMeterProvider(meterProvider).build();
+ recorder = new OpenTelemetryMetricsRecorder(openTelemetry);
+ }
+
+ @Test
+ public void recordTransactionLatency_recordsHistogramWithAttributes() {
+ Map attributes = new HashMap<>();
+ attributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, StatusCode.Code.OK.toString());
+ attributes.put(
+ TelemetryConstants.ATTRIBUTES_KEY_METHOD, TelemetryConstants.METHOD_TRANSACTION_COMMIT);
+
+ recorder.recordTransactionLatency(150.0, attributes);
+
+ Collection metrics = metricReader.collectAllMetrics();
+ MetricData latencyMetric =
+ metrics.stream()
+ .filter(m -> m.getName().equals(TelemetryConstants.METRIC_NAME_TRANSACTION_LATENCY))
+ .findFirst()
+ .orElse(null);
+
+ assertThat(latencyMetric).isNotNull();
+ assertThat(latencyMetric.getDescription()).isEqualTo("Total latency of transaction operations");
+ assertThat(latencyMetric.getUnit()).isEqualTo("ms");
+
+ HistogramPointData point =
+ latencyMetric.getHistogramData().getPoints().stream().findFirst().orElse(null);
+ assertThat(point).isNotNull();
+ assertThat(point.getSum()).isEqualTo(150.0);
+ assertThat(point.getCount()).isEqualTo(1);
+ assertThat(
+ point
+ .getAttributes()
+ .get(AttributeKey.stringKey(TelemetryConstants.ATTRIBUTES_KEY_STATUS)))
+ .isEqualTo(StatusCode.Code.OK.toString());
+ assertThat(
+ point
+ .getAttributes()
+ .get(AttributeKey.stringKey(TelemetryConstants.ATTRIBUTES_KEY_METHOD)))
+ .isEqualTo(TelemetryConstants.METHOD_TRANSACTION_COMMIT);
+ }
+
+ @Test
+ public void recordTransactionAttemptCount_recordsCounterWithAttributes() {
+ Map attributes = new HashMap<>();
+ attributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, StatusCode.Code.ABORTED.toString());
+ attributes.put(
+ TelemetryConstants.ATTRIBUTES_KEY_METHOD, TelemetryConstants.METHOD_TRANSACTION_COMMIT);
+
+ recorder.recordTransactionAttemptCount(1, attributes);
+
+ Collection metrics = metricReader.collectAllMetrics();
+ MetricData attemptMetric =
+ metrics.stream()
+ .filter(
+ m -> m.getName().equals(TelemetryConstants.METRIC_NAME_TRANSACTION_ATTEMPT_COUNT))
+ .findFirst()
+ .orElse(null);
+
+ assertThat(attemptMetric).isNotNull();
+ assertThat(attemptMetric.getDescription())
+ .isEqualTo("Number of attempts to commit a transaction");
+
+ LongPointData point =
+ attemptMetric.getLongSumData().getPoints().stream().findFirst().orElse(null);
+ assertThat(point).isNotNull();
+ assertThat(point.getValue()).isEqualTo(1);
+ assertThat(
+ point
+ .getAttributes()
+ .get(AttributeKey.stringKey(TelemetryConstants.ATTRIBUTES_KEY_STATUS)))
+ .isEqualTo(StatusCode.Code.ABORTED.toString());
+ assertThat(
+ point
+ .getAttributes()
+ .get(AttributeKey.stringKey(TelemetryConstants.ATTRIBUTES_KEY_METHOD)))
+ .isEqualTo(TelemetryConstants.METHOD_TRANSACTION_COMMIT);
+ }
+
+ @Test
+ public void recordTransactionAttemptCount_multipleAttempts_accumulates() {
+ Map abortedAttributes = new HashMap<>();
+ abortedAttributes.put(
+ TelemetryConstants.ATTRIBUTES_KEY_STATUS, StatusCode.Code.ABORTED.toString());
+ abortedAttributes.put(
+ TelemetryConstants.ATTRIBUTES_KEY_METHOD, TelemetryConstants.METHOD_TRANSACTION_COMMIT);
+
+ Map okAttributes = new HashMap<>();
+ okAttributes.put(TelemetryConstants.ATTRIBUTES_KEY_STATUS, StatusCode.Code.OK.toString());
+ okAttributes.put(
+ TelemetryConstants.ATTRIBUTES_KEY_METHOD, TelemetryConstants.METHOD_TRANSACTION_COMMIT);
+
+ // Simulate a retry scenario: first attempt ABORTED, second attempt OK
+ recorder.recordTransactionAttemptCount(1, abortedAttributes);
+ recorder.recordTransactionAttemptCount(1, okAttributes);
+
+ Collection metrics = metricReader.collectAllMetrics();
+ MetricData attemptMetric =
+ metrics.stream()
+ .filter(
+ m -> m.getName().equals(TelemetryConstants.METRIC_NAME_TRANSACTION_ATTEMPT_COUNT))
+ .findFirst()
+ .orElse(null);
+
+ assertThat(attemptMetric).isNotNull();
+
+ // There should be two data points — one for ABORTED and one for OK
+ assertThat(attemptMetric.getLongSumData().getPoints()).hasSize(2);
+ }
+
+ @Test
+ public void recordTransactionLatency_nullAttributes() {
+ recorder.recordTransactionLatency(100.0, null);
+
+ Collection metrics = metricReader.collectAllMetrics();
+ assertThat(metrics).isNotEmpty();
+ }
+}