From 2c9da0da95d2cfc02e0cc30b817385284fa70af7 Mon Sep 17 00:00:00 2001 From: Rahul Yadav Date: Thu, 8 Jan 2026 08:45:46 +0530 Subject: [PATCH 1/3] feat: add ChannelFinder server interfaces This commit adds the server abstraction interfaces for location-aware routing: - ChannelFinderServer: Interface representing a Spanner server endpoint with address, health check, and channel access - ChannelFinderServerFactory: Factory interface for creating and caching server connections - GrpcChannelFinderServerFactory: gRPC implementation that creates and manages gRPC channels for different server endpoints These interfaces enable the client to maintain connections to multiple Spanner servers and route requests directly to the appropriate server based on key location information. This is part of the experimental location-aware routing for improved latency. --- .../spanner/spi/v1/ChannelFinderServer.java | 28 ++++++ .../spi/v1/ChannelFinderServerFactory.java | 24 +++++ .../v1/GrpcChannelFinderServerFactory.java | 98 +++++++++++++++++++ 3 files changed, 150 insertions(+) create mode 100644 google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinderServer.java create mode 100644 google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinderServerFactory.java create mode 100644 google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcChannelFinderServerFactory.java diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinderServer.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinderServer.java new file mode 100644 index 0000000000..27a0b5d31a --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinderServer.java @@ -0,0 +1,28 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.spi.v1; + +import io.grpc.ManagedChannel; + +/** Represents a Spanner server endpoint for location-aware routing. */ +public interface ChannelFinderServer { + String getAddress(); + + boolean isHealthy(); + + ManagedChannel getChannel(); // Added to get the underlying channel for RPC calls +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinderServerFactory.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinderServerFactory.java new file mode 100644 index 0000000000..c81cf82c0d --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinderServerFactory.java @@ -0,0 +1,24 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.spi.v1; + +/** Factory for creating and caching server connections for location-aware routing. */ +public interface ChannelFinderServerFactory { + ChannelFinderServer defaultServer(); + + ChannelFinderServer create(String address); +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcChannelFinderServerFactory.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcChannelFinderServerFactory.java new file mode 100644 index 0000000000..8c120f0773 --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcChannelFinderServerFactory.java @@ -0,0 +1,98 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.spi.v1; + +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; +import io.grpc.ManagedChannel; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +class GrpcChannelFinderServerFactory implements ChannelFinderServerFactory { + private final InstantiatingGrpcChannelProvider.Builder channelBuilder; + private final Map servers = new ConcurrentHashMap<>(); + private final GrpcChannelFinderServer defaultServer; + + public GrpcChannelFinderServerFactory(InstantiatingGrpcChannelProvider.Builder channelBuilder) + throws IOException { + this.channelBuilder = channelBuilder; + // The "default" server will use the original endpoint from the builder. + this.defaultServer = + new GrpcChannelFinderServer(this.channelBuilder.getEndpoint(), channelBuilder.build()); + this.servers.put(this.defaultServer.getAddress(), this.defaultServer); + } + + @Override + public ChannelFinderServer defaultServer() { + return defaultServer; + } + + @Override + public ChannelFinderServer create(String address) { + return servers.computeIfAbsent( + address, + addr -> { + try { + // Modify the builder to use the new address + synchronized (channelBuilder) { + InstantiatingGrpcChannelProvider.Builder newBuilder = + channelBuilder.setEndpoint(addr); + return new GrpcChannelFinderServer(addr, newBuilder.build()); + } + } catch (IOException e) { + throw new RuntimeException("Failed to create channel for address: " + addr, e); + } + }); + } + + static class GrpcChannelFinderServer implements ChannelFinderServer { + private final String address; + private final ManagedChannel channel; + + public GrpcChannelFinderServer(String address, InstantiatingGrpcChannelProvider provider) + throws IOException { + this.address = address; + // It's assumed that getTransportChannel() returns a ManagedChannel or can be cast to one. + // For this example, GrpcTransportChannel is used as in KeyAwareChannel. + GrpcTransportChannel transportChannel = (GrpcTransportChannel) provider.getTransportChannel(); + this.channel = (ManagedChannel) transportChannel.getChannel(); + } + + // Constructor for the default server that already has a channel + public GrpcChannelFinderServer(String address, ManagedChannel channel) { + this.address = address; + this.channel = channel; + } + + @Override + public String getAddress() { + return address; + } + + @Override + public boolean isHealthy() { + // A simple health check. In a real scenario, this might involve a ping or other checks. + return !channel.isShutdown() && !channel.isTerminated(); + } + + @Override + public ManagedChannel getChannel() { + return channel; + } + } +} From cdf2f368e3238e0928fc6a260af8631a16f37193 Mon Sep 17 00:00:00 2001 From: Rahul Yadav Date: Fri, 16 Jan 2026 13:44:04 +0530 Subject: [PATCH 2/3] incorporate changes --- .../spanner/spi/v1/ChannelFinderServer.java | 42 +++++- .../spi/v1/ChannelFinderServerFactory.java | 57 +++++++- .../v1/GrpcChannelFinderServerFactory.java | 132 +++++++++++++++--- 3 files changed, 206 insertions(+), 25 deletions(-) diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinderServer.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinderServer.java index 27a0b5d31a..df57bac75d 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinderServer.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinderServer.java @@ -16,13 +16,51 @@ package com.google.cloud.spanner.spi.v1; +import com.google.api.core.InternalApi; import io.grpc.ManagedChannel; -/** Represents a Spanner server endpoint for location-aware routing. */ +/** + * Represents a Spanner server endpoint for location-aware routing. + * + *

Each instance wraps a gRPC {@link ManagedChannel} connected to a specific Spanner server. The + * {@link ChannelFinderServerFactory} creates and caches these instances. + * + *

Implementations must be thread-safe as instances may be shared across multiple concurrent + * operations. + * + * @see ChannelFinderServerFactory + */ +@InternalApi public interface ChannelFinderServer { + + /** + * Returns the network address of this server. + * + * @return the server address in "host:port" format + */ String getAddress(); + /** + * Returns whether this server is ready to accept RPCs. + * + *

A server is considered unhealthy if: + * + *

    + *
  • The underlying channel is shutdown or terminated + *
  • The channel is in a transient failure state + *
+ * + * @return true if the server is healthy and ready to accept RPCs + */ boolean isHealthy(); - ManagedChannel getChannel(); // Added to get the underlying channel for RPC calls + /** + * Returns the gRPC channel for making RPCs to this server. + * + *

The returned channel is managed by the {@link ChannelFinderServerFactory} and should not be + * shut down directly by callers. + * + * @return the managed channel for this server + */ + ManagedChannel getChannel(); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinderServerFactory.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinderServerFactory.java index c81cf82c0d..4ebbfe5e00 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinderServerFactory.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinderServerFactory.java @@ -16,9 +16,64 @@ package com.google.cloud.spanner.spi.v1; -/** Factory for creating and caching server connections for location-aware routing. */ +import com.google.api.core.InternalApi; + +/** + * Factory for creating and caching server connections for location-aware routing. + * + *

Implementations are expected to cache {@link ChannelFinderServer} instances such that repeated + * calls with the same address return the same instance. This allows the {@link + * com.google.cloud.spanner.spi.v1.KeyRangeCache} to efficiently manage server references. + * + *

Implementations must be thread-safe. Multiple threads may concurrently call {@link + * #create(String)} with different addresses. + */ +@InternalApi public interface ChannelFinderServerFactory { + + /** + * Returns the default server endpoint. + * + *

The default server is the original endpoint configured in {@link + * com.google.cloud.spanner.SpannerOptions}. It is used as a fallback when the location cache does + * not have routing information for a request. + * + * @return the default server, never null + */ ChannelFinderServer defaultServer(); + /** + * Creates or retrieves a cached server for the given address. + * + *

If a server for this address already exists in the cache, the cached instance is returned. + * Otherwise, a new server connection is created and cached. + * + * @param address the server address in "host:port" format + * @return a server instance for the address, never null + * @throws com.google.cloud.spanner.SpannerException if the channel cannot be created + */ ChannelFinderServer create(String address); + + /** + * Evicts a server from the cache and gracefully shuts down its channel. + * + *

This method should be called when a server becomes unhealthy or is no longer needed. The + * channel shutdown is graceful: existing RPCs are allowed to complete, but new RPCs will not be + * accepted on this channel. + * + *

If the address is not in the cache, this method does nothing. + * + * @param address the server address to evict + */ + void evict(String address); + + /** + * Shuts down all cached server connections. + * + *

This method should be called when the Spanner client is closed to release all resources. + * Each channel is shut down gracefully, allowing in-flight RPCs to complete. + * + *

After calling this method, the factory should not be used to create new connections. + */ + void shutdown(); } diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcChannelFinderServerFactory.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcChannelFinderServerFactory.java index 8c120f0773..98357df2fd 100644 --- a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcChannelFinderServerFactory.java +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcChannelFinderServerFactory.java @@ -16,25 +16,51 @@ package com.google.cloud.spanner.spi.v1; +import com.google.api.core.InternalApi; import com.google.api.gax.grpc.GrpcTransportChannel; import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; +import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.cloud.spanner.ErrorCode; +import com.google.cloud.spanner.SpannerExceptionFactory; +import com.google.common.annotations.VisibleForTesting; +import io.grpc.ConnectivityState; import io.grpc.ManagedChannel; import java.io.IOException; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; +/** + * gRPC implementation of {@link ChannelFinderServerFactory}. + * + *

This factory creates and caches gRPC channels per address. It uses {@link + * InstantiatingGrpcChannelProvider#withEndpoint(String)} to create new channels with the same + * configuration but different endpoints, avoiding race conditions. + */ +@InternalApi class GrpcChannelFinderServerFactory implements ChannelFinderServerFactory { - private final InstantiatingGrpcChannelProvider.Builder channelBuilder; + + /** Timeout for graceful channel shutdown. */ + private static final long SHUTDOWN_TIMEOUT_SECONDS = 5; + + private final InstantiatingGrpcChannelProvider baseProvider; private final Map servers = new ConcurrentHashMap<>(); private final GrpcChannelFinderServer defaultServer; + private volatile boolean isShutdown = false; - public GrpcChannelFinderServerFactory(InstantiatingGrpcChannelProvider.Builder channelBuilder) + /** + * Creates a new factory with the given channel provider. + * + * @param channelProvider the base provider used to create channels. New channels for different + * endpoints are created using {@link InstantiatingGrpcChannelProvider#withEndpoint(String)}. + * @throws IOException if the default channel cannot be created + */ + public GrpcChannelFinderServerFactory(InstantiatingGrpcChannelProvider channelProvider) throws IOException { - this.channelBuilder = channelBuilder; - // The "default" server will use the original endpoint from the builder. - this.defaultServer = - new GrpcChannelFinderServer(this.channelBuilder.getEndpoint(), channelBuilder.build()); - this.servers.put(this.defaultServer.getAddress(), this.defaultServer); + this.baseProvider = channelProvider; + String defaultEndpoint = channelProvider.getEndpoint(); + this.defaultServer = new GrpcChannelFinderServer(defaultEndpoint, channelProvider); + this.servers.put(defaultEndpoint, this.defaultServer); } @Override @@ -44,37 +70,95 @@ public ChannelFinderServer defaultServer() { @Override public ChannelFinderServer create(String address) { + if (isShutdown) { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.FAILED_PRECONDITION, "ChannelFinderServerFactory has been shut down"); + } + return servers.computeIfAbsent( address, addr -> { try { - // Modify the builder to use the new address - synchronized (channelBuilder) { - InstantiatingGrpcChannelProvider.Builder newBuilder = - channelBuilder.setEndpoint(addr); - return new GrpcChannelFinderServer(addr, newBuilder.build()); - } + // Create a new provider with the same config but different endpoint. + // This is thread-safe as withEndpoint() returns a new provider instance. + TransportChannelProvider newProvider = baseProvider.withEndpoint(addr); + return new GrpcChannelFinderServer(addr, newProvider); } catch (IOException e) { - throw new RuntimeException("Failed to create channel for address: " + addr, e); + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.INTERNAL, "Failed to create channel for address: " + addr, e); } }); } + @Override + public void evict(String address) { + if (defaultServer.getAddress().equals(address)) { + return; + } + GrpcChannelFinderServer server = servers.remove(address); + if (server != null) { + shutdownServerGracefully(server); + } + } + + @Override + public void shutdown() { + isShutdown = true; + for (GrpcChannelFinderServer server : servers.values()) { + shutdownServerGracefully(server); + } + servers.clear(); + } + + /** + * Gracefully shuts down a server's channel. + * + *

First attempts a graceful shutdown, waiting for in-flight RPCs to complete. If the timeout + * is exceeded, forces immediate shutdown. + */ + private void shutdownServerGracefully(GrpcChannelFinderServer server) { + ManagedChannel channel = server.getChannel(); + if (channel.isShutdown()) { + return; + } + + channel.shutdown(); + try { + if (!channel.awaitTermination(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + channel.shutdownNow(); + } + } catch (InterruptedException e) { + channel.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + /** gRPC implementation of {@link ChannelFinderServer}. */ static class GrpcChannelFinderServer implements ChannelFinderServer { private final String address; private final ManagedChannel channel; - public GrpcChannelFinderServer(String address, InstantiatingGrpcChannelProvider provider) - throws IOException { + /** + * Creates a server from a channel provider. + * + * @param address the server address + * @param provider the channel provider (must be a gRPC provider) + * @throws IOException if the channel cannot be created + */ + GrpcChannelFinderServer(String address, TransportChannelProvider provider) throws IOException { this.address = address; - // It's assumed that getTransportChannel() returns a ManagedChannel or can be cast to one. - // For this example, GrpcTransportChannel is used as in KeyAwareChannel. GrpcTransportChannel transportChannel = (GrpcTransportChannel) provider.getTransportChannel(); this.channel = (ManagedChannel) transportChannel.getChannel(); } - // Constructor for the default server that already has a channel - public GrpcChannelFinderServer(String address, ManagedChannel channel) { + /** + * Creates a server with an existing channel. Primarily for testing. + * + * @param address the server address + * @param channel the managed channel + */ + @VisibleForTesting + GrpcChannelFinderServer(String address, ManagedChannel channel) { this.address = address; this.channel = channel; } @@ -86,8 +170,12 @@ public String getAddress() { @Override public boolean isHealthy() { - // A simple health check. In a real scenario, this might involve a ping or other checks. - return !channel.isShutdown() && !channel.isTerminated(); + if (channel.isShutdown() || channel.isTerminated()) { + return false; + } + // Check connectivity state without triggering a connection attempt + ConnectivityState state = channel.getState(false); + return state != ConnectivityState.SHUTDOWN && state != ConnectivityState.TRANSIENT_FAILURE; } @Override From 212f23478d980c0c8916b474b95c2313721b7a8e Mon Sep 17 00:00:00 2001 From: Rahul Yadav Date: Fri, 16 Jan 2026 13:47:49 +0530 Subject: [PATCH 3/3] add tests --- .../GrpcChannelFinderServerFactoryTest.java | 119 ++++++++++++++++++ 1 file changed, 119 insertions(+) create mode 100644 google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GrpcChannelFinderServerFactoryTest.java diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GrpcChannelFinderServerFactoryTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GrpcChannelFinderServerFactoryTest.java new file mode 100644 index 0000000000..4a57764b19 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GrpcChannelFinderServerFactoryTest.java @@ -0,0 +1,119 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.spi.v1; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; + +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; +import com.google.cloud.spanner.SpannerException; +import io.grpc.ManagedChannelBuilder; +import org.junit.Test; + +public class GrpcChannelFinderServerFactoryTest { + + private static InstantiatingGrpcChannelProvider createProvider(String endpoint) { + return InstantiatingGrpcChannelProvider.newBuilder() + .setEndpoint(endpoint) + .setChannelConfigurator(ManagedChannelBuilder::usePlaintext) + .build(); + } + + @Test + public void defaultServerIsCached() throws Exception { + GrpcChannelFinderServerFactory factory = + new GrpcChannelFinderServerFactory(createProvider("localhost:1234")); + try { + ChannelFinderServer defaultServer = factory.defaultServer(); + ChannelFinderServer server = factory.create(defaultServer.getAddress()); + assertThat(server).isSameInstanceAs(defaultServer); + } finally { + factory.shutdown(); + } + } + + @Test + public void createCachesPerAddress() throws Exception { + GrpcChannelFinderServerFactory factory = + new GrpcChannelFinderServerFactory(createProvider("localhost:1234")); + try { + ChannelFinderServer first = factory.create("localhost:1111"); + ChannelFinderServer second = factory.create("localhost:1111"); + ChannelFinderServer third = factory.create("localhost:2222"); + + assertThat(second).isSameInstanceAs(first); + assertThat(third).isNotSameInstanceAs(first); + } finally { + factory.shutdown(); + } + } + + @Test + public void evictRemovesNonDefaultServer() throws Exception { + GrpcChannelFinderServerFactory factory = + new GrpcChannelFinderServerFactory(createProvider("localhost:1234")); + try { + ChannelFinderServer first = factory.create("localhost:1111"); + factory.evict("localhost:1111"); + ChannelFinderServer second = factory.create("localhost:1111"); + + assertThat(second).isNotSameInstanceAs(first); + } finally { + factory.shutdown(); + } + } + + @Test + public void evictIgnoresDefaultServer() throws Exception { + GrpcChannelFinderServerFactory factory = + new GrpcChannelFinderServerFactory(createProvider("localhost:1234")); + try { + ChannelFinderServer defaultServer = factory.defaultServer(); + factory.evict(defaultServer.getAddress()); + ChannelFinderServer server = factory.create(defaultServer.getAddress()); + + assertThat(server).isSameInstanceAs(defaultServer); + } finally { + factory.shutdown(); + } + } + + @Test + public void shutdownPreventsNewServers() throws Exception { + GrpcChannelFinderServerFactory factory = + new GrpcChannelFinderServerFactory(createProvider("localhost:1234")); + factory.shutdown(); + + assertThrows(SpannerException.class, () -> factory.create("localhost:1111")); + assertThat(factory.defaultServer().getChannel().isShutdown()).isTrue(); + } + + @Test + public void healthReflectsChannelShutdown() throws Exception { + GrpcChannelFinderServerFactory factory = + new GrpcChannelFinderServerFactory(createProvider("localhost:1234")); + try { + ChannelFinderServer server = factory.create("localhost:1111"); + assertThat(server.isHealthy()).isTrue(); + + server.getChannel().shutdownNow(); + assertThat(server.isHealthy()).isFalse(); + } finally { + factory.shutdown(); + } + } +}