perf: replace exception-based timeout with WaitToReadAsync in ConsumeAsync#595
perf: replace exception-based timeout with WaitToReadAsync in ConsumeAsync#595
Conversation
…Async Replace ReadAsync(timeoutCts.Token) with WaitToReadAsync + TryRead pattern in the consumer's prefetch channel read path. This eliminates per-timeout OperationCanceledException allocations that occurred on every prefetch timeout cycle when the consumer is waiting for data. WaitToReadAsync returns false on timeout instead of throwing, avoiding the exception allocation overhead. The ChannelClosedException catch is also removed since WaitToReadAsync propagates the underlying exception directly.
There was a problem hiding this comment.
Review
The optimization direction is correct and aligns well with Dekaf's zero-allocation goals. Replacing the exception-based timeout path with WaitToReadAsync + TryRead is a legitimate performance improvement. I verified the key behavioral claims experimentally before writing this review.
What was verified
WaitToReadAsync exception propagation (confirmed by test):
ReadAsyncwraps completion errors inChannelClosedException(inner = original exception)WaitToReadAsyncthrows the original exception directly — no wrapping
This confirms the comment in the PR is accurate and the removal of the ChannelClosedException catch is correct. The KafkaException will propagate up through ConsumeAsync to the caller unchanged.
Issues found
1. Unhandled WaitToReadAsync returning false — silent infinite loop risk
When WaitToReadAsync returns false, it means the channel was completed without error. The new code silently ignores this:
if (await _prefetchChannel.Reader.WaitToReadAsync(timeoutCts.Token).ConfigureAwait(false))
{
// only executed when true
}
// false falls through silently to the next loop iterationThe outer loop is while (!cancellationToken.IsCancellationRequested), so if the channel is ever closed with Complete() (no error), the consumer would spin in an infinite tight loop — fetching nothing, looping forever, consuming 100% CPU.
Today _prefetchChannel.Writer.TryComplete(ex) is the only completion point in the codebase, so false cannot happen in practice. But this is a fragile invariant: a future call to _prefetchChannel.Writer.Complete() (e.g., during disposal or rebalance cleanup) would trigger the infinite loop with no indication why.
The old ReadAsync code would have thrown ChannelClosedException (with null inner exception) in this case — it was also technically unhandled, but at least the exception would surface as an error rather than silent spinning.
Suggested fix:
if (!await _prefetchChannel.Reader.WaitToReadAsync(timeoutCts.Token).ConfigureAwait(false))
{
// Channel completed normally (no more data will arrive)
// Break or return — depends on intended semantics
break;
}
if (_prefetchChannel.Reader.TryRead(out var fetched))
{
_pendingFetches.Enqueue(fetched);
TrackPrefetchedBytes(fetched, release: true);
}Or at minimum, treat false as a signal to stop waiting (similar to how WaitToReadAsync returning false is handled in BrokerSender.cs where it explicitly breaks the loop).
2. TryRead silent miss is benign but worth a comment
When WaitToReadAsync returns true and TryRead returns false, an item is silently skipped. With SingleReader = true this cannot happen in practice, but the inner if has no else clause. The existing comment explains the optimization motivation — it would be clearer to also note why the inner TryRead miss is safe:
// SingleReader = true guarantees no concurrent drain, so TryRead cannot miss after WaitToReadAsync(true)
if (_prefetchChannel.Reader.TryRead(out var fetched))3. Integration tests not yet run
The test plan explicitly marks integration tests as not run (unchecked). Given this changes control flow in the consumer's hot path (ConsumeAsync), integration test coverage with a real Kafka instance is important to catch timing or ordering regressions.
Summary
The optimization is sound and the exception-propagation behavior is correct. The main concern is item 1 — the false return path creates a latent infinite-loop bug. The BrokerSender.cs pattern (explicitly break or return when WaitToReadAsync returns false) should be applied here for consistency and defensive correctness.
There was a problem hiding this comment.
Review: perf: replace exception-based timeout with WaitToReadAsync in ConsumeAsync
Overall, this is a sound optimization. I verified the core behavioral claim using a quick test: WaitToReadAsync does indeed throw a faulted channel's completion exception directly (e.g., KafkaException), whereas ReadAsync wraps it in ChannelClosedException. The PR description is accurate.
That said, there are a few points worth discussing.
Correctness: Error propagation change is real and needs explicit acknowledgement
Before: KafkaException was caught from ChannelClosedException.InnerException and re-thrown explicitly.
After: WaitToReadAsync throws the KafkaException directly. Since there is no catch (KafkaException) in this scope, it propagates up through ConsumeAsync to the caller — which is the correct behavior. But this is a silent behavioral change in terms of exception stack traces and the call site where the exception surfaces. It's correct, but the comment only explains the "no wrapping" benefit without calling out that unhandled KafkaException from the prefetch path now propagates differently up the async enumerator stack. Worth adding a comment or a test asserting this path.
Potential silent data loss: TryRead returning false after WaitToReadAsync returning true
if (await _prefetchChannel.Reader.WaitToReadAsync(timeoutCts.Token).ConfigureAwait(false))
{
// TryRead cannot miss: SingleReader = true guarantees no concurrent drain
if (_prefetchChannel.Reader.TryRead(out var fetched))
{
_pendingFetches.Enqueue(fetched);
TrackPrefetchedBytes(fetched, release: true);
}
// ← If TryRead returns false here, the item is silently skipped
}The comment claims TryRead cannot miss because SingleReader = true. This is correct during normal operation — no concurrent readers exist. However, the inner if has no else branch. If for any unexpected reason TryRead returns false, the loop just continues silently. This is a latent correctness risk.
Suggestion: Make the miss visible with a diagnostic assertion or at minimum add an else branch that logs an unexpected condition:
if (_prefetchChannel.Reader.TryRead(out var fetched))
{
_pendingFetches.Enqueue(fetched);
TrackPrefetchedBytes(fetched, release: true);
}
else
{
// Should not happen with SingleReader=true; defensive guard only
System.Diagnostics.Debug.Fail("WaitToReadAsync signalled data available but TryRead returned false");
}The break on normal channel completion changes control flow
else
{
// Channel completed normally — prefetch loop has stopped (e.g., shutdown)
break;
}WaitToReadAsync returning false means the channel was completed without error (normal shutdown via Writer.Complete()). The break exits the while (!cancellationToken.IsCancellationRequested) loop entirely, which ends ConsumeAsync iteration silently — no exception, no final yield. This is a new behavior that did not exist before (previously, normal channel completion would have caused ReadAsync to throw ChannelClosedException without matching the KafkaException filter, which would propagate to the caller).
Is this the intended behavior? If the prefetch channel shuts down due to a normal consumer dispose, the cancellationToken is presumably also cancelled, so the outer loop condition !cancellationToken.IsCancellationRequested would catch it. The break here would only fire in the edge case where the channel is completed normally while the cancellation token is not yet cancelled. It would be worth confirming no messages are lost in this window (i.e., the channel is always drained before Complete() is called).
Minor: No test coverage for the new path
The PR checklist notes integration tests weren't run with Docker, and no unit tests are added. The changed behavior in error propagation (KafkaException surfacing directly vs. via ChannelClosedException unwrap) and the new break on normal channel completion are both testable with mocked channels. Given the project's policy of comprehensive test coverage for behavioral changes, a unit test covering these two paths would strengthen the PR.
Summary
| Issue | Severity |
|---|---|
Silent TryRead miss with no diagnostic |
Low — SingleReader makes it practically impossible, but defensiveness is cheap |
break on normal channel completion — intent unclear |
Low-Medium — needs verification that no messages are lost |
| Error propagation stack trace change undocumented | Informational |
| No new unit tests for changed behavior | Low — the old code also lacked targeted tests for these paths |
The optimization itself is valid and the allocation savings are real (OperationCanceledException allocation on every timeout cycle is eliminated). The code is cleaner. The items above are mostly about correctness hardening and documentation, not blockers.
There was a problem hiding this comment.
Review (follow-up after commits 2 & 3)
Both items raised in the previous reviews have been addressed:
- The
WaitToReadAsyncreturningfalsecase now correctlybreaks out of the loop instead of spinning silently. ✓ - The
TryReadmiss case now has aDebug.Failassertion for defensive diagnostics. ✓
The core optimization (eliminating per-timeout OperationCanceledException allocations) is correct and the implementation is cleaner than the original. The exception propagation behavior is also accurate — WaitToReadAsync surfaces the KafkaException directly without a ChannelClosedException wrapper, and removing that catch block is the right call.
Remaining concern: break exits ConsumeAsync silently on normal channel completion
The break on WaitToReadAsync returning false exits the top-level while (!cancellationToken.IsCancellationRequested) loop in ConsumeAsync. This ends the async enumerable iteration without throwing an exception and without the cancellation token being cancelled.
Looking at line 921, _prefetchChannel.Writer.TryComplete(ex) is only called with a non-null KafkaException, and that path also rethrows the exception which cancels the prefetch task. There is currently no code path that calls Complete() (no-error) on the writer — so in practice, WaitToReadAsync returning false cannot be reached today.
The concern is future-proofing: if someone adds a graceful shutdown path that calls _prefetchChannel.Writer.Complete() (e.g., during rebalance or DisposeAsync), ConsumeAsync will silently stop yielding messages with no signal to the caller. A consumer written as:
await foreach (var msg in consumer.ConsumeAsync(ct))
{
// process...
}
// Code here runs — but why did it stop?...would exit the loop with no indication whether it stopped due to cancellation or channel closure. Whether this is the intended "graceful stop" semantics for Dekaf is a design question worth explicitly documenting in the code. If the intent is "channel completion = consumer is done", a comment to that effect (or a dedicated ConsumerStoppedException) would make the contract clear to future maintainers.
This is low-severity since the unreachable path can't cause harm today, but it's worth a decision: is silent completion the intended contract, or should this be an error/exception?
Integration tests still not run
The PR checklist still has integration tests unchecked. The change touches the hot path inside ConsumeAsync, specifically the branch taken on every timeout cycle when the prefetch channel has no data ready. A timing or ordering regression here would only show up under real Kafka conditions. Running the integration test suite before merging is recommended per the project's testing requirements.
Summary
| Item | Status |
|---|---|
Infinite loop on WaitToReadAsync returning false |
Fixed ✓ |
Missing Debug.Fail on TryRead miss |
Fixed ✓ |
Silent ConsumeAsync exit on normal channel completion |
Needs decision/documentation |
| Integration tests not run | Outstanding |
The optimization is sound and the code is in good shape. The outstanding items are low-risk for the current codebase but worth resolving before merge for long-term maintainability.
There was a problem hiding this comment.
Review (follow-up after commit 4)
Previous outstanding items — status
Silent ConsumeAsync exit on normal channel completion — Addressed ✓
Commit 4 added an explanatory comment to the break on WaitToReadAsync returning false:
// Currently unreachable: TryComplete() is only called with a KafkaException,
// never with null. This guard prevents a silent infinite loop if a future
// code path adds graceful channel completion (e.g., during rebalance).This is the right call. The comment documents the invariant, explains why the code exists despite being unreachable today, and serves as a warning to future maintainers who might add a graceful-shutdown path. The design question I raised (silent vs. exception on normal completion) is now answered: the intent is explicit silent break as a defensive guard, not an error condition. That's a reasonable and documented contract.
Integration tests not run — Still outstanding
The PR checklist still has integration tests unchecked. This is the only remaining item. The change touches the hot path inside ConsumeAsync on every timeout cycle, and a timing or ordering regression would only surface under real Kafka conditions. Per the project's testing requirements, running the integration suite before merging is recommended.
Overall assessment
All code-level concerns from previous reviews have been addressed:
| Item | Status |
|---|---|
Infinite loop on WaitToReadAsync returning false |
Fixed ✓ |
Missing Debug.Fail on TryRead miss |
Fixed ✓ |
Silent ConsumeAsync exit — decision/documentation |
Documented ✓ |
| Integration tests not run | Outstanding |
The implementation is clean, correct, and well-commented. The optimization (eliminating OperationCanceledException allocations on every prefetch timeout) is valid, the exception propagation behavior is accurate, and the defensive guards are in place. The code is ready to merge once integration tests are confirmed to pass.
Summary
ReadAsync(timeoutCts.Token)withWaitToReadAsync+TryReadin the consumer prefetch channel read pathOperationCanceledExceptionallocations that occur on every prefetch timeout cycleChannelClosedExceptioncatch sinceWaitToReadAsyncpropagates the underlying exception directlyMotivation
Consumer profiling showed
SetException/ExceptionDispatchInfo.Throwat 0.02-0.2% CPU from the exception-based timeout pattern. While small, this is an unnecessary allocation on every timeout cycle when the consumer is waiting for data.Test plan