-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathseqdelay.go
More file actions
268 lines (234 loc) · 6.98 KB
/
seqdelay.go
File metadata and controls
268 lines (234 loc) · 6.98 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
package seqdelay
import (
"context"
"fmt"
"os"
"strconv"
"sync/atomic"
"time"
)
// Queue is the public entry point for the seqdelay delay-queue library.
// It wires together the Redis store, timing wheel, ready queue, and
// distributed lock into a single coherent unit.
type Queue struct {
cfg *config
store *store
wheel *timeWheel
ready *readyQueue
lock *distLock
closed atomic.Bool
started atomic.Bool
stopCh chan struct{}
}
// New creates a new Queue with the given options.
// Returns ErrRedisRequired when no Redis client is configured.
func New(opts ...Option) (*Queue, error) {
cfg := defaultConfig()
for _, o := range opts {
o(cfg)
}
if cfg.redisClient == nil {
return nil, ErrRedisRequired
}
// Auto-generate an instanceID when the caller did not provide one.
if cfg.instanceID == "" {
hostname, err := os.Hostname()
if err != nil {
hostname = "unknown"
}
cfg.instanceID = fmt.Sprintf("%s-%d", hostname, os.Getpid())
}
s := newStore(cfg.redisClient)
wheel := newTimeWheel(cfg.wheelCapacity, cfg.tickInterval, nil)
ready := newReadyQueue(s, cfg.maxTopics)
lock := newDistLock(cfg.redisClient, cfg.instanceID, cfg.lockTTL)
q := &Queue{
cfg: cfg,
store: s,
wheel: wheel,
ready: ready,
lock: lock,
stopCh: make(chan struct{}),
}
// Set onFire after Queue creation so it can reference q.lock.
// The wheel doesn't tick until Start() is called.
wheel.onFire = func(taskID, topic string) {
if !q.lock.IsLeader(context.Background()) {
return
}
_ = q.store.ReadyTask(context.Background(), topic, taskID)
}
return q, nil
}
// Start begins background processing: task recovery from Redis, the timing
// wheel, and the leader-election heartbeat loop.
func (q *Queue) Start(ctx context.Context) error {
if err := q.recover(ctx); err != nil {
return err
}
q.started.Store(true)
q.wheel.start()
// Leader-election heartbeat goroutine.
go func() {
ticker := time.NewTicker(q.cfg.lockTTL / 3)
defer ticker.Stop()
for {
select {
case <-q.stopCh:
q.lock.Release(ctx)
return
case <-ticker.C:
if q.lock.IsLeader(ctx) {
q.lock.Renew(ctx)
} else {
q.lock.TryAcquire(ctx)
}
}
}
}()
return nil
}
// recover loads all live tasks from Redis and either re-schedules them in the
// timing wheel or immediately moves them to the ready list.
func (q *Queue) recover(ctx context.Context) error {
topics, err := q.store.ListTopics(ctx)
if err != nil {
// Non-fatal: best-effort recovery.
return nil
}
for _, topic := range topics {
tasks, err := q.store.LoadTopicTasks(ctx, topic)
if err != nil {
continue
}
// Pre-fetch the ready list once so StateReady checks below are O(1).
// Best-effort: on error we treat the set as empty, which means orphans
// get re-enqueued (safe) and live IDs may get duplicated (harmless —
// the duplicate just becomes a stale ID once the original is popped).
readySet, _ := q.store.ReadyListIDs(ctx, topic)
for _, task := range tasks {
switch task.State {
case StateDelayed:
remaining := time.Until(task.CreatedAt.Add(task.Delay))
if remaining <= 0 {
_ = q.store.ReadyTask(ctx, topic, task.ID)
} else {
q.wheel.add(remaining, task.ID, task.Topic)
}
case StateActive:
remaining := time.Until(task.ActiveAt.Add(task.TTR))
if remaining <= 0 {
_ = q.store.ReadyTask(ctx, topic, task.ID)
} else {
q.wheel.add(remaining, task.ID, task.Topic)
}
case StateReady:
// If the ID isn't on the ready list, the previous process
// crashed between BLPOP and the state update — re-enqueue so
// the task isn't lost.
if _, ok := readySet[task.ID]; !ok {
_ = q.store.RequeueReady(ctx, topic, task.ID)
}
case StateFinished, StateCancelled:
_ = q.store.CleanupIndex(ctx, topic, task.ID)
}
}
}
return nil
}
// Add validates and persists a new delayed task, then schedules it in the
// timing wheel.
func (q *Queue) Add(ctx context.Context, task *Task) error {
if q.closed.Load() {
return ErrClosed
}
if err := task.Validate(); err != nil {
return err
}
task.CreatedAt = time.Now()
task.State = StateDelayed
if err := q.store.SaveTask(ctx, task); err != nil {
return err
}
q.wheel.add(task.Delay, task.ID, task.Topic)
return nil
}
// Pop blocks until a task becomes available on the given topic's ready list or
// the configured pop timeout elapses. On success the task is in StateActive and
// a TTR re-delivery timer is armed.
//
// Returns nil, nil when the timeout elapses with no available task.
func (q *Queue) Pop(ctx context.Context, topic string) (*Task, error) {
if q.closed.Load() {
return nil, ErrClosed
}
task, err := q.store.PopTask(ctx, topic, q.cfg.popTimeout)
if err != nil {
return nil, err
}
if task == nil {
return nil, nil
}
// Arm TTR re-delivery: if the consumer does not call Finish/Cancel before
// TTR expires the timing wheel will call ReadyTask again.
q.wheel.add(task.TTR, task.ID, task.Topic)
return task, nil
}
// Finish marks a task as completed and cancels its TTR timer.
func (q *Queue) Finish(ctx context.Context, topic, id string) error {
if q.closed.Load() {
return ErrClosed
}
if err := q.store.FinishTask(ctx, topic, id); err != nil {
return err
}
q.wheel.cancel(id, topic)
return nil
}
// Cancel transitions a task to StateCancelled and removes it from the timing
// wheel.
func (q *Queue) Cancel(ctx context.Context, topic, id string) error {
if q.closed.Load() {
return ErrClosed
}
if err := q.store.CancelTask(ctx, topic, id); err != nil {
return err
}
q.wheel.cancel(id, topic)
return nil
}
// Get retrieves the current state of a task without altering it.
func (q *Queue) Get(ctx context.Context, topic, id string) (*Task, error) {
if q.closed.Load() {
return nil, ErrClosed
}
return q.store.GetTask(ctx, topic, id)
}
// OnExpire registers fn as the callback invoked when a task on topic becomes
// ready. The drain goroutine will pop tasks and call fn automatically, finishing
// the task on success or leaving it for TTR re-delivery on failure.
func (q *Queue) OnExpire(topic string, fn func(context.Context, *Task) error) error {
return q.ready.registerCallback(topic, fn)
}
// Shutdown gracefully stops the queue. It marks the queue closed, drains the
// timing wheel (bounded by ctx), stops all drain goroutines, and releases the
// distributed lock.
func (q *Queue) Shutdown(ctx context.Context) error {
if !q.closed.CompareAndSwap(false, true) {
return nil // already shut down
}
// Signal the leader-election goroutine to stop.
close(q.stopCh)
// Stop all per-topic drain goroutines first (they may trigger wheel adds).
q.ready.stopAll()
// Stop the timing wheel only if Start() was called.
if q.started.Load() {
return q.wheel.stop(ctx)
}
return nil
}
// instanceIDFromParts builds a deterministic instance ID from the hostname and
// PID — kept here for documentation; the actual call is inline in New.
func instanceIDFromParts(hostname string, pid int) string {
return hostname + "-" + strconv.Itoa(pid)
}