Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 10 additions & 6 deletions src/MongoDB.Driver/Core/Operations/AggregateOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@

namespace MongoDB.Driver.Core.Operations
{
internal sealed class AggregateOperation<TResult> : IReadOperation<IAsyncCursor<TResult>>, IExecutableInRetryableReadContext<IAsyncCursor<TResult>>
internal sealed class AggregateOperation<TResult> : IReadOperation<IAsyncCursor<TResult>>, IExecutableInRetryableReadContext<IAsyncCursor<TResult>>, ICommandCreator
{
// fields
private bool? _allowDiskUse;
Expand Down Expand Up @@ -279,7 +279,7 @@ public IAsyncCursor<TResult> Execute(OperationContext operationContext, IReadBin
Ensure.IsNotNull(binding, nameof(binding));

using (BeginOperation())
using (var context = RetryableReadContext.Create(operationContext, binding, _retryRequested))
using (var context = new RetryableReadContext(binding, _retryRequested))
{
return Execute(operationContext, context);
}
Expand Down Expand Up @@ -308,7 +308,7 @@ public async Task<IAsyncCursor<TResult>> ExecuteAsync(OperationContext operation
Ensure.IsNotNull(binding, nameof(binding));

using (BeginOperation())
using (var context = await RetryableReadContext.CreateAsync(operationContext, binding, _retryRequested).ConfigureAwait(false))
using (var context = new RetryableReadContext(binding, _retryRequested))
{
return await ExecuteAsync(operationContext, context).ConfigureAwait(false);
}
Expand All @@ -331,7 +331,7 @@ public async Task<IAsyncCursor<TResult>> ExecuteAsync(OperationContext operation
}
}

internal BsonDocument CreateCommand(OperationContext operationContext, ICoreSession session, ConnectionDescription connectionDescription)
public BsonDocument CreateCommand(OperationContext operationContext, ICoreSession session, ConnectionDescription connectionDescription)
{
var readConcern = ReadConcernHelper.GetReadConcernForCommand(session, connectionDescription, _readConcern);
var command = new BsonDocument
Expand Down Expand Up @@ -362,9 +362,13 @@ internal BsonDocument CreateCommand(OperationContext operationContext, ICoreSess
private ReadCommandOperation<AggregateResult> CreateOperation(OperationContext operationContext, RetryableReadContext context)
{
var databaseNamespace = _collectionNamespace == null ? _databaseNamespace : _collectionNamespace.DatabaseNamespace;
var command = CreateCommand(operationContext, context.Binding.Session, context.Channel.ConnectionDescription);
var serializer = new AggregateResultDeserializer(_resultSerializer);
return new ReadCommandOperation<AggregateResult>(databaseNamespace, command, serializer, MessageEncoderSettings, OperationName)
return new ReadCommandOperation<AggregateResult>(
databaseNamespace,
this,
serializer,
MessageEncoderSettings,
OperationName)
{
RetryRequested = _retryRequested // might be overridden by retryable read context
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ public WriteConcern WriteConcern
public BulkWriteOperationResult Execute(OperationContext operationContext, IWriteBinding binding)
{
using (BeginOperation())
using (var context = RetryableWriteContext.Create(operationContext, binding, IsOperationRetryable()))
using (var context = new RetryableWriteContext(binding, IsOperationRetryable()))
{
EnsureHintIsSupportedIfAnyRequestHasHint();
var helper = new BatchHelper(_requests, _isOrdered, _writeConcern);
Expand All @@ -160,7 +160,7 @@ public BulkWriteOperationResult Execute(OperationContext operationContext, IWrit
public async Task<BulkWriteOperationResult> ExecuteAsync(OperationContext operationContext, IWriteBinding binding)
{
using (BeginOperation())
using (var context = await RetryableWriteContext.CreateAsync(operationContext, binding, IsOperationRetryable()).ConfigureAwait(false))
using (var context = new RetryableWriteContext(binding, IsOperationRetryable()))
{
EnsureHintIsSupportedIfAnyRequestHasHint();
var helper = new BatchHelper(_requests, _isOrdered, _writeConcern);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ public BulkWriteOperationResult Execute(OperationContext operationContext, Retry
public BulkWriteOperationResult Execute(OperationContext operationContext, IWriteBinding binding)
{
using (BeginOperation())
using (var context = RetryableWriteContext.Create(operationContext, binding, IsOperationRetryable()))
using (var context = new RetryableWriteContext(binding, IsOperationRetryable()))
{
return Execute(operationContext, context);
}
Expand All @@ -147,7 +147,7 @@ public Task<BulkWriteOperationResult> ExecuteAsync(OperationContext operationCon
public async Task<BulkWriteOperationResult> ExecuteAsync(OperationContext operationContext, IWriteBinding binding)
{
using (BeginOperation())
using (var context = await RetryableWriteContext.CreateAsync(operationContext, binding, IsOperationRetryable()).ConfigureAwait(false))
using (var context = new RetryableWriteContext(binding, IsOperationRetryable()))
{
return await ExecuteAsync(operationContext, context).ConfigureAwait(false);
}
Expand Down Expand Up @@ -286,7 +286,7 @@ public BulkWriteOperationResult CreateFinalResultOrThrow(IChannelHandle channel)
{
var combiner = new BulkWriteBatchResultCombiner(_batchResults, _writeConcern.IsAcknowledged);
var remainingRequests = _requests.GetUnprocessedItems();
return combiner.CreateResultOrThrowIfHasErrors(channel.ConnectionDescription.ConnectionId, remainingRequests);
return combiner.CreateResultOrThrowIfHasErrors(channel?.ConnectionDescription.ConnectionId, remainingRequests);
}

// private methods
Expand Down
8 changes: 4 additions & 4 deletions src/MongoDB.Driver/Core/Operations/ChangeStreamOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ public IChangeStreamCursor<TResult> Execute(OperationContext operationContext, I
IAsyncCursor<RawBsonDocument> cursor;
ICursorBatchInfo cursorBatchInfo;
BsonTimestamp initialOperationTime;
using (var context = RetryableReadContext.Create(operationContext, binding, _retryRequested))
using (var context = new RetryableReadContext(binding, _retryRequested))
{
cursor = ExecuteAggregateOperation(operationContext, context);
cursorBatchInfo = (ICursorBatchInfo)cursor;
Expand Down Expand Up @@ -301,7 +301,7 @@ public async Task<IChangeStreamCursor<TResult>> ExecuteAsync(OperationContext op
IAsyncCursor<RawBsonDocument> cursor;
ICursorBatchInfo cursorBatchInfo;
BsonTimestamp initialOperationTime;
using (var context = await RetryableReadContext.CreateAsync(operationContext, binding, _retryRequested).ConfigureAwait(false))
using (var context = new RetryableReadContext(binding, _retryRequested))
{
cursor = await ExecuteAggregateOperationAsync(operationContext, context).ConfigureAwait(false);
cursorBatchInfo = (ICursorBatchInfo)cursor;
Expand All @@ -326,7 +326,7 @@ public async Task<IChangeStreamCursor<TResult>> ExecuteAsync(OperationContext op
/// <inheritdoc />
public IAsyncCursor<RawBsonDocument> Resume(OperationContext operationContext, IReadBinding binding)
{
using (var context = RetryableReadContext.Create(operationContext, binding, retryRequested: false))
using (var context = new RetryableReadContext(binding, retryRequested: false))
{
return ExecuteAggregateOperation(operationContext, context);
}
Expand All @@ -335,7 +335,7 @@ public IAsyncCursor<RawBsonDocument> Resume(OperationContext operationContext, I
/// <inheritdoc />
public async Task<IAsyncCursor<RawBsonDocument>> ResumeAsync(OperationContext operationContext, IReadBinding binding)
{
using (var context = await RetryableReadContext.CreateAsync(operationContext, binding, retryRequested: false).ConfigureAwait(false))
using (var context = new RetryableReadContext(binding, retryRequested: false))
{
return await ExecuteAggregateOperationAsync(operationContext, context).ConfigureAwait(false);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ protected override IEnumerable<BatchableCommandMessageSection> CreateCommandPayl
var bulkWriteResults = new BulkWriteRawResult();
while (true)
{
using var context = RetryableWriteContext.Create(operationContext, binding, GetEffectiveRetryRequested());
using var context = new RetryableWriteContext(binding, GetEffectiveRetryRequested());
BsonDocument serverResponse = null;
try
{
Expand All @@ -112,7 +112,7 @@ protected override IEnumerable<BatchableCommandMessageSection> CreateCommandPayl
bulkWriteResults.TopLevelException = commandException;
serverResponse = commandException.Result;
}
catch (Exception exception)
catch (Exception exception) when (context.Channel is not null)
{
bulkWriteResults.TopLevelException = exception;
}
Expand Down Expand Up @@ -154,7 +154,7 @@ protected override IEnumerable<BatchableCommandMessageSection> CreateCommandPayl
var bulkWriteResults = new BulkWriteRawResult();
while (true)
{
using var context = RetryableWriteContext.Create(operationContext, binding, GetEffectiveRetryRequested());
using var context = new RetryableWriteContext(binding, GetEffectiveRetryRequested());
BsonDocument serverResponse = null;
try
{
Expand All @@ -170,7 +170,7 @@ protected override IEnumerable<BatchableCommandMessageSection> CreateCommandPayl
bulkWriteResults.TopLevelException = commandException;
serverResponse = commandException.Result;
}
catch (Exception exception)
catch (Exception exception) when (context.Channel is not null)
{
bulkWriteResults.TopLevelException = exception;
}
Expand Down
29 changes: 11 additions & 18 deletions src/MongoDB.Driver/Core/Operations/CommandOperationBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,21 +27,17 @@ namespace MongoDB.Driver.Core.Operations
internal abstract class CommandOperationBase<TCommandResult>
{
private BsonDocument _additionalOptions;
private BsonDocument _command;
private IElementNameValidator _commandValidator = NoOpElementNameValidator.Instance;
private string _comment;
private DatabaseNamespace _databaseNamespace;
private MessageEncoderSettings _messageEncoderSettings;
private IBsonSerializer<TCommandResult> _resultSerializer;

protected CommandOperationBase(
DatabaseNamespace databaseNamespace,
BsonDocument command,
protected CommandOperationBase(DatabaseNamespace databaseNamespace,
IBsonSerializer<TCommandResult> resultSerializer,
MessageEncoderSettings messageEncoderSettings)
{
_databaseNamespace = Ensure.IsNotNull(databaseNamespace, nameof(databaseNamespace));
_command = Ensure.IsNotNull(command, nameof(command));
_resultSerializer = Ensure.IsNotNull(resultSerializer, nameof(resultSerializer));
_messageEncoderSettings = messageEncoderSettings;
}
Expand All @@ -52,11 +48,6 @@ public BsonDocument AdditionalOptions
set { _additionalOptions = value; }
}

public BsonDocument Command
{
get { return _command; }
}

public IElementNameValidator CommandValidator
{
get { return _commandValidator; }
Expand Down Expand Up @@ -84,7 +75,7 @@ public IBsonSerializer<TCommandResult> ResultSerializer
get { return _resultSerializer; }
}

protected TCommandResult ExecuteProtocol(OperationContext operationContext, IChannelHandle channel, ICoreSessionHandle session, ReadPreference readPreference)
protected TCommandResult ExecuteProtocol(OperationContext operationContext, IChannelHandle channel, ICoreSessionHandle session, ReadPreference readPreference, BsonDocument command)
{
var additionalOptions = GetEffectiveAdditionalOptions();

Expand All @@ -93,7 +84,7 @@ protected TCommandResult ExecuteProtocol(OperationContext operationContext, ICha
session,
readPreference,
_databaseNamespace,
_command,
command,
null, // commandPayloads
_commandValidator,
additionalOptions,
Expand All @@ -107,15 +98,16 @@ protected TCommandResult ExecuteProtocol(
OperationContext operationContext,
IChannelSource channelSource,
ICoreSessionHandle session,
ReadPreference readPreference)
ReadPreference readPreference,
BsonDocument command)
{
using (var channel = channelSource.GetChannel(operationContext))
{
return ExecuteProtocol(operationContext, channel, session, readPreference);
return ExecuteProtocol(operationContext, channel, session, readPreference, command);
}
}

protected Task<TCommandResult> ExecuteProtocolAsync(OperationContext operationContext, IChannelHandle channel, ICoreSessionHandle session, ReadPreference readPreference)
protected Task<TCommandResult> ExecuteProtocolAsync(OperationContext operationContext, IChannelHandle channel, ICoreSessionHandle session, ReadPreference readPreference, BsonDocument command)
{
var additionalOptions = GetEffectiveAdditionalOptions();

Expand All @@ -124,7 +116,7 @@ protected Task<TCommandResult> ExecuteProtocolAsync(OperationContext operationCo
session,
readPreference,
_databaseNamespace,
_command,
command,
null, // TODO: support commandPayloads
_commandValidator,
additionalOptions,
Expand All @@ -138,11 +130,12 @@ protected async Task<TCommandResult> ExecuteProtocolAsync(
OperationContext operationContext,
IChannelSource channelSource,
ICoreSessionHandle session,
ReadPreference readPreference)
ReadPreference readPreference,
BsonDocument command)
{
using (var channel = await channelSource.GetChannelAsync(operationContext).ConfigureAwait(false))
{
return await ExecuteProtocolAsync(operationContext, channel, session, readPreference).ConfigureAwait(false);
return await ExecuteProtocolAsync(operationContext, channel, session, readPreference, command).ConfigureAwait(false);
}
}

Expand Down
14 changes: 9 additions & 5 deletions src/MongoDB.Driver/Core/Operations/CountOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@

namespace MongoDB.Driver.Core.Operations
{
internal sealed class CountOperation : IReadOperation<long>, IExecutableInRetryableReadContext<long>
internal sealed class CountOperation : IReadOperation<long>, IExecutableInRetryableReadContext<long>, ICommandCreator
{
private Collation _collation;
private readonly CollectionNamespace _collectionNamespace;
Expand Down Expand Up @@ -133,7 +133,7 @@ public long Execute(OperationContext operationContext, IReadBinding binding)
Ensure.IsNotNull(binding, nameof(binding));

using (BeginOperation())
using (var context = RetryableReadContext.Create(operationContext, binding, _retryRequested))
using (var context = new RetryableReadContext(binding, _retryRequested))
{
return Execute(operationContext, context);
}
Expand All @@ -151,7 +151,7 @@ public async Task<long> ExecuteAsync(OperationContext operationContext, IReadBin
Ensure.IsNotNull(binding, nameof(binding));

using (BeginOperation())
using (var context = await RetryableReadContext.CreateAsync(operationContext, binding, _retryRequested).ConfigureAwait(false))
using (var context = new RetryableReadContext(binding, _retryRequested))
{
return await ExecuteAsync(operationContext, context).ConfigureAwait(false);
}
Expand All @@ -168,8 +168,12 @@ public async Task<long> ExecuteAsync(OperationContext operationContext, Retryabl

private ReadCommandOperation<BsonDocument> CreateOperation(OperationContext operationContext, RetryableReadContext context)
{
var command = CreateCommand(operationContext, context.Binding.Session, context.Channel.ConnectionDescription);
return new ReadCommandOperation<BsonDocument>(_collectionNamespace.DatabaseNamespace, command, BsonDocumentSerializer.Instance, _messageEncoderSettings, OperationName)
return new ReadCommandOperation<BsonDocument>(
_collectionNamespace.DatabaseNamespace,
this,
BsonDocumentSerializer.Instance,
_messageEncoderSettings,
OperationName)
{
RetryRequested = _retryRequested // might be overridden by retryable read context
};
Expand Down
Loading