Skip to content

Commit 6fc3dfa

Browse files
authored
Merge pull request #5 from koykov/leak-direction-1
Constants in leak metrics.
2 parents 62ee180 + ef53e65 commit 6fc3dfa

5 files changed

Lines changed: 6 additions & 17 deletions

File tree

dummy.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ func (DummyMetrics) WorkerStop(_ uint32, _ bool, _ WorkerStatus) {}
1515
func (DummyMetrics) QueuePut() {}
1616
func (DummyMetrics) QueuePull() {}
1717
func (DummyMetrics) QueueRetry() {}
18-
func (DummyMetrics) QueueLeak(_ string) {}
18+
func (DummyMetrics) QueueLeak(_ LeakDirection) {}
1919
func (DummyMetrics) QueueLost() {}
2020

2121
// DummyDLQ is a stub DLQ implementation. It does nothing and need for queues with leak tolerance.

leak.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -12,14 +12,3 @@ const (
1212

1313
defaultFrontLeakAttempts = 5
1414
)
15-
16-
func (ld LeakDirection) String() string {
17-
switch ld {
18-
case LeakDirectionRear:
19-
return "rear"
20-
case LeakDirectionFront:
21-
return "front"
22-
default:
23-
return "unknown"
24-
}
25-
}

metrics.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ type MetricsWriter interface {
2626
QueueRetry()
2727
// QueueLeak registers item's leak from the full queue.
2828
// Param dir indicates leak direction and may be "rear" or "front".
29-
QueueLeak(dir string)
29+
QueueLeak(dir LeakDirection)
3030
// QueueLost registers lost items missed queue and DLQ.
3131
QueueLost()
3232
}

queue.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -255,7 +255,7 @@ func (q *Queue) renqueue(itm *item) (err error) {
255255
if err = q.c().DLQ.Enqueue(itmf.payload); err != nil {
256256
return
257257
}
258-
q.mw().QueueLeak(LeakDirectionFront.String())
258+
q.mw().QueueLeak(LeakDirectionFront)
259259
select {
260260
case q.stream <- *itm:
261261
return
@@ -267,7 +267,7 @@ func (q *Queue) renqueue(itm *item) (err error) {
267267
}
268268
// Rear direction, just leak item.
269269
err = q.c().DLQ.Enqueue(itm.payload)
270-
q.mw().QueueLeak(LeakDirectionRear.String())
270+
q.mw().QueueLeak(LeakDirectionRear)
271271
}
272272
} else {
273273
// Regular put (blocking mode).
@@ -341,7 +341,7 @@ func (q *Queue) close(force bool) error {
341341
itm := <-q.stream
342342
if q.CheckBit(flagLeaky) {
343343
_ = q.c().DLQ.Enqueue(itm.payload)
344-
q.mw().QueueLeak(LeakDirectionFront.String())
344+
q.mw().QueueLeak(LeakDirectionFront)
345345
} else {
346346
q.mw().QueueLost()
347347
}

worker.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ func (w *worker) await(queue *Queue) {
120120
_ = queue.renqueue(&itm)
121121
} else if queue.CheckBit(flagLeaky) && w.c().FailToDLQ {
122122
_ = w.c().DLQ.Enqueue(itm.payload)
123-
w.mw().QueueLeak(w.c().LeakDirection.String())
123+
w.mw().QueueLeak(w.c().LeakDirection)
124124
}
125125
}
126126
case WorkerStatusIdle:

0 commit comments

Comments
 (0)