perf: eliminate per-message diagnostics overhead in consumer#570
perf: eliminate per-message diagnostics overhead in consumer#570
Conversation
There was a problem hiding this comment.
Review: perf: eliminate per-message diagnostics overhead in consumer
This is a well-motivated performance improvement. The change correctly applies the standard .NET zero-cost diagnostics pattern to the consumer hot path. The overall approach is sound, but there are a few issues worth addressing before merge.
Correctness Issue: BytesReceived is guarded by MessagesReceived.Enabled
The metrics guard uses MessagesReceived.Enabled to protect both counters:
if (Diagnostics.DekafMetrics.MessagesReceived.Enabled)
{
var metricTags = new System.Diagnostics.TagList { ... };
Diagnostics.DekafMetrics.MessagesReceived.Add(1, metricTags);
Diagnostics.DekafMetrics.BytesReceived.Add(messageBytes, metricTags);
}While MessagesReceived and BytesReceived are created from the same Meter instance and will almost always have the same Enabled state, the .Enabled property is per-instrument — a MeterListener can call EnableMeasurementEvents on individual instruments. A listener subscribing only to messaging.client.consumed.bytes (e.g., for bandwidth billing) would silently get no data because MessagesReceived.Enabled is false.
The correct approach is to check each instrument independently, or use the logical OR if you want to share the TagList allocation:
if (Diagnostics.DekafMetrics.MessagesReceived.Enabled || Diagnostics.DekafMetrics.BytesReceived.Enabled)
{
var metricTags = new System.Diagnostics.TagList
{ { Diagnostics.DekafDiagnostics.MessagingDestinationName, pending.Topic } };
if (Diagnostics.DekafMetrics.MessagesReceived.Enabled)
Diagnostics.DekafMetrics.MessagesReceived.Add(1, metricTags);
if (Diagnostics.DekafMetrics.BytesReceived.Enabled)
Diagnostics.DekafMetrics.BytesReceived.Add(messageBytes, metricTags);
}This matches what the runtime itself does internally and avoids any surprise for users who selectively subscribe to metrics. Note that the same pattern applies to the producer's Retries counter in BrokerSender.cs — it's not guarded at all — but that's a pre-existing issue and not in scope here.
Behaviour Change: Consumer spans are no longer created without a producer context
In the original code, StartActivity was always called (guarded only by HasListeners() when producerContext is absent):
// Before: always calls StartActivity, even without a traceparent header
else
{
activity = DekafDiagnostics.Source.StartActivity(
pending.ActivityName,
ActivityKind.Consumer);
}In the new code, the entire tracing block (including the else branch) is inside if (HasListeners()). This is functionally correct — StartActivity returns null anyway when HasListeners() is false — so there is no change in observable behaviour. This is a good simplification and the comment on line 690 is now accurate.
However, there is a subtle asymmetry worth noting: in the no-traceparent path, the code now creates an unlinked consumer span (StartActivity with no links) inside the HasListeners guard. This is correct per OTel messaging semantics (a standalone consumer span without a producer link is still valid). Just verifying the intent is preserved.
Missing Test Coverage for the Guard Paths
The existing DekafDiagnosticsTests in tests/Dekaf.Tests.Unit/Diagnostics/ tests the tracing logic at the ActivitySource/TraceContextPropagator level but does not exercise the consumer hot path directly. There are no integration tests verifying:
- That consumer spans are not created when no
ActivityListeneris attached (verifying theHasListeners()guard fires correctly end-to-end viaConsumeAsync). - That
MessagesReceived/BytesReceivedcounters are not incremented when noMeterListeneris subscribed. - That tracing and metrics do work correctly after this refactor when listeners are attached — the PR test plan marks the last two bullets as unchecked.
Given that the existing ProducerDiagnosticTests integration test suite exists precisely for this kind of regression check, adding a ConsumerDiagnosticTests counterpart (or extending the existing consumer integration tests) would give confidence that the refactored guard logic doesn't accidentally suppress diagnostics when they should be active.
Minor: The HasListeners() comment was updated but is slightly imprecise
The original comment said ~2ns no-op when no listener (which described the old always-executed StartActivity return path). The new comment says "fully guarded so zero work when no listener" which is accurate. But HasListeners() itself is a volatile field read (~1ns), so the comment could be even more specific. This is very minor and not a blocker.
Summary
| # | Severity | Issue |
|---|---|---|
| 1 | Medium | BytesReceived silently suppressed if only BytesReceived has a listener (instrument-level Enabled check asymmetry) |
| 2 | Low | No integration test coverage for the guard paths — regression risk for "tracing works when listener is attached" |
| 3 | Nitpick | Comment precision |
The core approach — HasListeners() guard for tracing and Enabled guard for metrics — is the correct .NET pattern and the performance improvement is real. Addressing issue #1 is the only change needed before merging.
There was a problem hiding this comment.
Review: perf: eliminate per-message diagnostics overhead in consumer
The approach is correct and follows standard .NET diagnostics patterns. The HasListeners() and Enabled guards are the right tools for zero-cost diagnostics in hot paths. A few observations:
Minor: Redundant double-check on metrics guard
The outer if already gates entry on either counter being enabled, but the inner if checks each counter independently:
if (Diagnostics.DekafMetrics.MessagesReceived.Enabled || Diagnostics.DekafMetrics.BytesReceived.Enabled)
{
var metricTags = new System.Diagnostics.TagList { ... };
if (Diagnostics.DekafMetrics.MessagesReceived.Enabled)
Diagnostics.DekafMetrics.MessagesReceived.Add(1, metricTags);
if (Diagnostics.DekafMetrics.BytesReceived.Enabled)
Diagnostics.DekafMetrics.BytesReceived.Add(messageBytes, metricTags);
}The inner per-instrument guards are unnecessary in practice — if either counter is enabled the TagList must be allocated anyway, so checking again before each Add() call adds two redundant boolean reads with no allocation saving. The TagList allocation is already gated; just call both Add() unconditionally inside the block. The Counter<T>.Add() method internally checks Enabled before recording, so the extra outer guard is sufficient.
Inconsistency: Producer hot path is not guarded
The consumer side now correctly gates TagList allocation behind Enabled, but the producer path at lines ~427–431 and ~439–440 of KafkaProducer.cs still unconditionally creates TagList structs and calls Add() regardless of whether any meter listener is attached:
// KafkaProducer.cs — no Enabled guard
var tagList = new TagList { { Diagnostics.DekafDiagnostics.MessagingDestinationName, topic } };
Diagnostics.DekafMetrics.MessagesSent.Add(1, tagList);
Diagnostics.DekafMetrics.BytesSent.Add(metadata.KeySize + metadata.ValueSize, tagList);
Diagnostics.DekafMetrics.OperationDuration.Record(..., tagList);The producer path occurs once per batch delivery acknowledgement (not per message), so the impact per message is amortized, but it is still an allocation at the critical delivery-confirmation path and the optimization is asymmetric with the consumer change. Applying the same Enabled guard there would be the consistent follow-up.
Behaviour change: activity started without producer context when no traceparent header
The original code only started a StartActivity when producerContext.HasValue. The new code also starts an activity when there is no traceparent header (the else branch). This is technically a pre-existing behaviour that was already there before this PR (the else branch existed in the original), but it is now reachable more clearly — this means every consumed message unconditionally starts a consumer span when any ActivityListener is attached, even for messages that were never produced with tracing. This is consistent with OTel messaging conventions (consumer spans exist independently of whether the producer was instrumented) but it is worth confirming this is intentional and that the Activity.Current = savedActivity restore in the finally block is tested for the no-header case.
Overall
The core change is sound, well-motivated, and uses the correct .NET patterns. The HasListeners() guard on the tracing block and the Enabled guard on TagList allocation are both well-established idioms. The main actionable item is making the producer metric path consistent with this change.
There was a problem hiding this comment.
Review: Follow-up after author's iterations
The author has now addressed both substantive issues raised in the previous two reviews. Here is the current state:
Changes since prior reviews are correct
Issue 1 (Medium — BytesReceived could be silently suppressed): Fixed. The outer guard now uses ||:
if (Diagnostics.DekafMetrics.MessagesReceived.Enabled || Diagnostics.DekafMetrics.BytesReceived.Enabled)A MeterListener subscribing only to messaging.client.consumed.bytes will now correctly receive measurements.
Issue 2 (Minor — redundant inner per-instrument checks): Fixed in commit 6f1ef64e. The inner if (MessagesReceived.Enabled) / if (BytesReceived.Enabled) guards were removed; both Add() calls are now unconditional inside the outer block. This is correct because Counter<T>.Add() internally checks Enabled, and the TagList allocation is already gated by the outer if.
One remaining observation: Activity.Current mutation is not thread-safe when HasListeners() guard is added
This is a subtle issue that exists in the new structure. The code temporarily sets Activity.Current = null to prevent StartActivity from inheriting an ambient parent. Activity.Current is a [ThreadStatic] field, so it is per-thread and the mutation is safe within a single await-free synchronous block.
However, ConsumeAsync is an async IAsyncEnumerable method. The block:
var savedActivity = System.Diagnostics.Activity.Current;
System.Diagnostics.Activity.Current = null;
try
{
// StartActivity calls
}
finally
{
System.Diagnostics.Activity.Current = savedActivity;
}...does not cross any await point, so the current thread's [ThreadStatic] state is not at risk. This was true both before and after this PR. The refactor is safe.
Still open: missing test coverage for guard paths
Both previous reviews flagged the absence of integration tests verifying that:
- Consumer spans are not created when no
ActivityListeneris attached. - Consumer spans are created when an
ActivityListeneris attached (regression guard for the refactored path). MessagesReceived/BytesReceivedcounters are not incremented when noMeterListeneris subscribed.
The PR test plan still marks the last two items as unchecked. The HasListeners() guard is a correct .NET pattern, but without an integration test, a future refactor could accidentally remove or invert the guard and no CI gate would catch it. Given that ProducerDiagnosticTests already exists as a model, adding a ConsumerDiagnosticTests counterpart remains the recommended follow-up before merge.
Summary
| # | Status | Issue |
|---|---|---|
| 1 | Resolved | BytesReceived now correctly guarded with ` |
| 2 | Resolved | Redundant inner Enabled checks removed |
| 3 | Still open | No integration test coverage for the HasListeners() / Enabled guard paths |
The correctness issue is fully resolved. The code is clean and the approach is the right pattern for zero-cost .NET diagnostics. The remaining gap is test coverage. Whether to block on that or track it as a follow-up is the author's call, but adding at least one integration test that attaches an ActivityListener and verifies a consumer span is produced would significantly reduce regression risk.
There was a problem hiding this comment.
Review: Follow-up on the ChannelClosedException commits
The previous three rounds of review resolved all issues with the diagnostics guards — those are in good shape. This review focuses on the final commit (e11b5d10), which was not covered in the prior reviews.
Concern: Unfiltered ChannelClosedException catch silently swallows error-propagation exceptions
The commit adds two bare catch blocks in ConsumeOneAsync:
catch (ChannelClosedException)
{
// Prefetch channel closed during timeout - treat as no messages available
}Compare this with the existing ChannelClosedException handler at line 649 of the main ConsumeAsync loop:
catch (ChannelClosedException ex) when (ex.InnerException is KafkaException kafkaEx)
{
// Rethrow the original KafkaException from the prefetch task
throw kafkaEx;
}That existing handler is carefully filtered: it only catches ChannelClosedException when the inner exception is a KafkaException, meaning the prefetch background task faulted and propagated the error through the channel. When that filter does not match — i.e., InnerException is something other than a KafkaException — the exception propagates up as-is, signalling a real problem.
The new unfiltered catches in ConsumeOneAsync break this contract. If the prefetch channel closes because the prefetch task faulted with a KafkaException (or any other exception), ChannelClosedException will be thrown and silently swallowed, causing ConsumeOneAsync to return null — indistinguishable from a clean timeout. The caller has no way to know that message delivery stopped due to a broker error.
Why this is the wrong fix: The race condition described in the commit message ("cancellation of the prefetch loop can cause the prefetch channel to close, racing with OperationCanceledException") is real, but the right guard is to mirror the existing pattern:
// Fast path
catch (ChannelClosedException ex) when (ex.InnerException is null && timeoutCts.IsCancellationRequested)
{
// Channel closed cleanly due to timeout-triggered cancellation — no messages available
}
// Slow path
catch (ChannelClosedException ex) when (ex.InnerException is null && timeoutCts.IsCancellationRequested && !cancellationToken.IsCancellationRequested)
{
// Channel closed cleanly due to our timeout — not user cancellation
}Alternatively, adding an InnerException check (ex.InnerException is null) is sufficient to distinguish a clean channel close (no exception stored) from an error-propagating close (inner exception present). Without this guard, a broker disconnect that faults the prefetch task will be silently lost when the consumer is using the ConsumeOneAsync API.
Minor: The fast-path ChannelClosedException catch drops the when cancellation guard
The existing OperationCanceledException fast-path catch is:
catch (OperationCanceledException) when (timeoutCts.IsCancellationRequested)The new ChannelClosedException catch has no equivalent when guard. If a ChannelClosedException fires while the user's cancellationToken (not the timeout) is the cause, the new bare catch returns null rather than letting the exception propagate to signal user-requested cancellation. This is a subtle but real change in observable behaviour for callers who pass a CancellationToken to ConsumeOneAsync.
Summary
| # | Severity | Issue |
|---|---|---|
| 1 | Medium | Unfiltered ChannelClosedException catch silently swallows error-propagation exceptions from the prefetch task — Kafka errors become invisible null returns |
| 2 | Low | Missing when guard means user-cancellation ChannelClosedException is incorrectly treated as a clean timeout |
The diagnostics changes from the first three commits are correct and ready to merge. The ChannelClosedException fix needs the guard conditions above before it is safe.
There was a problem hiding this comment.
Code Review
This PR makes two independent improvements to the consumer hot path: guarding diagnostics behind HasListeners()/Enabled checks, and catching ChannelClosedException in ConsumeOneAsync. Both changes are well-motivated and mostly correct. A few notes below.
Diagnostics Guard — Tracing (HasListeners())
The restructuring is correct and the performance intent is sound. The HasListeners() check gates the entire block — header scan, Activity.Current save/restore, and StartActivity — at the cost of a single boolean read.
One subtle behavioral change to be aware of: The old code only started an activity when producerContext.HasValue && Source.HasListeners(). The new code starts a tracing activity whenever HasListeners() is true, even when there is no producer trace context (i.e., for messages that were not produced with tracing enabled). This is intentional per the added else branch, but it is a new behavior: previously, consumer spans were only created when a producer span could be linked. Now, any message consumed while a listener is attached will get a standalone consumer span. Worth confirming this is the desired OTel behavior — it is legitimate per spec, but it is a semantic change, not a pure optimization.
The Activity.Current save/restore is now inside the HasListeners() guard, which is correct: if there are no listeners, StartActivity returns null without modifying Activity.Current, so the save/restore is never needed in that branch. Good.
Diagnostics Guard — Metrics (Enabled)
Minor concern: The guard checks MessagesReceived.Enabled || BytesReceived.Enabled, which means if only one counter has a listener, both are recorded. This is correct behavior — you don't want to silently drop one of the two metrics — but it would be worth a brief comment explaining the deliberate use of || rather than checking each counter individually before calling Add. As written it might look like a minor bug to a future reader who wonders "why not check BytesReceived.Enabled before BytesReceived.Add?"
A clearer alternative would be to always record both together when either is enabled (as the code does), but add a comment like: // Both share the same TagList; record together when either counter is subscribed.
ChannelClosedException Handling
This is the most interesting part of the PR and deserves some scrutiny.
The two new catch clauses catch ChannelClosedException when ex.InnerException is null && timeoutCts.IsCancellationRequested. The intent is clear: a channel closed due to our own timeout-triggered cancellation should be swallowed (same as OperationCanceledException in the sibling catch).
Concern — exception filter correctness: ChannelClosedException is thrown by Channel<T> when a reader attempts to read from a completed/closed channel. This can happen for multiple reasons:
- The channel was closed as part of internal shutdown/disposal of the consumer.
- The channel was closed for another reason unrelated to the timeout.
The filter ex.InnerException is null && timeoutCts.IsCancellationRequested is a reasonable heuristic, but it is not watertight. If the consumer is being disposed concurrently while a timeout also fires, both conditions could be true and a legitimate disposal-triggered ChannelClosedException would be silently swallowed, causing ConsumeOneAsync to return null instead of propagating the closure to the caller.
It would be worth documenting why this specific combination of conditions is considered safe, or alternatively, adding a check that the consumer is not also disposed/shutting down before swallowing the exception.
The slow-path filter (timeoutCts.IsCancellationRequested && !cancellationToken.IsCancellationRequested) correctly distinguishes between timeout-triggered and user-triggered cancellation, which mirrors the sibling OperationCanceledException catch. This is correct.
Zero-Allocation Compliance
- The
TagListstruct allocation is now gated, which is the main allocation concern in the hot path. Good. - The
ActivityLinkarray allocation (links: [new ActivityLink(...)]) inside theHasListeners()branch is acceptable — it only occurs when a listener is attached, which is never the high-throughput production case. - No new heap allocations are introduced in the non-diagnostic code path. Compliant.
ConfigureAwait(false) Compliance
No new await calls are introduced. Existing calls already use ConfigureAwait(false) (visible at lines 1175, 1196 in the file). No issues.
Thread Safety
No shared state changes. The diagnostics fields (Source, MessagesReceived, BytesReceived) are static readonly — thread-safe. No concerns.
Summary
| Area | Status |
|---|---|
| Zero-allocation hot path | Good — TagList gated correctly |
| ConfigureAwait(false) | N/A — no new awaits |
| Thread safety | No concerns |
| Behavioral change (standalone consumer spans) | Worth confirming intent |
| Metrics guard comment | Minor — ` |
| ChannelClosedException filter safety | Worth a comment or additional guard for disposal race |
Overall this is a solid performance improvement with correct use of the standard .NET zero-cost diagnostics patterns. The main thing to verify is whether the new standalone-consumer-span behavior (when there is no producer trace context) is intentional.
4291fb6 to
18d71a5
Compare
There was a problem hiding this comment.
Review: Final state after five iterations
All the substantive issues raised in the previous review rounds have been resolved. The code is in good shape. Here is a consolidated final assessment.
What is correct and ready
Diagnostics guard — tracing: The HasListeners() guard correctly gates the entire block: header scan, Activity.Current save/restore, and StartActivity. The finally block restoring Activity.Current is inside the HasListeners() branch, which is correct — the restore is only needed when the save happened. No await points cross this block in the async state machine, so the [ThreadStatic] mutation is safe.
Diagnostics guard — metrics: The outer || guard correctly gates the TagList allocation when either counter has a listener. Both Add() calls are unconditional inside the block, which is correct — Counter<T>.Add() already checks Enabled internally, and the TagList is shared.
ChannelClosedException guards: Both new catch blocks now have proper when guards (ex.InnerException is null && timeoutCts.IsCancellationRequested). This correctly distinguishes a clean timeout-triggered channel close from an error-propagating close (where InnerException is a KafkaException), matching the existing pattern in ConsumeAsync at line 649. The slow-path variant additionally checks !cancellationToken.IsCancellationRequested to avoid swallowing user-initiated cancellation. Both are correct.
Zero-allocation compliance: No new heap allocations in the no-listener path. The ActivityLink array allocation in the HasListeners() branch is acceptable — it only occurs when a tracing listener is actively attached, which is never the high-throughput production case.
ConfigureAwait(false) compliance: No new await calls are introduced.
Thread safety: All modified fields are static readonly instruments or [ThreadStatic] — no shared mutable state concerns.
One remaining open item: no consumer diagnostic integration test
This was raised in reviews 3 and 4 and is still unresolved. ProducerDiagnosticTests.cs exists and serves as a regression guard for the producer tracing path. There is no equivalent ConsumerDiagnosticTests.cs. The two most important cases to cover:
- A consumer span is created when an
ActivityListeneris attached — verifies theHasListeners()guard does not accidentally suppress diagnostics when they should fire. MessagesReceived/BytesReceivedcounters are incremented when aMeterListeneris subscribed — same regression guard for theEnabledgate.
Without these, a future refactor could invert or remove the guards and no CI gate would catch the regression. Whether to block merge on this or track it as a follow-up issue is the author's call. Given that the ProducerDiagnosticTests pattern already exists as a model, adding the consumer counterpart is low-effort and high-value.
Minor: behavioral change worth documenting
The original code only started a consumer activity when producerContext.HasValue. The new code starts a standalone consumer span (no producer link) whenever HasListeners() is true and there is no traceparent header. This is valid per OTel messaging conventions, and the existing else branch makes the intent clear. A brief comment on the else branch — something like "no producer trace context available; start a standalone consumer span" — would make the intent explicit for future readers who wonder why an unlinked span is created unconditionally.
Summary
| # | Status | Item |
|---|---|---|
| Tracing guard correctness | Ready | HasListeners() gates entire block correctly |
| Metrics guard correctness | Ready | ` |
ChannelClosedException guards |
Ready | when (ex.InnerException is null && ...) mirrors existing pattern |
Zero-allocation, ConfigureAwait, thread safety |
Ready | No issues |
| Consumer diagnostic integration tests | Still open | No ConsumerDiagnosticTests counterpart to ProducerDiagnosticTests |
Standalone-span else branch comment |
Minor | Worth a one-line clarifying comment |
The code is correct and the performance improvement is real. The only gap before merge is the missing integration test coverage.
18d71a5 to
688c186
Compare
There was a problem hiding this comment.
Follow-up Review
No new commits since the last review (last commit 688c1868, 2026-03-23). The diff is unchanged.
The code is correct — all substantive issues from previous rounds are resolved. Two items remain open from the prior review:
Still open: consumer diagnostic integration tests
ProducerDiagnosticTests.cs exists as a regression guard for the producer tracing path. There is no ConsumerDiagnosticTests.cs equivalent. Two cases are worth covering:
- A consumer span is created when an
ActivityListeneris attached — verifies theHasListeners()guard does not suppress diagnostics when they should fire. MessagesReceived/BytesReceivedcounters are incremented when aMeterListeneris subscribed — regression guard for theEnabledgate.
Without these, a future refactor could silently break the diagnostics path with no CI signal.
Still open (minor): standalone-span else branch
The else branch in the tracing block starts a consumer span when no producer trace context is present. A one-line comment (e.g. "no producer context — start a standalone consumer span") would make the intent clear for future readers.
The performance improvement is real and the implementation is correct. Whether to add the integration tests before merge or track them as a follow-up is the author's call — but given the ProducerDiagnosticTests pattern already exists as a model, the cost is low.
688c186 to
77dc0dc
Compare
There was a problem hiding this comment.
Review: perf: eliminate per-message diagnostics overhead in consumer
This PR makes two correct and well-motivated changes to the consumer hot path. Previous rounds of review resolved most issues. One new finding and two carry-over items remain.
New finding: _metricTagsCache field is now dead code
The PR removes all uses of _metricTagsCache from the hot path, but the field declaration at line 366 was not removed:
// line 366 in KafkaConsumer.cs
private readonly Dictionary<string, System.Diagnostics.TagList> _metricTagsCache = [];The base branch had three references (declaration, TryGetValue, [topic] =). The PR removes the two usage sites but leaves the declaration. This is an unused field with a stale comment ("Cached metric tags per topic to avoid per-message TagList allocation") that now contradicts the new approach. It should be deleted along with its comment block at lines 364–366.
Carry-over: consumer diagnostic integration tests missing
ProducerDiagnosticTests.cs exists as a regression guard for the producer tracing path. No equivalent ConsumerDiagnosticTests.cs exists. Without integration test coverage for the guarded paths, a future refactor could silently break consumer diagnostics with no CI signal. Two cases are worth covering:
- A consumer span is created (with correct tags) when an
ActivityListeneris attached — verifiesHasListeners()does not suppress diagnostics when they should fire. MessagesReceivedandBytesReceivedcounters are incremented when aMeterListeneris subscribed — regression guard for theEnabledgate.
The ProducerDiagnosticTests pattern already exists as a model so the cost is low.
Carry-over (minor): standalone-span else branch lacks a comment
In StartConsumeActivity, the else branch creates a consumer span when no producer trace context is present. A one-line comment (e.g. // no producer context — start a standalone consumer span) would clarify the intent for future readers.
What is correct and should not change
HasListeners()guard on tracing: correct. This is the standard .NET pattern.HasListeners()is a single volatile read and the guard preventsExtractTraceContext,Activity.Currentsave/restore, andStartActivityfrom running per-message when no listener is attached.Enabledguard on metrics: correct.MessagesReceived.Enabled || BytesReceived.Enabledproperly gates theTagListallocation. Checking both instruments independently is the right approach sinceEnabledis per-instrument.ChannelClosedExceptioncatches inConsumeOneAsync: correct. Thewhen (ex.InnerException is null && timeoutCts.IsCancellationRequested)guard on the fast path and the additional&& !cancellationToken.IsCancellationRequestedguard on the slow path correctly distinguish timeout-driven channel closes from user cancellation and from unexpected failures. BareChannelClosedExceptionwithout a guard would have silently swallowed real errors.
Summary
One actionable fix is required before merge: remove the dead _metricTagsCache field and its comment. The integration test gap and the minor comment are worth tracking but can reasonably be done as follow-up given the ProducerDiagnosticTests pattern already exists as a model.
Guard Activity creation and trace context extraction behind DekafDiagnostics.Source.HasListeners() so ExtractTraceContext header scanning, Activity.Current save/restore, and StartActivity are completely skipped when no tracing listener is attached. Guard per-message TagList allocation for metrics behind Counter<long>.Enabled check so the struct allocation is avoided when no meter listener is attached. Both changes make the consume loop zero-cost for diagnostics when no OpenTelemetry or diagnostics listener is configured, which is the common case for high-throughput consumers.
…metrics Counter<T>.Add() already checks Enabled internally, so the outer guard is sufficient to gate the TagList allocation.
…n timeout When the timeout CTS fires during ConsumeOneAsync, cancellation of the prefetch loop can cause the prefetch channel to close. This races with the OperationCanceledException — sometimes a ChannelClosedException propagates instead. Handle it the same way: return null to indicate no messages were available within the timeout.
…sync Bare catch (ChannelClosedException) blocks would silently swallow real Kafka errors propagated through the prefetch channel. Add guards to only catch clean channel closes (InnerException is null) triggered by our timeout CTS, matching the existing pattern in ConsumeAsync.
77dc0dc to
1faeddb
Compare
There was a problem hiding this comment.
Review: Follow-up after latest commits
The ChannelClosedException fixes and diagnostics guards are all correct. However, the one blocking item from the previous review has not been addressed.
Still open (blocking): _metricTagsCache is dead code
The previous review flagged this as a new finding. The field declaration at lines 368–370 of KafkaConsumer.cs was not removed:
// Cached metric tags per topic to avoid per-message TagList allocation
private readonly Dictionary<string, System.Diagnostics.TagList> _metricTagsCache = [];This PR removes the two usage sites (TryGetValue and [topic] =) but leaves the field declaration with a comment that now contradicts the new approach. The comment says "avoid per-message TagList allocation" — but the new approach allocates a local TagList per message (gated by Enabled). The field should be deleted along with its comment.
Still open (carry-over, minor): standalone-span else branch lacks a comment
StartConsumeActivity lines 2348–2352: the else branch starts a consumer span when no producer trace context is present. A one-liner (e.g. // no producer context — start a standalone consumer span per OTel messaging spec) would make the intent clear for future readers.
Still open (carry-over): no consumer diagnostic integration tests
ProducerDiagnosticTests.cs exists as a regression guard for the producer tracing path. No ConsumerDiagnosticTests.cs equivalent was added. The two minimal cases to cover:
- A consumer span is created when an
ActivityListeneris attached — verifiesHasListeners()doesn't suppress diagnostics. MessagesReceived/BytesReceivedcounters are incremented when aMeterListeneris subscribed — regression guard for theEnabledgate.
This is a carry-over from multiple previous rounds. The ProducerDiagnosticTests pattern already exists as a template. Tracking this as a follow-up issue is acceptable if the author prefers not to block the PR on it, but the _metricTagsCache cleanup above must be done before merge.
Summary
The consumer hot path performed expensive per-message diagnostics work even when no tracing or metrics listeners were attached:
ExtractTraceContext(headers)scanned all message headers looking fortraceparent/tracestateon every single message, even with noActivitySourcelistenerActivity.Currentsave/restore was executed per message regardless of whether any Activity would be creatednew TagList { ... }was allocated per message for metrics counters, even when noMeterListenerwas subscribedChanges
Guard tracing with
HasListeners(): MoveExtractTraceContext,Activity.Currentsave/restore, andStartActivityinside aDekafDiagnostics.Source.HasListeners()check. When no listener is attached (the common high-throughput case), the entire block is skipped — zero header scanning, zero Activity allocations.Guard metrics with
Counter.Enabled: Wrap the per-messageTagListallocation andCounter.Add()calls behindMessagesReceived.Enabled, avoiding the struct allocation when no meter listener is subscribed.Both guards are simple boolean field reads (~1ns) and are the standard .NET patterns for zero-cost diagnostics.
Test plan
ActivityListeneris attachedMeterListeneris attached