Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
236 changes: 127 additions & 109 deletions src/Dekaf/Consumer/KafkaConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1015,96 +1015,105 @@ private async ValueTask PrefetchFromBrokerAsync(int brokerId, List<TopicPartitio
pendingItems.Clear();

// Write to prefetch channel
foreach (var topicResponse in response.Responses)
try
{
var topic = topicResponse.Topic ?? string.Empty;
var activityName = _activityNameCache.GetOrAdd(topic, static t => $"{t} receive");

foreach (var partitionResponse in topicResponse.Partitions)
foreach (var topicResponse in response.Responses)
{
var tp = new TopicPartition(topic, partitionResponse.PartitionIndex);

// Update watermark cache from fetch response (even on errors, watermarks may be valid)
UpdateWatermarksFromFetchResponse(topic, partitionResponse);
var topic = topicResponse.Topic ?? string.Empty;
var activityName = _activityNameCache.GetOrAdd(topic, static t => $"{t} receive");

if (partitionResponse.ErrorCode != ErrorCode.None)
foreach (var partitionResponse in topicResponse.Partitions)
{
if (partitionResponse.ErrorCode == ErrorCode.OffsetOutOfRange)
var tp = new TopicPartition(topic, partitionResponse.PartitionIndex);

// Update watermark cache from fetch response (even on errors, watermarks may be valid)
UpdateWatermarksFromFetchResponse(topic, partitionResponse);

if (partitionResponse.ErrorCode != ErrorCode.None)
{
// CRITICAL: Reset fetch position based on auto.offset.reset policy
// Without this, we would retry with the same invalid offset forever
var (resetTimestamp, resetName) = _options.AutoOffsetReset switch
if (partitionResponse.ErrorCode == ErrorCode.OffsetOutOfRange)
{
AutoOffsetReset.Latest => (-1L, "latest"),
AutoOffsetReset.Earliest => (-2L, "earliest"),
AutoOffsetReset.None => throw new KafkaException(
$"OffsetOutOfRange for {topic}-{partitionResponse.PartitionIndex} and auto.offset.reset is 'none'"),
_ => throw new InvalidOperationException($"Unknown AutoOffsetReset value: {_options.AutoOffsetReset}")
};
_fetchPositions[tp] = resetTimestamp;
_positions[tp] = resetTimestamp;
LogOffsetOutOfRangeReset(topic, partitionResponse.PartitionIndex, resetName);
}
else if (partitionResponse.ErrorCode == ErrorCode.NotLeaderOrFollower)
{
// Invalidate metadata cache to force re-discovery of leader
InvalidatePartitionCache();
LogNotLeaderOrFollower(topic, partitionResponse.PartitionIndex);
}
else
{
LogPrefetchError(topic, partitionResponse.PartitionIndex, partitionResponse.ErrorCode);
// CRITICAL: Reset fetch position based on auto.offset.reset policy
// Without this, we would retry with the same invalid offset forever
var (resetTimestamp, resetName) = _options.AutoOffsetReset switch
{
AutoOffsetReset.Latest => (-1L, "latest"),
AutoOffsetReset.Earliest => (-2L, "earliest"),
AutoOffsetReset.None => throw new KafkaException(
$"OffsetOutOfRange for {topic}-{partitionResponse.PartitionIndex} and auto.offset.reset is 'none'"),
_ => throw new InvalidOperationException($"Unknown AutoOffsetReset value: {_options.AutoOffsetReset}")
};
_fetchPositions[tp] = resetTimestamp;
_positions[tp] = resetTimestamp;
LogOffsetOutOfRangeReset(topic, partitionResponse.PartitionIndex, resetName);
}
else if (partitionResponse.ErrorCode == ErrorCode.NotLeaderOrFollower)
{
// Invalidate metadata cache to force re-discovery of leader
InvalidatePartitionCache();
LogNotLeaderOrFollower(topic, partitionResponse.PartitionIndex);
}
else
{
LogPrefetchError(topic, partitionResponse.PartitionIndex, partitionResponse.ErrorCode);
}
continue;
}
continue;
}

// Update high watermark from response (thread-safe with ConcurrentDictionary)
_highWatermarks[tp] = partitionResponse.HighWatermark;
// Update high watermark from response (thread-safe with ConcurrentDictionary)
_highWatermarks[tp] = partitionResponse.HighWatermark;

var hasRecords = partitionResponse.Records is not null && partitionResponse.Records.Count > 0;
var hasRecords = partitionResponse.Records is not null && partitionResponse.Records.Count > 0;

if (hasRecords)
{
// We have new records - reset EOF state for this partition
lock (_prefetchLock)
if (hasRecords)
{
_eofEmitted.Remove(tp);
}
// We have new records - reset EOF state for this partition
lock (_prefetchLock)
{
_eofEmitted.Remove(tp);
}

var pending = new PendingFetchData(
topic,
partitionResponse.PartitionIndex,
partitionResponse.Records!,
partitionResponse.AbortedTransactions,
activityName: activityName);
var pending = new PendingFetchData(
topic,
partitionResponse.PartitionIndex,
partitionResponse.Records!,
partitionResponse.AbortedTransactions,
activityName: activityName);

// Track memory before adding to channel
TrackPrefetchedBytes(pending, release: false);
// Track memory before adding to channel
TrackPrefetchedBytes(pending, release: false);

// Update fetch positions for next prefetch
UpdateFetchPositionsFromPrefetch(pending);
// Update fetch positions for next prefetch
UpdateFetchPositionsFromPrefetch(pending);

// Collect for later - we'll assign memory owner to the last one
pendingItems.Add(pending);
}
else if (_options.EnablePartitionEof)
{
// No records returned - check if we're at EOF
lock (_prefetchLock)
// Collect for later - we'll assign memory owner to the last one
pendingItems.Add(pending);
}
else if (_options.EnablePartitionEof)
{
var fetchPosition = _fetchPositions.GetValueOrDefault(tp, 0);

// EOF condition: position >= high watermark and we haven't emitted EOF yet
if (fetchPosition >= partitionResponse.HighWatermark && !_eofEmitted.Contains(tp))
// No records returned - check if we're at EOF
lock (_prefetchLock)
{
// Queue EOF event and mark as emitted
_pendingEofEvents.Enqueue((tp, fetchPosition));
_eofEmitted.Add(tp);
var fetchPosition = _fetchPositions.GetValueOrDefault(tp, 0);

// EOF condition: position >= high watermark and we haven't emitted EOF yet
if (fetchPosition >= partitionResponse.HighWatermark && !_eofEmitted.Contains(tp))
{
// Queue EOF event and mark as emitted
_pendingEofEvents.Enqueue((tp, fetchPosition));
_eofEmitted.Add(tp);
}
}
}
}
}
}
finally
{
// Return the FetchResponse and its nested objects to the pool now that all data has been extracted.
// Records ownership has been transferred to PendingFetchData; the pool clear does NOT dispose them.
response.ReturnToPool();
}

// Write all pending items to the channel, with memory owner attached to the last one
if (pendingItems.Count > 0)
Expand Down Expand Up @@ -2218,64 +2227,73 @@ private async ValueTask<Dictionary<int, List<TopicPartition>>> GroupPartitionsBy
List<PendingFetchData>? pendingItems = null;

// Queue pending fetch data for lazy iteration - don't parse records yet!
foreach (var topicResponse in response.Responses)
try
{
var topic = topicResponse.Topic ?? string.Empty;
var activityName = _activityNameCache.GetOrAdd(topic, static t => $"{t} receive");

foreach (var partitionResponse in topicResponse.Partitions)
foreach (var topicResponse in response.Responses)
{
var tp = new TopicPartition(topic, partitionResponse.PartitionIndex);

// Update watermark cache from fetch response (even on errors, watermarks may be valid)
UpdateWatermarksFromFetchResponse(topic, partitionResponse);
var topic = topicResponse.Topic ?? string.Empty;
var activityName = _activityNameCache.GetOrAdd(topic, static t => $"{t} receive");

if (partitionResponse.ErrorCode != ErrorCode.None)
foreach (var partitionResponse in topicResponse.Partitions)
{
LogFetchError(topic, partitionResponse.PartitionIndex, partitionResponse.ErrorCode);
continue;
}

// Update high watermark from response (thread-safe with ConcurrentDictionary)
_highWatermarks[tp] = partitionResponse.HighWatermark;
var tp = new TopicPartition(topic, partitionResponse.PartitionIndex);

var hasRecords = partitionResponse.Records is not null && partitionResponse.Records.Count > 0;
// Update watermark cache from fetch response (even on errors, watermarks may be valid)
UpdateWatermarksFromFetchResponse(topic, partitionResponse);

if (hasRecords)
{
// We have new records - reset EOF state for this partition
lock (_prefetchLock)
if (partitionResponse.ErrorCode != ErrorCode.None)
{
_eofEmitted.Remove(tp);
LogFetchError(topic, partitionResponse.PartitionIndex, partitionResponse.ErrorCode);
continue;
}

// Collect pending fetch data for lazy record iteration
pendingItems ??= [];
pendingItems.Add(new PendingFetchData(
topic,
partitionResponse.PartitionIndex,
partitionResponse.Records!,
partitionResponse.AbortedTransactions,
activityName: activityName));
}
else if (_options.EnablePartitionEof)
{
// No records returned - check if we're at EOF
lock (_prefetchLock)
// Update high watermark from response (thread-safe with ConcurrentDictionary)
_highWatermarks[tp] = partitionResponse.HighWatermark;

var hasRecords = partitionResponse.Records is not null && partitionResponse.Records.Count > 0;

if (hasRecords)
{
var fetchPosition = _fetchPositions.GetValueOrDefault(tp, 0);
// We have new records - reset EOF state for this partition
lock (_prefetchLock)
{
_eofEmitted.Remove(tp);
}

// EOF condition: position >= high watermark and we haven't emitted EOF yet
if (fetchPosition >= partitionResponse.HighWatermark && !_eofEmitted.Contains(tp))
// Collect pending fetch data for lazy record iteration
pendingItems ??= [];
pendingItems.Add(new PendingFetchData(
topic,
partitionResponse.PartitionIndex,
partitionResponse.Records!,
partitionResponse.AbortedTransactions,
activityName: activityName));
}
else if (_options.EnablePartitionEof)
{
// No records returned - check if we're at EOF
lock (_prefetchLock)
{
// Queue EOF event and mark as emitted
_pendingEofEvents.Enqueue((tp, fetchPosition));
_eofEmitted.Add(tp);
var fetchPosition = _fetchPositions.GetValueOrDefault(tp, 0);

// EOF condition: position >= high watermark and we haven't emitted EOF yet
if (fetchPosition >= partitionResponse.HighWatermark && !_eofEmitted.Contains(tp))
{
// Queue EOF event and mark as emitted
_pendingEofEvents.Enqueue((tp, fetchPosition));
_eofEmitted.Add(tp);
}
}
}
}
}
}
finally
{
// Return the FetchResponse and its nested objects to the pool now that all data has been extracted.
// Records ownership has been transferred to PendingFetchData; the pool clear does NOT dispose them.
response.ReturnToPool();
}

// Attach memory owner to the last item (will be disposed last due to FIFO processing)
if (pendingItems is not null && pendingItems.Count > 0 && memoryOwner is not null)
Expand Down
Loading
Loading