Skip to content

Commit cd81965

Browse files
authored
Merge pull request #3 from koykov/force-close-lock
Balancer lock on DE queue.
2 parents 36462a3 + 75d7aa6 commit cd81965

4 files changed

Lines changed: 45 additions & 15 deletions

File tree

go.mod

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@ module github.com/koykov/queue
22

33
go 1.16
44

5-
require github.com/koykov/bitset v0.0.0-20220630192021-a713ee0d389d
5+
require github.com/koykov/bitset v1.0.0

go.sum

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,2 @@
1-
github.com/koykov/bitset v0.0.0-20220630192021-a713ee0d389d h1:b82iiOxU5g1mcgKfmJPg47WGLumM91t9wPNWN1a3DbM=
2-
github.com/koykov/bitset v0.0.0-20220630192021-a713ee0d389d/go.mod h1:Mg0YlEO6vG0Vd2N5qZhyH1ss2ZuuMZR8iYNKshQcm6I=
1+
github.com/koykov/bitset v1.0.0 h1:2mEbAhKelhpdWqnpa+mR3HRhdMsto5od7ACOi6MIAmk=
2+
github.com/koykov/bitset v1.0.0/go.mod h1:DVR3bH49c1oOcNtD38h+aQq7lp1ZY91cXmjOldlTk8A=

queue.go

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -301,9 +301,6 @@ func (q *Queue) close(force bool) error {
301301
// Wait till all enqueue operations will finish.
302302
for atomic.LoadInt64(&q.enqlock) > 0 {
303303
}
304-
// Close the stream.
305-
// Please note, this is not the end for regular close case. Workers continue works while queue has items.
306-
close(q.stream)
307304

308305
if force {
309306
// Immediately stop all active/sleeping workers.
@@ -329,6 +326,10 @@ func (q *Queue) close(force bool) error {
329326
}
330327
}
331328
}
329+
// Close the stream.
330+
// Please note, this is not the end for regular close case. Workers continue works while queue has items.
331+
close(q.stream)
332+
332333
return nil
333334
}
334335

worker.go

Lines changed: 38 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,11 @@ type worker struct {
2727
idx uint32
2828
// Status of the worker.
2929
status WorkerStatus
30-
// Pause channel between put to sleep and stop.
31-
pause chan struct{}
30+
// Channel to control sleep and stop states.
31+
// This channel delivers two signals:
32+
// * wakeup for slept workers
33+
// * force close for active workers
34+
ctl chan struct{}
3235
// Last signal timestamp.
3336
lastTS int64
3437
// Worker instance.
@@ -42,7 +45,7 @@ func makeWorker(idx uint32, config *Config) *worker {
4245
w := &worker{
4346
idx: idx,
4447
status: WorkerStatusIdle,
45-
pause: make(chan struct{}, 1),
48+
ctl: make(chan struct{}, 1),
4649
proc: config.Worker,
4750
config: config,
4851
}
@@ -70,7 +73,7 @@ func (w *worker) await(queue *Queue) {
7073
switch w.getStatus() {
7174
case WorkerStatusSleep:
7275
// Wait config.SleepInterval.
73-
<-w.pause
76+
<-w.ctl
7477
case WorkerStatusActive:
7578
// Read itm from the stream.
7679
itm, ok := <-queue.stream
@@ -81,15 +84,30 @@ func (w *worker) await(queue *Queue) {
8184
}
8285
w.mw().QueuePull()
8386

87+
var intr bool
8488
// Check delayed execution.
8589
if itm.dexpire > 0 {
8690
now := queue.clk().Now().UnixNano()
8791
if delta := time.Duration(itm.dexpire - now); delta > 0 {
8892
// Processing time has not yet arrived. So wait till delay ends.
89-
time.Sleep(delta)
93+
select {
94+
case <-time.After(delta):
95+
break
96+
case <-w.ctl:
97+
// Waiting interrupted due to force close signal.
98+
intr = true
99+
// Calculate real wait time.
100+
delta = time.Duration(queue.clk().Now().UnixNano() - now)
101+
break
102+
}
90103
w.mw().WorkerWait(w.idx, delta)
91104
}
92105
}
106+
if intr {
107+
// Return item back to the queue due to interrupt signal.
108+
_ = queue.renqueue(&itm)
109+
return
110+
}
93111

94112
// Forward itm to dequeuer.
95113
if err := w.proc.Do(itm.payload); err != nil {
@@ -137,7 +155,7 @@ func (w *worker) wakeup() {
137155
}
138156
w.setStatus(WorkerStatusActive)
139157
w.mw().WorkerWakeup(w.idx)
140-
w.pause <- struct{}{}
158+
w.notifyCtl()
141159
}
142160

143161
// Stop (or force stop) worker.
@@ -151,8 +169,19 @@ func (w *worker) stop(force bool) {
151169
}
152170
w.mw().WorkerStop(w.idx, force, w.getStatus())
153171
w.setStatus(WorkerStatusIdle)
154-
// Notify pause channel about stop.
155-
w.pause <- struct{}{}
172+
w.notifyCtl()
173+
}
174+
175+
// Check if ctl channel is empty and send signal (wakeup or force close).
176+
func (w *worker) notifyCtl() {
177+
// Check ctl channel for previously undelivered signal.
178+
if len(w.ctl) > 0 {
179+
// Clear ctl channel to prevent locking.
180+
_, _ = <-w.ctl
181+
}
182+
183+
// Send stop signal to ctl channel.
184+
w.ctl <- struct{}{}
156185
}
157186

158187
// Set worker status.
@@ -167,7 +196,7 @@ func (w *worker) getStatus() WorkerStatus {
167196

168197
// Check if worker slept enough time.
169198
func (w *worker) sleptEnough() bool {
170-
dur := time.Duration(time.Now().UnixNano() - atomic.LoadInt64(&w.lastTS))
199+
dur := time.Duration(w.c().Clock.Now().UnixNano() - atomic.LoadInt64(&w.lastTS))
171200
return dur >= w.c().SleepInterval
172201
}
173202

0 commit comments

Comments
 (0)