add cancellation watermark to close at-least-once gap on cancelled tasks#21
Merged
add cancellation watermark to close at-least-once gap on cancelled tasks#21
Conversation
Today _extract_ready_prefixes drops a cancelled task and everything after
from per-partition pending state, and _map_offsets_per_partition stops the
offset advance at the cancellation. That keeps cancelled-and-after offsets
redeliverable on restart — but only because cancellation never actually
occurs mid-stream today (it's gated by shutdown, after which no new tasks
are absorbed).
If a future change ever allowed mid-stream cancellation, a new task arriving
for the same partition with a higher offset would slip past the boundary:
the cancelled task is gone from pending, _map_offsets_per_partition has no
memory of the cancellation, and the new task's offset would be committed,
silently skipping the cancelled-and-after window.
This change adds a per-partition cancellation watermark on the committer.
When _map_offsets_per_partition sees a cancelled task at offset N for
partition P, it records watermarks[P] = N (keeping the earliest if multiple
batches see cancellations). On every subsequent batch the watermark blocks
that partition from advancing — the partition's pending still drains for
task_done() balance, but no commit is issued for it until the watermark is
cleared. The rebalance listener clears the watermark for revoked partitions
after commit_all() runs, so the next assignment starts fresh.
Trace: tasks 9 (✓), 10 (✗), 11 (✓), 12 (✓) all in pending → first commit
produces {tp: 10}, sets wm[tp] = 10. Second batch sees task 13 (✓), but
13+1 > 10, so {tp} is dropped. On restart, fetch from 10 — re-process 10,
11, 12, 13. At-least-once preserved.
Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Today _extract_ready_prefixes drops a cancelled task and everything after from per-partition pending state, and _map_offsets_per_partition stops the offset advance at the cancellation. That keeps cancelled-and-after offsets redeliverable on restart — but only because cancellation never actually occurs mid-stream today (it's gated by shutdown, after which no new tasks are absorbed).
If a future change ever allowed mid-stream cancellation, a new task arriving for the same partition with a higher offset would slip past the boundary: the cancelled task is gone from pending, _map_offsets_per_partition has no memory of the cancellation, and the new task's offset would be committed, silently skipping the cancelled-and-after window.
This change adds a per-partition cancellation watermark on the committer. When _map_offsets_per_partition sees a cancelled task at offset N for partition P, it records watermarks[P] = N (keeping the earliest if multiple batches see cancellations). On every subsequent batch the watermark blocks that partition from advancing — the partition's pending still drains for task_done() balance, but no commit is issued for it until the watermark is cleared. The rebalance listener clears the watermark for revoked partitions after commit_all() runs, so the next assignment starts fresh.
Trace: tasks 9 (✓), 10 (✗), 11 (✓), 12 (✓) all in pending → first commit produces {tp: 10}, sets wm[tp] = 10. Second batch sees task 13 (✓), but 13+1 > 10, so {tp} is dropped. On restart, fetch from 10 — re-process 10, 11, 12, 13. At-least-once preserved.