-
Notifications
You must be signed in to change notification settings - Fork 15
Expand file tree
/
Copy pathtask.go
More file actions
77 lines (67 loc) · 2.01 KB
/
task.go
File metadata and controls
77 lines (67 loc) · 2.01 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package seqdelay
import "time"
// TaskState 表示任务生命周期状态
type TaskState int
const (
StateDelayed TaskState = iota // 在时间轮中等待
StateReady // 在就绪队列中
StateActive // 已被消费,TTR 倒计时中
StateFinished // 已完成
StateCancelled // 已取消
)
func (s TaskState) String() string {
switch s {
case StateDelayed:
return "delayed"
case StateReady:
return "ready"
case StateActive:
return "active"
case StateFinished:
return "finished"
case StateCancelled:
return "cancelled"
default:
return "unknown"
}
}
// Task 表示一个延迟任务
type Task struct {
ID string `msgpack:"id" json:"id"`
Topic string `msgpack:"topic" json:"topic"`
Body []byte `msgpack:"body" json:"body"`
Delay time.Duration `msgpack:"delay" json:"delay_ms"`
TTR time.Duration `msgpack:"ttr" json:"ttr_ms"`
MaxRetries int `msgpack:"max_retries" json:"max_retries"`
State TaskState `msgpack:"state" json:"state"`
Retries int `msgpack:"retries" json:"retries"`
CreatedAt time.Time `msgpack:"created_at" json:"created_at"`
ActiveAt time.Time `msgpack:"active_at" json:"active_at"`
}
// Validate 校验必填字段
func (t *Task) Validate() error {
if t.ID == "" || t.Topic == "" {
return ErrInvalidTask
}
if t.Delay <= 0 {
return ErrInvalidDelay
}
if t.TTR <= 0 {
t.TTR = 30 * time.Second
}
return nil
}
// taskKey 返回任务的 Redis key(使用 {topic} hash tag 保证 Cluster 兼容)
func taskKey(topic, id string) string {
return "seqdelay:{" + topic + "}:task:" + id
}
// readyKey 返回 topic 的就绪队列 Redis key
func readyKey(topic string) string {
return "seqdelay:{" + topic + "}:ready"
}
// indexKey 返回 topic 的任务索引 Redis key
func indexKey(topic string) string {
return "seqdelay:{" + topic + "}:index"
}
// lockKey 分布式锁 key
const lockKey = "seqdelay:lock:tick"