Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/MongoDB.Driver/Core/Connections/BinaryConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,11 @@ private void ConnectionFailed(Exception exception)
{
_failedEventHasBeenRaised = true;
_eventLogger.LogAndPublish(new ConnectionFailedEvent(_connectionId, exception));
_commandEventHelper.ConnectionFailed(_connectionId, _description?.ServiceId, exception, IsInitializing);

if (_commandEventHelper.ShouldCallConnectionFailed)
{
_commandEventHelper.ConnectionFailed(_connectionId, _description?.ServiceId, exception, IsInitializing);
}
}
}

Expand Down
111 changes: 53 additions & 58 deletions src/MongoDB.Driver/Core/Connections/CommandEventHelper.cs
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ namespace MongoDB.Driver.Core.Connections
internal class CommandEventHelper
{
private readonly EventLogger<LogCategories.Command> _eventLogger;
private readonly ConcurrentDictionary<int, CommandState> _state;
private ConcurrentDictionary<int, CommandState> _state;

private readonly bool _shouldProcessRequestMessages;
private readonly bool _eventsNeedBeforeSending;
private readonly bool _shouldTrackStarted;
private readonly bool _shouldTrackState;
private readonly bool _eventsNeedState;
private readonly bool _shouldTrackFailed;
private readonly bool _shouldTrackSucceeded;
private readonly bool _shouldTrace;
private readonly bool _tracingDisabled;
private readonly int _queryTextMaxLength;

private Activity _currentCommandActivity;
Expand All @@ -54,44 +54,34 @@ public CommandEventHelper(EventLogger<LogCategories.Command> eventLogger, Tracin
_shouldTrackFailed = _eventLogger.IsEventTracked<CommandFailedEvent>();
_shouldTrackStarted = _eventLogger.IsEventTracked<CommandStartedEvent>();

_shouldTrace = tracingOptions?.Disabled != true && MongoTelemetry.ActivitySource.HasListeners();
_tracingDisabled = tracingOptions?.Disabled == true;
_queryTextMaxLength = tracingOptions?.QueryTextMaxLength ?? 0;

_shouldTrackState = _shouldTrackSucceeded || _shouldTrackFailed || _shouldTrace;
_shouldProcessRequestMessages = _shouldTrackStarted || _shouldTrackState;
_eventsNeedState = _shouldTrackSucceeded || _shouldTrackFailed;
_eventsNeedBeforeSending = _shouldTrackStarted || _eventsNeedState;

if (_shouldTrackState)
if (_eventsNeedState)
{
// we only need to track state if we have to raise
// a succeeded or failed event or for tracing
_state = new ConcurrentDictionary<int, CommandState>();
}
}

public bool ShouldCallBeforeSending
{
get { return _shouldProcessRequestMessages; }
}
public bool ShouldCallBeforeSending => _eventsNeedBeforeSending || ShouldTraceWithActivityListener();

public bool ShouldCallAfterSending
{
get { return _shouldTrackState; }
}
public bool ShouldCallAfterSending => _eventsNeedState || ShouldTraceWithActivityListener();

public bool ShouldCallErrorSending
{
get { return _shouldTrackState; }
}
public bool ShouldCallErrorSending => _eventsNeedState || ShouldTraceWithActivityListener();

public bool ShouldCallAfterReceiving
{
get { return _shouldTrackState; }
}
public bool ShouldCallAfterReceiving => _eventsNeedState || ShouldTraceWithActivityListener();

public bool ShouldCallErrorReceiving
{
get { return _shouldTrackState; }
}
public bool ShouldCallErrorReceiving => _eventsNeedState || ShouldTraceWithActivityListener();

public bool ShouldCallConnectionFailed => (_shouldTrackFailed || ShouldTraceWithActivityListener()) && _state != null;

private bool ShouldTraceWithActivityListener()
=> !_tracingDisabled && MongoTelemetry.ActivitySource.HasListeners();

public void CompleteFailedCommandActivity(Exception exception)
{
Expand Down Expand Up @@ -129,8 +119,12 @@ public void BeforeSending(

public void AfterSending(RequestMessage message, ConnectionId connectionId, ObjectId? serviceId, bool skipLogging)
{
CommandState state;
if (_state.TryGetValue(message.RequestId, out state) &&
if (_state == null)
{
return;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this code path expected, under assumption that it's guarded by ShouldCallAfterSending() ?
If it's not, should we throw instead of return?

Copy link
Copy Markdown
Contributor Author

@ajcvickers ajcvickers Apr 7, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, because there is always a race condition when the listener is not set before we start, but is set later. This is fine, it just means there needs to be a null check.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh I see, in a situation where ShouldCallBeforeSending == false and ShouldCallAfterSending == true.
Thanks!

}

if (_state.TryGetValue(message.RequestId, out var state) &&
state.ExpectedResponseType == ExpectedResponseType.None)
{
state.Stopwatch.Stop();
Expand All @@ -157,12 +151,16 @@ public void AfterSending(RequestMessage message, ConnectionId connectionId, Obje

public void ErrorSending(RequestMessage message, ConnectionId connectionId, ObjectId? serviceId, Exception exception, bool skipLogging)
{
CommandState state;
if (_state.TryRemove(message.RequestId, out state))
if (_state == null)
{
state.Stopwatch.Stop();
return;
}

CompleteFailedCommandActivity(exception);

CompleteCommandActivityWithException(exception);
if (_state.TryRemove(message.RequestId, out var state))
{
state.Stopwatch.Stop();

_eventLogger.LogAndPublish(new CommandFailedEvent(
state.CommandName,
Expand All @@ -179,8 +177,12 @@ public void ErrorSending(RequestMessage message, ConnectionId connectionId, Obje

public void AfterReceiving(ResponseMessage message, IByteBuffer buffer, ConnectionId connectionId, ObjectId? serviceId, MessageEncoderSettings encoderSettings, bool skipLogging)
{
CommandState state;
if (!_state.TryRemove(message.ResponseTo, out state))
if (_state == null)
{
return;
}

if (!_state.TryRemove(message.ResponseTo, out var state))
{
// this indicates a bug in the sending portion...
return;
Expand All @@ -198,17 +200,21 @@ public void AfterReceiving(ResponseMessage message, IByteBuffer buffer, Connecti

public void ErrorReceiving(int responseTo, ConnectionId connectionId, ObjectId? serviceId, Exception exception, bool skipLogging)
{
CommandState state;
if (!_state.TryRemove(responseTo, out state))
if (_state == null)
{
return;
}

CompleteFailedCommandActivity(exception);

if (!_state.TryRemove(responseTo, out var state))
{
// this indicates a bug in the sending portion...
return;
}

state.Stopwatch.Stop();

CompleteCommandActivityWithException(exception);

_eventLogger.LogAndPublish(new CommandFailedEvent(
state.CommandName,
state.QueryNamespace.DatabaseNamespace,
Expand All @@ -223,12 +229,7 @@ public void ErrorReceiving(int responseTo, ConnectionId connectionId, ObjectId?

public void ConnectionFailed(ConnectionId connectionId, ObjectId? serviceId, Exception exception, bool skipLogging)
{
if (!_shouldTrackFailed && !_shouldTrace)
{
return;
}

CompleteCommandActivityWithException(exception);
CompleteFailedCommandActivity(exception);

var requestIds = _state.Keys;
foreach (var requestId in requestIds)
Expand Down Expand Up @@ -731,11 +732,15 @@ private void TrackCommandState(
BsonDocument sessionId,
long? transactionNumber)
{
if (!_shouldTrackState)
var shouldTraceCommand = ShouldTraceWithActivityListener() && !shouldRedactCommand && !skipLogging;

if (!_eventsNeedState && !shouldTraceCommand)
{
return;
}

_state ??= new ConcurrentDictionary<int, CommandState>();

var commandState = new CommandState
{
CommandName = commandName,
Expand All @@ -746,7 +751,7 @@ private void TrackCommandState(
ShouldRedactReply = shouldRedactCommand
};

if (_shouldTrace && !shouldRedactCommand && !skipLogging)
if (shouldTraceCommand)
{
_currentCommandActivity = MongoTelemetry.StartCommandActivity(
commandName,
Expand All @@ -770,16 +775,6 @@ private void CompleteCommandActivityWithSuccess(BsonDocument reply = null)
}
}

private void CompleteCommandActivityWithException(Exception exception)
{
if (_currentCommandActivity is not null)
{
MongoTelemetry.RecordException(_currentCommandActivity, exception);
_currentCommandActivity.Dispose();
_currentCommandActivity = null;
}
}

private void HandleCommandFailure(
CommandState state,
BsonDocument reply,
Expand Down
Comment thread
ajcvickers marked this conversation as resolved.
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,7 @@ public void ShouldRedactCommand_should_return_expected_result(string commandJson
public void ShouldTrackState_should_be_correct(
[Values(false, true)] bool logCommands,
[Values(false, true)] bool captureCommandSucceeded,
[Values(false, true)] bool captureCommandFailed,
[Values(false, true)] bool traceCommands)
[Values(false, true)] bool captureCommandFailed)
{
var mockLogger = new Mock<ILogger<LogCategories.Command>>();
mockLogger.Setup(m => m.IsEnabled(LogLevel.Debug)).Returns(logCommands);
Expand All @@ -76,16 +75,16 @@ public void ShouldTrackState_should_be_correct(
}

var eventLogger = new EventLogger<LogCategories.Command>(eventCapturer, mockLogger.Object);
var tracingOptions = traceCommands ? new TracingOptions() : new TracingOptions { Disabled = true };
var tracingOptions = new TracingOptions { Disabled = true };
var commandHelper = new CommandEventHelper(eventLogger, tracingOptions);

// No ActivityListener, so tracing doesn't contribute to _shouldTrackState
commandHelper._shouldTrackState().Should().Be(logCommands || captureCommandSucceeded || captureCommandFailed);
// No ActivityListener, so tracing doesn't contribute to _eventsNeedState
commandHelper._eventsNeedState().Should().Be(logCommands || captureCommandSucceeded || captureCommandFailed);
}

[Theory]
[ParameterAttributeData]
public void ShouldTrackState_should_be_correct_with_activity_listener(
public void Callbacks_turn_on_when_listener_is_added_even_if_no_events(
[Values(false, true)] bool logCommands,
[Values(false, true)] bool captureCommandSucceeded,
[Values(false, true)] bool captureCommandFailed,
Expand All @@ -94,13 +93,6 @@ public void ShouldTrackState_should_be_correct_with_activity_listener(
ActivityListener listener = null;
try
{
listener = new ActivityListener
{
ShouldListenTo = source => source.Name == "MongoDB.Driver",
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllData
};
ActivitySource.AddActivityListener(listener);

var mockLogger = new Mock<ILogger<LogCategories.Command>>();
mockLogger.Setup(m => m.IsEnabled(LogLevel.Debug)).Returns(logCommands);

Expand All @@ -119,7 +111,27 @@ public void ShouldTrackState_should_be_correct_with_activity_listener(
var tracingOptions = traceCommands ? new TracingOptions() : new TracingOptions { Disabled = true };
var commandHelper = new CommandEventHelper(eventLogger, tracingOptions);

commandHelper._shouldTrackState().Should().Be(logCommands || captureCommandSucceeded || captureCommandFailed || traceCommands);
// When there are no listeners, these only return true if logging is enabled or an event is registered,
// regardless of whether tracing is enabled.
commandHelper.ShouldCallBeforeSending.Should().Be(captureCommandSucceeded || captureCommandFailed || logCommands);
commandHelper.ShouldCallAfterSending.Should().Be(captureCommandSucceeded || captureCommandFailed || logCommands);
commandHelper.ShouldCallErrorSending.Should().Be(captureCommandSucceeded || captureCommandFailed || logCommands);
commandHelper.ShouldCallAfterReceiving.Should().Be(captureCommandSucceeded || captureCommandFailed || logCommands);
commandHelper.ShouldCallErrorReceiving.Should().Be(captureCommandSucceeded || captureCommandFailed || logCommands);

listener = new ActivityListener
{
ShouldListenTo = source => source.Name == "MongoDB.Driver",
Sample = (ref ActivityCreationOptions<ActivityContext> _) => ActivitySamplingResult.AllData
};
ActivitySource.AddActivityListener(listener);

// With listeners registered, these always return true unless everything is disabled.
commandHelper.ShouldCallBeforeSending.Should().Be(captureCommandSucceeded || captureCommandFailed || logCommands || traceCommands);
commandHelper.ShouldCallAfterSending.Should().Be(captureCommandSucceeded || captureCommandFailed || logCommands || traceCommands);
commandHelper.ShouldCallErrorSending.Should().Be(captureCommandSucceeded || captureCommandFailed || logCommands || traceCommands);
commandHelper.ShouldCallAfterReceiving.Should().Be(captureCommandSucceeded || captureCommandFailed || logCommands || traceCommands);
commandHelper.ShouldCallErrorReceiving.Should().Be(captureCommandSucceeded || captureCommandFailed || logCommands || traceCommands);
}
finally
{
Expand All @@ -130,8 +142,8 @@ public void ShouldTrackState_should_be_correct_with_activity_listener(

internal static class CommandEventHelperReflector
{
public static bool _shouldTrackState(this CommandEventHelper commandEventHelper) =>
(bool)Reflector.GetFieldValue(commandEventHelper, nameof(_shouldTrackState));
public static bool _eventsNeedState(this CommandEventHelper commandEventHelper) =>
(bool)Reflector.GetFieldValue(commandEventHelper, nameof(_eventsNeedState));


public static bool ShouldRedactCommand(BsonDocument command) =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ namespace MongoDB.Driver.SmokeTests.Sdk
[Trait("Category", "Integration")]
public sealed class OpenTelemetryTests
{

[Fact]
public void MongoClient_should_create_activities_when_tracing_enabled()
{
Expand Down Expand Up @@ -84,7 +83,7 @@ public void MongoClient_should_not_create_activities_when_tracing_disabled()
capturedActivities.Should().BeEmpty();
}

private static ActivityListener CreateActivityListener(out List<Activity> capturedActivities)
private static ActivityListener CreateActivityListener(out IReadOnlyCollection<Activity> capturedActivities)
{
var activities = new List<Activity>();
var listener = new ActivityListener
Expand Down