diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinderServer.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinderServer.java new file mode 100644 index 0000000000..df57bac75d --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinderServer.java @@ -0,0 +1,66 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.spi.v1; + +import com.google.api.core.InternalApi; +import io.grpc.ManagedChannel; + +/** + * Represents a Spanner server endpoint for location-aware routing. + * + *

Each instance wraps a gRPC {@link ManagedChannel} connected to a specific Spanner server. The + * {@link ChannelFinderServerFactory} creates and caches these instances. + * + *

Implementations must be thread-safe as instances may be shared across multiple concurrent + * operations. + * + * @see ChannelFinderServerFactory + */ +@InternalApi +public interface ChannelFinderServer { + + /** + * Returns the network address of this server. + * + * @return the server address in "host:port" format + */ + String getAddress(); + + /** + * Returns whether this server is ready to accept RPCs. + * + *

A server is considered unhealthy if: + * + *

+ * + * @return true if the server is healthy and ready to accept RPCs + */ + boolean isHealthy(); + + /** + * Returns the gRPC channel for making RPCs to this server. + * + *

The returned channel is managed by the {@link ChannelFinderServerFactory} and should not be + * shut down directly by callers. + * + * @return the managed channel for this server + */ + ManagedChannel getChannel(); +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinderServerFactory.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinderServerFactory.java new file mode 100644 index 0000000000..4ebbfe5e00 --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/ChannelFinderServerFactory.java @@ -0,0 +1,79 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.spi.v1; + +import com.google.api.core.InternalApi; + +/** + * Factory for creating and caching server connections for location-aware routing. + * + *

Implementations are expected to cache {@link ChannelFinderServer} instances such that repeated + * calls with the same address return the same instance. This allows the {@link + * com.google.cloud.spanner.spi.v1.KeyRangeCache} to efficiently manage server references. + * + *

Implementations must be thread-safe. Multiple threads may concurrently call {@link + * #create(String)} with different addresses. + */ +@InternalApi +public interface ChannelFinderServerFactory { + + /** + * Returns the default server endpoint. + * + *

The default server is the original endpoint configured in {@link + * com.google.cloud.spanner.SpannerOptions}. It is used as a fallback when the location cache does + * not have routing information for a request. + * + * @return the default server, never null + */ + ChannelFinderServer defaultServer(); + + /** + * Creates or retrieves a cached server for the given address. + * + *

If a server for this address already exists in the cache, the cached instance is returned. + * Otherwise, a new server connection is created and cached. + * + * @param address the server address in "host:port" format + * @return a server instance for the address, never null + * @throws com.google.cloud.spanner.SpannerException if the channel cannot be created + */ + ChannelFinderServer create(String address); + + /** + * Evicts a server from the cache and gracefully shuts down its channel. + * + *

This method should be called when a server becomes unhealthy or is no longer needed. The + * channel shutdown is graceful: existing RPCs are allowed to complete, but new RPCs will not be + * accepted on this channel. + * + *

If the address is not in the cache, this method does nothing. + * + * @param address the server address to evict + */ + void evict(String address); + + /** + * Shuts down all cached server connections. + * + *

This method should be called when the Spanner client is closed to release all resources. + * Each channel is shut down gracefully, allowing in-flight RPCs to complete. + * + *

After calling this method, the factory should not be used to create new connections. + */ + void shutdown(); +} diff --git a/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcChannelFinderServerFactory.java b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcChannelFinderServerFactory.java new file mode 100644 index 0000000000..98357df2fd --- /dev/null +++ b/google-cloud-spanner/src/main/java/com/google/cloud/spanner/spi/v1/GrpcChannelFinderServerFactory.java @@ -0,0 +1,186 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.spi.v1; + +import com.google.api.core.InternalApi; +import com.google.api.gax.grpc.GrpcTransportChannel; +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; +import com.google.api.gax.rpc.TransportChannelProvider; +import com.google.cloud.spanner.ErrorCode; +import com.google.cloud.spanner.SpannerExceptionFactory; +import com.google.common.annotations.VisibleForTesting; +import io.grpc.ConnectivityState; +import io.grpc.ManagedChannel; +import java.io.IOException; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +/** + * gRPC implementation of {@link ChannelFinderServerFactory}. + * + *

This factory creates and caches gRPC channels per address. It uses {@link + * InstantiatingGrpcChannelProvider#withEndpoint(String)} to create new channels with the same + * configuration but different endpoints, avoiding race conditions. + */ +@InternalApi +class GrpcChannelFinderServerFactory implements ChannelFinderServerFactory { + + /** Timeout for graceful channel shutdown. */ + private static final long SHUTDOWN_TIMEOUT_SECONDS = 5; + + private final InstantiatingGrpcChannelProvider baseProvider; + private final Map servers = new ConcurrentHashMap<>(); + private final GrpcChannelFinderServer defaultServer; + private volatile boolean isShutdown = false; + + /** + * Creates a new factory with the given channel provider. + * + * @param channelProvider the base provider used to create channels. New channels for different + * endpoints are created using {@link InstantiatingGrpcChannelProvider#withEndpoint(String)}. + * @throws IOException if the default channel cannot be created + */ + public GrpcChannelFinderServerFactory(InstantiatingGrpcChannelProvider channelProvider) + throws IOException { + this.baseProvider = channelProvider; + String defaultEndpoint = channelProvider.getEndpoint(); + this.defaultServer = new GrpcChannelFinderServer(defaultEndpoint, channelProvider); + this.servers.put(defaultEndpoint, this.defaultServer); + } + + @Override + public ChannelFinderServer defaultServer() { + return defaultServer; + } + + @Override + public ChannelFinderServer create(String address) { + if (isShutdown) { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.FAILED_PRECONDITION, "ChannelFinderServerFactory has been shut down"); + } + + return servers.computeIfAbsent( + address, + addr -> { + try { + // Create a new provider with the same config but different endpoint. + // This is thread-safe as withEndpoint() returns a new provider instance. + TransportChannelProvider newProvider = baseProvider.withEndpoint(addr); + return new GrpcChannelFinderServer(addr, newProvider); + } catch (IOException e) { + throw SpannerExceptionFactory.newSpannerException( + ErrorCode.INTERNAL, "Failed to create channel for address: " + addr, e); + } + }); + } + + @Override + public void evict(String address) { + if (defaultServer.getAddress().equals(address)) { + return; + } + GrpcChannelFinderServer server = servers.remove(address); + if (server != null) { + shutdownServerGracefully(server); + } + } + + @Override + public void shutdown() { + isShutdown = true; + for (GrpcChannelFinderServer server : servers.values()) { + shutdownServerGracefully(server); + } + servers.clear(); + } + + /** + * Gracefully shuts down a server's channel. + * + *

First attempts a graceful shutdown, waiting for in-flight RPCs to complete. If the timeout + * is exceeded, forces immediate shutdown. + */ + private void shutdownServerGracefully(GrpcChannelFinderServer server) { + ManagedChannel channel = server.getChannel(); + if (channel.isShutdown()) { + return; + } + + channel.shutdown(); + try { + if (!channel.awaitTermination(SHUTDOWN_TIMEOUT_SECONDS, TimeUnit.SECONDS)) { + channel.shutdownNow(); + } + } catch (InterruptedException e) { + channel.shutdownNow(); + Thread.currentThread().interrupt(); + } + } + + /** gRPC implementation of {@link ChannelFinderServer}. */ + static class GrpcChannelFinderServer implements ChannelFinderServer { + private final String address; + private final ManagedChannel channel; + + /** + * Creates a server from a channel provider. + * + * @param address the server address + * @param provider the channel provider (must be a gRPC provider) + * @throws IOException if the channel cannot be created + */ + GrpcChannelFinderServer(String address, TransportChannelProvider provider) throws IOException { + this.address = address; + GrpcTransportChannel transportChannel = (GrpcTransportChannel) provider.getTransportChannel(); + this.channel = (ManagedChannel) transportChannel.getChannel(); + } + + /** + * Creates a server with an existing channel. Primarily for testing. + * + * @param address the server address + * @param channel the managed channel + */ + @VisibleForTesting + GrpcChannelFinderServer(String address, ManagedChannel channel) { + this.address = address; + this.channel = channel; + } + + @Override + public String getAddress() { + return address; + } + + @Override + public boolean isHealthy() { + if (channel.isShutdown() || channel.isTerminated()) { + return false; + } + // Check connectivity state without triggering a connection attempt + ConnectivityState state = channel.getState(false); + return state != ConnectivityState.SHUTDOWN && state != ConnectivityState.TRANSIENT_FAILURE; + } + + @Override + public ManagedChannel getChannel() { + return channel; + } + } +} diff --git a/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GrpcChannelFinderServerFactoryTest.java b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GrpcChannelFinderServerFactoryTest.java new file mode 100644 index 0000000000..4a57764b19 --- /dev/null +++ b/google-cloud-spanner/src/test/java/com/google/cloud/spanner/spi/v1/GrpcChannelFinderServerFactoryTest.java @@ -0,0 +1,119 @@ +/* + * Copyright 2026 Google LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.google.cloud.spanner.spi.v1; + +import static com.google.common.truth.Truth.assertThat; +import static org.junit.Assert.assertThrows; + +import com.google.api.gax.grpc.InstantiatingGrpcChannelProvider; +import com.google.cloud.spanner.SpannerException; +import io.grpc.ManagedChannelBuilder; +import org.junit.Test; + +public class GrpcChannelFinderServerFactoryTest { + + private static InstantiatingGrpcChannelProvider createProvider(String endpoint) { + return InstantiatingGrpcChannelProvider.newBuilder() + .setEndpoint(endpoint) + .setChannelConfigurator(ManagedChannelBuilder::usePlaintext) + .build(); + } + + @Test + public void defaultServerIsCached() throws Exception { + GrpcChannelFinderServerFactory factory = + new GrpcChannelFinderServerFactory(createProvider("localhost:1234")); + try { + ChannelFinderServer defaultServer = factory.defaultServer(); + ChannelFinderServer server = factory.create(defaultServer.getAddress()); + assertThat(server).isSameInstanceAs(defaultServer); + } finally { + factory.shutdown(); + } + } + + @Test + public void createCachesPerAddress() throws Exception { + GrpcChannelFinderServerFactory factory = + new GrpcChannelFinderServerFactory(createProvider("localhost:1234")); + try { + ChannelFinderServer first = factory.create("localhost:1111"); + ChannelFinderServer second = factory.create("localhost:1111"); + ChannelFinderServer third = factory.create("localhost:2222"); + + assertThat(second).isSameInstanceAs(first); + assertThat(third).isNotSameInstanceAs(first); + } finally { + factory.shutdown(); + } + } + + @Test + public void evictRemovesNonDefaultServer() throws Exception { + GrpcChannelFinderServerFactory factory = + new GrpcChannelFinderServerFactory(createProvider("localhost:1234")); + try { + ChannelFinderServer first = factory.create("localhost:1111"); + factory.evict("localhost:1111"); + ChannelFinderServer second = factory.create("localhost:1111"); + + assertThat(second).isNotSameInstanceAs(first); + } finally { + factory.shutdown(); + } + } + + @Test + public void evictIgnoresDefaultServer() throws Exception { + GrpcChannelFinderServerFactory factory = + new GrpcChannelFinderServerFactory(createProvider("localhost:1234")); + try { + ChannelFinderServer defaultServer = factory.defaultServer(); + factory.evict(defaultServer.getAddress()); + ChannelFinderServer server = factory.create(defaultServer.getAddress()); + + assertThat(server).isSameInstanceAs(defaultServer); + } finally { + factory.shutdown(); + } + } + + @Test + public void shutdownPreventsNewServers() throws Exception { + GrpcChannelFinderServerFactory factory = + new GrpcChannelFinderServerFactory(createProvider("localhost:1234")); + factory.shutdown(); + + assertThrows(SpannerException.class, () -> factory.create("localhost:1111")); + assertThat(factory.defaultServer().getChannel().isShutdown()).isTrue(); + } + + @Test + public void healthReflectsChannelShutdown() throws Exception { + GrpcChannelFinderServerFactory factory = + new GrpcChannelFinderServerFactory(createProvider("localhost:1234")); + try { + ChannelFinderServer server = factory.create("localhost:1111"); + assertThat(server.isHealthy()).isTrue(); + + server.getChannel().shutdownNow(); + assertThat(server.isHealthy()).isFalse(); + } finally { + factory.shutdown(); + } + } +}