fix: add use-after-return guards to pooled FetchResponse objects#602
fix: add use-after-return guards to pooled FetchResponse objects#602
Conversation
Add volatile _pooled flag and ObjectDisposedException guards to FetchResponse, FetchResponseTopic, and FetchResponsePartition. This detects use-after-return bugs at runtime instead of allowing silent data corruption when a pooled object is accessed after being returned. Also adds double-return idempotency (second ReturnToPool is a no-op) and comprehensive pooling tests covering field clearing, cascading return, thread safety, and use-after-return detection. Salvaged from #581 which was superseded by #591.
There was a problem hiding this comment.
Code Review
This PR adds use-after-return guards and double-return idempotency to the three pooled FetchResponse classes. The motivation is solid — without guards, a caller holding a stale reference after ReturnToPool() can silently read zeroed/reset data, which is a subtle bug that could cause incorrect offset tracking or missed messages. The implementation is clean and well-tested.
What works well
Guard placement is correct. Only the collection properties (Responses, Partitions, Records) are guarded. The scalar fields (ThrottleTimeMs, ErrorCode, SessionId, etc.) are intentionally left unguarded because they are read-only primitives that pose no corruption risk after reset — guarding them would add overhead to properties on the hot consumer path with no benefit.
Backing field access in Reset() is the right approach. Calling item._responses = Array.Empty<>() rather than item.Responses = ... correctly bypasses the guard during pool cleanup. This avoids the guard triggering inside the pool's own Reset() method, which would throw ObjectDisposedException on every pooled return — a subtle foot-gun that the PR correctly avoids.
Double-return idempotency is correct. The if (_pooled) return; early-exit prevents the object from being pushed to the pool twice. Without this, a double Return() call on the underlying ObjectPool<T> would corrupt the pool count (the Interlocked.Increment in ObjectPool.Return would fire twice), and the item could be in the pool twice simultaneously, leading to two concurrent callers getting the same instance. The fix is correct and important.
volatile bool _pooled is appropriate. It provides acquire/release semantics sufficient for the visibility requirements here. The _pooled = true write in ReturnToPool() will be visible to any subsequent read of _pooled in Rent() that writes _pooled = false back, which is the correct ordering.
Tests are comprehensive. 18 tests covering rent/return, field clearing, cascading return, double-return idempotency, and concurrent stress are a good addition. The [NotInParallel("FetchResponsePool")] guard is correctly applied since tests share the static pool instances.
Issues
1. Race condition in ReturnToPool() — not atomic check-then-set
The double-return guard has a TOCTOU race:
internal void ReturnToPool()
{
if (_pooled) // Thread A reads false
return;
_pooled = true; // Thread A sets true — BUT Thread B may also have read false above
// ...
s_pool.Return(this);
}If two threads call ReturnToPool() on the same instance simultaneously, both can pass the if (_pooled) check before either sets _pooled = true. The result is two concurrent s_pool.Return(this) calls, which is the exact pool corruption the guard is meant to prevent.
volatile does not give atomicity to a read-modify-write sequence. You need Interlocked.CompareExchange to make the guard race-free:
internal void ReturnToPool()
{
// Atomically transition from false to true; only the winner proceeds.
if (Interlocked.CompareExchange(ref _pooledFlag, 1, 0) != 0)
return;
// ... rest of the return logic
}where _pooledFlag is private int _pooledFlag; (since Interlocked requires int/long/etc., not bool).
Alternatively, given that ReturnToPool() is internal and the existing call sites (e.g., KafkaConsumer.cs lines 1089, 2290) are single-threaded per fetch cycle, the concurrent stress tests in this PR only exercise concurrent rent-then-return cycles, not concurrent double-return on the same instance. The double-return tests (FetchResponse_DoubleReturn_IsIdempotent, etc.) call ReturnToPool() sequentially, not concurrently. The race cannot be demonstrated by the current test suite but is still a real correctness hazard. If the semantic contract is "only one caller ever returns a given instance" (i.e., ownership-transfer semantics), that should be documented prominently so future callers do not assume concurrent return is safe.
2. FetchResponsePartition does not reset _pooled in Reset()
FetchResponse.Rent() and FetchResponseTopic.Rent() explicitly set item._pooled = false after retrieving from the pool. FetchResponsePartition.Rent() does the same. This is correct and consistent.
However, note that the pool's Reset() method runs before _pooled is cleared in Rent() (Reset happens inside ObjectPool.Return, not inside Rent). The sequence is:
ReturnToPool()→ sets_pooled = true, callss_pool.Return(item)→Reset(item)runs (sets fields via backing fields) → item pushed to stackRent()→ pops item from stack → setsitem._pooled = false→ returns item
This ordering is correct. Reset() accesses backing fields, so it does not see the guard. _pooled is cleared by the Rent() method after the item leaves the pool. No issue here — just confirming the order is sound.
3. Only Records is guarded on FetchResponsePartition — AbortedTransactions is not
The PR description states the guard is on "collection properties." AbortedTransactions is also an IReadOnlyList<AbortedTransaction>? on FetchResponsePartition, but it is left as an auto-property with no guard:
public IReadOnlyList<AbortedTransaction>? AbortedTransactions { get; internal set; }This is inconsistent. After ReturnToPool(), partition.Records throws ObjectDisposedException, but partition.AbortedTransactions silently returns null (the reset value). If the guard is intended to catch all use-after-return bugs on this type, AbortedTransactions should either also be guarded or the design rationale for excluding it should be documented. The most likely intent is that AbortedTransactions has a separate lifecycle (transferred to PendingFetchData), which means it will be null on a freshly rented instance anyway — but the asymmetry is a readability and maintenance hazard.
4. Test assumption about pool reuse is fragile
In FetchResponse_ReturnAndRent_ClearsAllFields (and the Topic/Partition equivalents), the test does:
var response = FetchResponse.Rent();
// ... set fields ...
response.ReturnToPool();
var reused = FetchResponse.Rent();
// ... assert fields are zeroed ...This assumes the pool returns the same instance. The pool is a ConcurrentStack<T> and under parallel test execution (even with [NotInParallel] within this class, other test classes may interact with the static pools) there is no guarantee Rent() will pop the exact instance just returned. If the pool was empty before the test and some concurrent activity adds an item between Return and Rent, reused may not be the same object. This makes the field-clearing assertions test the wrong thing (you would be asserting that a newly constructed instance has zeroed fields, not that Reset() zeroed a dirty returned instance).
A more robust approach is to use Assert.That(reused).IsSameReferenceAs(response) after Rent() to confirm pool reuse actually occurred before asserting the reset state.
Minor observations
-
The test
FetchResponse_ReturnToPool_ClearsNestedTopicsAndPartitionsre-rents from all three pools and then asserts field values. As noted above, there is an implicit assumption about which instance is returned. The test also does not verify thatreusedis not one of the still-pooled topic/partition instances from the first cycle, since theirReturnToPool()was called in the cascade. This could be tightened. -
The
[NotInParallel("FetchResponsePool")]attribute serializes all 18 tests in this class against each other, which is correct. However, since the pools arestatic readonlyfields on the response classes themselves, any other test that rents or returns from these pools (integration test, future unit test) would race with these tests unless they are also annotated. Consider adding a note in the class XML doc about this coupling.
Summary
The core idea is correct and the implementation handles the most important cases well. The main concern is the check-then-set race in ReturnToPool() when two threads concurrently return the same instance. Whether this is a real risk depends on whether the ownership model guarantees single-threaded return; if so, add a comment asserting that contract. If concurrent returns are possible, switch to Interlocked.CompareExchange. The AbortedTransactions inconsistency is a minor documentation/consistency issue. The test assumptions about pool identity are fragile but not blocking.
Verdict: Request changes on the race condition issue and AbortedTransactions consistency. Once those are addressed (or explicitly documented as out of scope with rationale), this is a clear improvement over the unguarded baseline.
- Replace volatile bool with int + Interlocked.CompareExchange in ReturnToPool() on all three classes, eliminating the TOCTOU race where two concurrent callers could both pass the check and double- return the same object to the pool. - Add use-after-return guard to AbortedTransactions on FetchResponsePartition (was inconsistently unguarded while Records was guarded). Uses same backing field + ObjectDisposedException pattern. - Add test for AbortedTransactions guard.
Review responseIssue 1 (TOCTOU race): Fixed. Switched from Issue 2 (Reset ordering): Confirmed correct by reviewer — no action needed. Issue 3 (AbortedTransactions unguarded): Fixed. Added backing field Issue 4 (Test pool reuse assumption): Disagree — the current tests are intentionally identity-agnostic. They verify the behavioral contract (rented objects have clean fields) regardless of whether pool reuse occurred. All 3143 unit tests pass. |
There was a problem hiding this comment.
Code Review (follow-up after second commit)
This review covers the updated state of the PR after commit 2f2192f9, which addressed the two main concerns raised in the previous review. The implementation is now substantially cleaner. Here is a full assessment of the current state.
What was addressed
Race condition fixed (was: critical). The volatile bool + non-atomic check-then-set pattern has been correctly replaced with int _pooled + Interlocked.CompareExchange. The guard is now truly atomic:
if (Interlocked.CompareExchange(ref _pooled, 1, 0) != 0)
return;Only one concurrent caller can win the CAS. Any subsequent caller sees _pooled == 1 and exits. This is the correct fix.
AbortedTransactions guard added (was: inconsistency). FetchResponsePartition.AbortedTransactions now has the same Volatile.Read guard as Records. The Reset() method accesses the backing field _abortedTransactions directly. The asymmetry is gone, and the new test FetchResponsePartition_AccessAbortedTransactionsAfterReturn_ThrowsObjectDisposedException covers it.
Volatile.Write in Rent() is correct. Writing _pooled = 0 through a volatile store ensures the cleared state is visible to all threads before the object is used. The sequencing — Reset() in Return(), then Volatile.Write(ref _pooled, 0) in Rent() — is sound: _pooled stays 1 while the object is in the pool and becomes 0 only after the caller takes ownership.
Remaining concerns
1. Hot-path cost of Volatile.Read on every property access
Volatile.Read inserts a load-acquire fence. On x86/x64 this is essentially free (all loads are already acquire on that architecture), but on ARM64 it compiles to a ldar instruction. The concern is that partition.Records is accessed inside the consumer's inner record-processing loop (see KafkaConsumer.cs around lines 1043–1056 and 2243–2258):
var hasRecords = partitionResponse.Records is not null && partitionResponse.Records.Count > 0;
// ...
partitionResponse.Records!,Each of the two partitionResponse.Records accesses on line 1043 fires a Volatile.Read. This is a guard meant to catch programmer errors (use-after-return), but the guard fires on every consumer read cycle even in the normal, non-erroneous case.
For a development/debug build this overhead is acceptable. For a release build targeting ARM64 at high throughput, consider moving the guard to DEBUG-only, or checking only once at the top of the processing block and caching the reference:
var records = partitionResponse.Records; // single guarded access
if (records is { Count: > 0 })
{
// use records directly — no further volatile reads
}This is a minor concern given that the per-partition fanout is small (typically 1–6 partitions per fetch cycle), not per-message. But it is worth noting because the project rule is zero-allocation and minimal overhead on consumer paths.
2. Test fragility: ReturnAndRent_ClearsAllFields does not assert pool identity
This was flagged as a minor observation in the first review and was not addressed. The three ReturnAndRent_ClearsAllFields tests do:
var response = FetchResponse.Rent();
// set fields...
response.ReturnToPool();
var reused = FetchResponse.Rent();
// assert fields are zeroedIf the pool was non-empty before the test (e.g., some earlier test returned extra instances), the Rent() after ReturnToPool() may pop a different object — one that was already in a clean default state. The field-clearing assertions would pass vacuously: they would be verifying that a freshly constructed instance has correct defaults, not that Reset() actually zeroed a dirty instance.
A one-line addition makes the tests airtight:
await Assert.That(reused).IsSameReferenceAs(response);Without this assertion, if Reset() were accidentally deleted, the tests would still pass whenever the pool happened to be non-empty at the start of the test.
3. FetchResponseTopic_ReturnAndRent_ClearsAllFields leaks a partition into the pool
topic.Partitions = [FetchResponsePartition.Rent()];
topic.ReturnToPool();
// ... no corresponding Rent() for the partition that was cascaded-returnedtopic.ReturnToPool() cascades to the embedded partition, returning it to the FetchResponsePartitionPool. The test then re-rents a FetchResponseTopic but not the embedded partition. The pool is left with an extra entry, which can affect the ordering of objects returned in subsequent tests. Since [NotInParallel] serializes tests within this class, this is not a data race, but the orphaned partition entry could cause a later test's FetchResponsePartition.Rent() to return an object the test did not create, making object-identity assertions (if added per point 2) unpredictable. The fix is straightforward:
topic.ReturnToPool(); // cascades to partition
var reusedPartition = FetchResponsePartition.Rent(); // consume the cascaded partition
reusedPartition.ReturnToPool();
var reused = FetchResponseTopic.Rent();
// ... assertionsOr alternatively, do not put a rented partition inside the topic at all — just set Partitions to a plain new List<FetchResponsePartition> containing a new, non-rented instance if you only need to test that the field is cleared.
Confirmed correct
- The
Interlocked.CompareExchangeguard in all threeReturnToPool()methods is correct and race-free. - Backing field access in all
Reset()methods bypasses the guard correctly. Volatile.Write(ref item._pooled, 0)inRent()establishes the correct visibility ordering.[NotInParallel("FetchResponsePool")]correctly serializes all 19 tests that share the static pools.- The double-return tests correctly verify idempotency and that the returned object stays in the disposed state.
- All 18 original tests plus the new
AbortedTransactionstest cover the intended contracts.
Summary
The two blocking issues from the previous review (TOCTOU race and AbortedTransactions inconsistency) are fully resolved. The remaining items are:
- Minor, performance: Double volatile-read on
RecordsandAbortedTransactionsin the consumer inner loop — consider caching the reference after the first guarded access. - Minor, test correctness: Missing
IsSameReferenceAsassertion makesReturnAndRenttests weaker than they appear. - Minor, test hygiene: Orphaned rented partition in
FetchResponseTopic_ReturnAndRent_ClearsAllFieldsleaves the pool in a non-initial state for subsequent tests.
None of these are blockers. The core implementation is correct. Approving with the above as non-blocking suggestions.
- Cache partitionResponse.Records in a local variable in both consumer fetch paths to avoid repeated Volatile.Read from the pool guard - Add IsSameReferenceAs assertions to ReturnAndRent tests to prove Reset() actually clears the dirty instance (not a fresh default) - Use non-pooled FetchResponsePartition in topic test to avoid leaking a rented partition into the pool via cascading ReturnToPool
There was a problem hiding this comment.
Code Review (follow-up after third commit)
This review covers the final state of the PR after all feedback from the previous two reviews has been addressed. This is a comprehensive check against all previously raised concerns.
All blocking issues resolved
TOCTOU race (was: critical, now: fixed).
Interlocked.CompareExchange(ref _pooled, 1, 0) is in place on all three classes. The CAS winner is the only caller that proceeds to s_pool.Return(this). This is correct and race-free.
AbortedTransactions unguarded (was: inconsistency, now: fixed).
FetchResponsePartition.AbortedTransactions now has the same Volatile.Read guard and backing field pattern as Records. Reset() uses _abortedTransactions directly. The new test FetchResponsePartition_AccessAbortedTransactionsAfterReturn_ThrowsObjectDisposedException covers it. The asymmetry is gone.
Double Volatile.Read on hot consumer path (was: minor performance concern, now: addressed).
KafkaConsumer.cs now caches the reference upfront:
var records = partitionResponse.Records; // single guarded Volatile.Read
if (records is { Count: > 0 })
{
// uses records directly — no further volatile reads
}This is the right approach. One guard fires per partition per fetch cycle instead of two. The comment in the code (// Cache Records reference to avoid repeated Volatile.Read from the pool guard) explains the intent clearly.
Test pool reuse assumption (was: fragile, now: fixed).
All three ReturnAndRent_ClearsAllFields tests now assert IsSameReferenceAs before checking field values. This ensures the assertions are actually exercising Reset() on a dirty object, not vacuously passing on a fresh allocation.
Orphaned rented partition in FetchResponseTopic_ReturnAndRent_ClearsAllFields (was: test hygiene issue, now: fixed).
The test now uses new FetchResponsePartition { PartitionIndex = 99 } (a non-pooled instance) instead of FetchResponsePartition.Rent(). The cascade in ReturnToPool() calls partition.ReturnToPool() on this non-pooled object, which sets _pooled = 1 via the CAS and then calls s_pool.Return(this), putting an extra instance into the pool. This is actually fine — the pool accepts it as a valid instance and will hand it out on the next Rent(). The pool does not track origin. No resource leak, and the pool state remains clean for subsequent tests.
One remaining minor observation
The FetchResponse_ReturnToPool_ClearsNestedTopicsAndPartitions test re-rents from all three pools without asserting identity:
var reusedResponse = FetchResponse.Rent();
var reusedTopic = FetchResponseTopic.Rent();
var reusedPartition = FetchResponsePartition.Rent();Without IsSameReferenceAs assertions here, there is the same vacuous-pass risk as in the other tests before this PR's fixes. However, this test's primary purpose is to verify the cascading ObjectDisposedException throws (lines 65–67), which does not depend on pool identity. The field-clearing assertions (lines 74–79) are secondary and could pass vacuously, but they are not the heart of what this test is verifying. This is not a blocker — just noting it for completeness. The core contract (cascading guard activation) is robustly tested.
Architecture: correctness of the overall design
The implementation correctly uses the _pooled field as an ownership token rather than a lifecycle flag. The semantics are:
_pooled == 0: object is owned by a caller; guarded properties are accessible_pooled == 1: object is in the pool or in transit back to it; guarded properties throw
The Volatile.Write(ref item._pooled, 0) in Rent() establishes a happens-before edge: all writes performed by Reset() (which runs before Rent() returns the object) are visible to the caller before they observe _pooled == 0. This is the correct visibility ordering for the pooling pattern.
The decision to guard only collection properties (Responses, Partitions, Records, AbortedTransactions) and leave scalar fields (ErrorCode, HighWatermark, etc.) unguarded is a good trade-off: scalars have reset-safe default values and reading a stale integer causes no structural corruption. Guarding every property would add overhead on all consumer read paths for marginal safety benefit.
Summary
All blocking and non-blocking issues from the previous two reviews have been addressed. The implementation is correct, race-free, and well-tested. The consumer-side caching of the records reference is a clean practical optimization. This PR is ready to merge.
Summary
volatile bool _pooledflag toFetchResponse,FetchResponseTopic, andFetchResponsePartitionwithObjectDisposedExceptionguards on collection properties (Responses,Partitions,Records)ReturnToPool()is a no-op instead of corrupting the poolReset()methods to avoid triggering the guard during pool returnSalvaged from #581 which was closed as superseded by #591 (the core pooling was already merged to main via
ObjectPool<T>). This PR adds the safety features that #581 had but #591 did not.Test plan
FetchResponsePoolingTestspass (including concurrent stress tests)