From 9eb13d4c4ab8f5e9bd9159ffd38f3353d5e3df8b Mon Sep 17 00:00:00 2001 From: gocronx Date: Fri, 8 May 2026 23:30:01 +0800 Subject: [PATCH] fix(timewheel): use absolute target sequence in addCmd (#5) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The producer used to precompute (pos, rounds) from cursor.Load() and write the result to addCh. If the producer was descheduled between the cursor read and the channel send — common under -race — the handler could drain the cmd long after cursor had advanced past that slot, leaving the entry stranded for a full wheel cycle (65.5s with the default 64K capacity / 1ms tick), which exceeded the stress-test deadline and surfaced as "fired N-1 of N". Carry the absolute targetSeq instead. The handler resolves the slot at drain time against the up-to-date seq via a new placeAdd helper, and fires immediately when targetSeq <= seq (i.e. the deadline has already passed). placeAdd is shared by the pre- and post-slot drain loops, removing the duplicated slot-append logic in Handle. Stress tests now pass 8/8 under -race (previously ~2/3 failure rate on master). Two unit regression tests cover the past-deadline and past-deadline-but-cancelled paths directly. --- timewheel.go | 71 +++++++++++++++++++++++++++-------------------- timewheel_test.go | 47 +++++++++++++++++++++++++++++++ 2 files changed, 88 insertions(+), 30 deletions(-) diff --git a/timewheel.go b/timewheel.go index 57ee7fc..29b2beb 100644 --- a/timewheel.go +++ b/timewheel.go @@ -21,11 +21,19 @@ type entry struct { } // addCmd is the message sent from Add() to the handler goroutine. +// +// Carries the absolute target tick sequence rather than a precomputed +// (pos, rounds) pair. If the producer were to precompute slot coordinates +// using cursor.Load() and then get descheduled before writing to addCh, +// the handler could drain the cmd long after cursor had advanced past +// that slot, leaving the entry stranded for a full wheel cycle. With +// targetSeq, the handler resolves the slot at drain time against the +// up-to-date cursor and fires immediately if the deadline has already +// passed. type addCmd struct { - pos int // absolute slot index (pre-computed) - rounds int // full laps remaining - taskID string - topic string + targetSeq int64 + taskID string + topic string } // cancelCmd is the message sent from Cancel() to the handler goroutine. @@ -135,14 +143,32 @@ func (w *timeWheel) add(delay time.Duration, taskID, topic string) { if ticks <= 0 { ticks = 1 } - pos := int((w.cursor.Load() + ticks) & int64(w.mask)) - rounds := int(ticks / int64(w.capacity)) w.addCh <- addCmd{ - pos: pos, - rounds: rounds, - taskID: taskID, - topic: topic, + targetSeq: w.cursor.Load() + ticks, + taskID: taskID, + topic: topic, + } +} + +// placeAdd resolves an addCmd against the tick currently being processed and +// either fires immediately (deadline already passed) or appends an entry to +// the appropriate slot. Used by both the pre- and post-slot drain loops. +func (w *timeWheel) placeAdd(cmd addCmd, seq int64) { + if _, isCancelled := w.cancelled[cmd.taskID]; isCancelled { + delete(w.cancelled, cmd.taskID) + return + } + if cmd.targetSeq <= seq { + w.onFire(cmd.taskID, cmd.topic) + return } + pos := int(cmd.targetSeq & int64(w.mask)) + rounds := int((cmd.targetSeq - seq - 1) / int64(w.capacity)) + w.slots[pos].entries = append(w.slots[pos].entries, &entry{ + rounds: rounds, + taskID: cmd.taskID, + topic: cmd.topic, + }) } // cancel lazily marks a task so it is skipped when its slot fires. @@ -170,11 +196,7 @@ func (h *wheelHandler) Handle(lower, upper int64) { for { select { case cmd := <-w.addCh: - w.slots[cmd.pos].entries = append(w.slots[cmd.pos].entries, &entry{ - rounds: cmd.rounds, - taskID: cmd.taskID, - topic: cmd.topic, - }) + w.placeAdd(cmd, seq) default: break drainAdds } @@ -213,25 +235,14 @@ func (h *wheelHandler) Handle(lower, upper int64) { } slot.entries = remaining - // --- 4. Post-drain: catch tasks added during slot processing --- - // If a task lands on the current slot with rounds==0, fire immediately. + // --- 4. Post-drain: catch tasks added during slot processing. + // placeAdd handles same-slot/rounds==0 (fire), past-deadline (fire), + // and future targets (slot append) uniformly. postDrain: for { select { case cmd := <-w.addCh: - if cmd.pos == idx && cmd.rounds == 0 { - if _, isCancelled := w.cancelled[cmd.taskID]; isCancelled { - delete(w.cancelled, cmd.taskID) - } else { - w.onFire(cmd.taskID, cmd.topic) - } - } else { - w.slots[cmd.pos].entries = append(w.slots[cmd.pos].entries, &entry{ - rounds: cmd.rounds, - taskID: cmd.taskID, - topic: cmd.topic, - }) - } + w.placeAdd(cmd, seq) default: break postDrain } diff --git a/timewheel_test.go b/timewheel_test.go index b2c7bdf..3591040 100644 --- a/timewheel_test.go +++ b/timewheel_test.go @@ -224,3 +224,50 @@ loop: t.Errorf("fired = %v, want [keep-me]", got) } } + +// TestTimeWheel_PlaceAdd_PastDeadlineFiresImmediately is the regression test +// for the producer-preempted-after-cursor-load race that left tasks stranded +// for a full wheel cycle. Simulates a cmd arriving at the handler after the +// cursor has already advanced past its target sequence, and asserts the +// handler fires it immediately instead of appending to a stale slot. +func TestTimeWheel_PlaceAdd_PastDeadlineFiresImmediately(t *testing.T) { + var fired []string + tw := newTimeWheel(64, testTick, func(id, _ string) { + fired = append(fired, id) + }) + // Note: don't start the wheel — placeAdd is invoked synchronously by the + // handler goroutine; calling it directly here is safe because no other + // goroutine touches the wheel state. + + // targetSeq=15, but the handler has already advanced to seq=20. + tw.placeAdd(addCmd{targetSeq: 15, taskID: "stranded", topic: "t"}, 20) + + if len(fired) != 1 || fired[0] != "stranded" { + t.Errorf("expected immediate fire of stranded task, got %v", fired) + } + for _, slot := range tw.slots { + if len(slot.entries) != 0 { + t.Errorf("stranded cmd should not have been appended to any slot") + } + } +} + +// TestTimeWheel_PlaceAdd_CancelledIsDropped verifies that a cmd whose taskID +// is in the lazy-cancel set is silently dropped, even when its deadline has +// already passed. +func TestTimeWheel_PlaceAdd_CancelledIsDropped(t *testing.T) { + var fired []string + tw := newTimeWheel(64, testTick, func(id, _ string) { + fired = append(fired, id) + }) + tw.cancelled["stranded"] = struct{}{} + + tw.placeAdd(addCmd{targetSeq: 15, taskID: "stranded", topic: "t"}, 20) + + if len(fired) != 0 { + t.Errorf("cancelled task should not fire, got %v", fired) + } + if _, stillCancelled := tw.cancelled["stranded"]; stillCancelled { + t.Errorf("cancelled set entry should be consumed after drop") + } +}