Skip to content

Commit 8484e6b

Browse files
authored
Merge pull request #21 from koykov/metrics-1
metrics: remove dependency of queue repo avoiding using internal types
2 parents 37ba537 + 8d4768e commit 8484e6b

10 files changed

Lines changed: 98 additions & 49 deletions

File tree

dummy.go

Lines changed: 15 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -6,21 +6,21 @@ import "time"
66
// Need just to reduce checks in code.
77
type DummyMetrics struct{}
88

9-
func (DummyMetrics) WorkerSetup(_, _, _ uint) {}
10-
func (DummyMetrics) WorkerInit(_ uint32) {}
11-
func (DummyMetrics) WorkerSleep(_ uint32) {}
12-
func (DummyMetrics) WorkerWakeup(_ uint32) {}
13-
func (DummyMetrics) WorkerWait(_ uint32, _ time.Duration) {}
14-
func (DummyMetrics) WorkerStop(_ uint32, _ bool, _ WorkerStatus) {}
15-
func (DummyMetrics) QueuePut() {}
16-
func (DummyMetrics) QueuePull() {}
17-
func (DummyMetrics) QueueRetry() {}
18-
func (DummyMetrics) QueueLeak(_ LeakDirection) {}
19-
func (DummyMetrics) QueueDeadline() {}
20-
func (DummyMetrics) QueueLost() {}
21-
func (DummyMetrics) SubqPut(_ string) {}
22-
func (DummyMetrics) SubqPull(_ string) {}
23-
func (DummyMetrics) SubqLeak(_ string) {}
9+
func (DummyMetrics) WorkerSetup(_, _, _ uint) {}
10+
func (DummyMetrics) WorkerInit(_ uint32) {}
11+
func (DummyMetrics) WorkerSleep(_ uint32) {}
12+
func (DummyMetrics) WorkerWakeup(_ uint32) {}
13+
func (DummyMetrics) WorkerWait(_ uint32, _ time.Duration) {}
14+
func (DummyMetrics) WorkerStop(_ uint32, _ bool, _ string) {}
15+
func (DummyMetrics) QueuePut() {}
16+
func (DummyMetrics) QueuePull() {}
17+
func (DummyMetrics) QueueRetry() {}
18+
func (DummyMetrics) QueueLeak(_ string) {}
19+
func (DummyMetrics) QueueDeadline() {}
20+
func (DummyMetrics) QueueLost() {}
21+
func (DummyMetrics) SubqPut(_ string) {}
22+
func (DummyMetrics) SubqPull(_ string) {}
23+
func (DummyMetrics) SubqLeak(_ string) {}
2424

2525
// DummyDLQ is a stub DLQ implementation. It does nothing and need for queues with leak tolerance.
2626
// It just leaks data to the trash.

leak.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,16 @@ const (
99
// LeakDirectionFront takes old item from queue front and redirects it to DLQ. Thus releases space for the new
1010
// incoming item in the queue.
1111
LeakDirectionFront
12-
13-
defaultFrontLeakAttempts = 5
1412
)
13+
14+
const defaultFrontLeakAttempts = 5
15+
16+
func (ld LeakDirection) String() string {
17+
switch ld {
18+
case LeakDirectionRear:
19+
return "rear"
20+
case LeakDirectionFront:
21+
return "front"
22+
}
23+
return "unknown"
24+
}

metrics.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ type MetricsWriter interface {
1717
// WorkerWait registers how many worker waits due to delayed execution.
1818
WorkerWait(idx uint32, dur time.Duration)
1919
// WorkerStop registers when sleeping worker stops.
20-
WorkerStop(idx uint32, force bool, status WorkerStatus)
20+
WorkerStop(idx uint32, force bool, status string)
2121
// QueuePut registers income of new item to the queue.
2222
QueuePut()
2323
// QueuePull registers outgoing of item from the queue.
@@ -26,7 +26,7 @@ type MetricsWriter interface {
2626
QueueRetry()
2727
// QueueLeak registers item's leak from the full queue.
2828
// Param dir indicates leak direction and may be "rear" or "front".
29-
QueueLeak(dir LeakDirection)
29+
QueueLeak(direction string)
3030
// QueueDeadline registers amount of skipped processing of items due to deadline.
3131
QueueDeadline()
3232
// QueueLost registers lost items missed queue and DLQ.

metrics/log/go.mod

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
module github.com/koykov/queue/metrics/log
2+
3+
go 1.18

metrics/log/writer.go

Lines changed: 8 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,6 @@ package log
33
import (
44
"log"
55
"time"
6-
7-
q "github.com/koykov/queue"
86
)
97

108
// MetricsWriter is Log implementation of queue.MetricsWriter.
@@ -41,9 +39,9 @@ func (w MetricsWriter) WorkerWait(idx uint32, delay time.Duration) {
4139
log.Printf("queue %s: worker %d waits %s\n", w.name, idx, delay)
4240
}
4341

44-
func (w MetricsWriter) WorkerStop(idx uint32, force bool, status q.WorkerStatus) {
42+
func (w MetricsWriter) WorkerStop(idx uint32, force bool, status string) {
4543
if force {
46-
log.Printf("queue %s: worker %d caught force stop signal (current status %d)\n", w.name, idx, status)
44+
log.Printf("queue %s: worker %d caught force stop signal (current status %s)\n", w.name, idx, status)
4745
} else {
4846
log.Printf("queue %s: worker %d caught stop signal\n", w.name, idx)
4947
}
@@ -61,12 +59,8 @@ func (w MetricsWriter) QueueRetry() {
6159
log.Printf("queue %s: retry item processing due to fail\n", w.name)
6260
}
6361

64-
func (w MetricsWriter) QueueLeak(dir q.LeakDirection) {
65-
dirs := "rear"
66-
if dir == q.LeakDirectionFront {
67-
dirs = "front"
68-
}
69-
log.Printf("queue %s: queue leak from %s\n", w.name, dirs)
62+
func (w MetricsWriter) QueueLeak(direction string) {
63+
log.Printf("queue %s: queue leak from %s\n", w.name, direction)
7064
}
7165

7266
func (w MetricsWriter) QueueDeadline() {
@@ -77,14 +71,14 @@ func (w MetricsWriter) QueueLost() {
7771
log.Printf("queue %s: queue lost\n", w.name)
7872
}
7973

80-
func (w MetricsWriter) SubQueuePut(subq string) {
74+
func (w MetricsWriter) SubqPut(subq string) {
8175
log.Printf("queue %s/%s: new item come to the queue\n", w.name, subq)
8276
}
8377

84-
func (w MetricsWriter) SubQueuePull(subq string) {
78+
func (w MetricsWriter) SubqPull(subq string) {
8579
log.Printf("queue %s/%s: item leave the queue\n", w.name, subq)
8680
}
8781

88-
func (w MetricsWriter) SubQueueDrop(subq string) {
89-
log.Printf("queue %s/%s: queue drop item\n", w.name, subq)
82+
func (w MetricsWriter) SubqLeak(subq string) {
83+
log.Printf("queue %s/%s: queue leak item\n", w.name, subq)
9084
}

metrics/prometheus/go.mod

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
module github.com/koykov/queue/metrics/prometheus
2+
3+
go 1.18
4+
5+
require github.com/prometheus/client_golang v1.19.1
6+
7+
require (
8+
github.com/beorn7/perks v1.0.1 // indirect
9+
github.com/cespare/xxhash/v2 v2.2.0 // indirect
10+
github.com/prometheus/client_model v0.5.0 // indirect
11+
github.com/prometheus/common v0.48.0 // indirect
12+
github.com/prometheus/procfs v0.12.0 // indirect
13+
golang.org/x/sys v0.17.0 // indirect
14+
google.golang.org/protobuf v1.33.0 // indirect
15+
)

metrics/prometheus/go.sum

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM=
2+
github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw=
3+
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
4+
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
5+
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
6+
github.com/prometheus/client_golang v1.19.1 h1:wZWJDwK+NameRJuPGDhlnFgx8e8HN3XHQeLaYJFJBOE=
7+
github.com/prometheus/client_golang v1.19.1/go.mod h1:mP78NwGzrVks5S2H6ab8+ZZGJLZUq1hoULYBAYBw1Ho=
8+
github.com/prometheus/client_model v0.5.0 h1:VQw1hfvPvk3Uv6Qf29VrPF32JB6rtbgI6cYPYQjL0Qw=
9+
github.com/prometheus/client_model v0.5.0/go.mod h1:dTiFglRmd66nLR9Pv9f0mZi7B7fk5Pm3gvsjB5tr+kI=
10+
github.com/prometheus/common v0.48.0 h1:QO8U2CdOzSn1BBsmXJXduaaW+dY/5QLjfB8svtSzKKE=
11+
github.com/prometheus/common v0.48.0/go.mod h1:0/KsvlIEfPQCQ5I2iNSAWKPZziNCvRs5EC6ILDTlAPc=
12+
github.com/prometheus/procfs v0.12.0 h1:jluTpSng7V9hY0O2R9DzzJHYb2xULk9VTR1V1R/k6Bo=
13+
github.com/prometheus/procfs v0.12.0/go.mod h1:pcuDEFsWDnvcgNzo4EEweacyhjeA9Zk3cnaOZAZEfOo=
14+
golang.org/x/sys v0.17.0 h1:25cE3gD+tdBA7lp7QfhuV+rJiE9YXTcS3VG1SqssI/Y=
15+
golang.org/x/sys v0.17.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
16+
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
17+
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=

metrics/prometheus/writer.go

Lines changed: 5 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,6 @@ package prometheus
33
import (
44
"time"
55

6-
q "github.com/koykov/queue"
76
"github.com/prometheus/client_golang/prometheus"
87
)
98

@@ -141,13 +140,13 @@ func (w MetricsWriter) WorkerWait(_ uint32, delay time.Duration) {
141140
promWorkerWait.WithLabelValues(w.name).Observe(float64(delay.Nanoseconds() / int64(w.prec)))
142141
}
143142

144-
func (w MetricsWriter) WorkerStop(_ uint32, force bool, status q.WorkerStatus) {
143+
func (w MetricsWriter) WorkerStop(_ uint32, force bool, status string) {
145144
promWorkerIdle.WithLabelValues(w.name).Inc()
146145
if force {
147146
switch status {
148-
case q.WorkerStatusActive:
147+
case "active":
149148
promWorkerActive.WithLabelValues(w.name).Add(-1)
150-
case q.WorkerStatusSleep:
149+
case "sleep":
151150
promWorkerSleep.WithLabelValues(w.name).Add(-1)
152151
}
153152
} else {
@@ -169,12 +168,8 @@ func (w MetricsWriter) QueueRetry() {
169168
promQueueRetry.WithLabelValues(w.name).Inc()
170169
}
171170

172-
func (w MetricsWriter) QueueLeak(dir q.LeakDirection) {
173-
dirs := "rear"
174-
if dir == q.LeakDirectionFront {
175-
dirs = "front"
176-
}
177-
promQueueLeak.WithLabelValues(w.name, dirs).Inc()
171+
func (w MetricsWriter) QueueLeak(direction string) {
172+
promQueueLeak.WithLabelValues(w.name, direction).Inc()
178173
promQueueSize.WithLabelValues(w.name).Dec()
179174
}
180175

queue.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -298,7 +298,7 @@ func (q *Queue) renqueue(itm *item) (err error) {
298298
q.mw().QueueLost()
299299
return
300300
}
301-
q.mw().QueueLeak(LeakDirectionFront)
301+
q.mw().QueueLeak(LeakDirectionFront.String())
302302
if q.engine.enqueue(itm, false) {
303303
return
304304
} else {
@@ -309,7 +309,7 @@ func (q *Queue) renqueue(itm *item) (err error) {
309309
}
310310
// Rear direction, just leak item.
311311
err = q.c().DLQ.Enqueue(itm.payload)
312-
q.mw().QueueLeak(LeakDirectionRear)
312+
q.mw().QueueLeak(LeakDirectionRear.String())
313313
}
314314
} else {
315315
// Regular put (blocking mode).
@@ -383,7 +383,7 @@ func (q *Queue) close(force bool) error {
383383
itm, _ := q.engine.dequeue()
384384
if q.CheckBit(flagLeaky) {
385385
_ = q.c().DLQ.Enqueue(itm.payload)
386-
q.mw().QueueLeak(LeakDirectionFront)
386+
q.mw().QueueLeak(LeakDirectionFront.String())
387387
} else {
388388
q.mw().QueueLost()
389389
}

worker.go

Lines changed: 18 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,28 @@ import (
66
)
77

88
type WorkerStatus uint32
9-
type signal uint32
109

1110
const (
1211
WorkerStatusIdle WorkerStatus = iota
1312
WorkerStatusActive
1413
WorkerStatusSleep
14+
)
15+
16+
func (s WorkerStatus) String() string {
17+
switch s {
18+
case WorkerStatusIdle:
19+
return "idle"
20+
case WorkerStatusActive:
21+
return "active"
22+
case WorkerStatusSleep:
23+
return "sleep"
24+
}
25+
return "unknown"
26+
}
27+
28+
type signal uint32
1529

30+
const (
1631
sigInit signal = iota
1732
sigSleep
1833
sigWakeup
@@ -133,7 +148,7 @@ func (w *worker) await(queue *Queue) {
133148
_ = queue.renqueue(&itm)
134149
} else if queue.CheckBit(flagLeaky) && w.c().FailToDLQ {
135150
_ = w.c().DLQ.Enqueue(itm.payload)
136-
w.mw().QueueLeak(LeakDirectionFront)
151+
w.mw().QueueLeak(LeakDirectionFront.String())
137152
}
138153
}
139154
case WorkerStatusIdle:
@@ -180,7 +195,7 @@ func (w *worker) stop(force bool) {
180195
}
181196
w.l().Printf(msg, w.idx)
182197
}
183-
w.mw().WorkerStop(w.idx, force, w.getStatus())
198+
w.mw().WorkerStop(w.idx, force, w.getStatus().String())
184199
w.setStatus(WorkerStatusIdle)
185200
w.notifyCtl()
186201
}

0 commit comments

Comments
 (0)