-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathworker.go
More file actions
108 lines (87 loc) · 1.75 KB
/
worker.go
File metadata and controls
108 lines (87 loc) · 1.75 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
package smartid
import (
"context"
"sync"
)
const (
DefaultConcurrency = 10
DefaultQueueSize = 100
)
type Result struct {
Person *Person
Err error
}
type Job struct {
SessionId string
ResultCh chan Result
}
type Worker interface {
Start(ctx context.Context)
Stop()
Process(ctx context.Context, sessionId string) <-chan Result
WithConcurrency(concurrency int) Worker
WithQueueSize(size int) Worker
}
type worker struct {
client Client
queue chan Job
concurrency int
wg sync.WaitGroup
}
func NewWorker(client Client) Worker {
return &worker{
client: client,
queue: make(chan Job, DefaultQueueSize),
concurrency: DefaultConcurrency,
}
}
func (w *worker) WithConcurrency(concurrency int) Worker {
if concurrency <= 0 {
concurrency = DefaultConcurrency
}
w.concurrency = concurrency
return w
}
func (w *worker) WithQueueSize(size int) Worker {
if size <= 0 {
size = DefaultQueueSize
}
w.queue = make(chan Job, size)
return w
}
func (w *worker) Start(ctx context.Context) {
for i := 0; i < w.concurrency; i++ {
w.wg.Add(1)
go w.perform(ctx)
}
}
func (w *worker) Stop() {
close(w.queue)
w.wg.Wait()
}
func (w *worker) Process(ctx context.Context, sessionId string) <-chan Result {
resultCh := make(chan Result, 1)
select {
case <-ctx.Done():
resultCh <- Result{Err: ctx.Err()}
close(resultCh)
case w.queue <- Job{SessionId: sessionId, ResultCh: resultCh}:
}
return resultCh
}
func (w *worker) perform(ctx context.Context) {
defer w.wg.Done()
for {
select {
case j, ok := <-w.queue:
if !ok {
return
}
person, err := w.client.FetchSession(ctx, j.SessionId)
j.ResultCh <- Result{Person: person, Err: err}
close(j.ResultCh)
case <-ctx.Done():
return
}
}
}