Skip to content

fix(subscriptions): prevent NRE when Ack races with Resubscribe#540

Merged
alexeyzimarev merged 2 commits intodevfrom
fix/ack-resubscribe-race
Mar 30, 2026
Merged

fix(subscriptions): prevent NRE when Ack races with Resubscribe#540
alexeyzimarev merged 2 commits intodevfrom
fix/ack-resubscribe-race

Conversation

@alexeyzimarev
Copy link
Copy Markdown
Contributor

Summary

  • Fix: Ack() now captures CheckpointCommitHandler into a local variable and returns early if null, preventing NRE when DisposeCommitHandler() nulls it concurrently during resubscribe
  • Root cause: AsyncHandlingFilter worker thread calls Ack()CheckpointCommitHandler!.Commit() after Resubscribe() has already set the handler to null via DisposeCommitHandler(), causing a NRE cascade (NRE → Nack → Dropped → resubscribe → repeat)
  • Test: Adds Should_not_throw_nre_when_ack_races_with_resubscribe that deterministically reproduces the race using a slow handler + triggered drop

Test plan

  • New test reproduces the NRE before the fix (confirmed: Assert.Fail with NRE at Nack line 107)
  • New test passes after the fix (31/31 pass)
  • All existing subscription tests still pass

🤖 Generated with Claude Code

During resubscribe, DisposeCommitHandler() sets CheckpointCommitHandler
to null. If the AsyncHandlingFilter worker thread is still processing a
message, it calls Ack() → CheckpointCommitHandler!.Commit() and hits a
NullReferenceException. The NRE cascades through Nack → Dropped →
resubscribe in an infinite loop.

Fix: capture CheckpointCommitHandler into a local variable and return
early if null, using the standard pattern for concurrent null races.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@qodo-free-for-open-source-projects
Copy link
Copy Markdown
Contributor

Review Summary by Qodo

Prevent NRE when Ack races with Resubscribe

🐞 Bug fix 🧪 Tests

Grey Divider

Walkthroughs

Description
• Prevent NullReferenceException when Ack races with Resubscribe
• Capture CheckpointCommitHandler locally before null check
• Add deterministic test reproducing the race condition
• Protect against concurrent handler disposal during message processing
Diagram
flowchart LR
  A["AsyncHandlingFilter worker<br/>processes message"] -->|calls| B["Ack()"]
  C["Resubscribe triggered"] -->|calls| D["DisposeCommitHandler<br/>nulls handler"]
  B -->|reads| E["CheckpointCommitHandler<br/>local variable"]
  D -->|concurrent write| F["CheckpointCommitHandler = null"]
  E -->|null check| G["Return early if null<br/>prevents NRE"]
  F -.->|race condition| B
Loading

Grey Divider

File Changes

1. src/Core/src/Eventuous.Subscriptions/EventSubscriptionWithCheckpoint.cs 🐞 Bug fix +7/-1

Capture handler locally to prevent NRE

• Capture CheckpointCommitHandler into local variable before null check
• Return early if handler is null to prevent NullReferenceException
• Use captured local variable in Commit call instead of null-forgiving operator
• Add explanatory comment about concurrent nulling during resubscribe

src/Core/src/Eventuous.Subscriptions/EventSubscriptionWithCheckpoint.cs


2. src/Core/test/Eventuous.Tests.Subscriptions/ResubscribeOnHandlerFailureTests.cs 🧪 Tests +101/-0

Add deterministic race condition test

• Add Should_not_throw_nre_when_ack_races_with_resubscribe test with retry logic
• Implement SlowAckHandler that blocks during processing to create race window
• Use TaskCompletionSource synchronization to coordinate test timing
• Add TriggerDropped() method to TestPollingSubscription for test control
• Test verifies NRE does not occur when Ack and Resubscribe race

src/Core/test/Eventuous.Tests.Subscriptions/ResubscribeOnHandlerFailureTests.cs


Grey Divider

Qodo Logo

@qodo-free-for-open-source-projects
Copy link
Copy Markdown
Contributor

qodo-free-for-open-source-projects bot commented Mar 30, 2026

Code Review by Qodo

🐞 Bugs (0) 📘 Rule violations (0) 📎 Requirement gaps (0)

Grey Divider


Action required

1. Synchronous TCS continuations🐞 Bug ⛯ Reliability
Description
The new race regression test uses TaskCompletionSource without RunContinuationsAsynchronously, so
TrySetResult can run awaited continuations inline on the calling (test) thread, altering the
intended cross-thread race and making the test flaky/non-representative.
Code

src/Core/test/Eventuous.Tests.Subscriptions/ResubscribeOnHandlerFailureTests.cs[R170-172]

+        var nreTcs        = new TaskCompletionSource<Exception>();
+        var ackStarted    = new TaskCompletionSource();
+        var proceedToAck  = new TaskCompletionSource();
Evidence
The test coordinates a race by blocking inside SlowAckHandler and later unblocking from the test
thread. With default TaskCompletionSource behavior, completing proceedToAck can execute the
continuation of await proceedToAck.Task synchronously on the thread that calls TrySetResult, which
changes where/when the rest of the handler (and subsequent ack path) runs—undermining the purpose of
a deterministic race test.

src/Core/test/Eventuous.Tests.Subscriptions/ResubscribeOnHandlerFailureTests.cs[167-173]
src/Core/test/Eventuous.Tests.Subscriptions/ResubscribeOnHandlerFailureTests.cs[215-218]
src/Core/test/Eventuous.Tests.Subscriptions/ResubscribeOnHandlerFailureTests.cs[240-248]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
`TaskCompletionSource` instances in the new race test are created without `TaskCreationOptions.RunContinuationsAsynchronously`. Because the test completes these TCSs from the test thread while the handler is awaiting them, continuations may run inline on the test thread, changing thread scheduling and making the race test flaky/non-representative.
### Issue Context
This test is specifically trying to validate a cross-thread race between the async worker’s Ack path and resubscribe/disposal, so avoiding inline continuations is important to keep the timing/threading behavior stable.
### Fix Focus Areas
- src/Core/test/Eventuous.Tests.Subscriptions/ResubscribeOnHandlerFailureTests.cs[167-173]
- src/Core/test/Eventuous.Tests.Subscriptions/ResubscribeOnHandlerFailureTests.cs[215-218]
- src/Core/test/Eventuous.Tests.Subscriptions/ResubscribeOnHandlerFailureTests.cs[240-248]
### Suggested change
Construct the TCSs with `TaskCreationOptions.RunContinuationsAsynchronously`, e.g.:
- `new TaskCompletionSource(TaskCreationOptions.RunContinuationsAsynchronously)`
- `new TaskCompletionSource<Exception>(TaskCreationOptions.RunContinuationsAsynchronously)`
Apply to `nreTcs`, `ackStarted`, and `proceedToAck`.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools



Advisory comments

2. Misleading race-test comments🐞 Bug ⚙ Maintainability
Description
The new test comments still describe the pre-fix implementation (CheckpointCommitHandler!.Commit and
a specific line number), which is now incorrect and will confuse future maintainers about what the
test is asserting.
Code

src/Core/test/Eventuous.Tests.Subscriptions/ResubscribeOnHandlerFailureTests.cs[R159-164]

+    /// <summary>
+    /// Reproduces a race condition during resubscribe: the AsyncHandlingFilter worker thread
+    /// calls Acknowledge() → Ack() → CheckpointCommitHandler!.Commit() after Resubscribe()
+    /// has already set CheckpointCommitHandler to null via DisposeCommitHandler().
+    /// The null-forgiving operator on line 98 causes a NullReferenceException.
+    /// </summary>
Evidence
The test summary claims the NRE comes from a null-forgiving operator at a specific line and later
repeats the CheckpointCommitHandler!.Commit() call chain. The production code in this PR no longer
uses the null-forgiving operator and instead captures the handler into a local variable before
committing, so the comments are now out of date.

src/Core/test/Eventuous.Tests.Subscriptions/ResubscribeOnHandlerFailureTests.cs[159-164]
src/Core/test/Eventuous.Tests.Subscriptions/ResubscribeOnHandlerFailureTests.cs[215-217]
src/Core/src/Eventuous.Subscriptions/EventSubscriptionWithCheckpoint.cs[91-108]

Agent prompt
The issue below was found during a code review. Follow the provided context and guidance below and implement a solution

## Issue description
The new test’s XML doc and inline comments reference the old buggy implementation (`CheckpointCommitHandler!.Commit()`) and a hard-coded production line number. After this PR’s fix, those statements are no longer accurate and will mislead maintainers.
### Issue Context
Comments should describe the behavior being validated ("Ack must not throw if commit handler is concurrently nulled/disposed") without referencing brittle line numbers or removed syntax.
### Fix Focus Areas
- src/Core/test/Eventuous.Tests.Subscriptions/ResubscribeOnHandlerFailureTests.cs[159-164]
- src/Core/test/Eventuous.Tests.Subscriptions/ResubscribeOnHandlerFailureTests.cs[215-217]
- src/Core/src/Eventuous.Subscriptions/EventSubscriptionWithCheckpoint.cs[91-108]
### Suggested change
Rewrite comments to:
- Remove the explicit line-number reference.
- Describe that the test reproduces the historical race and asserts the current fix prevents an NRE when resubscribe/dispose nulls the handler concurrently.
- Optionally mention the fix strategy (local capture + null-check) without quoting exact code operators.

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools


Grey Divider

ⓘ The new review experience is currently in Beta. Learn more

Grey Divider

Qodo Logo

@github-actions
Copy link
Copy Markdown

github-actions bot commented Mar 30, 2026

Test Results

   66 files  + 44     66 suites  +44   44m 39s ⏱️ + 30m 14s
  360 tests + 11    360 ✅ + 11  0 💤 ±0  0 ❌ ±0 
1 083 runs  +723  1 083 ✅ +723  0 💤 ±0  0 ❌ ±0 

Results for commit aa1455a. ± Comparison against base commit 4ba72de.

This pull request removes 5 and adds 16 tests. Note that renamed tests count towards both.
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(3/20/2026 5:13:27 PM +00:00)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(3/20/2026 5:13:27 PM)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(60d3927e-76fe-44ab-b911-52fc502c51b3)
Eventuous.Tests.Subscriptions.SequenceTests ‑ ShouldReturnFirstBefore(CommitPosition { Position: 0, Sequence: 1, Timestamp: 2026-03-20T17:13:27.8532500+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2026-03-20T17:13:27.8532500+00:00 }, CommitPosition { Position: 0, Sequence: 4, Timestamp: 2026-03-20T17:13:27.8532500+00:00 }, CommitPosition { Position: 0, Sequence: 6, Timestamp: 2026-03-20T17:13:27.8532500+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2026-03-20T17:13:27.8532500+00:00 })
Eventuous.Tests.Subscriptions.SequenceTests ‑ ShouldReturnFirstBefore(CommitPosition { Position: 0, Sequence: 1, Timestamp: 2026-03-20T17:13:27.8532500+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2026-03-20T17:13:27.8532500+00:00 }, CommitPosition { Position: 0, Sequence: 6, Timestamp: 2026-03-20T17:13:27.8532500+00:00 }, CommitPosition { Position: 0, Sequence: 8, Timestamp: 2026-03-20T17:13:27.8532500+00:00 }, CommitPosition { Position: 0, Sequence: 2, Timestamp: 2026-03-20T17:13:27.8532500+00:00 })
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(2da7653d-a407-46f7-828c-fc85d9d6c734)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(3/30/2026 7:53:13 AM +00:00)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(3/30/2026 7:53:13 AM)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(3/30/2026 7:53:14 AM +00:00)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(3/30/2026 7:53:14 AM)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(3/30/2026 7:53:15 AM +00:00)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(3/30/2026 7:53:15 AM)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(d3e0ddaa-e880-4200-b7da-3b49c67ef598)
Eventuous.Tests.Azure.ServiceBus.IsSerialisableByServiceBus ‑ Passes(fbd6f4d3-1778-4d3f-a8f8-7025d065b8d8)
Eventuous.Tests.Subscriptions.ResubscribeOnHandlerFailureTests ‑ Should_not_throw_nre_when_ack_races_with_resubscribe
…

♻️ This comment has been updated with latest results.

- Use RunContinuationsAsynchronously on all TaskCompletionSource instances
  to prevent inline continuations altering the cross-thread race behavior
- Update test comments to describe current behavior instead of referencing
  the old buggy code and line numbers

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>
@alexeyzimarev alexeyzimarev merged commit 59518c6 into dev Mar 30, 2026
6 checks passed
@alexeyzimarev alexeyzimarev deleted the fix/ack-resubscribe-race branch March 30, 2026 08:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant