From db819bb80857a735a3f47990e37f1bf119506ee2 Mon Sep 17 00:00:00 2001 From: Ferdinando Papale <4850119+papafe@users.noreply.github.com> Date: Thu, 26 Feb 2026 17:40:04 +0100 Subject: [PATCH 01/10] Merge --- .../Core/Misc/BatchableSource.cs | 8 +- .../Core/Operations/AggregateOperation.cs | 16 +- .../Operations/BulkMixedWriteOperation.cs | 4 +- .../BulkUnmixedWriteOperationBase.cs | 6 +- .../Core/Operations/ChangeStreamOperation.cs | 8 +- .../Operations/ClientBulkWriteOperation.cs | 8 +- .../Core/Operations/CommandOperationBase.cs | 11 +- .../Core/Operations/CountOperation.cs | 14 +- .../Core/Operations/DistinctOperation.cs | 14 +- .../EstimatedDocumentCountOperation.cs | 6 +- .../Core/Operations/FindOperation.cs | 9 +- .../Core/Operations/ICommandCreator.cs | 39 +++++ .../Operations/ListCollectionsOperation.cs | 4 +- .../Core/Operations/ListIndexesOperation.cs | 4 +- .../ListIndexesUsingCommandOperation.cs | 4 +- .../Core/Operations/ReadCommandOperation.cs | 29 +++- .../RetryableDeleteCommandOperation.cs | 2 +- .../RetryableInsertCommandOperation.cs | 2 +- .../Core/Operations/RetryableReadContext.cs | 138 ++++++++------- .../RetryableReadOperationExecutor.cs | 87 ++++------ .../RetryableUpdateCommandOperation.cs | 4 +- .../RetryableWriteCommandOperationBase.cs | 4 +- .../Core/Operations/RetryableWriteContext.cs | 137 ++++++++------- .../RetryableWriteOperationExecutor.cs | 160 +++++++++--------- .../Core/LoadBalancingIntergationTests.cs | 26 ++- .../Operations/CommandOperationBaseTests.cs | 13 -- .../RetryableWriteOperationExecutorTests.cs | 3 +- 27 files changed, 428 insertions(+), 332 deletions(-) create mode 100644 src/MongoDB.Driver/Core/Operations/ICommandCreator.cs diff --git a/src/MongoDB.Driver/Core/Misc/BatchableSource.cs b/src/MongoDB.Driver/Core/Misc/BatchableSource.cs index 4452dac3a99..efa725f6868 100644 --- a/src/MongoDB.Driver/Core/Misc/BatchableSource.cs +++ b/src/MongoDB.Driver/Core/Misc/BatchableSource.cs @@ -88,12 +88,18 @@ public BatchableSource(IReadOnlyList items, bool canBeSplit = false) /// The count. /// if set to true the batch can be split. public BatchableSource(IReadOnlyList items, int offset, int count, bool canBeSplit) + : this(items, offset, count, 0, canBeSplit) + { + + } + + internal BatchableSource(IReadOnlyList items, int offset, int count, int processedCount, bool canBeSplit) { _items = Ensure.IsNotNull(items, nameof(items)); _offset = Ensure.IsBetween(offset, 0, items.Count, nameof(offset)); _count = Ensure.IsBetween(count, 0, items.Count - offset, nameof(count)); + _processedCount = Ensure.IsBetween(processedCount, 0, count, nameof(processedCount)); _canBeSplit = canBeSplit; - _processedCount = 0; } // public properties diff --git a/src/MongoDB.Driver/Core/Operations/AggregateOperation.cs b/src/MongoDB.Driver/Core/Operations/AggregateOperation.cs index 3b6b1b2972a..7252ec107ae 100644 --- a/src/MongoDB.Driver/Core/Operations/AggregateOperation.cs +++ b/src/MongoDB.Driver/Core/Operations/AggregateOperation.cs @@ -29,7 +29,7 @@ namespace MongoDB.Driver.Core.Operations { - internal sealed class AggregateOperation : IReadOperation>, IExecutableInRetryableReadContext> + internal sealed class AggregateOperation : IReadOperation>, IExecutableInRetryableReadContext>, ICommandCreator { // fields private bool? _allowDiskUse; @@ -279,7 +279,7 @@ public IAsyncCursor Execute(OperationContext operationContext, IReadBin Ensure.IsNotNull(binding, nameof(binding)); using (BeginOperation()) - using (var context = RetryableReadContext.Create(operationContext, binding, _retryRequested)) + using (var context = new RetryableReadContext(binding, _retryRequested)) { return Execute(operationContext, context); } @@ -308,7 +308,7 @@ public async Task> ExecuteAsync(OperationContext operation Ensure.IsNotNull(binding, nameof(binding)); using (BeginOperation()) - using (var context = await RetryableReadContext.CreateAsync(operationContext, binding, _retryRequested).ConfigureAwait(false)) + using (var context = new RetryableReadContext(binding, _retryRequested)) { return await ExecuteAsync(operationContext, context).ConfigureAwait(false); } @@ -331,7 +331,7 @@ public async Task> ExecuteAsync(OperationContext operation } } - internal BsonDocument CreateCommand(OperationContext operationContext, ICoreSession session, ConnectionDescription connectionDescription) + public BsonDocument CreateCommand(OperationContext operationContext, ICoreSession session, ConnectionDescription connectionDescription) { var readConcern = ReadConcernHelper.GetReadConcernForCommand(session, connectionDescription, _readConcern); var command = new BsonDocument @@ -362,9 +362,13 @@ internal BsonDocument CreateCommand(OperationContext operationContext, ICoreSess private ReadCommandOperation CreateOperation(OperationContext operationContext, RetryableReadContext context) { var databaseNamespace = _collectionNamespace == null ? _databaseNamespace : _collectionNamespace.DatabaseNamespace; - var command = CreateCommand(operationContext, context.Binding.Session, context.Channel.ConnectionDescription); var serializer = new AggregateResultDeserializer(_resultSerializer); - return new ReadCommandOperation(databaseNamespace, command, serializer, MessageEncoderSettings, OperationName) + return new ReadCommandOperation( + databaseNamespace, + this, + serializer, + MessageEncoderSettings, + OperationName) { RetryRequested = _retryRequested // might be overridden by retryable read context }; diff --git a/src/MongoDB.Driver/Core/Operations/BulkMixedWriteOperation.cs b/src/MongoDB.Driver/Core/Operations/BulkMixedWriteOperation.cs index cafaaacd934..5114553e9e1 100644 --- a/src/MongoDB.Driver/Core/Operations/BulkMixedWriteOperation.cs +++ b/src/MongoDB.Driver/Core/Operations/BulkMixedWriteOperation.cs @@ -145,7 +145,7 @@ public WriteConcern WriteConcern public BulkWriteOperationResult Execute(OperationContext operationContext, IWriteBinding binding) { using (BeginOperation()) - using (var context = RetryableWriteContext.Create(operationContext, binding, IsOperationRetryable())) + using (var context = new RetryableWriteContext(binding, IsOperationRetryable())) { EnsureHintIsSupportedIfAnyRequestHasHint(); var helper = new BatchHelper(_requests, _isOrdered, _writeConcern); @@ -160,7 +160,7 @@ public BulkWriteOperationResult Execute(OperationContext operationContext, IWrit public async Task ExecuteAsync(OperationContext operationContext, IWriteBinding binding) { using (BeginOperation()) - using (var context = await RetryableWriteContext.CreateAsync(operationContext, binding, IsOperationRetryable()).ConfigureAwait(false)) + using (var context = new RetryableWriteContext(binding, IsOperationRetryable())) { EnsureHintIsSupportedIfAnyRequestHasHint(); var helper = new BatchHelper(_requests, _isOrdered, _writeConcern); diff --git a/src/MongoDB.Driver/Core/Operations/BulkUnmixedWriteOperationBase.cs b/src/MongoDB.Driver/Core/Operations/BulkUnmixedWriteOperationBase.cs index cc82f25b1d2..09d0c8a0a23 100644 --- a/src/MongoDB.Driver/Core/Operations/BulkUnmixedWriteOperationBase.cs +++ b/src/MongoDB.Driver/Core/Operations/BulkUnmixedWriteOperationBase.cs @@ -131,7 +131,7 @@ public BulkWriteOperationResult Execute(OperationContext operationContext, Retry public BulkWriteOperationResult Execute(OperationContext operationContext, IWriteBinding binding) { using (BeginOperation()) - using (var context = RetryableWriteContext.Create(operationContext, binding, IsOperationRetryable())) + using (var context = new RetryableWriteContext(binding, IsOperationRetryable())) { return Execute(operationContext, context); } @@ -147,7 +147,7 @@ public Task ExecuteAsync(OperationContext operationCon public async Task ExecuteAsync(OperationContext operationContext, IWriteBinding binding) { using (BeginOperation()) - using (var context = await RetryableWriteContext.CreateAsync(operationContext, binding, IsOperationRetryable()).ConfigureAwait(false)) + using (var context = new RetryableWriteContext(binding, IsOperationRetryable())) { return await ExecuteAsync(operationContext, context).ConfigureAwait(false); } @@ -286,7 +286,7 @@ public BulkWriteOperationResult CreateFinalResultOrThrow(IChannelHandle channel) { var combiner = new BulkWriteBatchResultCombiner(_batchResults, _writeConcern.IsAcknowledged); var remainingRequests = _requests.GetUnprocessedItems(); - return combiner.CreateResultOrThrowIfHasErrors(channel.ConnectionDescription.ConnectionId, remainingRequests); + return combiner.CreateResultOrThrowIfHasErrors(channel?.ConnectionDescription.ConnectionId, remainingRequests); } // private methods diff --git a/src/MongoDB.Driver/Core/Operations/ChangeStreamOperation.cs b/src/MongoDB.Driver/Core/Operations/ChangeStreamOperation.cs index 7306f5d196a..82db288a6c0 100644 --- a/src/MongoDB.Driver/Core/Operations/ChangeStreamOperation.cs +++ b/src/MongoDB.Driver/Core/Operations/ChangeStreamOperation.cs @@ -266,7 +266,7 @@ public IChangeStreamCursor Execute(OperationContext operationContext, I IAsyncCursor cursor; ICursorBatchInfo cursorBatchInfo; BsonTimestamp initialOperationTime; - using (var context = RetryableReadContext.Create(operationContext, binding, _retryRequested)) + using (var context = new RetryableReadContext(binding, _retryRequested)) { cursor = ExecuteAggregateOperation(operationContext, context); cursorBatchInfo = (ICursorBatchInfo)cursor; @@ -301,7 +301,7 @@ public async Task> ExecuteAsync(OperationContext op IAsyncCursor cursor; ICursorBatchInfo cursorBatchInfo; BsonTimestamp initialOperationTime; - using (var context = await RetryableReadContext.CreateAsync(operationContext, binding, _retryRequested).ConfigureAwait(false)) + using (var context = new RetryableReadContext(binding, _retryRequested)) { cursor = await ExecuteAggregateOperationAsync(operationContext, context).ConfigureAwait(false); cursorBatchInfo = (ICursorBatchInfo)cursor; @@ -326,7 +326,7 @@ public async Task> ExecuteAsync(OperationContext op /// public IAsyncCursor Resume(OperationContext operationContext, IReadBinding binding) { - using (var context = RetryableReadContext.Create(operationContext, binding, retryRequested: false)) + using (var context = new RetryableReadContext(binding, retryRequested: false)) { return ExecuteAggregateOperation(operationContext, context); } @@ -335,7 +335,7 @@ public IAsyncCursor Resume(OperationContext operationContext, I /// public async Task> ResumeAsync(OperationContext operationContext, IReadBinding binding) { - using (var context = await RetryableReadContext.CreateAsync(operationContext, binding, retryRequested: false).ConfigureAwait(false)) + using (var context = new RetryableReadContext(binding, retryRequested: false)) { return await ExecuteAggregateOperationAsync(operationContext, context).ConfigureAwait(false); } diff --git a/src/MongoDB.Driver/Core/Operations/ClientBulkWriteOperation.cs b/src/MongoDB.Driver/Core/Operations/ClientBulkWriteOperation.cs index d0360d2b230..01a50ffdac3 100644 --- a/src/MongoDB.Driver/Core/Operations/ClientBulkWriteOperation.cs +++ b/src/MongoDB.Driver/Core/Operations/ClientBulkWriteOperation.cs @@ -96,7 +96,7 @@ protected override IEnumerable CreateCommandPayl var bulkWriteResults = new BulkWriteRawResult(); while (true) { - using var context = RetryableWriteContext.Create(operationContext, binding, GetEffectiveRetryRequested()); + using var context = new RetryableWriteContext(binding, GetEffectiveRetryRequested()); BsonDocument serverResponse = null; try { @@ -112,7 +112,7 @@ protected override IEnumerable CreateCommandPayl bulkWriteResults.TopLevelException = commandException; serverResponse = commandException.Result; } - catch (Exception exception) + catch (Exception exception) when (context.Channel is not null) { bulkWriteResults.TopLevelException = exception; } @@ -154,7 +154,7 @@ protected override IEnumerable CreateCommandPayl var bulkWriteResults = new BulkWriteRawResult(); while (true) { - using var context = RetryableWriteContext.Create(operationContext, binding, GetEffectiveRetryRequested()); + using var context = new RetryableWriteContext(binding, GetEffectiveRetryRequested()); BsonDocument serverResponse = null; try { @@ -170,7 +170,7 @@ protected override IEnumerable CreateCommandPayl bulkWriteResults.TopLevelException = commandException; serverResponse = commandException.Result; } - catch (Exception exception) + catch (Exception exception) when (context.Channel is not null) { bulkWriteResults.TopLevelException = exception; } diff --git a/src/MongoDB.Driver/Core/Operations/CommandOperationBase.cs b/src/MongoDB.Driver/Core/Operations/CommandOperationBase.cs index e5f94ebe200..a1294e95624 100644 --- a/src/MongoDB.Driver/Core/Operations/CommandOperationBase.cs +++ b/src/MongoDB.Driver/Core/Operations/CommandOperationBase.cs @@ -41,7 +41,7 @@ protected CommandOperationBase( MessageEncoderSettings messageEncoderSettings) { _databaseNamespace = Ensure.IsNotNull(databaseNamespace, nameof(databaseNamespace)); - _command = Ensure.IsNotNull(command, nameof(command)); + _command = command; //can be null _resultSerializer = Ensure.IsNotNull(resultSerializer, nameof(resultSerializer)); _messageEncoderSettings = messageEncoderSettings; } @@ -146,6 +146,15 @@ protected async Task ExecuteProtocolAsync( } } + /// + /// Sets the command to be executed. This is used by derived classes that build commands dynamically. + /// + /// The command. + protected void SetCommand(BsonDocument command) + { + _command = Ensure.IsNotNull(command, nameof(command)); + } + private BsonDocument GetEffectiveAdditionalOptions() { if (_additionalOptions == null && _comment == null) diff --git a/src/MongoDB.Driver/Core/Operations/CountOperation.cs b/src/MongoDB.Driver/Core/Operations/CountOperation.cs index d7a79a904a3..69f564a3768 100644 --- a/src/MongoDB.Driver/Core/Operations/CountOperation.cs +++ b/src/MongoDB.Driver/Core/Operations/CountOperation.cs @@ -25,7 +25,7 @@ namespace MongoDB.Driver.Core.Operations { - internal sealed class CountOperation : IReadOperation, IExecutableInRetryableReadContext + internal sealed class CountOperation : IReadOperation, IExecutableInRetryableReadContext, ICommandCreator { private Collation _collation; private readonly CollectionNamespace _collectionNamespace; @@ -133,7 +133,7 @@ public long Execute(OperationContext operationContext, IReadBinding binding) Ensure.IsNotNull(binding, nameof(binding)); using (BeginOperation()) - using (var context = RetryableReadContext.Create(operationContext, binding, _retryRequested)) + using (var context = new RetryableReadContext(binding, _retryRequested)) { return Execute(operationContext, context); } @@ -151,7 +151,7 @@ public async Task ExecuteAsync(OperationContext operationContext, IReadBin Ensure.IsNotNull(binding, nameof(binding)); using (BeginOperation()) - using (var context = await RetryableReadContext.CreateAsync(operationContext, binding, _retryRequested).ConfigureAwait(false)) + using (var context = new RetryableReadContext(binding, _retryRequested)) { return await ExecuteAsync(operationContext, context).ConfigureAwait(false); } @@ -168,8 +168,12 @@ public async Task ExecuteAsync(OperationContext operationContext, Retryabl private ReadCommandOperation CreateOperation(OperationContext operationContext, RetryableReadContext context) { - var command = CreateCommand(operationContext, context.Binding.Session, context.Channel.ConnectionDescription); - return new ReadCommandOperation(_collectionNamespace.DatabaseNamespace, command, BsonDocumentSerializer.Instance, _messageEncoderSettings, OperationName) + return new ReadCommandOperation( + _collectionNamespace.DatabaseNamespace, + this, + BsonDocumentSerializer.Instance, + _messageEncoderSettings, + OperationName) { RetryRequested = _retryRequested // might be overridden by retryable read context }; diff --git a/src/MongoDB.Driver/Core/Operations/DistinctOperation.cs b/src/MongoDB.Driver/Core/Operations/DistinctOperation.cs index 094a81ef35e..5f1b578cf37 100644 --- a/src/MongoDB.Driver/Core/Operations/DistinctOperation.cs +++ b/src/MongoDB.Driver/Core/Operations/DistinctOperation.cs @@ -27,7 +27,7 @@ namespace MongoDB.Driver.Core.Operations { - internal sealed class DistinctOperation : IReadOperation> + internal sealed class DistinctOperation : IReadOperation>, ICommandCreator { private Collation _collation; private CollectionNamespace _collectionNamespace; @@ -111,7 +111,7 @@ public IAsyncCursor Execute(OperationContext operationContext, IReadBind Ensure.IsNotNull(binding, nameof(binding)); using (BeginOperation()) - using (var context = RetryableReadContext.Create(operationContext, binding, _retryRequested)) + using (var context = new RetryableReadContext(binding, _retryRequested)) { var operation = CreateOperation(operationContext, context); var result = operation.Execute(operationContext, context); @@ -127,7 +127,7 @@ public async Task> ExecuteAsync(OperationContext operationC Ensure.IsNotNull(binding, nameof(binding)); using (BeginOperation()) - using (var context = await RetryableReadContext.CreateAsync(operationContext, binding, _retryRequested).ConfigureAwait(false)) + using (var context = new RetryableReadContext(binding, _retryRequested)) { var operation = CreateOperation(operationContext, context); var result = await operation.ExecuteAsync(operationContext, context).ConfigureAwait(false); @@ -157,10 +157,14 @@ public BsonDocument CreateCommand(OperationContext operationContext, ICoreSessio private ReadCommandOperation CreateOperation(OperationContext operationContext, RetryableReadContext context) { - var command = CreateCommand(operationContext, context.Binding.Session, context.Channel.ConnectionDescription); var serializer = new DistinctResultDeserializer(_valueSerializer); - return new ReadCommandOperation(_collectionNamespace.DatabaseNamespace, command, serializer, _messageEncoderSettings, OperationName) + return new ReadCommandOperation( + _collectionNamespace.DatabaseNamespace, + this, + serializer, + _messageEncoderSettings, + OperationName) { RetryRequested = _retryRequested // might be overridden by retryable read context }; diff --git a/src/MongoDB.Driver/Core/Operations/EstimatedDocumentCountOperation.cs b/src/MongoDB.Driver/Core/Operations/EstimatedDocumentCountOperation.cs index f1972574c35..6ff1642dae0 100644 --- a/src/MongoDB.Driver/Core/Operations/EstimatedDocumentCountOperation.cs +++ b/src/MongoDB.Driver/Core/Operations/EstimatedDocumentCountOperation.cs @@ -1,4 +1,4 @@ -/* Copyright 2021-present MongoDB Inc. +/* Copyright 2021-present MongoDB Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -73,7 +73,7 @@ public long Execute(OperationContext operationContext, IReadBinding binding) Ensure.IsNotNull(binding, nameof(binding)); using (BeginOperation()) - using (var context = RetryableReadContext.Create(operationContext, binding, _retryRequested)) + using (var context = new RetryableReadContext(binding, _retryRequested)) { var operation = CreateCountOperation(); @@ -86,7 +86,7 @@ public async Task ExecuteAsync(OperationContext operationContext, IReadBin Ensure.IsNotNull(binding, nameof(binding)); using (BeginOperation()) - using (var context = RetryableReadContext.Create(operationContext, binding, _retryRequested)) + using (var context = new RetryableReadContext(binding, _retryRequested)) { var operation = CreateCountOperation(); diff --git a/src/MongoDB.Driver/Core/Operations/FindOperation.cs b/src/MongoDB.Driver/Core/Operations/FindOperation.cs index 9ad0c2fb057..9498d38d643 100644 --- a/src/MongoDB.Driver/Core/Operations/FindOperation.cs +++ b/src/MongoDB.Driver/Core/Operations/FindOperation.cs @@ -28,7 +28,7 @@ namespace MongoDB.Driver.Core.Operations { - internal sealed class FindOperation : IReadOperation>, IExecutableInRetryableReadContext> + internal sealed class FindOperation : IReadOperation>, IExecutableInRetryableReadContext>, ICommandCreator { #region static // private static fields @@ -287,7 +287,7 @@ public IAsyncCursor Execute(OperationContext operationContext, IReadB Ensure.IsNotNull(binding, nameof(binding)); using (BeginOperation()) - using (var context = RetryableReadContext.Create(operationContext, binding, _retryRequested)) + using (var context = new RetryableReadContext(binding, _retryRequested)) { return Execute(operationContext, context); } @@ -310,7 +310,7 @@ public async Task> ExecuteAsync(OperationContext operati Ensure.IsNotNull(binding, nameof(binding)); using (BeginOperation()) - using (var context = await RetryableReadContext.CreateAsync(operationContext, binding, _retryRequested).ConfigureAwait(false)) + using (var context = new RetryableReadContext(binding, _retryRequested)) { return await ExecuteAsync(operationContext, context).ConfigureAwait(false); } @@ -369,10 +369,9 @@ private CursorBatch CreateFirstCursorBatch(BsonDocument cursorDocumen private ReadCommandOperation CreateOperation(OperationContext operationContext, RetryableReadContext context) { - var command = CreateCommand(operationContext, context.Binding.Session, context.Channel.ConnectionDescription); var operation = new ReadCommandOperation( _collectionNamespace.DatabaseNamespace, - command, + this, __findCommandResultSerializer, _messageEncoderSettings, OperationName) diff --git a/src/MongoDB.Driver/Core/Operations/ICommandCreator.cs b/src/MongoDB.Driver/Core/Operations/ICommandCreator.cs new file mode 100644 index 00000000000..44e541893fc --- /dev/null +++ b/src/MongoDB.Driver/Core/Operations/ICommandCreator.cs @@ -0,0 +1,39 @@ +/* Copyright 2010-present MongoDB Inc. +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +using MongoDB.Bson; +using MongoDB.Driver.Core.Bindings; +using MongoDB.Driver.Core.Connections; + +namespace MongoDB.Driver.Core.Operations +{ + /// + /// Interface for operations that create commands dynamically based on session and connection information. + /// + internal interface ICommandCreator + { + /// + /// Creates a command to be executed. + /// + /// The operation context. + /// The session. + /// The connection description. + /// The command document. + BsonDocument CreateCommand( + OperationContext operationContext, + ICoreSession session, + ConnectionDescription connectionDescription); + } +} diff --git a/src/MongoDB.Driver/Core/Operations/ListCollectionsOperation.cs b/src/MongoDB.Driver/Core/Operations/ListCollectionsOperation.cs index ae0e2d3f340..f4c66846379 100644 --- a/src/MongoDB.Driver/Core/Operations/ListCollectionsOperation.cs +++ b/src/MongoDB.Driver/Core/Operations/ListCollectionsOperation.cs @@ -98,7 +98,7 @@ public IAsyncCursor Execute(OperationContext operationContext, IRe using (BeginOperation()) { - using (var context = RetryableReadContext.Create(operationContext, binding, _retryRequested)) + using (var context = new RetryableReadContext(binding, _retryRequested)) { return Execute(operationContext, context); } @@ -123,7 +123,7 @@ public async Task> ExecuteAsync(OperationContext oper using (BeginOperation()) { - using (var context = await RetryableReadContext.CreateAsync(operationContext, binding, _retryRequested).ConfigureAwait(false)) + using (var context = new RetryableReadContext(binding, _retryRequested)) { return await ExecuteAsync(operationContext, context).ConfigureAwait(false); } diff --git a/src/MongoDB.Driver/Core/Operations/ListIndexesOperation.cs b/src/MongoDB.Driver/Core/Operations/ListIndexesOperation.cs index 30e5ce69012..f23c75df1c9 100644 --- a/src/MongoDB.Driver/Core/Operations/ListIndexesOperation.cs +++ b/src/MongoDB.Driver/Core/Operations/ListIndexesOperation.cs @@ -74,7 +74,7 @@ public IAsyncCursor Execute(OperationContext operationContext, IRe Ensure.IsNotNull(binding, nameof(binding)); using (BeginOperation()) - using (var context = RetryableReadContext.Create(operationContext, binding, _retryRequested)) + using (var context = new RetryableReadContext(binding, _retryRequested)) { var operation = CreateOperation(); return operation.Execute(operationContext, context); @@ -86,7 +86,7 @@ public async Task> ExecuteAsync(OperationContext oper Ensure.IsNotNull(binding, nameof(binding)); using (BeginOperation()) - using (var context = await RetryableReadContext.CreateAsync(operationContext, binding, _retryRequested).ConfigureAwait(false)) + using (var context = new RetryableReadContext(binding, _retryRequested)) { var operation = CreateOperation(); return await operation.ExecuteAsync(operationContext, context).ConfigureAwait(false); diff --git a/src/MongoDB.Driver/Core/Operations/ListIndexesUsingCommandOperation.cs b/src/MongoDB.Driver/Core/Operations/ListIndexesUsingCommandOperation.cs index 8dd7868284a..9cb1a86e1cb 100644 --- a/src/MongoDB.Driver/Core/Operations/ListIndexesUsingCommandOperation.cs +++ b/src/MongoDB.Driver/Core/Operations/ListIndexesUsingCommandOperation.cs @@ -75,7 +75,7 @@ public IAsyncCursor Execute(OperationContext operationContext, IRe { Ensure.IsNotNull(binding, nameof(binding)); - using (var context = RetryableReadContext.Create(operationContext, binding, _retryRequested)) + using (var context = new RetryableReadContext(binding, _retryRequested)) { return Execute(operationContext, context); } @@ -104,7 +104,7 @@ public async Task> ExecuteAsync(OperationContext oper { Ensure.IsNotNull(binding, nameof(binding)); - using (var context = await RetryableReadContext.CreateAsync(operationContext, binding, _retryRequested).ConfigureAwait(false)) + using (var context = new RetryableReadContext(binding, _retryRequested)) { return await ExecuteAsync(operationContext, context).ConfigureAwait(false); } diff --git a/src/MongoDB.Driver/Core/Operations/ReadCommandOperation.cs b/src/MongoDB.Driver/Core/Operations/ReadCommandOperation.cs index 504a7e91f18..3e8bc65db44 100644 --- a/src/MongoDB.Driver/Core/Operations/ReadCommandOperation.cs +++ b/src/MongoDB.Driver/Core/Operations/ReadCommandOperation.cs @@ -26,6 +26,7 @@ namespace MongoDB.Driver.Core.Operations internal sealed class ReadCommandOperation : CommandOperationBase, IReadOperation, IRetryableReadOperation { private readonly string _operationName; + private readonly ICommandCreator _commandCreator; private bool _retryRequested; public ReadCommandOperation( @@ -41,6 +42,18 @@ public ReadCommandOperation( public string OperationName => _operationName; + public ReadCommandOperation( + DatabaseNamespace databaseNamespace, + ICommandCreator commandCreator, + IBsonSerializer resultSerializer, + MessageEncoderSettings messageEncoderSettings, + string operationName = null) + : base(databaseNamespace, null, resultSerializer, messageEncoderSettings) + { + _commandCreator = Ensure.IsNotNull(commandCreator, nameof(commandCreator)); + _operationName = operationName; + } + public bool RetryRequested { get => _retryRequested; @@ -51,7 +64,7 @@ public TCommandResult Execute(OperationContext operationContext, IReadBinding bi { Ensure.IsNotNull(binding, nameof(binding)); - using (var context = RetryableReadContext.Create(operationContext, binding, _retryRequested)) + using (var context = new RetryableReadContext(binding, _retryRequested)) { return Execute(operationContext, context); } @@ -71,7 +84,7 @@ public async Task ExecuteAsync(OperationContext operationContext { Ensure.IsNotNull(binding, nameof(binding)); - using (var context = await RetryableReadContext.CreateAsync(operationContext, binding, _retryRequested).ConfigureAwait(false)) + using (var context = new RetryableReadContext(binding, _retryRequested)) { return await ExecuteAsync(operationContext, context).ConfigureAwait(false); } @@ -89,11 +102,23 @@ public async Task ExecuteAsync(OperationContext operationContext public TCommandResult ExecuteAttempt(OperationContext operationContext, RetryableReadContext context, int attempt, long? transactionNumber) { + if (_commandCreator != null) + { + var command = _commandCreator.CreateCommand(operationContext, context.Binding.Session, context.Channel.ConnectionDescription); + SetCommand(command); + } + return ExecuteProtocol(operationContext, context.Channel, context.Binding.Session, context.Binding.ReadPreference); } public Task ExecuteAttemptAsync(OperationContext operationContext, RetryableReadContext context, int attempt, long? transactionNumber) { + if (_commandCreator != null) + { + var command = _commandCreator.CreateCommand(operationContext, context.Binding.Session, context.Channel.ConnectionDescription); + SetCommand(command); + } + return ExecuteProtocolAsync(operationContext, context.Channel, context.Binding.Session, context.Binding.ReadPreference); } } diff --git a/src/MongoDB.Driver/Core/Operations/RetryableDeleteCommandOperation.cs b/src/MongoDB.Driver/Core/Operations/RetryableDeleteCommandOperation.cs index 128a4c6997e..9e64f79ebe6 100644 --- a/src/MongoDB.Driver/Core/Operations/RetryableDeleteCommandOperation.cs +++ b/src/MongoDB.Driver/Core/Operations/RetryableDeleteCommandOperation.cs @@ -94,7 +94,7 @@ protected override IEnumerable CreateCommandPayl } else { - deletes = new BatchableSource(_deletes.Items, _deletes.Offset, _deletes.ProcessedCount, canBeSplit: false); + deletes = new BatchableSource(_deletes.Items, _deletes.Offset, _deletes.Count, _deletes.ProcessedCount, canBeSplit: false); } var maxBatchCount = Math.Min(MaxBatchCount ?? int.MaxValue, channel.ConnectionDescription.MaxBatchCount); var maxDocumentSize = channel.ConnectionDescription.MaxWireDocumentSize; diff --git a/src/MongoDB.Driver/Core/Operations/RetryableInsertCommandOperation.cs b/src/MongoDB.Driver/Core/Operations/RetryableInsertCommandOperation.cs index 45eba3eebe2..f7df89bfd44 100644 --- a/src/MongoDB.Driver/Core/Operations/RetryableInsertCommandOperation.cs +++ b/src/MongoDB.Driver/Core/Operations/RetryableInsertCommandOperation.cs @@ -91,7 +91,7 @@ protected override IEnumerable CreateCommandPayl } else { - documents = new BatchableSource(_documents.Items, _documents.Offset, _documents.ProcessedCount, canBeSplit: false); + documents = new BatchableSource(_documents.Items, _documents.Offset, _documents.Count, _documents.ProcessedCount, canBeSplit: false); } var elementNameValidator = NoOpElementNameValidator.Instance; diff --git a/src/MongoDB.Driver/Core/Operations/RetryableReadContext.cs b/src/MongoDB.Driver/Core/Operations/RetryableReadContext.cs index d1f8f36c15f..ad9c1aa9924 100644 --- a/src/MongoDB.Driver/Core/Operations/RetryableReadContext.cs +++ b/src/MongoDB.Driver/Core/Operations/RetryableReadContext.cs @@ -1,4 +1,4 @@ -/* Copyright 2010-present MongoDB Inc. +/* Copyright 2010-present MongoDB Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -24,43 +24,6 @@ namespace MongoDB.Driver.Core.Operations { internal sealed class RetryableReadContext : IDisposable { - #region static - - public static RetryableReadContext Create(OperationContext operationContext, IReadBinding binding, bool retryRequested) - { - var context = new RetryableReadContext(binding, retryRequested); - try - { - context.AcquireOrReplaceChannel(operationContext, null); - } - catch - { - context.Dispose(); - throw; - } - - ChannelPinningHelper.PinChannellIfRequired(context.ChannelSource, context.Channel, context.Binding.Session); - return context; - } - - public static async Task CreateAsync(OperationContext operationContext, IReadBinding binding, bool retryRequested) - { - var context = new RetryableReadContext(binding, retryRequested); - try - { - await context.AcquireOrReplaceChannelAsync(operationContext, null).ConfigureAwait(false); - } - catch - { - context.Dispose(); - throw; - } - - ChannelPinningHelper.PinChannellIfRequired(context.ChannelSource, context.Channel, context.Binding.Session); - return context; - } - #endregion - #pragma warning disable CA2213 // Disposable fields should be disposed private readonly IReadBinding _binding; #pragma warning restore CA2213 // Disposable fields should be disposed @@ -84,50 +47,95 @@ public void Dispose() { if (!_disposed) { - _channelSource?.Dispose(); - _channel?.Dispose(); + DisposeChannelAndSource(); _disposed = true; } } - public void AcquireOrReplaceChannel(OperationContext operationContext, IReadOnlyCollection deprioritizedServers) + public ServerDescription DoServerSelection(OperationContext operationContext, IReadOnlyCollection deprioritizedServers) { - var attempt = 1; - while (true) + try { operationContext.ThrowIfTimedOutOrCanceled(); - ReplaceChannelSource(Binding.GetReadChannelSource(operationContext, deprioritizedServers)); - try - { - ReplaceChannel(ChannelSource.GetChannel(operationContext)); - return; - } - catch (Exception ex) when (RetryableReadOperationExecutor.ShouldConnectionAcquireBeRetried(operationContext, this, ex, attempt)) - { - attempt++; - } + var readChannelSource = Binding.GetReadChannelSource(operationContext, deprioritizedServers); + ReplaceChannelSource(readChannelSource); + return ChannelSource.ServerDescription; + } + catch + { + DisposeChannelAndSource(); + throw; } } - public async Task AcquireOrReplaceChannelAsync(OperationContext operationContext, IReadOnlyCollection deprioritizedServers) + public async Task DoServerSelectionAsync(OperationContext operationContext, IReadOnlyCollection deprioritizedServers) { - var attempt = 1; - while (true) + try { operationContext.ThrowIfTimedOutOrCanceled(); - ReplaceChannelSource(await Binding.GetReadChannelSourceAsync(operationContext, deprioritizedServers).ConfigureAwait(false)); - try + var readChannelSource = await Binding + .GetReadChannelSourceAsync(operationContext, deprioritizedServers).ConfigureAwait(false); + ReplaceChannelSource(readChannelSource); + return ChannelSource.ServerDescription; + } + catch + { + DisposeChannelAndSource(); + throw; + } + } + + public void DoChannelAcquisition(OperationContext operationContext) + { + try + { + if (_channelSource is null) { - ReplaceChannel(await ChannelSource.GetChannelAsync(operationContext).ConfigureAwait(false)); - return; + throw new InvalidOperationException("Channel source is not initialized. Server selection must be performed before channel acquisition."); } - catch (Exception ex) when (RetryableReadOperationExecutor.ShouldConnectionAcquireBeRetried(operationContext, this, ex, attempt)) + operationContext.ThrowIfTimedOutOrCanceled(); + ReplaceChannel(ChannelSource.GetChannel(operationContext)); + ChannelPinningHelper.PinChannellIfRequired(ChannelSource, Channel, Binding.Session); + } + catch + { + DisposeChannelAndSource(); + throw; + } + } + + public async Task DoChannelAcquisitionAsync(OperationContext operationContext) + { + try + { + if (_channelSource is null) { - attempt++; + throw new InvalidOperationException("Channel source is not initialized. Server selection must be performed before channel acquisition."); } + operationContext.ThrowIfTimedOutOrCanceled(); + ReplaceChannel(await ChannelSource.GetChannelAsync(operationContext).ConfigureAwait(false)); + ChannelPinningHelper.PinChannellIfRequired(ChannelSource, Channel, Binding.Session); + } + catch + { + DisposeChannelAndSource(); + throw; } } + //TODO We can proably remove those, are used only by tests now + public void AcquireOrReplaceChannel(OperationContext operationContext, IReadOnlyCollection deprioritizedServers) + { + DoServerSelection(operationContext, deprioritizedServers); + DoChannelAcquisition(operationContext); + } + + public async Task AcquireOrReplaceChannelAsync(OperationContext operationContext, IReadOnlyCollection deprioritizedServers) + { + await DoServerSelectionAsync(operationContext, deprioritizedServers).ConfigureAwait(false); + await DoChannelAcquisitionAsync(operationContext).ConfigureAwait(false); + } + private void ReplaceChannel(IChannelHandle channel) { Ensure.IsNotNull(channel, nameof(channel)); @@ -143,5 +151,11 @@ private void ReplaceChannelSource(IChannelSourceHandle channelSource) _channelSource = channelSource; _channel = null; } + + private void DisposeChannelAndSource() + { + _channelSource?.Dispose(); + _channel?.Dispose(); + } } } diff --git a/src/MongoDB.Driver/Core/Operations/RetryableReadOperationExecutor.cs b/src/MongoDB.Driver/Core/Operations/RetryableReadOperationExecutor.cs index 666470e1412..987c32b2138 100644 --- a/src/MongoDB.Driver/Core/Operations/RetryableReadOperationExecutor.cs +++ b/src/MongoDB.Driver/Core/Operations/RetryableReadOperationExecutor.cs @@ -26,98 +26,72 @@ internal static class RetryableReadOperationExecutor public static TResult Execute(OperationContext operationContext, IRetryableReadOperation operation, RetryableReadContext context) { HashSet deprioritizedServers = null; - var attempt = 1; + var totalAttempts = 0; Exception originalException = null; while (true) // Circle breaking logic based on ShouldRetryOperation method, see the catch block below. { + totalAttempts++; operationContext.ThrowIfTimedOutOrCanceled(); - var server = context.ChannelSource.ServerDescription; + ServerDescription server = null; + try { - return operation.ExecuteAttempt(operationContext, context, attempt, transactionNumber: null); + server = context.DoServerSelection(operationContext, deprioritizedServers); + context.DoChannelAcquisition(operationContext); + + return operation.ExecuteAttempt(operationContext, context, totalAttempts, transactionNumber: null); } catch (Exception ex) { - if (!ShouldRetryOperation(operationContext, context, ex, attempt)) + if (!ShouldRetryOperation(operationContext, context, ex, totalAttempts)) { throw originalException ?? ex; } originalException ??= ex; - if (server.Type == ServerType.ShardRouter || - (ex is MongoException mongoException && mongoException.HasErrorLabel("SystemOverloadedError"))) - { - deprioritizedServers ??= new HashSet(); - deprioritizedServers.Add(server); - } - } - - try - { - context.AcquireOrReplaceChannel(operationContext, deprioritizedServers); + deprioritizedServers = UpdateServerList(server, deprioritizedServers, ex); } - catch - { - throw originalException; - } - - attempt++; } } public static async Task ExecuteAsync(OperationContext operationContext, IRetryableReadOperation operation, RetryableReadContext context) { HashSet deprioritizedServers = null; - var attempt = 1; + var totalAttempts = 0; Exception originalException = null; while (true) // Circle breaking logic based on ShouldRetryOperation method, see the catch block below. { + totalAttempts++; operationContext.ThrowIfTimedOutOrCanceled(); - var server = context.ChannelSource.ServerDescription; + ServerDescription server = null; + try { - return await operation.ExecuteAttemptAsync(operationContext, context, attempt, transactionNumber: null).ConfigureAwait(false); + server = await context.DoServerSelectionAsync(operationContext, deprioritizedServers).ConfigureAwait(false); + await context.DoChannelAcquisitionAsync(operationContext).ConfigureAwait(false); + + return await operation.ExecuteAttemptAsync(operationContext, context, totalAttempts, transactionNumber: null).ConfigureAwait(false); } catch (Exception ex) { - if (!ShouldRetryOperation(operationContext, context, ex, attempt)) + if (!ShouldRetryOperation(operationContext, context, ex, totalAttempts)) { throw originalException ?? ex; } originalException ??= ex; - if (server.Type == ServerType.ShardRouter || - (ex is MongoException mongoException && mongoException.HasErrorLabel("SystemOverloadedError"))) - { - deprioritizedServers ??= new HashSet(); - deprioritizedServers.Add(server); - } - } - - try - { - await context.AcquireOrReplaceChannelAsync(operationContext, deprioritizedServers).ConfigureAwait(false); - } - catch - { - throw originalException; + deprioritizedServers = UpdateServerList(server, deprioritizedServers, ex); } - - attempt++; } } - public static bool ShouldConnectionAcquireBeRetried(OperationContext operationContext, RetryableReadContext context, Exception exception, int attempt) - { - var innerException = exception is MongoAuthenticationException mongoAuthenticationException ? mongoAuthenticationException.InnerException : exception; - return ShouldRetryOperation(operationContext, context, innerException, attempt); - } - // private static methods - private static bool ShouldRetryOperation(OperationContext operationContext, RetryableReadContext context, Exception exception, int attempt) + private static bool ShouldRetryOperation(OperationContext operationContext, RetryableReadContext context, Exception exception, int totalAttempts) { + exception = exception is MongoAuthenticationException mongoAuthenticationException ? mongoAuthenticationException.InnerException : exception; + if (!context.RetryRequested || context.Binding.Session.IsInTransaction) { return false; @@ -128,7 +102,20 @@ private static bool ShouldRetryOperation(OperationContext operationContext, Retr return false; } - return operationContext.IsRootContextTimeoutConfigured() || attempt < 2; + return operationContext.IsRootContextTimeoutConfigured() || totalAttempts < 2; + } + + private static HashSet UpdateServerList(ServerDescription server, HashSet deprioritizedServers, Exception ex) + { + if (server != null && + (server.Type == ServerType.ShardRouter || + (ex is MongoException mongoException && mongoException.HasErrorLabel("SystemOverloadedError")))) + { + deprioritizedServers ??= []; + deprioritizedServers.Add(server); + } + + return deprioritizedServers; } } } diff --git a/src/MongoDB.Driver/Core/Operations/RetryableUpdateCommandOperation.cs b/src/MongoDB.Driver/Core/Operations/RetryableUpdateCommandOperation.cs index d469076aee4..bf7c6f8760f 100644 --- a/src/MongoDB.Driver/Core/Operations/RetryableUpdateCommandOperation.cs +++ b/src/MongoDB.Driver/Core/Operations/RetryableUpdateCommandOperation.cs @@ -101,11 +101,11 @@ protected override IEnumerable CreateCommandPayl } else { - updates = new BatchableSource(_updates.Items, _updates.Offset, _updates.ProcessedCount, canBeSplit: false); + updates = new BatchableSource(_updates.Items, _updates.Offset, _updates.Count, _updates.ProcessedCount, canBeSplit: false); } var maxBatchCount = Math.Min(MaxBatchCount ?? int.MaxValue, channel.ConnectionDescription.MaxBatchCount); var maxDocumentSize = channel.ConnectionDescription.MaxWireDocumentSize; - var payload = new Type1CommandMessageSection("updates", _updates, UpdateRequestSerializer.Instance, NoOpElementNameValidator.Instance, maxBatchCount, maxDocumentSize); + var payload = new Type1CommandMessageSection("updates", updates, UpdateRequestSerializer.Instance, NoOpElementNameValidator.Instance, maxBatchCount, maxDocumentSize); return new Type1CommandMessageSection[] { payload }; } diff --git a/src/MongoDB.Driver/Core/Operations/RetryableWriteCommandOperationBase.cs b/src/MongoDB.Driver/Core/Operations/RetryableWriteCommandOperationBase.cs index 44d0e294169..e37529e1252 100644 --- a/src/MongoDB.Driver/Core/Operations/RetryableWriteCommandOperationBase.cs +++ b/src/MongoDB.Driver/Core/Operations/RetryableWriteCommandOperationBase.cs @@ -90,7 +90,7 @@ public WriteConcern WriteConcern public virtual BsonDocument Execute(OperationContext operationContext, IWriteBinding binding) { - using (var context = RetryableWriteContext.Create(operationContext, binding, _retryRequested)) + using (var context = new RetryableWriteContext(binding, _retryRequested)) { return Execute(operationContext, context); } @@ -103,7 +103,7 @@ public virtual BsonDocument Execute(OperationContext operationContext, Retryable public virtual async Task ExecuteAsync(OperationContext operationContext, IWriteBinding binding) { - using (var context = await RetryableWriteContext.CreateAsync(operationContext, binding, _retryRequested).ConfigureAwait(false)) + using (var context = new RetryableWriteContext(binding, _retryRequested)) { return await ExecuteAsync(operationContext, context).ConfigureAwait(false); } diff --git a/src/MongoDB.Driver/Core/Operations/RetryableWriteContext.cs b/src/MongoDB.Driver/Core/Operations/RetryableWriteContext.cs index cbf64188d99..9ca4b80d444 100644 --- a/src/MongoDB.Driver/Core/Operations/RetryableWriteContext.cs +++ b/src/MongoDB.Driver/Core/Operations/RetryableWriteContext.cs @@ -24,43 +24,6 @@ namespace MongoDB.Driver.Core.Operations { internal sealed class RetryableWriteContext : IDisposable { - #region static - - public static RetryableWriteContext Create(OperationContext operationContext, IWriteBinding binding, bool retryRequested) - { - var context = new RetryableWriteContext(binding, retryRequested); - try - { - context.AcquireOrReplaceChannel(operationContext, null); - } - catch - { - context.Dispose(); - throw; - } - - ChannelPinningHelper.PinChannellIfRequired(context.ChannelSource, context.Channel, context.Binding.Session); - return context; - } - - public static async Task CreateAsync(OperationContext operationContext, IWriteBinding binding, bool retryRequested) - { - var context = new RetryableWriteContext(binding, retryRequested); - try - { - await context.AcquireOrReplaceChannelAsync(operationContext, null).ConfigureAwait(false); - } - catch - { - context.Dispose(); - throw; - } - - ChannelPinningHelper.PinChannellIfRequired(context.ChannelSource, context.Channel, context.Binding.Session); - return context; - } - #endregion - #pragma warning disable CA2213 // Disposable fields should be disposed private readonly IWriteBinding _binding; #pragma warning restore CA2213 // Disposable fields should be disposed @@ -84,52 +47,94 @@ public void Dispose() { if (!_disposed) { - _channelSource?.Dispose(); - _channel?.Dispose(); + DisposeChannelAndSource(); _disposed = true; } } - public void AcquireOrReplaceChannel(OperationContext operationContext, IReadOnlyCollection deprioritizedServers) + public ServerDescription DoServerSelection(OperationContext operationContext, IReadOnlyCollection deprioritizedServers) { - var attempt = 1; - while (true) + try { operationContext.ThrowIfTimedOutOrCanceled(); - ReplaceChannelSource(Binding.GetWriteChannelSource(operationContext, deprioritizedServers)); - var server = ChannelSource.ServerDescription; - try - { - ReplaceChannel(ChannelSource.GetChannel(operationContext)); - return; - } - catch (Exception ex) when (RetryableWriteOperationExecutor.ShouldConnectionAcquireBeRetried(operationContext, this, server, ex, attempt)) - { - attempt++; - } + var writeChannelSource = Binding.GetWriteChannelSource(operationContext, deprioritizedServers); + ReplaceChannelSource(writeChannelSource); + return ChannelSource.ServerDescription; + } + catch + { + DisposeChannelAndSource(); + throw; } } - public async Task AcquireOrReplaceChannelAsync(OperationContext operationContext, IReadOnlyCollection deprioritizedServers) + public async Task DoServerSelectionAsync(OperationContext operationContext, IReadOnlyCollection deprioritizedServers) { - var attempt = 1; - while (true) + try { operationContext.ThrowIfTimedOutOrCanceled(); - ReplaceChannelSource(await Binding.GetWriteChannelSourceAsync(operationContext, deprioritizedServers).ConfigureAwait(false)); - var server = ChannelSource.ServerDescription; - try + var writeChannelSource = await Binding + .GetWriteChannelSourceAsync(operationContext, deprioritizedServers).ConfigureAwait(false); + ReplaceChannelSource(writeChannelSource); + return ChannelSource.ServerDescription; + } + catch + { + DisposeChannelAndSource(); + throw; + } + } + + public void DoChannelAcquisition(OperationContext operationContext) + { + try + { + if (_channelSource is null) { - ReplaceChannel(await ChannelSource.GetChannelAsync(operationContext).ConfigureAwait(false)); - return; + throw new InvalidOperationException("Channel source is not initialized. Server selection must be performed before channel acquisition."); } - catch (Exception ex) when (RetryableWriteOperationExecutor.ShouldConnectionAcquireBeRetried(operationContext, this, server, ex, attempt)) + operationContext.ThrowIfTimedOutOrCanceled(); + ReplaceChannel(ChannelSource.GetChannel(operationContext)); + ChannelPinningHelper.PinChannellIfRequired(ChannelSource, Channel, Binding.Session); + } + catch + { + DisposeChannelAndSource(); + throw; + } + } + + public async Task DoChannelAcquisitionAsync(OperationContext operationContext) + { + try + { + if (_channelSource is null) { - attempt++; + throw new InvalidOperationException("Channel source is not initialized. Server selection must be performed before channel acquisition."); } + operationContext.ThrowIfTimedOutOrCanceled(); + ReplaceChannel(await ChannelSource.GetChannelAsync(operationContext).ConfigureAwait(false)); + ChannelPinningHelper.PinChannellIfRequired(ChannelSource, Channel, Binding.Session); + } + catch + { + DisposeChannelAndSource(); + throw; } } + public void AcquireOrReplaceChannel(OperationContext operationContext, IReadOnlyCollection deprioritizedServers) + { + DoServerSelection(operationContext, deprioritizedServers); + DoChannelAcquisition(operationContext); + } + + public async Task AcquireOrReplaceChannelAsync(OperationContext operationContext, IReadOnlyCollection deprioritizedServers) + { + await DoServerSelectionAsync(operationContext, deprioritizedServers).ConfigureAwait(false); + await DoChannelAcquisitionAsync(operationContext).ConfigureAwait(false); + } + private void ReplaceChannel(IChannelHandle channel) { Ensure.IsNotNull(channel, nameof(channel)); @@ -145,5 +150,11 @@ private void ReplaceChannelSource(IChannelSourceHandle channelSource) _channelSource = channelSource; _channel = null; } + + private void DisposeChannelAndSource() + { + _channelSource?.Dispose(); + _channel?.Dispose(); + } } } diff --git a/src/MongoDB.Driver/Core/Operations/RetryableWriteOperationExecutor.cs b/src/MongoDB.Driver/Core/Operations/RetryableWriteOperationExecutor.cs index 3ead26897c9..dd248bb3fcb 100644 --- a/src/MongoDB.Driver/Core/Operations/RetryableWriteOperationExecutor.cs +++ b/src/MongoDB.Driver/Core/Operations/RetryableWriteOperationExecutor.cs @@ -26,151 +26,131 @@ internal static class RetryableWriteOperationExecutor // public static methods public static TResult Execute(OperationContext operationContext, IRetryableWriteOperation operation, IWriteBinding binding, bool retryRequested) { - using (var context = RetryableWriteContext.Create(operationContext, binding, retryRequested)) - { - return Execute(operationContext, operation, context); - } + using var context = new RetryableWriteContext(binding, retryRequested); + return Execute(operationContext, operation, context); } public static TResult Execute(OperationContext operationContext, IRetryableWriteOperation operation, RetryableWriteContext context) { HashSet deprioritizedServers = null; - var attempt = 1; + var totalAttempts = 0; + var operationExecutionAttempts = 0; Exception originalException = null; - long? transactionNumber = AreRetriesAllowed(operation.WriteConcern, context, context.ChannelSource.ServerDescription) ? context.Binding.Session.AdvanceTransactionNumber() : null; + long? transactionNumber = null; while (true) // Circle breaking logic based on ShouldRetryOperation method, see the catch block below. { + totalAttempts++; operationContext.ThrowIfTimedOutOrCanceled(); - var server = context.ChannelSource.ServerDescription; + ServerDescription server = null; + bool channelAcquisitionSuccessful = false; + try { - return operation.ExecuteAttempt(operationContext, context, attempt, transactionNumber); + server = context.DoServerSelection(operationContext, deprioritizedServers); + context.DoChannelAcquisition(operationContext); + channelAcquisitionSuccessful = true; + + transactionNumber ??= AreRetriesAllowed(operation.WriteConcern, context, server) ? context.Binding.Session.AdvanceTransactionNumber() : null; + + operationExecutionAttempts++; + return operation.ExecuteAttempt(operationContext, context, operationExecutionAttempts, transactionNumber); } catch (Exception ex) { - if (!ShouldRetryOperation(operationContext, operation.WriteConcern, context, server, ex, attempt)) + if (!ShouldRetryOperation(operationContext, !channelAcquisitionSuccessful, operation.WriteConcern, context, server, ex, totalAttempts)) { throw originalException ?? ex; } originalException ??= ex; - if (server.Type == ServerType.ShardRouter || - (ex is MongoException mongoException && mongoException.HasErrorLabel("SystemOverloadedError"))) - { - deprioritizedServers ??= new HashSet(); - deprioritizedServers.Add(server); - } - } - - try - { - context.AcquireOrReplaceChannel(operationContext, deprioritizedServers); - } - catch - { - throw originalException; - } - - if (!AreRetryableWritesSupported(context.ChannelSource.ServerDescription)) - { - throw originalException; + deprioritizedServers = UpdateServerList(server, deprioritizedServers, ex); } - - attempt++; } } - public async static Task ExecuteAsync(OperationContext operationContext, IRetryableWriteOperation operation, IWriteBinding binding, bool retryRequested) + public static async Task ExecuteAsync(OperationContext operationContext, IRetryableWriteOperation operation, IWriteBinding binding, bool retryRequested) { - using (var context = await RetryableWriteContext.CreateAsync(operationContext, binding, retryRequested).ConfigureAwait(false)) - { - return await ExecuteAsync(operationContext, operation, context).ConfigureAwait(false); - } + using var context = new RetryableWriteContext(binding, retryRequested); + return await ExecuteAsync(operationContext, operation, context).ConfigureAwait(false); } public static async Task ExecuteAsync(OperationContext operationContext, IRetryableWriteOperation operation, RetryableWriteContext context) { HashSet deprioritizedServers = null; - var attempt = 1; + var totalAttempts = 0; + var operationExecutionAttempts = 0; Exception originalException = null; - long? transactionNumber = AreRetriesAllowed(operation.WriteConcern, context, context.ChannelSource.ServerDescription) ? context.Binding.Session.AdvanceTransactionNumber() : null; + long? transactionNumber = null; while (true) // Circle breaking logic based on ShouldRetryOperation method, see the catch block below. { + totalAttempts++; operationContext.ThrowIfTimedOutOrCanceled(); - var server = context.ChannelSource.ServerDescription; + ServerDescription server = null; + bool channelAcquisitionSuccessful = false; + try { - return await operation.ExecuteAttemptAsync(operationContext, context, attempt, transactionNumber).ConfigureAwait(false); + server = await context.DoServerSelectionAsync(operationContext, deprioritizedServers).ConfigureAwait(false); + await context.DoChannelAcquisitionAsync(operationContext).ConfigureAwait(false); + channelAcquisitionSuccessful = true; + + transactionNumber ??= AreRetriesAllowed(operation.WriteConcern, context, server) ? context.Binding.Session.AdvanceTransactionNumber() : null; + + operationExecutionAttempts++; + return await operation.ExecuteAttemptAsync(operationContext, context, operationExecutionAttempts, transactionNumber).ConfigureAwait(false); } catch (Exception ex) { - if (!ShouldRetryOperation(operationContext, operation.WriteConcern, context, server, ex, attempt)) + if (!ShouldRetryOperation(operationContext, !channelAcquisitionSuccessful, operation.WriteConcern, context, server, ex, totalAttempts)) { throw originalException ?? ex; } originalException ??= ex; - if (server.Type == ServerType.ShardRouter || - (ex is MongoException mongoException && mongoException.HasErrorLabel("SystemOverloadedError"))) - { - deprioritizedServers ??= new HashSet(); - deprioritizedServers.Add(server); - } + deprioritizedServers = UpdateServerList(server, deprioritizedServers, ex); } - - try - { - await context.AcquireOrReplaceChannelAsync(operationContext, deprioritizedServers).ConfigureAwait(false); - } - catch - { - throw originalException; - } - - if (!AreRetryableWritesSupported(context.ChannelSource.ServerDescription)) - { - throw originalException; - } - - attempt++; } } - public static bool ShouldConnectionAcquireBeRetried(OperationContext operationContext, RetryableWriteContext context, ServerDescription server, Exception exception, int attempt) + // private static methods + private static bool ShouldRetryOperation(OperationContext operationContext, bool errorDuringChannelAcquisition, WriteConcern writeConcern, RetryableWriteContext context, ServerDescription server, Exception exception, int totalAttempts) { - if (!DoesContextAllowRetries(context, server)) - { + if (server is null) return false; - } - var innerException = exception is MongoAuthenticationException mongoAuthenticationException ? mongoAuthenticationException.InnerException : exception; - // According the spec error during handshake should be handle according to RetryableReads logic - if (!RetryabilityHelper.IsRetryableReadException(innerException)) + if (errorDuringChannelAcquisition) { - return false; - } + // According to the spec, errors during handshake should be handled according to RetryableReads logic + exception = exception is MongoAuthenticationException mongoAuthenticationException ? mongoAuthenticationException.InnerException : exception; - return operationContext.IsRootContextTimeoutConfigured() || attempt < 2; - } + if (!DoesContextAllowRetries(context, server)) + { + return false; + } - // private static methods - private static bool ShouldRetryOperation(OperationContext operationContext, WriteConcern writeConcern, RetryableWriteContext context, ServerDescription server, Exception exception, int attempt) - { - if (!AreRetriesAllowed(writeConcern, context, server)) - { - return false; + if (!RetryabilityHelper.IsRetryableReadException(exception)) + { + return false; + } } - - if (!RetryabilityHelper.IsRetryableWriteException(exception)) + else { - return false; + if (!AreRetriesAllowed(writeConcern, context, server)) + { + return false; + } + + if (!RetryabilityHelper.IsRetryableWriteException(exception)) + { + return false; + } } - return operationContext.IsRootContextTimeoutConfigured() || attempt < 2; + return operationContext.IsRootContextTimeoutConfigured() || totalAttempts < 2; } private static bool AreRetriesAllowed(WriteConcern writeConcern, RetryableWriteContext context, ServerDescription server) @@ -191,5 +171,17 @@ private static bool DoesContextAllowRetries(RetryableWriteContext context, Serve private static bool IsOperationAcknowledged(WriteConcern writeConcern) => writeConcern == null || // null means use server default write concern which implies acknowledged writeConcern.IsAcknowledged; + + private static HashSet UpdateServerList(ServerDescription server, HashSet deprioritizedServers, Exception ex) + { + if (server != null && (server.Type == ServerType.ShardRouter || + (ex is MongoException mongoException && mongoException.HasErrorLabel("SystemOverloadedError")))) + { + deprioritizedServers ??= []; + deprioritizedServers.Add(server); + } + + return deprioritizedServers; + } } } diff --git a/tests/MongoDB.Driver.Tests/Core/LoadBalancingIntergationTests.cs b/tests/MongoDB.Driver.Tests/Core/LoadBalancingIntergationTests.cs index a433e87fa5e..2fbc155ba1b 100644 --- a/tests/MongoDB.Driver.Tests/Core/LoadBalancingIntergationTests.cs +++ b/tests/MongoDB.Driver.Tests/Core/LoadBalancingIntergationTests.cs @@ -644,9 +644,16 @@ private IAsyncCursor CreateAndRunFindOperation(RetryableReadContex private RetryableReadContext CreateRetryableReadContext(IReadBindingHandle readBindingHandle, bool async) { - return async - ? RetryableReadContext.CreateAsync(OperationContext.NoTimeout, readBindingHandle, retryRequested: false).GetAwaiter().GetResult() - : RetryableReadContext.Create(OperationContext.NoTimeout, readBindingHandle, retryRequested: false); + var retryableContext = new RetryableReadContext(readBindingHandle, retryRequested: false); + if (async) + { + retryableContext.AcquireOrReplaceChannelAsync(OperationContext.NoTimeout, null).GetAwaiter().GetResult(); + } + else + { + retryableContext.AcquireOrReplaceChannel(OperationContext.NoTimeout, null); + } + return retryableContext; } private DisposableBindingBundle CreateReadBindingsAndRetryableReadContext(IClusterInternal cluster, ICoreSessionHandle sessionHandle, bool async) @@ -661,9 +668,16 @@ private DisposableBindingBundle Create private RetryableWriteContext CreateRetryableWriteContext(IReadWriteBindingHandle readWriteBindingHandle, bool async) { - return async - ? RetryableWriteContext.CreateAsync(OperationContext.NoTimeout, readWriteBindingHandle, retryRequested: false).GetAwaiter().GetResult() - : RetryableWriteContext.Create(OperationContext.NoTimeout, readWriteBindingHandle, retryRequested: false); + var retryableContext = new RetryableWriteContext(readWriteBindingHandle, retryRequested: false); + if (async) + { + retryableContext.AcquireOrReplaceChannelAsync(OperationContext.NoTimeout, null).GetAwaiter().GetResult(); + } + else + { + retryableContext.AcquireOrReplaceChannel(OperationContext.NoTimeout, null); + } + return retryableContext; } private DisposableBindingBundle CreateReadWriteBindingsAndRetryableWriteContext(IClusterInternal cluster, ICoreSessionHandle sessionHandle, bool async) diff --git a/tests/MongoDB.Driver.Tests/Core/Operations/CommandOperationBaseTests.cs b/tests/MongoDB.Driver.Tests/Core/Operations/CommandOperationBaseTests.cs index 9a0faaff797..c786b0598eb 100644 --- a/tests/MongoDB.Driver.Tests/Core/Operations/CommandOperationBaseTests.cs +++ b/tests/MongoDB.Driver.Tests/Core/Operations/CommandOperationBaseTests.cs @@ -124,19 +124,6 @@ public void constructor_should_initialize_instance_when_messageEncoderSettings_i result.MessageEncoderSettings.Should().BeNull(); } - [Fact] - public void constructor_should_throw_when_command_is_null() - { - var databaseNamespace = new DatabaseNamespace("databaseName"); - BsonDocument command = null; - var resultSerializer = new BsonDocumentSerializer(); - var messageEncoderSettings = new MessageEncoderSettings(); - - Action action = () => new FakeCommandOperation(databaseNamespace, command, resultSerializer, messageEncoderSettings); - - action.ShouldThrow().And.ParamName.Should().Be("command"); - } - [Fact] public void constructor_should_throw_when_databaseNamespace_is_null() { diff --git a/tests/MongoDB.Driver.Tests/Core/Operations/RetryableWriteOperationExecutorTests.cs b/tests/MongoDB.Driver.Tests/Core/Operations/RetryableWriteOperationExecutorTests.cs index d01706d6128..4c80ca0df58 100644 --- a/tests/MongoDB.Driver.Tests/Core/Operations/RetryableWriteOperationExecutorTests.cs +++ b/tests/MongoDB.Driver.Tests/Core/Operations/RetryableWriteOperationExecutorTests.cs @@ -118,7 +118,8 @@ private ServerDescription CreateServerDescription(bool withLogicalSessionTimeout private RetryableWriteContext CreateContext(bool retryRequested, bool areRetryableWritesSupported, bool hasSessionId, bool isInTransaction) { var binding = CreateBinding(areRetryableWritesSupported, hasSessionId, isInTransaction); - var context = RetryableWriteContext.Create(OperationContext.NoTimeout, binding, retryRequested); + var context = new RetryableWriteContext(binding, retryRequested); + context.AcquireOrReplaceChannel(OperationContext.NoTimeout, null); return context; } From 0fc39ef23d04da52701457d77813f20d13ec703a Mon Sep 17 00:00:00 2001 From: Ferdinando Papale <4850119+papafe@users.noreply.github.com> Date: Thu, 26 Feb 2026 17:43:26 +0100 Subject: [PATCH 02/10] Small fix --- .../Core/Operations/RetryableReadOperationExecutor.cs | 5 ++--- .../Core/Operations/RetryableWriteOperationExecutor.cs | 2 +- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/src/MongoDB.Driver/Core/Operations/RetryableReadOperationExecutor.cs b/src/MongoDB.Driver/Core/Operations/RetryableReadOperationExecutor.cs index 987c32b2138..60737a55a36 100644 --- a/src/MongoDB.Driver/Core/Operations/RetryableReadOperationExecutor.cs +++ b/src/MongoDB.Driver/Core/Operations/RetryableReadOperationExecutor.cs @@ -107,9 +107,8 @@ private static bool ShouldRetryOperation(OperationContext operationContext, Retr private static HashSet UpdateServerList(ServerDescription server, HashSet deprioritizedServers, Exception ex) { - if (server != null && - (server.Type == ServerType.ShardRouter || - (ex is MongoException mongoException && mongoException.HasErrorLabel("SystemOverloadedError")))) + if (server != null && (server.Type == ServerType.ShardRouter || + (ex is MongoException mongoException && mongoException.HasErrorLabel("SystemOverloadedError")))) { deprioritizedServers ??= []; deprioritizedServers.Add(server); diff --git a/src/MongoDB.Driver/Core/Operations/RetryableWriteOperationExecutor.cs b/src/MongoDB.Driver/Core/Operations/RetryableWriteOperationExecutor.cs index dd248bb3fcb..1fe0d131127 100644 --- a/src/MongoDB.Driver/Core/Operations/RetryableWriteOperationExecutor.cs +++ b/src/MongoDB.Driver/Core/Operations/RetryableWriteOperationExecutor.cs @@ -175,7 +175,7 @@ private static bool IsOperationAcknowledged(WriteConcern writeConcern) private static HashSet UpdateServerList(ServerDescription server, HashSet deprioritizedServers, Exception ex) { if (server != null && (server.Type == ServerType.ShardRouter || - (ex is MongoException mongoException && mongoException.HasErrorLabel("SystemOverloadedError")))) + (ex is MongoException mongoException && mongoException.HasErrorLabel("SystemOverloadedError")))) { deprioritizedServers ??= []; deprioritizedServers.Add(server); From aeceafc3308d316c2bd4656cbaa3034c2d37a8dd Mon Sep 17 00:00:00 2001 From: Ferdinando Papale <4850119+papafe@users.noreply.github.com> Date: Thu, 26 Feb 2026 18:29:02 +0100 Subject: [PATCH 03/10] Small fix --- src/MongoDB.Driver/Core/Operations/DistinctOperation.cs | 2 +- .../Core/Operations/EstimatedDocumentCountOperation.cs | 2 +- src/MongoDB.Driver/Core/Operations/RetryableReadContext.cs | 1 - 3 files changed, 2 insertions(+), 3 deletions(-) diff --git a/src/MongoDB.Driver/Core/Operations/DistinctOperation.cs b/src/MongoDB.Driver/Core/Operations/DistinctOperation.cs index 5f1b578cf37..a2ea5c51335 100644 --- a/src/MongoDB.Driver/Core/Operations/DistinctOperation.cs +++ b/src/MongoDB.Driver/Core/Operations/DistinctOperation.cs @@ -153,7 +153,7 @@ public BsonDocument CreateCommand(OperationContext operationContext, ICoreSessio }; } - private EventContext.OperationNameDisposer BeginOperation() => EventContext.BeginOperation(OperationName); + private EventContext.OperationIdDisposer BeginOperation() => EventContext.BeginOperation(null, OperationName); private ReadCommandOperation CreateOperation(OperationContext operationContext, RetryableReadContext context) { diff --git a/src/MongoDB.Driver/Core/Operations/EstimatedDocumentCountOperation.cs b/src/MongoDB.Driver/Core/Operations/EstimatedDocumentCountOperation.cs index 6ff1642dae0..6d0fab786bd 100644 --- a/src/MongoDB.Driver/Core/Operations/EstimatedDocumentCountOperation.cs +++ b/src/MongoDB.Driver/Core/Operations/EstimatedDocumentCountOperation.cs @@ -94,7 +94,7 @@ public async Task ExecuteAsync(OperationContext operationContext, IReadBin } } - private EventContext.OperationNameDisposer BeginOperation() => EventContext.BeginOperation(OperationName); + private EventContext.OperationIdDisposer BeginOperation() => EventContext.BeginOperation(null, OperationName); private IExecutableInRetryableReadContext CreateCountOperation() { diff --git a/src/MongoDB.Driver/Core/Operations/RetryableReadContext.cs b/src/MongoDB.Driver/Core/Operations/RetryableReadContext.cs index ad9c1aa9924..25dcb1a1dbb 100644 --- a/src/MongoDB.Driver/Core/Operations/RetryableReadContext.cs +++ b/src/MongoDB.Driver/Core/Operations/RetryableReadContext.cs @@ -123,7 +123,6 @@ public async Task DoChannelAcquisitionAsync(OperationContext operationContext) } } - //TODO We can proably remove those, are used only by tests now public void AcquireOrReplaceChannel(OperationContext operationContext, IReadOnlyCollection deprioritizedServers) { DoServerSelection(operationContext, deprioritizedServers); From 843cfdf88e110948cfe34d1c1f4995d0c338fb5a Mon Sep 17 00:00:00 2001 From: Ferdinando Papale <4850119+papafe@users.noreply.github.com> Date: Wed, 4 Mar 2026 11:02:59 +0100 Subject: [PATCH 04/10] Rename --- .../Core/Operations/RetryableReadContext.cs | 20 ++++--------------- .../RetryableReadOperationExecutor.cs | 8 ++++---- .../Core/Operations/RetryableWriteContext.cs | 20 ++++--------------- .../RetryableWriteOperationExecutor.cs | 10 +++++----- .../Core/LoadBalancingIntergationTests.cs | 12 +++++++---- .../RetryableWriteOperationExecutorTests.cs | 3 ++- 6 files changed, 27 insertions(+), 46 deletions(-) diff --git a/src/MongoDB.Driver/Core/Operations/RetryableReadContext.cs b/src/MongoDB.Driver/Core/Operations/RetryableReadContext.cs index 25dcb1a1dbb..d0c90a8e327 100644 --- a/src/MongoDB.Driver/Core/Operations/RetryableReadContext.cs +++ b/src/MongoDB.Driver/Core/Operations/RetryableReadContext.cs @@ -52,7 +52,7 @@ public void Dispose() } } - public ServerDescription DoServerSelection(OperationContext operationContext, IReadOnlyCollection deprioritizedServers) + public ServerDescription SelectServer(OperationContext operationContext, IReadOnlyCollection deprioritizedServers) { try { @@ -68,7 +68,7 @@ public ServerDescription DoServerSelection(OperationContext operationContext, IR } } - public async Task DoServerSelectionAsync(OperationContext operationContext, IReadOnlyCollection deprioritizedServers) + public async Task SelectServerAsync(OperationContext operationContext, IReadOnlyCollection deprioritizedServers) { try { @@ -85,7 +85,7 @@ public async Task DoServerSelectionAsync(OperationContext ope } } - public void DoChannelAcquisition(OperationContext operationContext) + public void AcquireChannel(OperationContext operationContext) { try { @@ -104,7 +104,7 @@ public void DoChannelAcquisition(OperationContext operationContext) } } - public async Task DoChannelAcquisitionAsync(OperationContext operationContext) + public async Task AcquireChannelAsync(OperationContext operationContext) { try { @@ -123,18 +123,6 @@ public async Task DoChannelAcquisitionAsync(OperationContext operationContext) } } - public void AcquireOrReplaceChannel(OperationContext operationContext, IReadOnlyCollection deprioritizedServers) - { - DoServerSelection(operationContext, deprioritizedServers); - DoChannelAcquisition(operationContext); - } - - public async Task AcquireOrReplaceChannelAsync(OperationContext operationContext, IReadOnlyCollection deprioritizedServers) - { - await DoServerSelectionAsync(operationContext, deprioritizedServers).ConfigureAwait(false); - await DoChannelAcquisitionAsync(operationContext).ConfigureAwait(false); - } - private void ReplaceChannel(IChannelHandle channel) { Ensure.IsNotNull(channel, nameof(channel)); diff --git a/src/MongoDB.Driver/Core/Operations/RetryableReadOperationExecutor.cs b/src/MongoDB.Driver/Core/Operations/RetryableReadOperationExecutor.cs index 60737a55a36..e16a42f79ef 100644 --- a/src/MongoDB.Driver/Core/Operations/RetryableReadOperationExecutor.cs +++ b/src/MongoDB.Driver/Core/Operations/RetryableReadOperationExecutor.cs @@ -37,8 +37,8 @@ public static TResult Execute(OperationContext operationContext, IRetry try { - server = context.DoServerSelection(operationContext, deprioritizedServers); - context.DoChannelAcquisition(operationContext); + server = context.SelectServer(operationContext, deprioritizedServers); + context.AcquireChannel(operationContext); return operation.ExecuteAttempt(operationContext, context, totalAttempts, transactionNumber: null); } @@ -69,8 +69,8 @@ public static async Task ExecuteAsync(OperationContext operati try { - server = await context.DoServerSelectionAsync(operationContext, deprioritizedServers).ConfigureAwait(false); - await context.DoChannelAcquisitionAsync(operationContext).ConfigureAwait(false); + server = await context.SelectServerAsync(operationContext, deprioritizedServers).ConfigureAwait(false); + await context.AcquireChannelAsync(operationContext).ConfigureAwait(false); return await operation.ExecuteAttemptAsync(operationContext, context, totalAttempts, transactionNumber: null).ConfigureAwait(false); } diff --git a/src/MongoDB.Driver/Core/Operations/RetryableWriteContext.cs b/src/MongoDB.Driver/Core/Operations/RetryableWriteContext.cs index 9ca4b80d444..8b2cad7affb 100644 --- a/src/MongoDB.Driver/Core/Operations/RetryableWriteContext.cs +++ b/src/MongoDB.Driver/Core/Operations/RetryableWriteContext.cs @@ -52,7 +52,7 @@ public void Dispose() } } - public ServerDescription DoServerSelection(OperationContext operationContext, IReadOnlyCollection deprioritizedServers) + public ServerDescription SelectServer(OperationContext operationContext, IReadOnlyCollection deprioritizedServers) { try { @@ -68,7 +68,7 @@ public ServerDescription DoServerSelection(OperationContext operationContext, IR } } - public async Task DoServerSelectionAsync(OperationContext operationContext, IReadOnlyCollection deprioritizedServers) + public async Task SelectServerAsync(OperationContext operationContext, IReadOnlyCollection deprioritizedServers) { try { @@ -85,7 +85,7 @@ public async Task DoServerSelectionAsync(OperationContext ope } } - public void DoChannelAcquisition(OperationContext operationContext) + public void AcquireChannel(OperationContext operationContext) { try { @@ -104,7 +104,7 @@ public void DoChannelAcquisition(OperationContext operationContext) } } - public async Task DoChannelAcquisitionAsync(OperationContext operationContext) + public async Task AcquireChannelAsync(OperationContext operationContext) { try { @@ -123,18 +123,6 @@ public async Task DoChannelAcquisitionAsync(OperationContext operationContext) } } - public void AcquireOrReplaceChannel(OperationContext operationContext, IReadOnlyCollection deprioritizedServers) - { - DoServerSelection(operationContext, deprioritizedServers); - DoChannelAcquisition(operationContext); - } - - public async Task AcquireOrReplaceChannelAsync(OperationContext operationContext, IReadOnlyCollection deprioritizedServers) - { - await DoServerSelectionAsync(operationContext, deprioritizedServers).ConfigureAwait(false); - await DoChannelAcquisitionAsync(operationContext).ConfigureAwait(false); - } - private void ReplaceChannel(IChannelHandle channel) { Ensure.IsNotNull(channel, nameof(channel)); diff --git a/src/MongoDB.Driver/Core/Operations/RetryableWriteOperationExecutor.cs b/src/MongoDB.Driver/Core/Operations/RetryableWriteOperationExecutor.cs index 1fe0d131127..af45545df2c 100644 --- a/src/MongoDB.Driver/Core/Operations/RetryableWriteOperationExecutor.cs +++ b/src/MongoDB.Driver/Core/Operations/RetryableWriteOperationExecutor.cs @@ -1,4 +1,4 @@ -/* Copyright 2010-present MongoDB Inc. +/* Copyright 2010-present MongoDB Inc. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -48,8 +48,8 @@ public static TResult Execute(OperationContext operationContext, IRetry try { - server = context.DoServerSelection(operationContext, deprioritizedServers); - context.DoChannelAcquisition(operationContext); + server = context.SelectServer(operationContext, deprioritizedServers); + context.AcquireChannel(operationContext); channelAcquisitionSuccessful = true; transactionNumber ??= AreRetriesAllowed(operation.WriteConcern, context, server) ? context.Binding.Session.AdvanceTransactionNumber() : null; @@ -94,8 +94,8 @@ public static async Task ExecuteAsync(OperationContext operati try { - server = await context.DoServerSelectionAsync(operationContext, deprioritizedServers).ConfigureAwait(false); - await context.DoChannelAcquisitionAsync(operationContext).ConfigureAwait(false); + server = await context.SelectServerAsync(operationContext, deprioritizedServers).ConfigureAwait(false); + await context.AcquireChannelAsync(operationContext).ConfigureAwait(false); channelAcquisitionSuccessful = true; transactionNumber ??= AreRetriesAllowed(operation.WriteConcern, context, server) ? context.Binding.Session.AdvanceTransactionNumber() : null; diff --git a/tests/MongoDB.Driver.Tests/Core/LoadBalancingIntergationTests.cs b/tests/MongoDB.Driver.Tests/Core/LoadBalancingIntergationTests.cs index 2fbc155ba1b..b9f0c57a7f9 100644 --- a/tests/MongoDB.Driver.Tests/Core/LoadBalancingIntergationTests.cs +++ b/tests/MongoDB.Driver.Tests/Core/LoadBalancingIntergationTests.cs @@ -647,11 +647,13 @@ private RetryableReadContext CreateRetryableReadContext(IReadBindingHandle readB var retryableContext = new RetryableReadContext(readBindingHandle, retryRequested: false); if (async) { - retryableContext.AcquireOrReplaceChannelAsync(OperationContext.NoTimeout, null).GetAwaiter().GetResult(); + retryableContext.SelectServerAsync(OperationContext.NoTimeout, null).GetAwaiter().GetResult(); + retryableContext.AcquireChannelAsync(OperationContext.NoTimeout).GetAwaiter().GetResult(); } else { - retryableContext.AcquireOrReplaceChannel(OperationContext.NoTimeout, null); + retryableContext.SelectServer(OperationContext.NoTimeout, null); + retryableContext.AcquireChannel(OperationContext.NoTimeout); } return retryableContext; } @@ -671,11 +673,13 @@ private RetryableWriteContext CreateRetryableWriteContext(IReadWriteBindingHandl var retryableContext = new RetryableWriteContext(readWriteBindingHandle, retryRequested: false); if (async) { - retryableContext.AcquireOrReplaceChannelAsync(OperationContext.NoTimeout, null).GetAwaiter().GetResult(); + retryableContext.SelectServerAsync(OperationContext.NoTimeout, null).GetAwaiter().GetResult(); + retryableContext.AcquireChannelAsync(OperationContext.NoTimeout).GetAwaiter().GetResult(); } else { - retryableContext.AcquireOrReplaceChannel(OperationContext.NoTimeout, null); + retryableContext.SelectServer(OperationContext.NoTimeout, null); + retryableContext.AcquireChannel(OperationContext.NoTimeout); } return retryableContext; } diff --git a/tests/MongoDB.Driver.Tests/Core/Operations/RetryableWriteOperationExecutorTests.cs b/tests/MongoDB.Driver.Tests/Core/Operations/RetryableWriteOperationExecutorTests.cs index 4c80ca0df58..653625c9177 100644 --- a/tests/MongoDB.Driver.Tests/Core/Operations/RetryableWriteOperationExecutorTests.cs +++ b/tests/MongoDB.Driver.Tests/Core/Operations/RetryableWriteOperationExecutorTests.cs @@ -119,7 +119,8 @@ private RetryableWriteContext CreateContext(bool retryRequested, bool areRetryab { var binding = CreateBinding(areRetryableWritesSupported, hasSessionId, isInTransaction); var context = new RetryableWriteContext(binding, retryRequested); - context.AcquireOrReplaceChannel(OperationContext.NoTimeout, null); + context.SelectServer(OperationContext.NoTimeout, null); + context.AcquireChannel(OperationContext.NoTimeout); return context; } From 5446592d06e556033499fb5fe4c58bc0d6877577 Mon Sep 17 00:00:00 2001 From: Ferdinando Papale <4850119+papafe@users.noreply.github.com> Date: Wed, 4 Mar 2026 11:03:54 +0100 Subject: [PATCH 05/10] Corrected typo --- ...ngIntergationTests.cs => LoadBalancingIntegrationTests.cs} | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) rename tests/MongoDB.Driver.Tests/Core/{LoadBalancingIntergationTests.cs => LoadBalancingIntegrationTests.cs} (99%) diff --git a/tests/MongoDB.Driver.Tests/Core/LoadBalancingIntergationTests.cs b/tests/MongoDB.Driver.Tests/Core/LoadBalancingIntegrationTests.cs similarity index 99% rename from tests/MongoDB.Driver.Tests/Core/LoadBalancingIntergationTests.cs rename to tests/MongoDB.Driver.Tests/Core/LoadBalancingIntegrationTests.cs index b9f0c57a7f9..63c269de2b6 100644 --- a/tests/MongoDB.Driver.Tests/Core/LoadBalancingIntergationTests.cs +++ b/tests/MongoDB.Driver.Tests/Core/LoadBalancingIntegrationTests.cs @@ -35,9 +35,9 @@ namespace MongoDB.Driver.Core.Tests /// /// Tests in this file emulate internal steps in operations that can work with cursors or transactions. /// - public class LoadBalancingIntergationTests : OperationTestBase + public class LoadBalancingIntegrationTests : OperationTestBase { - public LoadBalancingIntergationTests() + public LoadBalancingIntegrationTests() { _collectionNamespace = CollectionNamespace.FromFullName("db.coll"); } From 638272f60c81f7f49ae9e72edf26756674ca87cb Mon Sep 17 00:00:00 2001 From: Ferdinando Papale <4850119+papafe@users.noreply.github.com> Date: Wed, 18 Mar 2026 11:50:31 +0100 Subject: [PATCH 06/10] Nulling source and channel --- src/MongoDB.Driver/Core/Operations/RetryableReadContext.cs | 2 ++ src/MongoDB.Driver/Core/Operations/RetryableWriteContext.cs | 2 ++ 2 files changed, 4 insertions(+) diff --git a/src/MongoDB.Driver/Core/Operations/RetryableReadContext.cs b/src/MongoDB.Driver/Core/Operations/RetryableReadContext.cs index d0c90a8e327..fa415f77bbb 100644 --- a/src/MongoDB.Driver/Core/Operations/RetryableReadContext.cs +++ b/src/MongoDB.Driver/Core/Operations/RetryableReadContext.cs @@ -143,6 +143,8 @@ private void DisposeChannelAndSource() { _channelSource?.Dispose(); _channel?.Dispose(); + _channelSource = null; + _channel = null; } } } diff --git a/src/MongoDB.Driver/Core/Operations/RetryableWriteContext.cs b/src/MongoDB.Driver/Core/Operations/RetryableWriteContext.cs index 8b2cad7affb..5a3f47b1f41 100644 --- a/src/MongoDB.Driver/Core/Operations/RetryableWriteContext.cs +++ b/src/MongoDB.Driver/Core/Operations/RetryableWriteContext.cs @@ -143,6 +143,8 @@ private void DisposeChannelAndSource() { _channelSource?.Dispose(); _channel?.Dispose(); + _channelSource = null; + _channel = null; } } } From 926b425a6aeb323683f38600ee506e22cf2a590a Mon Sep 17 00:00:00 2001 From: Ferdinando Papale <4850119+papafe@users.noreply.github.com> Date: Wed, 18 Mar 2026 12:28:35 +0100 Subject: [PATCH 07/10] Removed unnecessary variable --- .../Core/Operations/RetryableWriteOperationExecutor.cs | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/MongoDB.Driver/Core/Operations/RetryableWriteOperationExecutor.cs b/src/MongoDB.Driver/Core/Operations/RetryableWriteOperationExecutor.cs index af45545df2c..29e887592a9 100644 --- a/src/MongoDB.Driver/Core/Operations/RetryableWriteOperationExecutor.cs +++ b/src/MongoDB.Driver/Core/Operations/RetryableWriteOperationExecutor.cs @@ -44,13 +44,11 @@ public static TResult Execute(OperationContext operationContext, IRetry totalAttempts++; operationContext.ThrowIfTimedOutOrCanceled(); ServerDescription server = null; - bool channelAcquisitionSuccessful = false; try { server = context.SelectServer(operationContext, deprioritizedServers); context.AcquireChannel(operationContext); - channelAcquisitionSuccessful = true; transactionNumber ??= AreRetriesAllowed(operation.WriteConcern, context, server) ? context.Binding.Session.AdvanceTransactionNumber() : null; @@ -59,7 +57,7 @@ public static TResult Execute(OperationContext operationContext, IRetry } catch (Exception ex) { - if (!ShouldRetryOperation(operationContext, !channelAcquisitionSuccessful, operation.WriteConcern, context, server, ex, totalAttempts)) + if (!ShouldRetryOperation(operationContext, context.Channel is null, operation.WriteConcern, context, server, ex, totalAttempts)) { throw originalException ?? ex; } From 1febd7ea6b36acbfa923ca7c250eba52582b7f2c Mon Sep 17 00:00:00 2001 From: Ferdinando Papale <4850119+papafe@users.noreply.github.com> Date: Wed, 18 Mar 2026 13:35:28 +0100 Subject: [PATCH 08/10] Removed SetCommand --- .../Core/Operations/CommandOperationBase.cs | 27 +++++++------------ .../Core/Operations/ReadCommandOperation.cs | 20 ++++++-------- .../Core/Operations/WriteCommandOperation.cs | 4 +-- 3 files changed, 20 insertions(+), 31 deletions(-) diff --git a/src/MongoDB.Driver/Core/Operations/CommandOperationBase.cs b/src/MongoDB.Driver/Core/Operations/CommandOperationBase.cs index a1294e95624..bb573003dbd 100644 --- a/src/MongoDB.Driver/Core/Operations/CommandOperationBase.cs +++ b/src/MongoDB.Driver/Core/Operations/CommandOperationBase.cs @@ -84,7 +84,7 @@ public IBsonSerializer ResultSerializer get { return _resultSerializer; } } - protected TCommandResult ExecuteProtocol(OperationContext operationContext, IChannelHandle channel, ICoreSessionHandle session, ReadPreference readPreference) + protected TCommandResult ExecuteProtocol(OperationContext operationContext, IChannelHandle channel, ICoreSessionHandle session, ReadPreference readPreference, BsonDocument command) { var additionalOptions = GetEffectiveAdditionalOptions(); @@ -93,7 +93,7 @@ protected TCommandResult ExecuteProtocol(OperationContext operationContext, ICha session, readPreference, _databaseNamespace, - _command, + command, null, // commandPayloads _commandValidator, additionalOptions, @@ -107,15 +107,16 @@ protected TCommandResult ExecuteProtocol( OperationContext operationContext, IChannelSource channelSource, ICoreSessionHandle session, - ReadPreference readPreference) + ReadPreference readPreference, + BsonDocument command) { using (var channel = channelSource.GetChannel(operationContext)) { - return ExecuteProtocol(operationContext, channel, session, readPreference); + return ExecuteProtocol(operationContext, channel, session, readPreference, command); } } - protected Task ExecuteProtocolAsync(OperationContext operationContext, IChannelHandle channel, ICoreSessionHandle session, ReadPreference readPreference) + protected Task ExecuteProtocolAsync(OperationContext operationContext, IChannelHandle channel, ICoreSessionHandle session, ReadPreference readPreference, BsonDocument command) { var additionalOptions = GetEffectiveAdditionalOptions(); @@ -124,7 +125,7 @@ protected Task ExecuteProtocolAsync(OperationContext operationCo session, readPreference, _databaseNamespace, - _command, + command, null, // TODO: support commandPayloads _commandValidator, additionalOptions, @@ -138,23 +139,15 @@ protected async Task ExecuteProtocolAsync( OperationContext operationContext, IChannelSource channelSource, ICoreSessionHandle session, - ReadPreference readPreference) + ReadPreference readPreference, + BsonDocument command) { using (var channel = await channelSource.GetChannelAsync(operationContext).ConfigureAwait(false)) { - return await ExecuteProtocolAsync(operationContext, channel, session, readPreference).ConfigureAwait(false); + return await ExecuteProtocolAsync(operationContext, channel, session, readPreference, command).ConfigureAwait(false); } } - /// - /// Sets the command to be executed. This is used by derived classes that build commands dynamically. - /// - /// The command. - protected void SetCommand(BsonDocument command) - { - _command = Ensure.IsNotNull(command, nameof(command)); - } - private BsonDocument GetEffectiveAdditionalOptions() { if (_additionalOptions == null && _comment == null) diff --git a/src/MongoDB.Driver/Core/Operations/ReadCommandOperation.cs b/src/MongoDB.Driver/Core/Operations/ReadCommandOperation.cs index 3e8bc65db44..8161eee6cec 100644 --- a/src/MongoDB.Driver/Core/Operations/ReadCommandOperation.cs +++ b/src/MongoDB.Driver/Core/Operations/ReadCommandOperation.cs @@ -102,24 +102,20 @@ public async Task ExecuteAsync(OperationContext operationContext public TCommandResult ExecuteAttempt(OperationContext operationContext, RetryableReadContext context, int attempt, long? transactionNumber) { - if (_commandCreator != null) - { - var command = _commandCreator.CreateCommand(operationContext, context.Binding.Session, context.Channel.ConnectionDescription); - SetCommand(command); - } + var command = _commandCreator != null + ? _commandCreator.CreateCommand(operationContext, context.Binding.Session, context.Channel.ConnectionDescription) + : Command; - return ExecuteProtocol(operationContext, context.Channel, context.Binding.Session, context.Binding.ReadPreference); + return ExecuteProtocol(operationContext, context.Channel, context.Binding.Session, context.Binding.ReadPreference, command); } public Task ExecuteAttemptAsync(OperationContext operationContext, RetryableReadContext context, int attempt, long? transactionNumber) { - if (_commandCreator != null) - { - var command = _commandCreator.CreateCommand(operationContext, context.Binding.Session, context.Channel.ConnectionDescription); - SetCommand(command); - } + var command = _commandCreator != null + ? _commandCreator.CreateCommand(operationContext, context.Binding.Session, context.Channel.ConnectionDescription) + : Command; - return ExecuteProtocolAsync(operationContext, context.Channel, context.Binding.Session, context.Binding.ReadPreference); + return ExecuteProtocolAsync(operationContext, context.Channel, context.Binding.Session, context.Binding.ReadPreference, command); } } } diff --git a/src/MongoDB.Driver/Core/Operations/WriteCommandOperation.cs b/src/MongoDB.Driver/Core/Operations/WriteCommandOperation.cs index a6ee83d8155..24c94bf8fb2 100644 --- a/src/MongoDB.Driver/Core/Operations/WriteCommandOperation.cs +++ b/src/MongoDB.Driver/Core/Operations/WriteCommandOperation.cs @@ -49,7 +49,7 @@ public TCommandResult Execute(OperationContext operationContext, IWriteBinding b using (EventContext.BeginOperation()) using (var channelSource = binding.GetWriteChannelSource(operationContext)) { - return ExecuteProtocol(operationContext, channelSource, binding.Session, _readPreference); + return ExecuteProtocol(operationContext, channelSource, binding.Session, _readPreference, Command); } } @@ -60,7 +60,7 @@ public async Task ExecuteAsync(OperationContext operationContext using (EventContext.BeginOperation()) using (var channelSource = await binding.GetWriteChannelSourceAsync(operationContext).ConfigureAwait(false)) { - return await ExecuteProtocolAsync(operationContext, channelSource, binding.Session, _readPreference).ConfigureAwait(false); + return await ExecuteProtocolAsync(operationContext, channelSource, binding.Session, _readPreference, Command).ConfigureAwait(false); } } } From c6fe659ffe3025423230a58693c48eb5e6b9fed4 Mon Sep 17 00:00:00 2001 From: Ferdinando Papale <4850119+papafe@users.noreply.github.com> Date: Wed, 18 Mar 2026 15:31:16 +0100 Subject: [PATCH 09/10] Restored batchable source and related. --- src/MongoDB.Driver/Core/Misc/BatchableSource.cs | 8 +------- .../Core/Operations/RetryableDeleteCommandOperation.cs | 2 +- .../Core/Operations/RetryableInsertCommandOperation.cs | 2 +- .../Core/Operations/RetryableUpdateCommandOperation.cs | 2 +- 4 files changed, 4 insertions(+), 10 deletions(-) diff --git a/src/MongoDB.Driver/Core/Misc/BatchableSource.cs b/src/MongoDB.Driver/Core/Misc/BatchableSource.cs index efa725f6868..4452dac3a99 100644 --- a/src/MongoDB.Driver/Core/Misc/BatchableSource.cs +++ b/src/MongoDB.Driver/Core/Misc/BatchableSource.cs @@ -88,18 +88,12 @@ public BatchableSource(IReadOnlyList items, bool canBeSplit = false) /// The count. /// if set to true the batch can be split. public BatchableSource(IReadOnlyList items, int offset, int count, bool canBeSplit) - : this(items, offset, count, 0, canBeSplit) - { - - } - - internal BatchableSource(IReadOnlyList items, int offset, int count, int processedCount, bool canBeSplit) { _items = Ensure.IsNotNull(items, nameof(items)); _offset = Ensure.IsBetween(offset, 0, items.Count, nameof(offset)); _count = Ensure.IsBetween(count, 0, items.Count - offset, nameof(count)); - _processedCount = Ensure.IsBetween(processedCount, 0, count, nameof(processedCount)); _canBeSplit = canBeSplit; + _processedCount = 0; } // public properties diff --git a/src/MongoDB.Driver/Core/Operations/RetryableDeleteCommandOperation.cs b/src/MongoDB.Driver/Core/Operations/RetryableDeleteCommandOperation.cs index 9e64f79ebe6..128a4c6997e 100644 --- a/src/MongoDB.Driver/Core/Operations/RetryableDeleteCommandOperation.cs +++ b/src/MongoDB.Driver/Core/Operations/RetryableDeleteCommandOperation.cs @@ -94,7 +94,7 @@ protected override IEnumerable CreateCommandPayl } else { - deletes = new BatchableSource(_deletes.Items, _deletes.Offset, _deletes.Count, _deletes.ProcessedCount, canBeSplit: false); + deletes = new BatchableSource(_deletes.Items, _deletes.Offset, _deletes.ProcessedCount, canBeSplit: false); } var maxBatchCount = Math.Min(MaxBatchCount ?? int.MaxValue, channel.ConnectionDescription.MaxBatchCount); var maxDocumentSize = channel.ConnectionDescription.MaxWireDocumentSize; diff --git a/src/MongoDB.Driver/Core/Operations/RetryableInsertCommandOperation.cs b/src/MongoDB.Driver/Core/Operations/RetryableInsertCommandOperation.cs index f7df89bfd44..45eba3eebe2 100644 --- a/src/MongoDB.Driver/Core/Operations/RetryableInsertCommandOperation.cs +++ b/src/MongoDB.Driver/Core/Operations/RetryableInsertCommandOperation.cs @@ -91,7 +91,7 @@ protected override IEnumerable CreateCommandPayl } else { - documents = new BatchableSource(_documents.Items, _documents.Offset, _documents.Count, _documents.ProcessedCount, canBeSplit: false); + documents = new BatchableSource(_documents.Items, _documents.Offset, _documents.ProcessedCount, canBeSplit: false); } var elementNameValidator = NoOpElementNameValidator.Instance; diff --git a/src/MongoDB.Driver/Core/Operations/RetryableUpdateCommandOperation.cs b/src/MongoDB.Driver/Core/Operations/RetryableUpdateCommandOperation.cs index bf7c6f8760f..ea81d10e333 100644 --- a/src/MongoDB.Driver/Core/Operations/RetryableUpdateCommandOperation.cs +++ b/src/MongoDB.Driver/Core/Operations/RetryableUpdateCommandOperation.cs @@ -101,7 +101,7 @@ protected override IEnumerable CreateCommandPayl } else { - updates = new BatchableSource(_updates.Items, _updates.Offset, _updates.Count, _updates.ProcessedCount, canBeSplit: false); + updates = new BatchableSource(_updates.Items, _updates.Offset, _updates.ProcessedCount, canBeSplit: false); } var maxBatchCount = Math.Min(MaxBatchCount ?? int.MaxValue, channel.ConnectionDescription.MaxBatchCount); var maxDocumentSize = channel.ConnectionDescription.MaxWireDocumentSize; From 7f9a61af46f185bb5cc9bd82282fff4340534468 Mon Sep 17 00:00:00 2001 From: Ferdinando Papale <4850119+papafe@users.noreply.github.com> Date: Thu, 19 Mar 2026 12:42:55 +0100 Subject: [PATCH 10/10] Removed command --- .../Core/Operations/CommandOperationBase.cs | 11 +------- .../Core/Operations/ReadCommandOperation.cs | 12 ++++++--- .../RetryableUpdateCommandOperation.cs | 2 +- .../Core/Operations/WriteCommandOperation.cs | 10 ++++--- .../Operations/CommandOperationBaseTests.cs | 27 +++++-------------- 5 files changed, 23 insertions(+), 39 deletions(-) diff --git a/src/MongoDB.Driver/Core/Operations/CommandOperationBase.cs b/src/MongoDB.Driver/Core/Operations/CommandOperationBase.cs index bb573003dbd..39f2afc8606 100644 --- a/src/MongoDB.Driver/Core/Operations/CommandOperationBase.cs +++ b/src/MongoDB.Driver/Core/Operations/CommandOperationBase.cs @@ -27,21 +27,17 @@ namespace MongoDB.Driver.Core.Operations internal abstract class CommandOperationBase { private BsonDocument _additionalOptions; - private BsonDocument _command; private IElementNameValidator _commandValidator = NoOpElementNameValidator.Instance; private string _comment; private DatabaseNamespace _databaseNamespace; private MessageEncoderSettings _messageEncoderSettings; private IBsonSerializer _resultSerializer; - protected CommandOperationBase( - DatabaseNamespace databaseNamespace, - BsonDocument command, + protected CommandOperationBase(DatabaseNamespace databaseNamespace, IBsonSerializer resultSerializer, MessageEncoderSettings messageEncoderSettings) { _databaseNamespace = Ensure.IsNotNull(databaseNamespace, nameof(databaseNamespace)); - _command = command; //can be null _resultSerializer = Ensure.IsNotNull(resultSerializer, nameof(resultSerializer)); _messageEncoderSettings = messageEncoderSettings; } @@ -52,11 +48,6 @@ public BsonDocument AdditionalOptions set { _additionalOptions = value; } } - public BsonDocument Command - { - get { return _command; } - } - public IElementNameValidator CommandValidator { get { return _commandValidator; } diff --git a/src/MongoDB.Driver/Core/Operations/ReadCommandOperation.cs b/src/MongoDB.Driver/Core/Operations/ReadCommandOperation.cs index 8161eee6cec..b17489907c4 100644 --- a/src/MongoDB.Driver/Core/Operations/ReadCommandOperation.cs +++ b/src/MongoDB.Driver/Core/Operations/ReadCommandOperation.cs @@ -27,6 +27,7 @@ internal sealed class ReadCommandOperation : CommandOperationBas { private readonly string _operationName; private readonly ICommandCreator _commandCreator; + private readonly BsonDocument _command; private bool _retryRequested; public ReadCommandOperation( @@ -35,11 +36,14 @@ public ReadCommandOperation( IBsonSerializer resultSerializer, MessageEncoderSettings messageEncoderSettings, string operationName = null) - : base(databaseNamespace, command, resultSerializer, messageEncoderSettings) + : base(databaseNamespace, resultSerializer, messageEncoderSettings) { + _command = Ensure.IsNotNull(command, nameof(command)); _operationName = operationName; } + public BsonDocument Command => _command; + public string OperationName => _operationName; public ReadCommandOperation( @@ -48,7 +52,7 @@ public ReadCommandOperation( IBsonSerializer resultSerializer, MessageEncoderSettings messageEncoderSettings, string operationName = null) - : base(databaseNamespace, null, resultSerializer, messageEncoderSettings) + : base(databaseNamespace, resultSerializer, messageEncoderSettings) { _commandCreator = Ensure.IsNotNull(commandCreator, nameof(commandCreator)); _operationName = operationName; @@ -104,7 +108,7 @@ public TCommandResult ExecuteAttempt(OperationContext operationContext, Retryabl { var command = _commandCreator != null ? _commandCreator.CreateCommand(operationContext, context.Binding.Session, context.Channel.ConnectionDescription) - : Command; + : _command; return ExecuteProtocol(operationContext, context.Channel, context.Binding.Session, context.Binding.ReadPreference, command); } @@ -113,7 +117,7 @@ public Task ExecuteAttemptAsync(OperationContext operationContex { var command = _commandCreator != null ? _commandCreator.CreateCommand(operationContext, context.Binding.Session, context.Channel.ConnectionDescription) - : Command; + : _command; return ExecuteProtocolAsync(operationContext, context.Channel, context.Binding.Session, context.Binding.ReadPreference, command); } diff --git a/src/MongoDB.Driver/Core/Operations/RetryableUpdateCommandOperation.cs b/src/MongoDB.Driver/Core/Operations/RetryableUpdateCommandOperation.cs index ea81d10e333..d469076aee4 100644 --- a/src/MongoDB.Driver/Core/Operations/RetryableUpdateCommandOperation.cs +++ b/src/MongoDB.Driver/Core/Operations/RetryableUpdateCommandOperation.cs @@ -105,7 +105,7 @@ protected override IEnumerable CreateCommandPayl } var maxBatchCount = Math.Min(MaxBatchCount ?? int.MaxValue, channel.ConnectionDescription.MaxBatchCount); var maxDocumentSize = channel.ConnectionDescription.MaxWireDocumentSize; - var payload = new Type1CommandMessageSection("updates", updates, UpdateRequestSerializer.Instance, NoOpElementNameValidator.Instance, maxBatchCount, maxDocumentSize); + var payload = new Type1CommandMessageSection("updates", _updates, UpdateRequestSerializer.Instance, NoOpElementNameValidator.Instance, maxBatchCount, maxDocumentSize); return new Type1CommandMessageSection[] { payload }; } diff --git a/src/MongoDB.Driver/Core/Operations/WriteCommandOperation.cs b/src/MongoDB.Driver/Core/Operations/WriteCommandOperation.cs index 24c94bf8fb2..fc8c2d515fa 100644 --- a/src/MongoDB.Driver/Core/Operations/WriteCommandOperation.cs +++ b/src/MongoDB.Driver/Core/Operations/WriteCommandOperation.cs @@ -25,15 +25,19 @@ namespace MongoDB.Driver.Core.Operations { internal sealed class WriteCommandOperation : CommandOperationBase, IWriteOperation { + private readonly BsonDocument _command; private readonly string _operationName; private ReadPreference _readPreference = ReadPreference.Primary; public WriteCommandOperation(DatabaseNamespace databaseNamespace, BsonDocument command, IBsonSerializer resultSerializer, MessageEncoderSettings messageEncoderSettings, string operationName = null) - : base(databaseNamespace, command, resultSerializer, messageEncoderSettings) + : base(databaseNamespace, resultSerializer, messageEncoderSettings) { + _command = Ensure.IsNotNull(command, nameof(command)); _operationName = operationName; } + public BsonDocument Command => _command; + public string OperationName => _operationName; public ReadPreference ReadPreference @@ -49,7 +53,7 @@ public TCommandResult Execute(OperationContext operationContext, IWriteBinding b using (EventContext.BeginOperation()) using (var channelSource = binding.GetWriteChannelSource(operationContext)) { - return ExecuteProtocol(operationContext, channelSource, binding.Session, _readPreference, Command); + return ExecuteProtocol(operationContext, channelSource, binding.Session, _readPreference, _command); } } @@ -60,7 +64,7 @@ public async Task ExecuteAsync(OperationContext operationContext using (EventContext.BeginOperation()) using (var channelSource = await binding.GetWriteChannelSourceAsync(operationContext).ConfigureAwait(false)) { - return await ExecuteProtocolAsync(operationContext, channelSource, binding.Session, _readPreference, Command).ConfigureAwait(false); + return await ExecuteProtocolAsync(operationContext, channelSource, binding.Session, _readPreference, _command).ConfigureAwait(false); } } } diff --git a/tests/MongoDB.Driver.Tests/Core/Operations/CommandOperationBaseTests.cs b/tests/MongoDB.Driver.Tests/Core/Operations/CommandOperationBaseTests.cs index c786b0598eb..8a3929757c9 100644 --- a/tests/MongoDB.Driver.Tests/Core/Operations/CommandOperationBaseTests.cs +++ b/tests/MongoDB.Driver.Tests/Core/Operations/CommandOperationBaseTests.cs @@ -46,17 +46,6 @@ public void AdditionalOptions_get_and_set_should_work( result.Should().BeSameAs(additionalOptions); } - [Fact] - public void Command_get_should_return_expected_result() - { - var command = new BsonDocument("command", 1); - var subject = CreateSubject(command: command); - - var result = subject.Command; - - result.Should().BeSameAs(command); - } - [Fact] public void CommandValidator_get_and_set_should_work() { @@ -100,10 +89,9 @@ public void constructor_should_initialize_instance() var resultSerializer = new BsonDocumentSerializer(); var messageEncoderSettings = new MessageEncoderSettings(); - var result = new FakeCommandOperation(databaseNamespace, command, resultSerializer, messageEncoderSettings); + var result = new FakeCommandOperation(databaseNamespace, resultSerializer, messageEncoderSettings); result.AdditionalOptions.Should().BeNull(); - result.Command.Should().BeSameAs(command); result.CommandValidator.Should().BeOfType(); result.Comment.Should().BeNull(); result.DatabaseNamespace.Should().BeSameAs(databaseNamespace); @@ -119,7 +107,7 @@ public void constructor_should_initialize_instance_when_messageEncoderSettings_i var resultSerializer = new BsonDocumentSerializer(); MessageEncoderSettings messageEncoderSettings = null; - var result = new FakeCommandOperation(databaseNamespace, command, resultSerializer, messageEncoderSettings); + var result = new FakeCommandOperation(databaseNamespace, resultSerializer, messageEncoderSettings); result.MessageEncoderSettings.Should().BeNull(); } @@ -132,7 +120,7 @@ public void constructor_should_throw_when_databaseNamespace_is_null() var resultSerializer = new BsonDocumentSerializer(); var messageEncoderSettings = new MessageEncoderSettings(); - Action action = () => new FakeCommandOperation(databaseNamespace, command, resultSerializer, messageEncoderSettings); + Action action = () => new FakeCommandOperation(databaseNamespace, resultSerializer, messageEncoderSettings); action.ShouldThrow().And.ParamName.Should().Be("databaseNamespace"); } @@ -145,7 +133,7 @@ public void constructor_should_throw_when_resultSerializer_is_null() BsonDocumentSerializer resultSerializer = null; var messageEncoderSettings = new MessageEncoderSettings(); - Action action = () => new FakeCommandOperation(databaseNamespace, command, resultSerializer, messageEncoderSettings); + Action action = () => new FakeCommandOperation(databaseNamespace, resultSerializer, messageEncoderSettings); action.ShouldThrow().And.ParamName.Should().Be("resultSerializer"); } @@ -195,14 +183,12 @@ private ServerDescription CreateServerDescription(ServerType serverType) private CommandOperationBase CreateSubject( DatabaseNamespace databaseNamespace = null, - BsonDocument command = null, IBsonSerializer resultSerializer = null, MessageEncoderSettings messageEncoderSettings = null) { databaseNamespace = databaseNamespace ?? new DatabaseNamespace("databaseName"); - command = command ?? new BsonDocument("command", 1); resultSerializer = resultSerializer ?? BsonSerializer.LookupSerializer(); - return new FakeCommandOperation(databaseNamespace, command, resultSerializer, messageEncoderSettings); + return new FakeCommandOperation(databaseNamespace, resultSerializer, messageEncoderSettings); } // nested types @@ -210,10 +196,9 @@ private class FakeCommandOperation : CommandOperationBase resultSerializer, MessageEncoderSettings messageEncoderSettings) - : base(databaseNamespace, command, resultSerializer, messageEncoderSettings) + : base(databaseNamespace, resultSerializer, messageEncoderSettings) { } }