Skip to content
This repository was archived by the owner on Apr 7, 2026. It is now read-only.

Commit f2659a6

Browse files
fix: Retry multiplexed session failures
1 parent 3d585cf commit f2659a6

4 files changed

Lines changed: 231 additions & 30 deletions

File tree

google-cloud-spanner/src/main/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClient.java

Lines changed: 63 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,7 @@
5353
import java.util.concurrent.atomic.AtomicInteger;
5454
import java.util.concurrent.atomic.AtomicLong;
5555
import java.util.concurrent.atomic.AtomicReference;
56+
import java.util.concurrent.locks.ReentrantLock;
5657

5758
/**
5859
* {@link TransactionRunner} that automatically handles "UNIMPLEMENTED" errors with the message
@@ -315,6 +316,18 @@ public void close() {
315316
*/
316317
private final AtomicBoolean unimplemented = new AtomicBoolean(false);
317318

319+
/**
320+
* This flag is set to true if create session RPC is in progress. This flag prevents application
321+
* from firing two requests concurrently
322+
*/
323+
private final AtomicBoolean retryingSessionCreation = new AtomicBoolean(false);
324+
325+
/**
326+
* This lock is used to prevent two threads from retrying createSession RPC requests in
327+
* concurrently.
328+
*/
329+
private final ReentrantLock sessionCreationLock = new ReentrantLock();
330+
318331
/**
319332
* This flag is set to true if the server return UNIMPLEMENTED when a read-write transaction is
320333
* executed on a multiplexed session. TODO: Remove once this is guaranteed to be available.
@@ -358,11 +371,20 @@ public void close() {
358371
SettableApiFuture.create();
359372
this.readWriteBeginTransactionReferenceFuture = SettableApiFuture.create();
360373
this.multiplexedSessionReference = new AtomicReference<>(initialSessionReferenceFuture);
374+
asyncCreateMultiplexedSession(initialSessionReferenceFuture);
375+
maybeWaitForSessionCreation(
376+
sessionClient.getSpanner().getOptions().getSessionPoolOptions(),
377+
initialSessionReferenceFuture);
378+
}
379+
380+
private void asyncCreateMultiplexedSession(
381+
SettableApiFuture<SessionReference> sessionReferenceFuture) {
361382
this.sessionClient.asyncCreateMultiplexedSession(
362383
new SessionConsumer() {
363384
@Override
364385
public void onSessionReady(SessionImpl session) {
365-
initialSessionReferenceFuture.set(session.getSessionReference());
386+
retryingSessionCreation.set(false);
387+
sessionReferenceFuture.set(session.getSessionReference());
366388
// only start the maintainer if we actually managed to create a session in the first
367389
// place.
368390
maintainer.start();
@@ -394,13 +416,11 @@ public void onSessionReady(SessionImpl session) {
394416
public void onSessionCreateFailure(Throwable t, int createFailureForSessionCount) {
395417
// Mark multiplexes sessions as unimplemented and fall back to regular sessions if
396418
// UNIMPLEMENTED is returned.
419+
retryingSessionCreation.set(false);
397420
maybeMarkUnimplemented(t);
398-
initialSessionReferenceFuture.setException(t);
421+
sessionReferenceFuture.setException(t);
399422
}
400423
});
401-
maybeWaitForSessionCreation(
402-
sessionClient.getSpanner().getOptions().getSessionPoolOptions(),
403-
initialSessionReferenceFuture);
404424
}
405425

406426
void setPool(SessionPool pool) {
@@ -546,10 +566,35 @@ MultiplexedSessionMaintainer getMaintainer() {
546566
return this.maintainer;
547567
}
548568

569+
ApiFuture<SessionReference> getCurrentSessionReferenceFuture() {
570+
return ApiFutures.catchingAsync(
571+
this.multiplexedSessionReference.get(),
572+
Throwable.class,
573+
(throwable) -> {
574+
maybeRetrySessionCreation();
575+
return this.multiplexedSessionReference.get();
576+
},
577+
MoreExecutors.directExecutor());
578+
}
579+
580+
private void maybeRetrySessionCreation() {
581+
sessionCreationLock.lock();
582+
try {
583+
if (isValid() && isMultiplexedSessionsSupported()
584+
&& retryingSessionCreation.compareAndSet(false, true)) {
585+
SettableApiFuture<SessionReference> settableApiFuture = SettableApiFuture.create();
586+
asyncCreateMultiplexedSession(settableApiFuture);
587+
multiplexedSessionReference.set(settableApiFuture);
588+
}
589+
} finally {
590+
sessionCreationLock.unlock();
591+
}
592+
}
593+
549594
@VisibleForTesting
550595
SessionReference getCurrentSessionReference() {
551596
try {
552-
return this.multiplexedSessionReference.get().get();
597+
return getCurrentSessionReferenceFuture().get();
553598
} catch (ExecutionException executionException) {
554599
throw SpannerExceptionFactory.asSpannerException(executionException.getCause());
555600
} catch (InterruptedException interruptedException) {
@@ -587,28 +632,22 @@ private DatabaseClient createMultiplexedSessionTransaction(boolean singleUse) {
587632

588633
private MultiplexedSessionTransaction createDirectMultiplexedSessionTransaction(
589634
boolean singleUse) {
590-
try {
591-
return new MultiplexedSessionTransaction(
592-
this,
593-
tracer.getCurrentSpan(),
594-
// Getting the result of the SettableApiFuture that contains the multiplexed session will
595-
// also automatically propagate any error that happened during the creation of the
596-
// session, such as for example a DatabaseNotFound exception. We therefore do not need
597-
// any special handling of such errors.
598-
multiplexedSessionReference.get().get(),
599-
singleUse ? getSingleUseChannelHint() : NO_CHANNEL_HINT,
600-
singleUse,
601-
this.pool);
602-
} catch (ExecutionException executionException) {
603-
throw SpannerExceptionFactory.asSpannerException(executionException.getCause());
604-
} catch (InterruptedException interruptedException) {
605-
throw SpannerExceptionFactory.propagateInterrupt(interruptedException);
606-
}
635+
return new MultiplexedSessionTransaction(
636+
this,
637+
tracer.getCurrentSpan(),
638+
// Getting the result of the SettableApiFuture that contains the multiplexed session will
639+
// also automatically propagate any error that happened during the creation of the
640+
// session, such as for example a DatabaseNotFound exception. We therefore do not need
641+
// any special handling of such errors.
642+
getCurrentSessionReference(),
643+
singleUse ? getSingleUseChannelHint() : NO_CHANNEL_HINT,
644+
singleUse,
645+
this.pool);
607646
}
608647

609648
private DelayedMultiplexedSessionTransaction createDelayedMultiplexSessionTransaction() {
610649
return new DelayedMultiplexedSessionTransaction(
611-
this, tracer.getCurrentSpan(), multiplexedSessionReference.get(), this.pool);
650+
this, tracer.getCurrentSpan(), getCurrentSessionReferenceFuture(), this.pool);
612651
}
613652

614653
private int getSingleUseChannelHint() {

google-cloud-spanner/src/test/java/com/google/cloud/spanner/DatabaseClientImplTest.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -3202,7 +3202,8 @@ public void testDatabaseOrInstanceDoesNotExistOnCreate() {
32023202
mockSpanner.unfreeze();
32033203
assertThrows(ResourceNotFoundException.class, rs::next);
32043204
// The server should only receive one BatchCreateSessions request.
3205-
assertThat(mockSpanner.getRequests()).hasSize(1);
3205+
// If multiplexed session used, it will be retried once so 2 CreateSession requests
3206+
assertThat(mockSpanner.getRequests()).hasSize(useMultiplexedSession ? 2 : 1);
32063207
}
32073208
assertThrows(
32083209
ResourceNotFoundException.class,
@@ -3221,9 +3222,9 @@ public void testDatabaseOrInstanceDoesNotExistOnCreate() {
32213222
} else {
32223223
// Note that in case of the use of regular sessions, then we have 1 request:
32233224
// BatchCreateSessions for the session pool.
3224-
// Note that in case of the use of multiplexed sessions for read-write, then we have 1
3225-
// request: CreateSession for the multiplexed session.
3226-
assertThat(mockSpanner.getRequests()).hasSize(1);
3225+
// Note that in case of the use of multiplexed sessions for read-write, then we have 3
3226+
// requests: CreateSession for the multiplexed session.
3227+
assertThat(mockSpanner.getRequests()).hasSize(useMultiplexedSession ? 3 : 1);
32273228
}
32283229
}
32293230
}
@@ -3413,7 +3414,7 @@ public void testGetInvalidatedClientMultipleTimes() {
34133414
if (spanner.getOptions().getSessionPoolOptions().getUseMultiplexedSession()) {
34143415
// We should only receive 1 CreateSession request. The query should never be executed,
34153416
// as the session creation fails before it gets to executing a query.
3416-
assertEquals(1, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
3417+
assertEquals(2 + (2 * run) + useClient, mockSpanner.countRequestsOfType(CreateSessionRequest.class));
34173418
assertEquals(0, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class));
34183419
} else {
34193420
// The server should only receive one BatchCreateSessions request for each run as we

google-cloud-spanner/src/test/java/com/google/cloud/spanner/MultiplexedSessionDatabaseClientMockServerTest.java

Lines changed: 161 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,8 @@
5454
import java.time.Duration;
5555
import java.util.*;
5656
import java.util.concurrent.CountDownLatch;
57+
import java.util.concurrent.ExecutorService;
58+
import java.util.concurrent.Executors;
5759
import java.util.concurrent.TimeUnit;
5860
import java.util.concurrent.atomic.AtomicInteger;
5961
import java.util.concurrent.atomic.AtomicReference;
@@ -245,6 +247,165 @@ public void testUnimplementedErrorOnCreation_fallsBackToRegularSessions() {
245247
assertEquals(0L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
246248
}
247249

250+
@Test
251+
public void testDeadlineExceededErrorWithOneRetry() {
252+
// Setting up two exceptions
253+
mockSpanner.setCreateSessionExecutionTime(
254+
SimulatedExecutionTime.ofExceptions(
255+
Arrays.asList(
256+
Status.DEADLINE_EXCEEDED
257+
.withDescription(
258+
"CallOptions deadline exceeded after 22.986872393s. "
259+
+ "Name resolution delay 6.911918521 seconds. [closed=[], "
260+
+ "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]")
261+
.asRuntimeException(),
262+
Status.DEADLINE_EXCEEDED
263+
.withDescription(
264+
"CallOptions deadline exceeded after 22.986872393s. "
265+
+ "Name resolution delay 6.911918521 seconds. [closed=[], "
266+
+ "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]")
267+
.asRuntimeException())));
268+
DatabaseClientImpl client =
269+
(DatabaseClientImpl) spanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
270+
assertNotNull(client.multiplexedSessionDatabaseClient);
271+
272+
// initial fetch call fails with exception
273+
// this call will try to fetch it again which again throws an exception
274+
assertThrows(
275+
SpannerException.class,
276+
() -> {
277+
try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) {
278+
//noinspection StatementWithEmptyBody
279+
while (resultSet.next()) {
280+
// ignore
281+
}
282+
}
283+
});
284+
285+
// When third request comes it should succeed
286+
try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) {
287+
//noinspection StatementWithEmptyBody
288+
while (resultSet.next()) {
289+
// ignore
290+
}
291+
}
292+
293+
// Verify that we received one ExecuteSqlRequest, and that it used a multiplexed session.
294+
assertEquals(1, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class));
295+
List<ExecuteSqlRequest> requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
296+
297+
Session session = mockSpanner.getSession(requests.get(0).getSession());
298+
assertNotNull(session);
299+
assertTrue(session.getMultiplexed());
300+
301+
assertNotNull(client.multiplexedSessionDatabaseClient);
302+
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get());
303+
assertEquals(1L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
304+
}
305+
306+
@Test
307+
public void testDeadlineExceededErrorWithOneRetryWithParallelRequests()
308+
throws InterruptedException {
309+
mockSpanner.setCreateSessionExecutionTime(
310+
SimulatedExecutionTime.ofExceptions(
311+
Arrays.asList(
312+
Status.DEADLINE_EXCEEDED
313+
.withDescription(
314+
"CallOptions deadline exceeded after 22.986872393s. "
315+
+ "Name resolution delay 6.911918521 seconds. [closed=[], "
316+
+ "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]")
317+
.asRuntimeException(),
318+
Status.DEADLINE_EXCEEDED
319+
.withDescription(
320+
"CallOptions deadline exceeded after 22.986872393s. "
321+
+ "Name resolution delay 6.911918521 seconds. [closed=[], "
322+
+ "open=[[connecting_and_lb_delay=32445014148ns, was_still_waiting]]]")
323+
.asRuntimeException())));
324+
325+
SpannerOptions spannerOptions = SpannerOptions.newBuilder()
326+
.setCredentials(NoCredentials.getInstance())
327+
.setChannelProvider(channelProvider)
328+
.setSessionPoolOption(SessionPoolOptions.newBuilder()
329+
.setMultiplexedSessionMaintenanceDuration(Duration.ofMinutes(10))
330+
.build())
331+
.build();
332+
333+
Spanner testSpanner = spannerOptions.getService();
334+
DatabaseClientImpl client =
335+
(DatabaseClientImpl) testSpanner.getDatabaseClient(DatabaseId.of("p", "i", "d"));
336+
assertNotNull(client.multiplexedSessionDatabaseClient);
337+
338+
ExecutorService executor = Executors.newCachedThreadPool();
339+
340+
// First set of request should fail with an error
341+
CountDownLatch failureCountDownLatch = new CountDownLatch(3);
342+
for (int i = 0; i < 3; i++) {
343+
if (i == 0) {
344+
mockSpanner.freeze();
345+
}
346+
executor.submit(
347+
() -> {
348+
try {
349+
try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) {
350+
//noinspection StatementWithEmptyBody
351+
while (resultSet.next()) {
352+
// ignore
353+
}
354+
}
355+
} catch (SpannerException e) {
356+
failureCountDownLatch.countDown();
357+
}
358+
});
359+
if (i == 2) {
360+
mockSpanner.unfreeze();
361+
}
362+
}
363+
364+
assertTrue(failureCountDownLatch.await(2, TimeUnit.SECONDS));
365+
assertEquals(0, failureCountDownLatch.getCount());
366+
367+
// Second set of requests should pass
368+
CountDownLatch countDownLatch = new CountDownLatch(3);
369+
for (int i = 0; i < 3; i++) {
370+
if (i == 0) {
371+
mockSpanner.freeze();
372+
}
373+
executor.submit(
374+
() -> {
375+
try {
376+
try (ResultSet resultSet = client.singleUse().executeQuery(STATEMENT)) {
377+
//noinspection StatementWithEmptyBody
378+
while (resultSet.next()) {
379+
// ignore
380+
}
381+
}
382+
} catch (SpannerException e) {
383+
countDownLatch.countDown();
384+
}
385+
});
386+
if (i == 2) {
387+
mockSpanner.unfreeze();
388+
}
389+
}
390+
391+
assertFalse(countDownLatch.await(2, TimeUnit.SECONDS));
392+
assertEquals(3, countDownLatch.getCount());
393+
394+
// Verify that we received 3 ExecuteSqlRequest, and that it used a multiplexed session.
395+
assertEquals(3, mockSpanner.countRequestsOfType(ExecuteSqlRequest.class));
396+
List<ExecuteSqlRequest> requests = mockSpanner.getRequestsOfType(ExecuteSqlRequest.class);
397+
398+
Session session = mockSpanner.getSession(requests.get(0).getSession());
399+
assertNotNull(session);
400+
assertTrue(session.getMultiplexed());
401+
402+
assertNotNull(client.multiplexedSessionDatabaseClient);
403+
assertEquals(3L, client.multiplexedSessionDatabaseClient.getNumSessionsAcquired().get());
404+
assertEquals(3L, client.multiplexedSessionDatabaseClient.getNumSessionsReleased().get());
405+
406+
testSpanner.close();
407+
}
408+
248409
@Test
249410
public void
250411
testUnimplementedErrorOnCreation_firstReceivesError_secondFallsBackToRegularSessions() {

google-cloud-spanner/src/test/java/com/google/cloud/spanner/OpenTelemetryBuiltInMetricsTracerTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -339,7 +339,7 @@ public void testNoNetworkConnection() {
339339

340340
// Attempt count should have a failed metric point for CreateSession.
341341
assertEquals(
342-
1, getAggregatedValue(attemptCountMetricData, expectedAttributesCreateSessionFailed), 0);
342+
2, getAggregatedValue(attemptCountMetricData, expectedAttributesCreateSessionFailed), 0);
343343
assertTrue(
344344
checkIfMetricExists(metricReader, BuiltInMetricsConstant.GFE_CONNECTIVITY_ERROR_NAME));
345345
assertTrue(

0 commit comments

Comments
 (0)