Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion seqdelay.go
Original file line number Diff line number Diff line change
Expand Up @@ -125,6 +125,13 @@ func (q *Queue) recover(ctx context.Context) error {
if err != nil {
continue
}

// Pre-fetch the ready list once so StateReady checks below are O(1).
// Best-effort: on error we treat the set as empty, which means orphans
// get re-enqueued (safe) and live IDs may get duplicated (harmless —
// the duplicate just becomes a stale ID once the original is popped).
readySet, _ := q.store.ReadyListIDs(ctx, topic)

for _, task := range tasks {
switch task.State {
case StateDelayed:
Expand All @@ -142,7 +149,12 @@ func (q *Queue) recover(ctx context.Context) error {
q.wheel.add(remaining, task.ID, task.Topic)
}
case StateReady:
// Already in the ready list — nothing to do.
// If the ID isn't on the ready list, the previous process
// crashed between BLPOP and the state update — re-enqueue so
// the task isn't lost.
if _, ok := readySet[task.ID]; !ok {
_ = q.store.RequeueReady(ctx, topic, task.ID)
}
case StateFinished, StateCancelled:
_ = q.store.CleanupIndex(ctx, topic, task.ID)
}
Expand Down
87 changes: 87 additions & 0 deletions seqdelay_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -356,3 +356,90 @@ func TestQueue_OnExpire_Conflict(t *testing.T) {
t.Errorf("expected ErrTopicConflict, got %v", err)
}
}

// ---------------------------------------------------------------------------
// Recover: orphan StateReady tasks
// ---------------------------------------------------------------------------

// TestQueue_Recover_OrphanReadyTask plants the state a previous process leaves
// behind when it crashes between BLPOP and the state update inside PopTask:
// the task is stored as StateReady, but its ID is no longer on the ready list.
// recover() should re-enqueue the orphan so it isn't lost.
func TestQueue_Recover_OrphanReadyTask(t *testing.T) {
client := getTestRedis(t)
defer client.Close()

ctx := context.Background()
s := newStore(client)

// Persist a task in StateReady without pushing the ID onto the ready list.
orphan := newTestTask("orphan-topic", "orphan-1")
orphan.State = StateReady
if err := s.SaveTask(ctx, orphan); err != nil {
t.Fatalf("SaveTask: %v", err)
}

// Sanity: ready list is empty, task is in StateReady.
if n, _ := client.LLen(ctx, readyKey(orphan.Topic)).Result(); n != 0 {
t.Fatalf("ready list should start empty, got %d", n)
}

q, err := New(
WithRedisClient(client),
WithTickInterval(10*time.Millisecond),
WithPopTimeout(2*time.Second),
)
if err != nil {
t.Fatalf("New: %v", err)
}
if err := q.Start(ctx); err != nil {
t.Fatalf("Start: %v", err)
}
defer q.Shutdown(ctx) //nolint:errcheck

// After Start (which calls recover), the orphan ID should be back on the
// ready list and Pop should return it.
popped, err := q.Pop(ctx, orphan.Topic)
if err != nil {
t.Fatalf("Pop: %v", err)
}
if popped == nil {
t.Fatal("Pop returned nil — orphan was not recovered")
}
if popped.ID != orphan.ID {
t.Errorf("recovered wrong task: got %q, want %q", popped.ID, orphan.ID)
}
}

// TestQueue_Recover_LiveReadyTaskNotDuplicated guards against the inverse:
// when a StateReady task's ID is already on the ready list, recover() must
// not push a duplicate.
func TestQueue_Recover_LiveReadyTaskNotDuplicated(t *testing.T) {
client := getTestRedis(t)
defer client.Close()

ctx := context.Background()
s := newStore(client)

live := newTestTask("live-topic", "live-1")
live.State = StateReady
if err := s.SaveTask(ctx, live); err != nil {
t.Fatalf("SaveTask: %v", err)
}
if err := client.RPush(ctx, readyKey(live.Topic), live.ID).Err(); err != nil {
t.Fatalf("RPush: %v", err)
}

q, err := New(WithRedisClient(client), WithTickInterval(10*time.Millisecond))
if err != nil {
t.Fatalf("New: %v", err)
}
if err := q.Start(ctx); err != nil {
t.Fatalf("Start: %v", err)
}
defer q.Shutdown(ctx) //nolint:errcheck

if n, _ := client.LLen(ctx, readyKey(live.Topic)).Result(); n != 1 {
t.Errorf("ready list should still hold exactly 1 entry, got %d", n)
}
}
25 changes: 25 additions & 0 deletions store.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,6 +369,31 @@ func (s *store) PopTask(ctx context.Context, topic string, timeout time.Duration
return t, nil
}

// ---------------------------------------------------------------------------
// ReadyListIDs / RequeueReady
// ---------------------------------------------------------------------------

// ReadyListIDs returns the set of task IDs currently sitting on the topic's
// ready list. Used by recover() to detect orphan StateReady tasks whose IDs
// went missing because a previous process crashed mid-Pop.
func (s *store) ReadyListIDs(ctx context.Context, topic string) (map[string]struct{}, error) {
ids, err := s.client.LRange(ctx, readyKey(topic), 0, -1).Result()
if err != nil {
return nil, err
}
set := make(map[string]struct{}, len(ids))
for _, id := range ids {
set[id] = struct{}{}
}
return set, nil
}

// RequeueReady appends id to the topic's ready list. Idempotency is the
// caller's responsibility (recover checks ReadyListIDs first).
func (s *store) RequeueReady(ctx context.Context, topic, id string) error {
return s.client.RPush(ctx, readyKey(topic), id).Err()
}

// ---------------------------------------------------------------------------
// LoadTopicTasks
// ---------------------------------------------------------------------------
Expand Down
Loading