forked from hetiansu5/work
-
Notifications
You must be signed in to change notification settings - Fork 8
Expand file tree
/
Copy pathjob_queue.go
More file actions
133 lines (117 loc) · 3.43 KB
/
job_queue.go
File metadata and controls
133 lines (117 loc) · 3.43 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
package work
import (
"context"
)
// GetQueueByTopic 获取topic对应的queue服务
func (j *Job) GetQueueByTopic(topic string) Queue {
j.qLock.RLock()
q, ok := j.queueMap[topic]
j.qLock.RUnlock()
if !ok {
return nil
}
return q
}
// AddQueue 往Job注入Queue服务
func (j *Job) AddQueue(q Queue, topics ...string) {
if len(topics) > 0 {
qm := queueManger{
queue: q,
topics: topics,
}
j.queueMangers = append(j.queueMangers, qm)
} else {
j.defaultQueue = q
}
}
// Enqueue 消息入队 -- 原始message
func (j *Job) Enqueue(ctx context.Context, topic string, message string, args ...interface{}) (bool, error) {
task := GenTask(topic, message)
return j.EnqueueWithTask(ctx, topic, task, args...)
}
// EnqueueWithTask 消息入队 -- Task数据结构
func (j *Job) EnqueueWithTask(ctx context.Context, topic string, task Task, args ...interface{}) (bool, error) {
hookCtx := NewContextHook(ctx, topic, args)
err := j.BeforeProcess(hookCtx)
if err != nil {
return false, err
}
success, err := j.enqueueWithTask(hookCtx.Ctx, topic, task, args)
hookCtx.End(err)
err = j.AfterProcess(hookCtx)
return success, err
}
func (j *Job) enqueueWithTask(ctx context.Context, topic string, task Task, args []interface{}) (bool, error) {
if !j.isQueueMapInit {
j.initQueueMap()
}
q := j.GetQueueByTopic(topic)
if q == nil {
return false, ErrQueueNotExist
}
if task.Topic == "" {
task.Topic = topic
}
s, _ := JsonEncode(task)
return q.Enqueue(ctx, topic, s, args...)
}
// EnqueueRaw 消息入队 -- 原始message不带有task结构原生消息
func (j *Job) EnqueueRaw(ctx context.Context, topic string, message string, args ...interface{}) (bool, error) {
hookCtx := NewContextHook(ctx, topic, args)
err := j.BeforeProcess(hookCtx)
if err != nil {
return false, err
}
success, err := j.enqueueRaw(hookCtx.Ctx, topic, message, args)
hookCtx.End(err)
err = j.AfterProcess(hookCtx)
return success, err
}
func (j *Job) enqueueRaw(ctx context.Context, topic string, message string, args []interface{}) (bool, error) {
if !j.isQueueMapInit {
j.initQueueMap()
}
q := j.GetQueueByTopic(topic)
if q == nil {
return false, ErrQueueNotExist
}
return q.Enqueue(ctx, topic, message, args...)
}
// BatchEnqueue 消息入队 -- 原始message
func (j *Job) BatchEnqueue(ctx context.Context, topic string, messages []string, args ...interface{}) (bool, error) {
tasks := make([]Task, len(messages))
for k, message := range messages {
tasks[k] = GenTask(topic, message)
}
return j.BatchEnqueueWithTask(ctx, topic, tasks, args...)
}
// BatchEnqueueWithTask 消息入队 -- Task数据结构
func (j *Job) BatchEnqueueWithTask(ctx context.Context, topic string, tasks []Task, args ...interface{}) (bool, error) {
messages := make([]string, len(tasks))
for k, task := range tasks {
if task.Topic == "" {
task.Topic = topic
}
s, _ := JsonEncode(task)
messages[k] = s
}
hookCtx := NewContextHook(ctx, topic, args)
err := j.BeforeProcess(hookCtx)
if err != nil {
return false, err
}
success, err := j.batchEnqueueWithTask(hookCtx.Ctx, topic, messages, args)
hookCtx.End(err)
err = j.AfterProcess(hookCtx)
return success, err
}
func (j *Job) batchEnqueueWithTask(ctx context.Context, topic string, messages []string, args []interface{}) (bool, error) {
if !j.isQueueMapInit {
j.initQueueMap()
}
q := j.GetQueueByTopic(topic)
if q == nil {
return false, ErrQueueNotExist
}
return q.BatchEnqueue(ctx, topic, messages, args...)
}