splits = getSplitsWithRetry(query, PARTITION, 2, DATASTORE);
Truth.assertThat(splits).isNotEmpty();
splits.forEach(
@@ -91,4 +103,85 @@ public void testQuerySplitterWithDb() throws DatastoreException {
Truth.assertThat(split.getFilter()).isEqualTo(propertyFilter);
});
}
+
+ /**
+ * A generic helper method that executes a {@link Callable} with retries using the GAX retrying
+ * framework.
+ *
+ * It configures a {@link DirectRetryingExecutor} with the provided {@link RetrySettings} and
+ * the custom {@link ResultRetryAlgorithmWithContext}.
+ *
+ * @param callable the action to execute
+ * @param retrySettings the retry configuration (backoff, max attempts, timeouts)
+ * @param resultRetryAlgorithm the algorithm to determine if a failed attempt should be retried
+ * @return the result of the callable execution
+ * @throws Exception if the execution fails after all retry attempts.
+ */
+ private static V runWithRetry(
+ Callable callable,
+ RetrySettings retrySettings,
+ ResultRetryAlgorithmWithContext resultRetryAlgorithm)
+ throws Exception {
+ ApiClock clock = NanoClock.getDefaultClock();
+ // We must wrap the result algorithm and timed algorithm into a RetryAlgorithm
+ // as required by DirectRetryingExecutor.
+ RetryAlgorithm retryAlgorithm =
+ new RetryAlgorithm<>(
+ resultRetryAlgorithm, new ExponentialRetryAlgorithm(retrySettings, clock));
+
+ DirectRetryingExecutor executor = new DirectRetryingExecutor<>(retryAlgorithm);
+ RetryingFuture future = executor.createFuture(callable);
+
+ ApiFuture submittedFuture = executor.submit(future);
+
+ try {
+ return submittedFuture.get();
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ // submittedFuture.get() wraps any exception thrown during execution in an ExecutionException.
+ // We unwrap and rethrow the actual cause (Exception or Error) directly so that test failures
+ // report the root cause (e.g., DatastoreException or AssertionError) instead of the wrapper.
+ if (cause instanceof Exception) {
+ throw (Exception) cause;
+ }
+ if (cause instanceof Error) {
+ throw (Error) cause;
+ }
+ throw e;
+ } catch (InterruptedException e) {
+ // Restore the interrupted status before rethrowing, as per Java concurrency best practices.
+ Thread.currentThread().interrupt();
+ throw e;
+ }
+ }
+
+ // This low-level Datastore client (proto-over-HTTP) does not have built-in retry logic
+ // (unlike the high-level google-cloud-datastore gRPC client). We must explicitly retry
+ // here to handle transient backend errors (such as Code.INTERNAL auth issues).
+ // We reuse GAX retrying utilities here in the test to implement this backoff/retry.
+ private static List getSplitsWithRetry(
+ Query query, PartitionId partition, int numSplits, Datastore datastore) throws Exception {
+ // Fail fast configuration to avoid long wait times during test failures
+ RetrySettings retrySettings =
+ RetrySettings.newBuilder()
+ .setMaxAttempts(3)
+ .setInitialRetryDelayDuration(Duration.ofMillis(200))
+ .setRetryDelayMultiplier(1.5)
+ .setMaxRetryDelayDuration(Duration.ofMillis(500))
+ .setTotalTimeoutDuration(Duration.ofSeconds(2))
+ .build();
+ return runWithRetry(
+ () -> DatastoreHelper.getQuerySplitter().getSplits(query, partition, numSplits, datastore),
+ retrySettings,
+ new BasicResultRetryAlgorithm>() {
+ @Override
+ public boolean shouldRetry(Throwable prevThrowable, List prevResult) {
+ if (prevThrowable instanceof DatastoreException) {
+ DatastoreException de = (DatastoreException) prevThrowable;
+ return de.getCode() == Code.INTERNAL;
+ }
+ return false;
+ }
+ });
+ }
}
diff --git a/java-datastore/google-cloud-datastore-utils/pom.xml b/java-datastore/google-cloud-datastore-utils/pom.xml
index b0533059163a..e5de55f6139e 100644
--- a/java-datastore/google-cloud-datastore-utils/pom.xml
+++ b/java-datastore/google-cloud-datastore-utils/pom.xml
@@ -84,6 +84,11 @@
+
+ com.google.api
+ gax
+ test
+
diff --git a/java-datastore/google-cloud-datastore-utils/src/test/java/com/google/datastore/utils/it/ITDatastoreProtoClientTest.java b/java-datastore/google-cloud-datastore-utils/src/test/java/com/google/datastore/utils/it/ITDatastoreProtoClientTest.java
index d30c1cbdc598..3b7776b941f1 100644
--- a/java-datastore/google-cloud-datastore-utils/src/test/java/com/google/datastore/utils/it/ITDatastoreProtoClientTest.java
+++ b/java-datastore/google-cloud-datastore-utils/src/test/java/com/google/datastore/utils/it/ITDatastoreProtoClientTest.java
@@ -18,14 +18,28 @@
import static com.google.datastore.utils.DatastoreHelper.makeFilter;
import static com.google.datastore.utils.DatastoreHelper.makeValue;
+import com.google.api.core.ApiClock;
+import com.google.api.core.ApiFuture;
+import com.google.api.core.NanoClock;
+import com.google.api.gax.retrying.BasicResultRetryAlgorithm;
+import com.google.api.gax.retrying.DirectRetryingExecutor;
+import com.google.api.gax.retrying.ExponentialRetryAlgorithm;
+import com.google.api.gax.retrying.ResultRetryAlgorithmWithContext;
+import com.google.api.gax.retrying.RetryAlgorithm;
+import com.google.api.gax.retrying.RetrySettings;
+import com.google.api.gax.retrying.RetryingFuture;
import com.google.common.truth.Truth;
import com.google.datastore.utils.Datastore;
import com.google.datastore.utils.DatastoreException;
import com.google.datastore.utils.DatastoreHelper;
import com.google.datastore.v1.*;
+import com.google.rpc.Code;
import java.io.IOException;
import java.security.GeneralSecurityException;
+import java.time.Duration;
import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ExecutionException;
import org.junit.Before;
import org.junit.Test;
@@ -44,7 +58,7 @@ public void setUp() throws GeneralSecurityException, IOException {
}
@Test
- public void testQuerySplitterWithDefaultDb() throws DatastoreException {
+ public void testQuerySplitterWithDefaultDb() throws Exception {
Filter propertyFilter =
makeFilter("foo", PropertyFilter.Operator.EQUAL, makeValue("value")).build();
Query query =
@@ -55,8 +69,7 @@ public void testQuerySplitterWithDefaultDb() throws DatastoreException {
PARTITION = PartitionId.newBuilder().setProjectId(PROJECT_ID).build();
- List splits =
- DatastoreHelper.getQuerySplitter().getSplits(query, PARTITION, 2, DATASTORE);
+ List splits = getSplitsWithRetry(query, PARTITION, 2, DATASTORE);
Truth.assertThat(splits).isNotEmpty();
splits.forEach(
split -> {
@@ -66,7 +79,7 @@ public void testQuerySplitterWithDefaultDb() throws DatastoreException {
}
@Test
- public void testQuerySplitterWithDb() throws DatastoreException {
+ public void testQuerySplitterWithDb() throws Exception {
Filter propertyFilter =
makeFilter("foo", PropertyFilter.Operator.EQUAL, makeValue("value")).build();
Query query =
@@ -77,8 +90,7 @@ public void testQuerySplitterWithDb() throws DatastoreException {
PARTITION = PartitionId.newBuilder().setProjectId(PROJECT_ID).setDatabaseId("test-db").build();
- List splits =
- DatastoreHelper.getQuerySplitter().getSplits(query, PARTITION, 2, DATASTORE);
+ List splits = getSplitsWithRetry(query, PARTITION, 2, DATASTORE);
Truth.assertThat(splits).isNotEmpty();
splits.forEach(
@@ -87,4 +99,85 @@ public void testQuerySplitterWithDb() throws DatastoreException {
Truth.assertThat(split.getFilter()).isEqualTo(propertyFilter);
});
}
+
+ /**
+ * A generic helper method that executes a {@link Callable} with retries using the GAX retrying
+ * framework.
+ *
+ * It configures a {@link DirectRetryingExecutor} with the provided {@link RetrySettings} and
+ * the custom {@link ResultRetryAlgorithmWithContext}.
+ *
+ * @param callable the action to execute
+ * @param retrySettings the retry configuration (backoff, max attempts, timeouts)
+ * @param resultRetryAlgorithm the algorithm to determine if a failed attempt should be retried
+ * @return the result of the callable execution
+ * @throws Exception if the execution fails after all retry attempts.
+ */
+ private static V runWithRetry(
+ Callable callable,
+ RetrySettings retrySettings,
+ ResultRetryAlgorithmWithContext resultRetryAlgorithm)
+ throws Exception {
+ ApiClock clock = NanoClock.getDefaultClock();
+ // We must wrap the result algorithm and timed algorithm into a RetryAlgorithm
+ // as required by DirectRetryingExecutor.
+ RetryAlgorithm retryAlgorithm =
+ new RetryAlgorithm<>(
+ resultRetryAlgorithm, new ExponentialRetryAlgorithm(retrySettings, clock));
+
+ DirectRetryingExecutor executor = new DirectRetryingExecutor<>(retryAlgorithm);
+ RetryingFuture future = executor.createFuture(callable);
+
+ ApiFuture submittedFuture = executor.submit(future);
+
+ try {
+ return submittedFuture.get();
+ } catch (ExecutionException e) {
+ Throwable cause = e.getCause();
+ // submittedFuture.get() wraps any exception thrown during execution in an ExecutionException.
+ // We unwrap and rethrow the actual cause (Exception or Error) directly so that test failures
+ // report the root cause (e.g., DatastoreException or AssertionError) instead of the wrapper.
+ if (cause instanceof Exception) {
+ throw (Exception) cause;
+ }
+ if (cause instanceof Error) {
+ throw (Error) cause;
+ }
+ throw e;
+ } catch (InterruptedException e) {
+ // Restore the interrupted status before rethrowing, as per Java concurrency best practices.
+ Thread.currentThread().interrupt();
+ throw e;
+ }
+ }
+
+ // This low-level Datastore client (proto-over-HTTP) does not have built-in retry logic
+ // (unlike the high-level google-cloud-datastore gRPC client). We must explicitly retry
+ // here to handle transient backend errors (such as Code.INTERNAL auth issues).
+ // We reuse GAX retrying utilities here in the test to implement this backoff/retry.
+ private static List getSplitsWithRetry(
+ Query query, PartitionId partition, int numSplits, Datastore datastore) throws Exception {
+ // Fail fast configuration to avoid long wait times during test failures
+ RetrySettings retrySettings =
+ RetrySettings.newBuilder()
+ .setMaxAttempts(3)
+ .setInitialRetryDelayDuration(Duration.ofMillis(200))
+ .setRetryDelayMultiplier(1.5)
+ .setMaxRetryDelayDuration(Duration.ofMillis(500))
+ .setTotalTimeoutDuration(Duration.ofSeconds(2))
+ .build();
+ return runWithRetry(
+ () -> DatastoreHelper.getQuerySplitter().getSplits(query, partition, numSplits, datastore),
+ retrySettings,
+ new BasicResultRetryAlgorithm>() {
+ @Override
+ public boolean shouldRetry(Throwable prevThrowable, List prevResult) {
+ if (prevThrowable instanceof DatastoreException) {
+ DatastoreException de = (DatastoreException) prevThrowable;
+ return de.getCode() == Code.INTERNAL;
+ }
+ return false;
+ }
+ });
+ }
}