Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 41 additions & 30 deletions timewheel.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,19 @@ type entry struct {
}

// addCmd is the message sent from Add() to the handler goroutine.
//
// Carries the absolute target tick sequence rather than a precomputed
// (pos, rounds) pair. If the producer were to precompute slot coordinates
// using cursor.Load() and then get descheduled before writing to addCh,
// the handler could drain the cmd long after cursor had advanced past
// that slot, leaving the entry stranded for a full wheel cycle. With
// targetSeq, the handler resolves the slot at drain time against the
// up-to-date cursor and fires immediately if the deadline has already
// passed.
type addCmd struct {
pos int // absolute slot index (pre-computed)
rounds int // full laps remaining
taskID string
topic string
targetSeq int64
taskID string
topic string
}

// cancelCmd is the message sent from Cancel() to the handler goroutine.
Expand Down Expand Up @@ -135,14 +143,32 @@ func (w *timeWheel) add(delay time.Duration, taskID, topic string) {
if ticks <= 0 {
ticks = 1
}
pos := int((w.cursor.Load() + ticks) & int64(w.mask))
rounds := int(ticks / int64(w.capacity))
w.addCh <- addCmd{
pos: pos,
rounds: rounds,
taskID: taskID,
topic: topic,
targetSeq: w.cursor.Load() + ticks,
taskID: taskID,
topic: topic,
}
}

// placeAdd resolves an addCmd against the tick currently being processed and
// either fires immediately (deadline already passed) or appends an entry to
// the appropriate slot. Used by both the pre- and post-slot drain loops.
func (w *timeWheel) placeAdd(cmd addCmd, seq int64) {
if _, isCancelled := w.cancelled[cmd.taskID]; isCancelled {
delete(w.cancelled, cmd.taskID)
return
}
if cmd.targetSeq <= seq {
w.onFire(cmd.taskID, cmd.topic)
return
}
pos := int(cmd.targetSeq & int64(w.mask))
rounds := int((cmd.targetSeq - seq - 1) / int64(w.capacity))
w.slots[pos].entries = append(w.slots[pos].entries, &entry{
rounds: rounds,
taskID: cmd.taskID,
topic: cmd.topic,
})
}

// cancel lazily marks a task so it is skipped when its slot fires.
Expand Down Expand Up @@ -170,11 +196,7 @@ func (h *wheelHandler) Handle(lower, upper int64) {
for {
select {
case cmd := <-w.addCh:
w.slots[cmd.pos].entries = append(w.slots[cmd.pos].entries, &entry{
rounds: cmd.rounds,
taskID: cmd.taskID,
topic: cmd.topic,
})
w.placeAdd(cmd, seq)
default:
break drainAdds
}
Expand Down Expand Up @@ -213,25 +235,14 @@ func (h *wheelHandler) Handle(lower, upper int64) {
}
slot.entries = remaining

// --- 4. Post-drain: catch tasks added during slot processing ---
// If a task lands on the current slot with rounds==0, fire immediately.
// --- 4. Post-drain: catch tasks added during slot processing.
// placeAdd handles same-slot/rounds==0 (fire), past-deadline (fire),
// and future targets (slot append) uniformly.
postDrain:
for {
select {
case cmd := <-w.addCh:
if cmd.pos == idx && cmd.rounds == 0 {
if _, isCancelled := w.cancelled[cmd.taskID]; isCancelled {
delete(w.cancelled, cmd.taskID)
} else {
w.onFire(cmd.taskID, cmd.topic)
}
} else {
w.slots[cmd.pos].entries = append(w.slots[cmd.pos].entries, &entry{
rounds: cmd.rounds,
taskID: cmd.taskID,
topic: cmd.topic,
})
}
w.placeAdd(cmd, seq)
default:
break postDrain
}
Expand Down
47 changes: 47 additions & 0 deletions timewheel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,3 +224,50 @@ loop:
t.Errorf("fired = %v, want [keep-me]", got)
}
}

// TestTimeWheel_PlaceAdd_PastDeadlineFiresImmediately is the regression test
// for the producer-preempted-after-cursor-load race that left tasks stranded
// for a full wheel cycle. Simulates a cmd arriving at the handler after the
// cursor has already advanced past its target sequence, and asserts the
// handler fires it immediately instead of appending to a stale slot.
func TestTimeWheel_PlaceAdd_PastDeadlineFiresImmediately(t *testing.T) {
var fired []string
tw := newTimeWheel(64, testTick, func(id, _ string) {
fired = append(fired, id)
})
// Note: don't start the wheel — placeAdd is invoked synchronously by the
// handler goroutine; calling it directly here is safe because no other
// goroutine touches the wheel state.

// targetSeq=15, but the handler has already advanced to seq=20.
tw.placeAdd(addCmd{targetSeq: 15, taskID: "stranded", topic: "t"}, 20)

if len(fired) != 1 || fired[0] != "stranded" {
t.Errorf("expected immediate fire of stranded task, got %v", fired)
}
for _, slot := range tw.slots {
if len(slot.entries) != 0 {
t.Errorf("stranded cmd should not have been appended to any slot")
}
}
}

// TestTimeWheel_PlaceAdd_CancelledIsDropped verifies that a cmd whose taskID
// is in the lazy-cancel set is silently dropped, even when its deadline has
// already passed.
func TestTimeWheel_PlaceAdd_CancelledIsDropped(t *testing.T) {
var fired []string
tw := newTimeWheel(64, testTick, func(id, _ string) {
fired = append(fired, id)
})
tw.cancelled["stranded"] = struct{}{}

tw.placeAdd(addCmd{targetSeq: 15, taskID: "stranded", topic: "t"}, 20)

if len(fired) != 0 {
t.Errorf("cancelled task should not fire, got %v", fired)
}
if _, stillCancelled := tw.cancelled["stranded"]; stillCancelled {
t.Errorf("cancelled set entry should be consumed after drop")
}
}
Loading