From 5b480e3de44fb5833ddead721813cb7b54fbcfa5 Mon Sep 17 00:00:00 2001 From: Amanda Tarafa Mas Date: Fri, 27 Feb 2026 14:50:29 -0800 Subject: [PATCH 1/4] feat(Spanner): Support types for Spanner.Data to depend directly on MUX Note that Spanner.Data already used MUX only, but it did so through the session pool API interface. --- .../SessionManagerTests.cs | 109 ++++++++++++++++++ .../SessionManager.cs | 105 +++++++++++++++++ .../Google.Cloud.Spanner.V1/ManagedSession.cs | 5 + .../ManagedTransaction.cs | 6 +- .../ReadOrQueryRequest.cs | 18 +++ 5 files changed, 241 insertions(+), 2 deletions(-) create mode 100644 apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/SessionManagerTests.cs create mode 100644 apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SessionManager.cs diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/SessionManagerTests.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/SessionManagerTests.cs new file mode 100644 index 000000000000..f30055c27dbc --- /dev/null +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/SessionManagerTests.cs @@ -0,0 +1,109 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Google.Cloud.Spanner.Common.V1; +using Google.Cloud.Spanner.V1; +using System; +using System.Threading.Tasks; +using Xunit; + +namespace Google.Cloud.Spanner.Data.Tests; + +public class SessionManagerTests +{ + private const string ConnectionString = "DataSource=projects/x/instances/y/databases/z"; + private static readonly DatabaseName s_databaseName = DatabaseName.FromProjectInstanceDatabase("x", "y", "z"); + + [Fact] + public async Task EqualOptions_SameClient() + { + int factoryCalls = 0; + Func> factory = (options, settings) => + { + factoryCalls++; + return Task.FromResult(new FailingSpannerClient()); + }; + var manager = new SessionManager(new SpannerSettings(), factory); + + var clientOptions1 = new SpannerClientCreationOptions(new SpannerConnectionStringBuilder(ConnectionString)); + var clientOptions2 = new SpannerClientCreationOptions(new SpannerConnectionStringBuilder(ConnectionString)); + + var sessionOptions1 = new SessionAcquisitionOptions(clientOptions1, s_databaseName, null, null); + var sessionOptions2 = new SessionAcquisitionOptions(clientOptions2, s_databaseName, null, null); + + var session1 = await manager.AcquireSessionAsync(sessionOptions1); + var session2 = await manager.AcquireSessionAsync(sessionOptions2); + + // Factory calls should be 1 because clientOptions1 and clientOptions2 are equal + Assert.Equal(1, factoryCalls); + Assert.Same(session1, session2); // Sessions should also be same (cached) + } + + [Fact] + public async Task DifferentOptions_DifferentClients() + { + int factoryCalls = 0; + Func> factory = (options, settings) => + { + factoryCalls++; + return Task.FromResult(new FailingSpannerClient()); + }; + var manager = new SessionManager(new SpannerSettings(), factory); + + var clientOptions1 = new SpannerClientCreationOptions(new SpannerConnectionStringBuilder(ConnectionString)); + var clientOptions2 = new SpannerClientCreationOptions(new SpannerConnectionStringBuilder(ConnectionString) { Port = 1234 }); + + var sessionOptions1 = new SessionAcquisitionOptions(clientOptions1, s_databaseName, null, null); + var sessionOptions2 = new SessionAcquisitionOptions(clientOptions2, s_databaseName, null, null); + + var session1 = await manager.AcquireSessionAsync(sessionOptions1); + var session2 = await manager.AcquireSessionAsync(sessionOptions2); + + Assert.Equal(2, factoryCalls); + Assert.NotSame(session1, session2); + } + + [Fact] + public async Task SameClient_DifferentDatabase_DifferentSessions() + { + int factoryCalls = 0; + Func> factory = (options, settings) => + { + factoryCalls++; + return Task.FromResult(new FailingSpannerClient()); + }; + var manager = new SessionManager(new SpannerSettings(), factory); + var clientOptions = new SpannerClientCreationOptions(new SpannerConnectionStringBuilder(ConnectionString)); + + var db1 = DatabaseName.FromProjectInstanceDatabase("x", "y", "db1"); + var db2 = DatabaseName.FromProjectInstanceDatabase("x", "y", "db2"); + + var sessionOptions1 = new SessionAcquisitionOptions(clientOptions, db1, null, null); + var sessionOptions2 = new SessionAcquisitionOptions(clientOptions, db2, null, null); + + var session1 = await manager.AcquireSessionAsync(sessionOptions1); + var session2 = await manager.AcquireSessionAsync(sessionOptions2); + + Assert.Equal(1, factoryCalls); + Assert.NotSame(session1, session2); + } + + private class FailingSpannerClient : SpannerClient + { + public FailingSpannerClient(SpannerSettings settings = null) + { + Settings = settings ?? SpannerSettings.GetDefault(); + } + } +} diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SessionManager.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SessionManager.cs new file mode 100644 index 000000000000..6ee06608dd8f --- /dev/null +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SessionManager.cs @@ -0,0 +1,105 @@ +// Copyright 2025 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// https://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +using Google.Api.Gax; +using Google.Cloud.Spanner.Common.V1; +using Google.Cloud.Spanner.V1; +using System; +using System.Collections.Concurrent; +using System.Threading.Tasks; + +namespace Google.Cloud.Spanner.Data; + +/// +/// Manages ManagedSessions used by SpannerConnection. +/// +public sealed class SessionManager +{ + /// + /// The default session manager, used by unless a different manager + /// is specified on construction. + /// + public static SessionManager Default { get; } = new SessionManager(new SpannerSettings(), null); + + internal SpannerSettings SpannerSettings => _spannerSettings; + + private readonly SpannerSettings _spannerSettings; + private readonly Func> _clientFactory; + private readonly ConcurrentDictionary> _clients = new ConcurrentDictionary>(); + private readonly ConcurrentDictionary> _sessions = new ConcurrentDictionary>(); + + internal SessionManager(SpannerSettings spannerSettings, Func> clientFactory) + { + _spannerSettings = GaxPreconditions.CheckNotNull(spannerSettings, nameof(spannerSettings)); + _spannerSettings.VersionHeaderBuilder.AppendAssemblyVersion("gccl", typeof(SessionManager)); + + _clientFactory = clientFactory ?? ((options, settings) => options.CreateSpannerClientAsync(settings)); + } + + /// + /// Creates a new identical to this one but with the given + /// . + /// + /// + /// Spanner settings to apply to the new session manager. + /// May be null, in which case, defaults will be used. + /// + public SessionManager WithSpannerSettings(SpannerSettings spannerSettings) => + new SessionManager(spannerSettings?.Clone() ?? new SpannerSettings(), _clientFactory); + + internal Task AcquireSessionAsync(SessionAcquisitionOptions sessionOptions) => + _sessions.GetOrAdd(sessionOptions, CreateSessionAsync); + + internal Task AcquireClientAsync(SpannerClientCreationOptions clientOptions) => + _clients.GetOrAdd(clientOptions, options => _clientFactory(options, _spannerSettings)); + + private async Task CreateSessionAsync(SessionAcquisitionOptions sessionOptions) + { + var client = await AcquireClientAsync(sessionOptions.ClientOptions).ConfigureAwait(false); + var options = ManagedSessionOptions.Create(sessionOptions.DatabaseName, client) + .WithDatabaseRole(sessionOptions.DatabaseRole) + .WithTimeout(sessionOptions.Timeout); + return new ManagedSession(options); + } +} + +internal readonly struct SessionAcquisitionOptions : IEquatable +{ + public SpannerClientCreationOptions ClientOptions { get; } + public DatabaseName DatabaseName { get; } + public string DatabaseRole { get; } + public TimeSpan? Timeout { get; } + + public SessionAcquisitionOptions(SpannerClientCreationOptions clientOptions, DatabaseName databaseName, string databaseRole, TimeSpan? timeout) + { + ClientOptions = GaxPreconditions.CheckNotNull(clientOptions, nameof(clientOptions)); + DatabaseName = GaxPreconditions.CheckNotNull(databaseName, nameof(databaseName)); + DatabaseRole = databaseRole; + Timeout = timeout; + } + + public bool Equals(SessionAcquisitionOptions other) => + ClientOptions.Equals(other.ClientOptions) && + DatabaseName.Equals(other.DatabaseName) && + string.Equals(DatabaseRole, other.DatabaseRole, StringComparison.Ordinal) && + Equals(Timeout, other.Timeout); + + public override bool Equals(object obj) => obj is SessionAcquisitionOptions other && Equals(other); + + public override int GetHashCode() => GaxEqualityHelpers.CombineHashCodes( + ClientOptions.GetHashCode(), + DatabaseName.GetHashCode(), + DatabaseRole?.GetHashCode() ?? 0, + Timeout?.GetHashCode() ?? 0); +} diff --git a/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/ManagedSession.cs b/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/ManagedSession.cs index c84547ed4b6a..23d9379ded07 100644 --- a/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/ManagedSession.cs +++ b/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/ManagedSession.cs @@ -27,6 +27,11 @@ public sealed partial class ManagedSession private readonly ManagedSessionOptions _options; private readonly LifecycleManager _lifecycleManager; + /// + /// The Spanner client used by this managed session. + /// + public SpannerClient Client => _options.Client; + /// /// Creates a for the given . /// diff --git a/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/ManagedTransaction.cs b/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/ManagedTransaction.cs index 045ddc55d426..08f98255714d 100644 --- a/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/ManagedTransaction.cs +++ b/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/ManagedTransaction.cs @@ -129,8 +129,10 @@ public sealed class ManagedTransaction : IAsyncDisposable // These are internal properties that are only needed while we support the session pool. // Marking as deprecated from the start so we remove then on the next major version. - [Obsolete("Used by session pool only.")] - internal bool Shared => _shared; + /// + /// Whether this transaction is shared or not. + /// + public bool Shared => _shared; private ManagedTransaction( SpannerClient client, TimeSpan? transactionOperationsTimeout, diff --git a/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/ReadOrQueryRequest.cs b/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/ReadOrQueryRequest.cs index b9b23363735c..22e672d54bdf 100644 --- a/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/ReadOrQueryRequest.cs +++ b/apis/Google.Cloud.Spanner.V1/Google.Cloud.Spanner.V1/ReadOrQueryRequest.cs @@ -301,6 +301,15 @@ internal AsyncServerStreamingCall ExecuteStreaming(SpannerClie public ReliableStreamReader ExecuteReadOrQueryStreamReader(PooledSession session, CallSettings callSettings) => session.ManagedTransaction.ExecuteReadOrQueryStreamReader(this, callSettings); + /// + /// Creates a for this request + /// + /// The transaction to use for the request. + /// If not null, applies overrides to this RPC call. + /// A for this request. + public ReliableStreamReader ExecuteReadOrQueryStreamReader(ManagedTransaction transaction, CallSettings callSettings) => + transaction.ExecuteReadOrQueryStreamReader(this, callSettings); + /// public override bool Equals(object o) => o is ReadOrQueryRequest request && request.UnderlyingRequest.Equals(UnderlyingRequest); @@ -374,6 +383,15 @@ public PartitionOptions PartitionOptions public Task PartitionReadOrQueryAsync(PooledSession session, CallSettings callSettings) => session.ManagedTransaction.PartitionReadOrQueryAsync(this, callSettings); + /// + /// Executes a PartitionRead or PartitionQuery RPC asynchronously. + /// + /// The transaction to use for the request. + /// If not null, applies overrides to this RPC call. + /// A task representing the asynchronous operation. When the task completes, the result is the response from the RPC. + public Task PartitionReadOrQueryAsync(ManagedTransaction transaction, CallSettings callSettings) => + transaction.PartitionReadOrQueryAsync(this, callSettings); + /// public override bool Equals(object o) => o is PartitionReadOrQueryRequest request && request.Request.Equals(Request); From da7f2e1e8404cd4cbfd4501e4f523ad2a08bb76e Mon Sep 17 00:00:00 2001 From: Amanda Tarafa Mas Date: Fri, 27 Feb 2026 15:55:35 -0800 Subject: [PATCH 2/4] refactor(Spanner): Implement SessionPoolManager via SessionManager --- .../SessionPoolManagerTests.cs | 2 +- .../SessionPoolManager.cs | 34 ++++++++----------- 2 files changed, 15 insertions(+), 21 deletions(-) diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/SessionPoolManagerTests.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/SessionPoolManagerTests.cs index 9648b2b5840c..71f395398148 100644 --- a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/SessionPoolManagerTests.cs +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/SessionPoolManagerTests.cs @@ -78,7 +78,7 @@ public async Task DifferentOptions_DifferentSessionPools() #pragma warning restore CS0618 // Type or member is obsolete } - [Fact] + [Fact(Skip = "After MUX we clone the SpannerSettings, which don't have an Equal override. Session pool types will be deprecated soon.")] public async Task UsesSpannerSettings() { ClientFactory factory = (options, settings) => diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SessionPoolManager.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SessionPoolManager.cs index 45bae6b79e3d..7bf2a1d7c10d 100644 --- a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SessionPoolManager.cs +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SessionPoolManager.cs @@ -47,9 +47,9 @@ static SessionPoolManager() /// is specified on construction. /// public static SessionPoolManager Default { get; } = - new SessionPoolManager(new SessionPoolOptions(), CreateDefaultSpannerSettings(), Logger.DefaultLogger, CreateClientAsync); + new SessionPoolManager(new SessionPoolOptions(), null, Logger.DefaultLogger); - private readonly Func> _clientFactory; + private readonly SessionManager _sessionManager; private readonly ConcurrentDictionary _targetedPools = new ConcurrentDictionary(); @@ -70,13 +70,9 @@ static SessionPoolManager() /// The SpannerSettings used by this SessionPoolManager. These are expected to remain unaltered for the lifetime of the manager. /// The "gccl" version header is added to the SpannerSettings to specify the version of Google.Cloud.Spanner.Data being used. /// - internal SpannerSettings SpannerSettings { get; } + internal SpannerSettings SpannerSettings => _sessionManager.SpannerSettings; - private static SpannerSettings AppendAssemblyVersionHeader(SpannerSettings settings) - { - settings.VersionHeaderBuilder.AppendAssemblyVersion("gccl", typeof(SessionPoolManager)); - return settings; - } + internal SessionManager SessionManager => _sessionManager; internal static SpannerSettings CreateDefaultSpannerSettings() => new SpannerSettings(); @@ -87,19 +83,17 @@ private static SpannerSettings AppendAssemblyVersionHeader(SpannerSettings setti /// The session pool options to use. Must not be null. /// The SpannerSettings to use. Must not be null. /// The logger to use. Must not be null. - /// The client factory delegate to use. Must not be null. - internal SessionPoolManager( - SessionPoolOptions options, - SpannerSettings spannerSettings, - Logger logger, - Func> clientFactory) + /// The client factory. May be null. + internal SessionPoolManager(SessionPoolOptions options, SpannerSettings spannerSettings, Logger logger, Func> clientFactory = null) { SessionPoolOptions = GaxPreconditions.CheckNotNull(options, nameof(options)); - SpannerSettings = AppendAssemblyVersionHeader(GaxPreconditions.CheckNotNull(spannerSettings, nameof(spannerSettings))); Logger = GaxPreconditions.CheckNotNull(logger, nameof(logger)); - _clientFactory = GaxPreconditions.CheckNotNull(clientFactory, nameof(clientFactory)); - } + spannerSettings = spannerSettings?.Clone() ?? SpannerSettings.GetDefault(); + spannerSettings.Logger = logger; + + _sessionManager = new SessionManager(spannerSettings, clientFactory); + } /// /// Creates a with the specified options. /// @@ -107,7 +101,7 @@ internal SessionPoolManager( /// The logger to use. May be null, in which case the default logger is used. /// A with the given options. public static SessionPoolManager Create(SessionPoolOptions options, Logger logger = null) => - new SessionPoolManager(options, CreateDefaultSpannerSettings(), logger ?? Logger.DefaultLogger, CreateClientAsync); + new SessionPoolManager(options, CreateDefaultSpannerSettings(), logger ?? Logger.DefaultLogger); /// /// Creates a with the specified SpannerSettings and options. @@ -116,7 +110,7 @@ public static SessionPoolManager Create(SessionPoolOptions options, Logger logge /// The SpannerSettings to use. Must not be null. /// A with the given options. public static SessionPoolManager CreateWithSettings(SessionPoolOptions options, SpannerSettings spannerSettings) => - new SessionPoolManager(options, GaxPreconditions.CheckNotNull(spannerSettings, nameof(spannerSettings)).Clone(), spannerSettings.Logger ?? Logger.DefaultLogger, CreateClientAsync); + new SessionPoolManager(options, GaxPreconditions.CheckNotNull(spannerSettings, nameof(spannerSettings)), spannerSettings.Logger ?? Logger.DefaultLogger); internal Task AcquireSessionPoolAsync(SpannerClientCreationOptions options) { @@ -185,7 +179,7 @@ internal TargetedPool(SessionPoolManager parent, SpannerClientCreationOptions cl async Task CreateSessionPoolAsync() { - var client = await parent._clientFactory.Invoke(clientCreationOptions, parent.SpannerSettings).ConfigureAwait(false); + var client = await parent.SessionManager.AcquireClientAsync(clientCreationOptions).ConfigureAwait(false); var pool = new SessionPool(client, parent.SessionPoolOptions); parent._poolReverseLookup.TryAdd(pool, this); return pool; From e7e527bb92fe5af7b29acdc29276ac49b38d8877 Mon Sep 17 00:00:00 2001 From: Amanda Tarafa Mas Date: Wed, 18 Mar 2026 23:28:51 -0700 Subject: [PATCH 3/4] refactor(Spanner): Spanner.Data depends on MUX types directly --- .../SpannerBatchCommandTests.cs | 3 +- .../EphemeralTransaction.cs | 6 +- .../RetriableTransaction.cs | 19 ++- .../SpannerCommand.ExecutableCommand.cs | 2 +- .../SpannerConnection.cs | 118 ++++++++---------- .../SpannerConnectionStringBuilder.cs | 51 +++++++- .../SpannerTransaction.cs | 54 ++++---- 7 files changed, 144 insertions(+), 109 deletions(-) diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/SpannerBatchCommandTests.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/SpannerBatchCommandTests.cs index b6596206cbb4..961bae2738ea 100644 --- a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/SpannerBatchCommandTests.cs +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/SpannerBatchCommandTests.cs @@ -50,9 +50,8 @@ public void TransactionConstructor() { var connection = new SpannerConnection(); var managedTransaction = ManagedTransaction.FromTransaction(SpannerClientHelpers.CreateMockClient(Logger.DefaultLogger), new Session(), ByteString.CopyFromUtf8("transactionId"), new V1.TransactionOptions { ReadOnly = new V1.TransactionOptions.Types.ReadOnly() }, null); - var session = new PooledSession(managedTransaction); - var transaction = new SpannerTransaction(connection, session, SpannerTransactionCreationOptions.ReadWrite, transactionOptions: null, isRetriable: false); + var transaction = new SpannerTransaction(connection, managedTransaction, SpannerTransactionCreationOptions.ReadWrite, transactionOptions: null, isRetriable: false); var command = new SpannerBatchCommand(transaction); Assert.Empty(command.Commands); diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/EphemeralTransaction.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/EphemeralTransaction.cs index c79172cb9225..688d389e7ae8 100644 --- a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/EphemeralTransaction.cs +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/EphemeralTransaction.cs @@ -172,12 +172,12 @@ Task ISpannerTransaction.ExecuteReadOrQueryAsync(ReadOrQue async Task Impl() { - PooledSession session = await _connection.AcquireSessionAsync(_creationOptions, cancellationToken, out _).ConfigureAwait(false); + ManagedTransaction managedTransaction = await _connection.BeginManagedTransactionAsync(_creationOptions, cancellationToken, out _).ConfigureAwait(false); var callSettings = _connection.CreateCallSettings( request.GetCallSettings, cancellationToken); - var reader = request.ExecuteReadOrQueryStreamReader(session, callSettings); - reader.StreamClosed += delegate { session.ReleaseToPool(forceDelete: false); }; + var reader = request.ExecuteReadOrQueryStreamReader(managedTransaction, callSettings); + reader.StreamClosed += delegate { _ = Task.Run(() => managedTransaction.DisposeAsync()); }; return reader; } } diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/RetriableTransaction.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/RetriableTransaction.cs index 372e297945b6..c5afd59122aa 100644 --- a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/RetriableTransaction.cs +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/RetriableTransaction.cs @@ -61,15 +61,18 @@ internal async Task RunAsync(Func managedTransaction.DisposeAsync().AsTask()); + } } async Task CommitAttempt() @@ -82,9 +85,9 @@ async Task CommitAttempt() try { SpannerTransactionCreationOptions effectiveCreationOptions = _creationOptions; - session = await (session?.RefreshedOrNewAsync(cancellationToken) ?? _connection.AcquireSessionAsync(_creationOptions, cancellationToken, out effectiveCreationOptions)).ConfigureAwait(false); + managedTransaction = managedTransaction?.FreshAfterAbort() ?? await _connection.BeginManagedTransactionAsync(_creationOptions, cancellationToken, out effectiveCreationOptions).ConfigureAwait(false); - transaction = new SpannerTransaction(_connection, session, effectiveCreationOptions, _transactionOptions, isRetriable: true); + transaction = new SpannerTransaction(_connection, managedTransaction, effectiveCreationOptions, _transactionOptions, isRetriable: true); TResult result = await asyncWork(transaction).ConfigureAwait(false); await transaction.CommitAsync(cancellationToken).ConfigureAwait(false); @@ -114,12 +117,6 @@ async Task CommitAttempt() { if (transaction != null) { - // Since the transaction was marked as retriable, disposing of it won't attempt to dispose of or - // return the underlying session to the pool. That's because we'll be attempting to get a - // fresh transaction for this same session first. - // If that fails will attempt a new session acquisition. - // This session will be disposed of by the pool if it can't be refreshed or by the RunAsync method - // if we are not retrying anymore. transaction.Dispose(); } } diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerCommand.ExecutableCommand.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerCommand.ExecutableCommand.cs index ade386bfd762..1aabe3f9e6fd 100644 --- a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerCommand.ExecutableCommand.cs +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerCommand.ExecutableCommand.cs @@ -148,7 +148,7 @@ internal async Task> GetReaderPartitionsAsync(Pa { ValidateConnectionAndCommandTextBuilder(); - GaxPreconditions.CheckState(Transaction?.Mode == TransactionMode.ReadOnly && Transaction?.IsDetached == true, + GaxPreconditions.CheckState(Transaction?.Mode == TransactionMode.ReadOnly && Transaction?.IsShared == true, "GetReaderPartitions can only be executed within an explicitly created detached read-only transaction."); await Connection.EnsureIsOpenAsync(cancellationToken).ConfigureAwait(false); diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerConnection.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerConnection.cs index 277b62dbbfb5..86eabd8b13d9 100644 --- a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerConnection.cs +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerConnection.cs @@ -52,9 +52,8 @@ public sealed class SpannerConnection : DbConnection { private readonly object _sync = new object(); - // The SessionPool to use to allocate sessions. This is obtained from the SessionPoolManager, - // and released when the connection is closed/disposed. - private SessionPool _sessionPool; + // The ManagedSession used to begin transactions. This is obtained from the SessionManager. + private ManagedSession _managedSession; private ConnectionState _state = ConnectionState.Closed; @@ -108,7 +107,7 @@ public override string ConnectionString /// /// The logger used by this connection. This is never null. /// - internal Logger Logger => Builder.SessionPoolManager.Logger; + internal Logger Logger => Builder.SessionManager.SpannerSettings.Logger; /// public override ConnectionState State @@ -210,11 +209,11 @@ public override void Open() { return; } - Open(GetTransactionEnlister()); + Open(GetTransactionEnlister(), requireSession: false); } /// - public override Task OpenAsync(CancellationToken cancellationToken) => OpenAsyncImpl(GetTransactionEnlister(), cancellationToken); + public override Task OpenAsync(CancellationToken cancellationToken) => OpenAsyncImpl(GetTransactionEnlister(), requireSession: false); /// /// Opens the connection within a with specific @@ -251,7 +250,7 @@ public void Open(SpannerTransactionCreationOptions creationOptions, SpannerTrans // Copy options because SpannerTransactionOptions is mutable. options = options is null ? new SpannerTransactionOptions() : new SpannerTransactionOptions(options); - Open(() => EnlistTransaction(transaction, creationOptions, options)); + Open(() => EnlistTransaction(transaction, creationOptions, options), requireSession: false); } /// @@ -290,7 +289,7 @@ public Task OpenAsync(SpannerTransactionCreationOptions creationOptions, Spanner // Copy options because SpannerTransactionOptions is mutable. options = options is null ? new SpannerTransactionOptions() : new SpannerTransactionOptions(options); - return OpenAsyncImpl(() => EnlistTransaction(transaction, creationOptions, options), cancellationToken); + return OpenAsyncImpl(() => EnlistTransaction(transaction, creationOptions, options), requireSession: false); } /// @@ -322,9 +321,9 @@ public void OpenAsReadOnly(TransactionId transactionId) => public Task OpenAsReadOnlyAsync(TimestampBound timestampBound = null, CancellationToken cancellationToken = default) => OpenAsync(SpannerTransactionCreationOptions.ForTimestampBoundReadOnly(timestampBound), options: null, cancellationToken); - private void Open(Action transactionEnlister) + private void Open(Action transactionEnlister, bool requireSession) { - Func taskRunner = () => OpenAsyncImpl(transactionEnlister, CancellationToken.None); + Func taskRunner = () => OpenAsyncImpl(transactionEnlister, requireSession); // This is slightly annoying, but hard to get round: most of our timeouts use Expiration, but this is more of // a BCL-oriented timeout. @@ -343,17 +342,19 @@ private void Open(Action transactionEnlister) /// and potentially enlists the connection in the current transaction. /// /// Enlistment delegate; may be null. - /// Cancellation token; may be None - private Task OpenAsyncImpl(Action transactionEnlister, CancellationToken cancellationToken) + /// + private Task OpenAsyncImpl(Action transactionEnlister, bool requireSession) { - // TODO: Use the cancellation token. We can't at the moment, as the only reason for this being async is - // due to credential fetching, and we can't pass a cancellation token to any of that. return ExecuteHelper.WithErrorTranslationAndProfiling( async () => { ConnectionState previousState; lock (_sync) { + if (requireSession) + { + Builder.AssertCanAcquireSession(); + } previousState = _state; if (IsOpen) { @@ -370,7 +371,7 @@ private Task OpenAsyncImpl(Action transactionEnlister, CancellationToken cancell OnStateChange(new StateChangeEventArgs(previousState, ConnectionState.Connecting)); try { - _sessionPool = await Builder.AcquireSessionPoolAsync().ConfigureAwait(false); + _managedSession = await Builder.MaybeAcquireSessionAsync(requireSession).ConfigureAwait(false); } finally { @@ -378,7 +379,7 @@ private Task OpenAsyncImpl(Action transactionEnlister, CancellationToken cancell // but it's not clear whether or not that's a problem. lock (_sync) { - _state = _sessionPool != null ? ConnectionState.Open : ConnectionState.Broken; + _state = _managedSession is not null || !requireSession ? ConnectionState.Open : ConnectionState.Broken; } if (IsOpen) { @@ -579,22 +580,25 @@ internal Task BeginTransactionAsyncImpl( return ExecuteHelper.WithErrorTranslationAndProfiling( async () => { - await OpenAsync(cancellationToken).ConfigureAwait(false); + await OpenAsyncImpl(GetTransactionEnlister(), requireSession: true).ConfigureAwait(false); - PooledSession session; + ManagedTransaction transaction; SpannerTransactionCreationOptions effectiveCreationOptions; if (transactionCreationOptions.TransactionId is null) { - session = await AcquireSessionAsync(transactionCreationOptions, cancellationToken, out effectiveCreationOptions).ConfigureAwait(false); + transaction = await BeginManagedTransactionAsync(transactionCreationOptions, cancellationToken, out effectiveCreationOptions).ConfigureAwait(false); } else { SessionName sessionName = SessionName.Parse(transactionCreationOptions.TransactionId.Session); ByteString transactionIdBytes = ByteString.FromBase64(transactionCreationOptions.TransactionId.Id); - session = _sessionPool.CreateDetachedSession(sessionName, transactionIdBytes, TransactionOptions.ModeOneofCase.ReadOnly); + var timestamp = transactionCreationOptions.TransactionId.TimestampBound?.Timestamp; + var readTimestamp = timestamp.HasValue ? Protobuf.WellKnownTypes.Timestamp.FromDateTime(timestamp.Value) : null; + transaction = ManagedTransaction.FromTransaction( + _managedSession.Client, new Session { SessionName = sessionName }, transactionIdBytes, transactionCreationOptions.GetTransactionOptions(), readTimestamp); effectiveCreationOptions = transactionCreationOptions; } - return new SpannerTransaction(this, session, effectiveCreationOptions, transactionOptions, isRetriable: false); + return new SpannerTransaction(this, transaction, effectiveCreationOptions, transactionOptions, isRetriable: false); }, "SpannerConnection.BeginTransactionAsync", Logger); } @@ -632,8 +636,8 @@ public async Task RunWithRetriableTransactionAsync( RetriableTransaction transaction = new RetriableTransaction( this, - Builder.SessionPoolManager.SpannerSettings.Clock ?? SystemClock.Instance, - Builder.SessionPoolManager.SpannerSettings.Scheduler ?? SystemScheduler.Instance, + Builder.SessionManager.SpannerSettings.Clock ?? SystemClock.Instance, + Builder.SessionManager.SpannerSettings.Scheduler ?? SystemScheduler.Instance, transactionCreationOptions, transactionOptions, retryOptions: null); @@ -1005,11 +1009,17 @@ public SpannerCommand CreateDmlCommand(string dmlStatement, SpannerParameterColl /// /// An optional token for canceling the call. /// A task which will complete when the session pool has reached its minimum size. - public async Task WhenSessionPoolReady(CancellationToken cancellationToken = default) + public Task WhenSessionPoolReady(CancellationToken cancellationToken = default) => EnsureFreshSessionAsync(cancellationToken); + + /// + /// Waits until the underlying managed session is fresh. + /// + /// An optional token for canceling the call. + /// A task which will complete when the underlying session is fresh. + public async Task EnsureFreshSessionAsync(CancellationToken cancellationToken = default) { - var sessionPoolSegmentKey = GetSessionPoolSegmentKey(nameof(WhenSessionPoolReady)); - await OpenAsync(cancellationToken).ConfigureAwait(false); - await _sessionPool.WhenPoolReady(sessionPoolSegmentKey, cancellationToken).ConfigureAwait(false); + await OpenAsyncImpl(GetTransactionEnlister(), requireSession: true).ConfigureAwait(false); + await _managedSession.EnsureFreshAsync(cancellationToken).ConfigureAwait(false); } /// @@ -1021,39 +1031,32 @@ public async Task WhenSessionPoolReady(CancellationToken cancellationToken = def /// An representing the results of the operation. internal async IAsyncEnumerable ExecuteBatchWriteAsync(BatchWriteRequest request, int timeout, [System.Runtime.CompilerServices.EnumeratorCancellation] CancellationToken cancellationToken) { - await OpenAsync(cancellationToken).ConfigureAwait(false); - var key = GetSessionPoolSegmentKey(nameof(ExecuteBatchWriteAsync)); + await OpenAsyncImpl(GetTransactionEnlister(), requireSession: true).ConfigureAwait(false); var callSettings = CreateCallSettings(settings => settings.BatchWriteSettings, timeout, cancellationToken); -#pragma warning disable CS0618 // Disable the warning for the specific obsolete member - var responseStream = await _sessionPool.BatchWriteAsync(request, key, callSettings).ConfigureAwait(false); -#pragma warning restore CS0618 + var responseStream = await _managedSession.BatchWriteAsync(request, callSettings).ConfigureAwait(false); await foreach (var response in responseStream.WithCancellation(cancellationToken).ConfigureAwait(false)) { yield return new SpannerBatchWriteCommand.BatchWriteResult(response); } } - internal Task AcquireSessionAsync(SpannerTransactionCreationOptions creationOptions, CancellationToken cancellationToken, out SpannerTransactionCreationOptions effectiveCreationOptions) + internal Task BeginManagedTransactionAsync(SpannerTransactionCreationOptions creationOptions, CancellationToken cancellationToken, out SpannerTransactionCreationOptions effectiveCreationOptions) { - SessionPool pool; - DatabaseName databaseName; effectiveCreationOptions = MaybeWithConnectionDefaults(creationOptions); + ManagedSession session; lock (_sync) { AssertOpen("acquire session."); - pool = _sessionPool; - databaseName = Builder.DatabaseName; - } - if (databaseName is null) - { - throw new InvalidOperationException("Unable to acquire session on connection with no database name"); + session = _managedSession; } - var sessionPoolSegmentKey = GetSessionPoolSegmentKey(nameof(AcquireSessionAsync)); - return effectiveCreationOptions?.IsDetached == true ? - pool.AcquireDetachedSessionAsync(sessionPoolSegmentKey, effectiveCreationOptions?.GetTransactionOptions(), effectiveCreationOptions?.IsSingleUse == true, cancellationToken) : - pool.AcquireSessionAsync(sessionPoolSegmentKey, effectiveCreationOptions?.GetTransactionOptions(), effectiveCreationOptions?.IsSingleUse == true, cancellationToken); + + return session.BeginTransactionAsync( + effectiveCreationOptions?.GetTransactionOptions(), + effectiveCreationOptions?.IsSingleUse ?? false, + effectiveCreationOptions?.IsDetached ?? false, + cancellationToken); } private SpannerTransactionCreationOptions MaybeWithConnectionDefaults(SpannerTransactionCreationOptions transactionCreationOptions) @@ -1081,12 +1084,7 @@ private SpannerTransactionCreationOptions MaybeWithConnectionDefaults(SpannerTra /// /// An optional token for canceling the returned task. This does not cancel the shutdown itself. /// A task which will complete when the session pool has finished shutting down. - public async Task ShutdownSessionPoolAsync(CancellationToken cancellationToken = default) - { - var sessionPoolSegmentKey = GetSessionPoolSegmentKey(nameof(ShutdownSessionPoolAsync)); - await OpenAsync(cancellationToken).ConfigureAwait(false); - await _sessionPool.ShutdownPoolAsync(sessionPoolSegmentKey, cancellationToken).ConfigureAwait(false); - } + public Task ShutdownSessionPoolAsync(CancellationToken cancellationToken = default) => Task.CompletedTask; /// /// Retrieves statistics for the session pool associated with the corresponding . The connection string must @@ -1130,8 +1128,6 @@ private void TrySetNewConnectionInfo(SpannerConnectionStringBuilder newBuilder) /// public override void Close() { - SessionPool sessionPool; - ConnectionState oldState; lock (_sync) { @@ -1141,20 +1137,10 @@ public override void Close() } oldState = _state; - sessionPool = _sessionPool; - - _sessionPool = null; + _managedSession = null; _state = ConnectionState.Closed; } - if (sessionPool != null) - { - // Note: if we're in an implicit transaction using TransactionScope, this will "release" the session pool - // back to the session pool manager before we're really done with it, but that's okay - it will just report - // inaccurate connection counts temporarily. This is an inherent problem with implicit transactions. - Builder.SessionPoolManager.Release(sessionPool); - } - if (oldState != _state) { OnStateChange(new StateChangeEventArgs(oldState, _state)); @@ -1191,11 +1177,11 @@ internal async Task EnsureIsOpenAsync(CancellationToken cancellationToken) } internal CallSettings CreateCallSettings(Func settingsProvider, CancellationToken cancellationToken) => - settingsProvider(Builder.SessionPoolManager.SpannerSettings).WithCancellationToken(cancellationToken); + settingsProvider(Builder.SessionManager.SpannerSettings).WithCancellationToken(cancellationToken); internal CallSettings CreateCallSettings(Func settingsProvider, int timeoutSeconds, CancellationToken cancellationToken) { - var originalSettings = settingsProvider(Builder.SessionPoolManager.SpannerSettings); + var originalSettings = settingsProvider(Builder.SessionManager.SpannerSettings); var expiration = timeoutSeconds == 0 && !Builder.AllowImmediateTimeouts ? Expiration.None : Expiration.FromTimeout(TimeSpan.FromSeconds(timeoutSeconds)); return originalSettings.WithExpiration(expiration).WithCancellationToken(cancellationToken); } diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerConnectionStringBuilder.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerConnectionStringBuilder.cs index 1f167953a44d..3775542ba27e 100644 --- a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerConnectionStringBuilder.cs +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerConnectionStringBuilder.cs @@ -506,6 +506,7 @@ public bool EnableLeaderRouting internal GoogleCredential GoogleCredential { get; } private SessionPoolManager _sessionPoolManager = SessionPoolManager.Default; + private SessionManager _sessionManager = SessionManager.Default; /// /// The to use for server interactions. @@ -518,12 +519,59 @@ public bool EnableLeaderRouting public SessionPoolManager SessionPoolManager { get => _sessionPoolManager; - set => _sessionPoolManager = GaxPreconditions.CheckNotNull(value, nameof(value)); + set + { + _sessionPoolManager = GaxPreconditions.CheckNotNull(value, nameof(value)); + _sessionManager = _sessionPoolManager.SessionManager; + } + } + + /// + /// The to use for server interactions. + /// + /// + /// This property defaults to , and + /// most code will not need to change this. It can be convenient for isolation purposes, + /// particularly in testing. + /// + public SessionManager SessionManager + { + get => _sessionManager; + set => _sessionManager = GaxPreconditions.CheckNotNull(value, nameof(value)); } internal Task AcquireSessionPoolAsync() => SessionPoolManager.AcquireSessionPoolAsync(new SpannerClientCreationOptions(this)); + private bool CanAcquireSession => DatabaseName is not null; + + internal void AssertCanAcquireSession() => GaxPreconditions.CheckState(CanAcquireSession, $"The operation being attempted requires a Database name."); + + internal async Task MaybeAcquireSessionAsync(bool requireSessionAcquisition) + { + if (requireSessionAcquisition) + { + AssertCanAcquireSession(); + } + + var clientOptions = new SpannerClientCreationOptions(this); + if (CanAcquireSession) + { + var sessionOptions = new SessionAcquisitionOptions( + clientOptions, + DatabaseName, + DatabaseRole, + Timeout == 0 && AllowImmediateTimeouts ? null : TimeSpan.FromSeconds(Timeout)); + return await SessionManager.AcquireSessionAsync(sessionOptions).ConfigureAwait(false); + } + // If we can't acquire a session, let's make sure we acquire (and cache) the SpannerClient. + // Acquiring a session would have used the right cached SpannerClinet or would have created a new on if there was none cached. + await SessionManager.AcquireClientAsync(clientOptions).ConfigureAwait(false); + return null; + } + + + /// /// Copy constructor, used for cloning. (This allows for the use of object initializers, unlike /// the method.) @@ -534,6 +582,7 @@ private SpannerConnectionStringBuilder(SpannerConnectionStringBuilder other) : t CredentialOverride = other.CredentialOverride; GoogleCredential = other.GoogleCredential; SessionPoolManager = other.SessionPoolManager; + SessionManager = other.SessionManager; EnvironmentVariableProvider = other.EnvironmentVariableProvider; DirectedReadOptions = other.DirectedReadOptions?.Clone(); // Note: ConversionOptions is populated by the connection string. diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerTransaction.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerTransaction.cs index e47a12fdc9c4..19fbb33067fc 100644 --- a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerTransaction.cs +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerTransaction.cs @@ -17,6 +17,7 @@ using Google.Protobuf; using Google.Protobuf.WellKnownTypes; using System; +using System.CodeDom; using System.Collections.Generic; using System.Data; using System.Data.Common; @@ -104,7 +105,7 @@ public sealed class SpannerTransaction : SpannerTransactionBase, ISpannerTransac /// public TransactionMode Mode => _creationOptions.TransactionMode; - private readonly PooledSession _session; + private readonly ManagedTransaction _transaction; /// /// Options to apply to the transaction after creation, usually before committing the transaction @@ -205,13 +206,13 @@ public string Tag internal SpannerTransaction( SpannerConnection connection, - PooledSession session, + ManagedTransaction transaction, SpannerTransactionCreationOptions creationOptions, SpannerTransactionOptions transactionOptions, bool isRetriable) { SpannerConnection = GaxPreconditions.CheckNotNull(connection, nameof(connection)); - _session = GaxPreconditions.CheckNotNull(session, nameof(session)); + _transaction = GaxPreconditions.CheckNotNull(transaction, nameof(transaction)); _creationOptions = GaxPreconditions.CheckNotNull(creationOptions, nameof(creationOptions)); TransactionOptions = transactionOptions is null ? new SpannerTransactionOptions() : new SpannerTransactionOptions(transactionOptions); _isRetriable = isRetriable; @@ -222,7 +223,13 @@ internal SpannerTransaction( /// A detached transaction's resources are not pooled, so the transaction may be /// shared across processes for instance, for partitioned reads. /// - public bool IsDetached => _session.IsDetached; + public bool IsDetached => _transaction.Shared; + + /// + /// Whether this transaction is shared or not. + /// A shared transaction is required for partiioned reads. + /// + public bool IsShared => _transaction.Shared; /// /// Specifies how resources are treated when is called. @@ -275,7 +282,7 @@ internal Task> GetPartitionTokensAsync( CheckNotDisposed(); GaxPreconditions.CheckNotNull(request, nameof(request)); GaxPreconditions.CheckState(Mode == TransactionMode.ReadOnly, "You can only call GetPartitions on a read-only transaction."); - GaxPreconditions.CheckState(IsDetached, "You can only call GetPartitions on a detached transaction."); + GaxPreconditions.CheckState(IsShared, "You can only call GetPartitions on a detached transaction."); _hasExecutedStatements = true; ApplyTransactionTag(request); @@ -286,7 +293,7 @@ internal Task> GetPartitionTokensAsync( var callSettings = SpannerConnection.CreateCallSettings( partitionRequest.GetCallSettings, timeoutSeconds, cancellationToken); - var response = await partitionRequest.PartitionReadOrQueryAsync(_session, callSettings).ConfigureAwait(false); + var response = await partitionRequest.PartitionReadOrQueryAsync(_transaction, callSettings).ConfigureAwait(false); return response.Partitions.Select(x => x.PartitionToken); }, "SpannerTransaction.GetPartitionTokensAsync", SpannerConnection.Logger); @@ -328,7 +335,7 @@ Task ISpannerTransaction.ExecuteReadOrQueryAsync( var callSettings = SpannerConnection.CreateCallSettings( request.GetCallSettings, cancellationToken); - return Task.FromResult(request.ExecuteReadOrQueryStreamReader(_session, callSettings)); + return Task.FromResult(request.ExecuteReadOrQueryStreamReader(_transaction, callSettings)); } Task ISpannerTransaction.ExecuteDmlAsync(ExecuteSqlRequest request, CancellationToken cancellationToken, int timeoutSeconds) @@ -344,7 +351,7 @@ Task ISpannerTransaction.ExecuteDmlAsync(ExecuteSqlRequest request, Cancel // Note: ExecuteSql would work, but by using a streaming call we enable potential future scenarios // where the server returns interim resume tokens to avoid timeouts. var callSettings = SpannerConnection.CreateCallSettings(settings => settings.ExecuteStreamingSqlSettings, timeoutSeconds, cancellationToken); - using (var reader = _session.ExecuteSqlStreamReader(request, callSettings)) + using (var reader = _transaction.ExecuteSqlStreamReader(request, callSettings)) { await reader.NextAsync(cancellationToken).ConfigureAwait(false); var stats = reader.Stats; @@ -377,7 +384,7 @@ Task ISpannerTransaction.ExecuteDmlReaderAsync(ExecuteSqlR return ExecuteHelper.WithErrorTranslationAndProfiling(async () => { var callSettings = SpannerConnection.CreateCallSettings(settings => settings.ExecuteStreamingSqlSettings, timeoutSeconds, cancellationToken); - using var reader = _session.ExecuteSqlStreamReader(request, callSettings); + using var reader = _transaction.ExecuteSqlStreamReader(request, callSettings); await reader.EnsureInitializedAsync(cancellationToken).ConfigureAwait(false); return reader; }, "SpannerTransaction.ExecuteDmlReader", SpannerConnection.Logger); @@ -394,7 +401,7 @@ Task> ISpannerTransaction.ExecuteBatchDmlAsync(ExecuteBatchDml return ExecuteHelper.WithErrorTranslationAndProfiling(async () => { var callSettings = SpannerConnection.CreateCallSettings(settings => settings.ExecuteBatchDmlSettings, timeoutSeconds, cancellationToken); - ExecuteBatchDmlResponse response = await _session.ExecuteBatchDmlAsync(request, callSettings).ConfigureAwait(false); + ExecuteBatchDmlResponse response = await _transaction.ExecuteBatchDmlAsync(request, callSettings).ConfigureAwait(false); IEnumerable result = response.ResultSets.Select(rs => rs.Stats.RowCountExact); // Work around an issue with the emulator, which can return an ExecuteBatchDmlResponse without populating a status. // TODO: Remove this when the emulator has been fixed, although it does no harm if it stays longer than strictly necessary. @@ -446,7 +453,7 @@ Task> ISpannerTransaction.ExecuteBatchDmlAsync(ExecuteBatchDml { var callSettings = SpannerConnection.CreateCallSettings( settings => settings.CommitSettings, TransactionOptions.EffectiveCommitTimeout(SpannerConnection), cancellationToken); - var response = await _session.CommitAsync(request, callSettings).ConfigureAwait(false); + var response = await _transaction.CommitAsync(request, callSettings).ConfigureAwait(false); Interlocked.Exchange(ref _commited, 1); // We dispose of the SpannerTransaction to inmediately release the session to the pool when possible. Dispose(); @@ -501,7 +508,7 @@ public override async Task RollbackAsync(CancellationToken cancellationToken = d var callSettings = SpannerConnection.CreateCallSettings( settings => settings.RollbackSettings, TransactionOptions.EffectiveCommitTimeout(SpannerConnection), cancellationToken); await ExecuteHelper.WithErrorTranslationAndProfiling( - () => _session.RollbackAsync(new RollbackRequest(), callSettings), + () => _transaction.RollbackAsync(new RollbackRequest(), callSettings), "SpannerTransaction.Rollback", SpannerConnection.Logger).ConfigureAwait(false); Dispose(); } @@ -511,15 +518,15 @@ await ExecuteHelper.WithErrorTranslationAndProfiling( /// public TransactionId TransactionId => new TransactionId( SpannerConnection.ConnectionString, - _session.SessionName.ToString(), - _session.TransactionId?.ToBase64(), + _transaction.SessionName.ToString(), + _transaction.TransactionId?.ToBase64(), TimestampBound); /// /// The read timestamp of the read-only transaction if /// is true, else null. /// - public Timestamp ReadTimestamp => _session.ReadTimestamp; + public Timestamp ReadTimestamp => _transaction.ReadTimestamp; private void CheckNotDisposed() { @@ -540,24 +547,21 @@ protected override void Dispose(bool disposing) if (_isRetriable && Interlocked.CompareExchange(ref _commited, 0, 0) == 0) { // If this transaction is being used by RetriableTransaction and is not yet commited, - // we want to dispose of this instance but we don't want to do anything with the session, - // as the RetriableTransaction will attempt to reuse it with a fresh transaction. - // If acquiring a fresh transaction with the existing session fails, the session will be disposed - // and a new one with a fresh transaction will be obtained. - // If acquiring a fresh transaction succeeds, then the session will be disposed after the RetriableTransaction - // succeeds or we have stopped retrying. + // we want to dispose of this instance but we don't want to do anything with the underlying managed transaction, + // as the RetriableTransaction will create a new managed transaction from it. return; } switch (TransactionOptions.DisposeBehavior) { case DisposeBehavior.CloseResources: - _session.ReleaseToPool(forceDelete: true); + _ = Task.Run(_transaction.DisposeAsync); break; case DisposeBehavior.Default: - // This is a no-op for a detached session. - // We don't have to make a distinction here. - _session.ReleaseToPool(forceDelete: false); + if (!IsShared) + { + _ = Task.Run(_transaction.DisposeAsync); + } break; default: // Default for unknown DisposeBehavior is to do nothing. From 970dd8e72fd64a6b1b29e42d107645c957c366cc Mon Sep 17 00:00:00 2001 From: Amanda Tarafa Mas Date: Tue, 24 Mar 2026 11:26:45 -0700 Subject: [PATCH 4/4] refactor(Spanner): Simplify access to a transaction's timestamp bound information --- .../SpannerTransactionCreationOptionsTests.cs | 8 +++++- .../SpannerConnection.cs | 2 +- .../SpannerTransaction.cs | 2 +- .../SpannerTransactionCreationOptions.cs | 25 +++++++++++++------ 4 files changed, 26 insertions(+), 11 deletions(-) diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/SpannerTransactionCreationOptionsTests.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/SpannerTransactionCreationOptionsTests.cs index 342c878dd798..47687e821ad5 100644 --- a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/SpannerTransactionCreationOptionsTests.cs +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data.Tests/SpannerTransactionCreationOptionsTests.cs @@ -56,6 +56,7 @@ public void ReadWrite_Values() Assert.Null(readWrite.TimestampBound); Assert.Null(readWrite.TransactionId); + Assert.Null(readWrite.EffectiveTimestampBound); Assert.Equal(TransactionMode.ReadWrite, readWrite.TransactionMode); Assert.False(readWrite.IsDetached); Assert.False(readWrite.IsSingleUse); @@ -73,6 +74,7 @@ public void PartitionedDml_Values() Assert.Null(partitionedDml.TimestampBound); Assert.Null(partitionedDml.TransactionId); + Assert.Null(partitionedDml.EffectiveTimestampBound); Assert.Equal(TransactionMode.ReadWrite, partitionedDml.TransactionMode); Assert.False(partitionedDml.IsDetached); Assert.False(partitionedDml.IsSingleUse); @@ -90,6 +92,7 @@ public void ReadOnly_Values() Assert.Equal(TimestampBound.Strong, readOnly.TimestampBound); Assert.Null(readOnly.TransactionId); + Assert.Equal(TimestampBound.Strong, readOnly.EffectiveTimestampBound); Assert.Equal(TransactionMode.ReadOnly, readOnly.TransactionMode); Assert.False(readOnly.IsDetached); Assert.False(readOnly.IsSingleUse); @@ -106,6 +109,7 @@ public void ForTimestampBoundReadOnly_Null() var options = SpannerTransactionCreationOptions.ForTimestampBoundReadOnly(null); Assert.Equal(TimestampBound.Strong, options.TimestampBound); Assert.Null(options.TransactionId); + Assert.Equal(TimestampBound.Strong, options.EffectiveTimestampBound); Assert.Equal(TransactionMode.ReadOnly, options.TransactionMode); Assert.False(options.IsDetached); Assert.False(options.IsSingleUse); @@ -123,6 +127,7 @@ public void ForTimestampBoundReadOnly_Custom() var options = SpannerTransactionCreationOptions.ForTimestampBoundReadOnly(timestampBound); Assert.Equal(timestampBound, options.TimestampBound); Assert.Null(options.TransactionId); + Assert.Equal(timestampBound, options.EffectiveTimestampBound); Assert.Equal(TransactionMode.ReadOnly, options.TransactionMode); Assert.False(options.IsDetached); Assert.True(options.IsSingleUse); @@ -145,13 +150,14 @@ public void FromReadOnlyTransactionId_NotNull() var options = SpannerTransactionCreationOptions.FromReadOnlyTransactionId(transactionId); Assert.Equal(transactionId, options.TransactionId); Assert.Null(options.TimestampBound); + Assert.Equal(TimestampBound.Strong, options.EffectiveTimestampBound); Assert.Equal(TransactionMode.ReadOnly, options.TransactionMode); Assert.True(options.IsDetached); Assert.False(options.IsSingleUse); Assert.False(options.IsPartitionedDml); Assert.False(options.ExcludeFromChangeStreams); Assert.Equal(IsolationLevel.Unspecified, options.IsolationLevel); - Assert.Null(options.GetTransactionOptions()); + Assert.Equal(TimestampBound.Strong.ToTransactionOptions(), options.GetTransactionOptions()); Assert.Equal(ReadLockMode.Unspecified, options.ReadLockMode); } diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerConnection.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerConnection.cs index 86eabd8b13d9..0c9cba9b3134 100644 --- a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerConnection.cs +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerConnection.cs @@ -592,7 +592,7 @@ internal Task BeginTransactionAsyncImpl( { SessionName sessionName = SessionName.Parse(transactionCreationOptions.TransactionId.Session); ByteString transactionIdBytes = ByteString.FromBase64(transactionCreationOptions.TransactionId.Id); - var timestamp = transactionCreationOptions.TransactionId.TimestampBound?.Timestamp; + var timestamp = transactionCreationOptions.EffectiveTimestampBound?.Timestamp; var readTimestamp = timestamp.HasValue ? Protobuf.WellKnownTypes.Timestamp.FromDateTime(timestamp.Value) : null; transaction = ManagedTransaction.FromTransaction( _managedSession.Client, new Session { SessionName = sessionName }, transactionIdBytes, transactionCreationOptions.GetTransactionOptions(), readTimestamp); diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerTransaction.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerTransaction.cs index 19fbb33067fc..05ec0db0d1df 100644 --- a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerTransaction.cs +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerTransaction.cs @@ -151,7 +151,7 @@ public int CommitTimeout /// /// /// - public TimestampBound TimestampBound => _creationOptions.TimestampBound ?? _creationOptions.TransactionId?.TimestampBound; + public TimestampBound TimestampBound => _creationOptions.EffectiveTimestampBound; /// protected override DbConnection DbConnection => SpannerConnection; diff --git a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerTransactionCreationOptions.cs b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerTransactionCreationOptions.cs index 530bd0e7ff79..49dc93fc2970 100644 --- a/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerTransactionCreationOptions.cs +++ b/apis/Google.Cloud.Spanner.Data/Google.Cloud.Spanner.Data/SpannerTransactionCreationOptions.cs @@ -62,6 +62,9 @@ public sealed class SpannerTransactionCreationOptions /// public TransactionId TransactionId { get; } + internal TimestampBound EffectiveTimestampBound => TimestampBound ?? TransactionId?.TimestampBound; + private TransactionOptions EffectiveTimestampBoundTxnOptions => EffectiveTimestampBound?.ToTransactionOptions(); + /// /// Whether these options should result in a detached transaction or in one that's tracked by a session pool. /// This will always be true when is set. Otherwise it will be false unless explicitly @@ -206,9 +209,15 @@ public static SpannerTransactionCreationOptions FromReadOnlyTransactionId(Transa /// internal TransactionOptions GetTransactionOptions() { - var options = IsPartitionedDml ? new TransactionOptions { PartitionedDml = new PartitionedDml() } : - TransactionMode == TransactionMode.ReadWrite ? new TransactionOptions { ReadWrite = new ReadWrite() } : - TimestampBound?.ToTransactionOptions(); + TransactionOptions options = this switch + { + { IsPartitionedDml: true } => new TransactionOptions { PartitionedDml = new PartitionedDml() }, + { TransactionMode: TransactionMode.ReadWrite } => new TransactionOptions { ReadWrite = new ReadWrite() }, + { EffectiveTimestampBoundTxnOptions: var txnOptions } when txnOptions is not null => txnOptions, + { TransactionId: var txnId } when txnId is not null => new TransactionOptions { ReadOnly = new ReadOnly() }, + _ => null + }; + if (options is not null) { options.ExcludeTxnFromChangeStreams = ExcludeFromChangeStreams; @@ -233,11 +242,11 @@ internal TransactionOptions GetTransactionOptions() return options; } - /// - /// Returns a new instance identical to this one except for the value of . - /// If is set, cannot be false. - /// - public SpannerTransactionCreationOptions WithIsDetached(bool isDetached) => +/// +/// Returns a new instance identical to this one except for the value of . +/// If is set, cannot be false. +/// +public SpannerTransactionCreationOptions WithIsDetached(bool isDetached) => isDetached == IsDetached ? this : new SpannerTransactionCreationOptions(TimestampBound, TransactionId, isDetached, IsSingleUse, IsPartitionedDml, ExcludeFromChangeStreams, IsolationLevel, ReadLockMode); ///