Skip to content

Commit 59518c6

Browse files
fix(subscriptions): prevent NRE when Ack races with Resubscribe (#540)
* fix(subscriptions): prevent NRE when Ack races with Resubscribe During resubscribe, DisposeCommitHandler() sets CheckpointCommitHandler to null. If the AsyncHandlingFilter worker thread is still processing a message, it calls Ack() → CheckpointCommitHandler!.Commit() and hits a NullReferenceException. The NRE cascades through Nack → Dropped → resubscribe in an infinite loop. Fix: capture CheckpointCommitHandler into a local variable and return early if null, using the standard pattern for concurrent null races. Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> * fix(test): address PR review feedback - Use RunContinuationsAsynchronously on all TaskCompletionSource instances to prevent inline continuations altering the cross-thread race behavior - Update test comments to describe current behavior instead of referencing the old buggy code and line numbers Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com> --------- Co-authored-by: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
1 parent 4ba72de commit 59518c6

2 files changed

Lines changed: 106 additions & 1 deletion

File tree

src/Core/src/Eventuous.Subscriptions/EventSubscriptionWithCheckpoint.cs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -90,12 +90,18 @@ ValueTask NackOnAsyncWorker(IMessageConsumeContext context, Exception exception)
9090

9191
[MethodImpl(MethodImplOptions.AggressiveInlining)]
9292
ValueTask Ack(IMessageConsumeContext context) {
93+
// Capture locally — CheckpointCommitHandler can be nulled by Resubscribe/DisposeCommitHandler
94+
// on another thread while the async worker is still completing a message.
95+
var handler = CheckpointCommitHandler;
96+
97+
if (handler is null) return default;
98+
9399
var eventPosition = GetPositionFromContext(context);
94100
LastProcessed = eventPosition;
95101

96102
context.LogContext.MessageAcked(context.MessageType, context.GlobalPosition);
97103

98-
return CheckpointCommitHandler!.Commit(
104+
return handler.Commit(
99105
new(eventPosition.Position!.Value, context.Sequence, eventPosition.Created) { LogContext = context.LogContext },
100106
context.CancellationToken
101107
);

src/Core/test/Eventuous.Tests.Subscriptions/ResubscribeOnHandlerFailureTests.cs

Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,6 +156,99 @@ public override ValueTask<EventHandlingStatus> HandleEvent(IMessageConsumeContex
156156
}
157157
}
158158

159+
/// <summary>
160+
/// Validates that Ack does not throw when CheckpointCommitHandler is concurrently
161+
/// nulled by Resubscribe/DisposeCommitHandler on another thread while the
162+
/// AsyncHandlingFilter worker is still completing a message.
163+
/// </summary>
164+
[Test]
165+
[Retry(3)]
166+
public async Task Should_not_throw_nre_when_ack_races_with_resubscribe(CancellationToken ct) {
167+
// Arrange
168+
var loggerFactory = LoggingExtensions.GetLoggerFactory();
169+
var nreTcs = new TaskCompletionSource<Exception>(TaskCreationOptions.RunContinuationsAsynchronously);
170+
var ackStarted = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
171+
var proceedToAck = new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously);
172+
173+
var options = new TestSubscriptionOptions {
174+
SubscriptionId = "test-ack-race",
175+
ThrowOnError = true,
176+
CheckpointCommitBatchSize = 1,
177+
CheckpointCommitDelayMs = 100
178+
};
179+
180+
// A handler that signals when it's about to ack, then waits for the test to
181+
// trigger resubscribe before the ack path runs.
182+
var handler = new SlowAckHandler(ackStarted, proceedToAck);
183+
var pipe = new ConsumePipe().AddDefaultConsumer(handler);
184+
185+
var checkpointStore = new NoOpCheckpointStore();
186+
187+
var subscription = new TestPollingSubscription(
188+
options,
189+
checkpointStore,
190+
pipe,
191+
loggerFactory,
192+
eventCount: 20
193+
);
194+
195+
// Act
196+
await subscription.Subscribe(
197+
_ => { },
198+
(_, _, ex) => {
199+
if (ex is NullReferenceException nre) nreTcs.TrySetResult(nre);
200+
},
201+
ct
202+
);
203+
204+
// Wait until the handler has processed an event and is about to ack
205+
var started = await Task.WhenAny(ackStarted.Task, Task.Delay(TimeSpan.FromSeconds(10), ct));
206+
started.ShouldBe(ackStarted.Task, "Handler should have started processing an event");
207+
208+
// Now trigger Dropped → Resubscribe, which will null CheckpointCommitHandler
209+
subscription.TriggerDropped();
210+
211+
// Give Resubscribe a moment to dispose the commit handler
212+
await Task.Delay(200, ct);
213+
214+
// Let the handler complete — the AsyncHandlingFilter worker will now call Acknowledge → Ack.
215+
// Without the fix, the commit handler is already null at this point, causing an NRE.
216+
proceedToAck.TrySetResult();
217+
218+
// Assert — wait for either the NRE or a timeout
219+
var result = await Task.WhenAny(nreTcs.Task, Task.Delay(TimeSpan.FromSeconds(5), ct));
220+
221+
if (result == nreTcs.Task) {
222+
var exception = await nreTcs.Task;
223+
Assert.Fail(
224+
$"NullReferenceException in Ack path during resubscribe race: {exception}. " +
225+
"CheckpointCommitHandler was null when Ack tried to call Commit()."
226+
);
227+
}
228+
229+
// Cleanup
230+
await subscription.Unsubscribe(_ => { }, ct);
231+
}
232+
233+
/// <summary>
234+
/// A handler that signals the test when processing is happening,
235+
/// then blocks until the test allows it to complete. This creates the
236+
/// window for the race between Ack and Resubscribe.
237+
/// </summary>
238+
class SlowAckHandler(TaskCompletionSource ackStarted, TaskCompletionSource proceedToAck) : BaseEventHandler {
239+
int _signaled;
240+
241+
public override async ValueTask<EventHandlingStatus> HandleEvent(IMessageConsumeContext context) {
242+
// Signal only on the first event to avoid double-signaling
243+
if (Interlocked.CompareExchange(ref _signaled, 1, 0) == 0) {
244+
ackStarted.TrySetResult();
245+
await proceedToAck.Task;
246+
}
247+
248+
return EventHandlingStatus.Success;
249+
}
250+
}
251+
159252
record TestSubscriptionOptions : SubscriptionWithCheckpointOptions;
160253

161254
/// <summary>
@@ -182,6 +275,12 @@ class TestPollingSubscription(
182275
) {
183276
TaskRunner? _runner;
184277

278+
/// <summary>
279+
/// Exposes the protected Dropped method so the test can trigger a resubscribe.
280+
/// </summary>
281+
public void TriggerDropped()
282+
=> Dropped(DropReason.SubscriptionError, new InvalidOperationException("Simulated drop for race test"));
283+
185284
protected override ValueTask Subscribe(CancellationToken cancellationToken) {
186285
_runner = new TaskRunner(token => PollEvents(token)).Start();
187286

0 commit comments

Comments
 (0)