Skip to content

Commit 7fc123f

Browse files
committed
*: add drpc client and server metrics
Add metrics collection for drpc client connections, server connections, and stream pool. - Introduce Counter, Observer, and Gauge metric interfaces in the drpc package, allowing us to plug in the metrics implementation from cockroach - Client metrics: bytes sent/received - Server metrics: bytes sent/received, TLS handshake errors, inactivity timeouts, and per-RPC stream lifecycle metrics (total, active, duration, errors) with labels for service/method/type - Pool metrics: pool size, connection hits/misses, connection age
1 parent 4b35ab8 commit 7fc123f

8 files changed

Lines changed: 918 additions & 11 deletions

File tree

drpcconn/conn.go

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -19,14 +19,25 @@ import (
1919
"storj.io/drpc/internal/drpcopts"
2020
)
2121

22+
// ClientMetrics holds optional metrics that the client connection will
23+
// populate during operation.
24+
type ClientMetrics struct {
25+
BytesSent drpc.Counter
26+
BytesRecv drpc.Counter
27+
}
28+
2229
// Options controls configuration settings for a conn.
2330
type Options struct {
2431
// Manager controls the options we pass to the manager of this conn.
2532
Manager drpcmanager.Options
2633

27-
// CollectStats controls whether the server should collect stats on the
34+
// CollectStats controls whether the client should collect stats on the
2835
// rpcs it creates.
2936
CollectStats bool
37+
38+
// Metrics holds optional metrics the conn will populate. If nil, no
39+
// metrics are recorded.
40+
Metrics *ClientMetrics
3041
}
3142

3243
// Conn is a drpc client connection.
@@ -51,6 +62,15 @@ func NewWithOptions(tr drpc.Transport, opts Options) *Conn {
5162
tr: tr,
5263
}
5364

65+
if opts.Metrics != nil && (opts.Metrics.BytesSent != nil || opts.Metrics.BytesRecv != nil) {
66+
tr = &drpc.MeteredTransport{
67+
Transport: tr,
68+
BytesSent: opts.Metrics.BytesSent,
69+
BytesRecv: opts.Metrics.BytesRecv,
70+
}
71+
c.tr = tr
72+
}
73+
5474
if opts.CollectStats {
5575
drpcopts.SetManagerStatsCB(&opts.Manager.Internal, c.getStats)
5676
c.stats = make(map[string]*drpcstats.Stats)

drpcmux/mux.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,13 @@ func (m *Mux) Register(srv interface{}, desc drpc.Description) error {
6767
return nil
6868
}
6969

70+
// IsUnitary reports whether the named RPC has unary input and output.
71+
// It returns false if the RPC is not registered or is a streaming RPC.
72+
func (m *Mux) IsUnitary(rpc string) bool {
73+
data, ok := m.rpcs[rpc]
74+
return ok && data.unitary
75+
}
76+
7077
// registerOne does the work to register a single rpc.
7178
func (m *Mux) registerOne(
7279
srv interface{}, rpc string, enc drpc.Encoding, receiver drpc.Receiver, method interface{},

drpcpool/entry.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,12 @@ import (
99
)
1010

1111
type entry[K comparable, V Conn] struct {
12-
key K
13-
val V
14-
exp *time.Timer
15-
global node[K, V]
16-
local node[K, V]
12+
key K
13+
val V
14+
exp *time.Timer
15+
created time.Time
16+
global node[K, V]
17+
local node[K, V]
1718
}
1819

1920
func (e *entry[K, V]) String() string {

drpcpool/pool.go

Lines changed: 48 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,18 @@ import (
1111

1212
"github.com/zeebo/errs"
1313

14+
"storj.io/drpc"
1415
"storj.io/drpc/drpcdebug"
1516
)
1617

18+
// PoolMetrics holds optional metrics for connection pool monitoring.
19+
type PoolMetrics struct {
20+
PoolSize drpc.Gauge
21+
ConnectionHitsTotal drpc.Counter
22+
ConnectionMissesTotal drpc.Counter
23+
ConnectionAgeSeconds drpc.Observer
24+
}
25+
1726
// Options contains the options to configure a pool.
1827
type Options struct {
1928
// Expiration will remove any values from the Pool after the
@@ -28,6 +37,10 @@ type Options struct {
2837
// the Pool holds unlimited for any single key. Negative means
2938
// no values for any single key.
3039
KeyCapacity int
40+
41+
// Metrics holds optional metrics the pool will populate. If nil,
42+
// no metrics are recorded.
43+
Metrics *PoolMetrics
3144
}
3245

3346
// Pool is a connection pool with key type K. It maintains a cache of connections
@@ -49,6 +62,30 @@ func New[K comparable, V Conn](opts Options) *Pool[K, V] {
4962
}
5063
}
5164

65+
func (p *Pool[K, V]) recordHit(created time.Time) {
66+
if p.opts.Metrics == nil {
67+
return
68+
}
69+
if p.opts.Metrics.ConnectionHitsTotal != nil {
70+
p.opts.Metrics.ConnectionHitsTotal.Add(nil, 1)
71+
}
72+
if p.opts.Metrics.ConnectionAgeSeconds != nil {
73+
p.opts.Metrics.ConnectionAgeSeconds.Observe(nil, time.Since(created).Seconds())
74+
}
75+
}
76+
77+
func (p *Pool[K, V]) recordMiss() {
78+
if p.opts.Metrics != nil && p.opts.Metrics.ConnectionMissesTotal != nil {
79+
p.opts.Metrics.ConnectionMissesTotal.Add(nil, 1)
80+
}
81+
}
82+
83+
func (p *Pool[K, V]) recordPoolSizeChange(delta float64) {
84+
if p.opts.Metrics != nil && p.opts.Metrics.PoolSize != nil {
85+
p.opts.Metrics.PoolSize.Add(nil, delta)
86+
}
87+
}
88+
5289
func (p *Pool[K, V]) log(what string, cb func() string) {
5390
if drpcdebug.Enabled {
5491
drpcdebug.Log(func() (_, _, _ string) { return fmt.Sprintf("<pül %p>", p), what, cb() })
@@ -62,12 +99,14 @@ func (p *Pool[K, V]) Close() (err error) {
6299
defer p.mu.Unlock()
63100

64101
var eg errs.Group
102+
count := p.order.count
65103
for ent := p.order.head; ent != nil; ent = ent.global.next {
66104
eg.Add(p.closeEntry(ent))
67105
}
68106

69107
p.entries = make(map[K]*list[K, V])
70108
p.order = list[K, V]{}
109+
p.recordPoolSizeChange(-float64(count))
71110

72111
return eg.Err()
73112
}
@@ -99,6 +138,7 @@ func (p *Pool[K, V]) removeEntry(ent *entry[K, V]) {
99138

100139
local.removeEntry(ent, (*entry[K, V]).localList)
101140
p.order.removeEntry(ent, (*entry[K, V]).globalList)
141+
p.recordPoolSizeChange(-1)
102142

103143
if local.count == 0 {
104144
delete(p.entries, ent.key)
@@ -123,6 +163,7 @@ func (p *Pool[K, V]) Take(key K) (V, bool) {
123163

124164
local := p.entries[key]
125165
if local == nil {
166+
p.recordMiss()
126167
return *new(V), false
127168
}
128169

@@ -137,6 +178,7 @@ func (p *Pool[K, V]) Take(key K) (V, bool) {
137178

138179
local.removeEntry(ent, (*entry[K, V]).localList)
139180
p.order.removeEntry(ent, (*entry[K, V]).globalList)
181+
p.recordPoolSizeChange(-1)
140182

141183
if ent.exp != nil && !ent.exp.Stop() {
142184
continue
@@ -145,9 +187,11 @@ func (p *Pool[K, V]) Take(key K) (V, bool) {
145187
}
146188

147189
p.log("TAKEN", ent.String)
190+
p.recordHit(ent.created)
148191
return ent.val, true
149192
}
150193

194+
p.recordMiss()
151195
return *new(V), false
152196
}
153197

@@ -177,6 +221,7 @@ func (p *Pool[K, V]) Put(key K, val V) {
177221

178222
local.removeEntry(ent, (*entry[K, V]).localList)
179223
p.order.removeEntry(ent, (*entry[K, V]).globalList)
224+
p.recordPoolSizeChange(-1)
180225
}
181226

182227
for p.opts.Capacity != 0 && p.order.count >= p.opts.Capacity {
@@ -187,15 +232,17 @@ func (p *Pool[K, V]) Put(key K, val V) {
187232

188233
local.removeEntry(ent, (*entry[K, V]).localList)
189234
p.order.removeEntry(ent, (*entry[K, V]).globalList)
235+
p.recordPoolSizeChange(-1)
190236

191237
if local.count == 0 {
192238
delete(p.entries, ent.key)
193239
}
194240
}
195241

196-
ent := &entry[K, V]{key: key, val: val}
242+
ent := &entry[K, V]{key: key, val: val, created: time.Now()}
197243
local.appendEntry(ent, (*entry[K, V]).localList)
198244
p.order.appendEntry(ent, (*entry[K, V]).globalList)
245+
p.recordPoolSizeChange(1)
199246
p.log("PUT", ent.String)
200247

201248
if p.opts.Expiration > 0 {

drpcpool/pool_test.go

Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -419,6 +419,144 @@ func TestPool_StreamContext(t *testing.T) {
419419
}
420420
}
421421

422+
//
423+
// pool metrics tests
424+
//
425+
426+
type testPoolCounter struct {
427+
total float64
428+
}
429+
430+
func (c *testPoolCounter) Add(_ map[string]string, value float64) {
431+
c.total += value
432+
}
433+
434+
type testPoolGauge struct {
435+
total float64
436+
}
437+
438+
func (g *testPoolGauge) Add(_ map[string]string, value float64) {
439+
g.total += value
440+
}
441+
442+
type testPoolObserver struct {
443+
calls int
444+
last float64
445+
}
446+
447+
func (o *testPoolObserver) Observe(_ map[string]string, value float64) {
448+
o.calls++
449+
o.last = value
450+
}
451+
452+
func TestPoolMetrics_PutTakeClose(t *testing.T) {
453+
ctx := drpctest.NewTracker(t)
454+
defer ctx.Close()
455+
456+
poolSize := &testPoolGauge{}
457+
hits := &testPoolCounter{}
458+
misses := &testPoolCounter{}
459+
age := &testPoolObserver{}
460+
461+
pool := New[string, Conn](Options{
462+
Capacity: 10,
463+
Metrics: &PoolMetrics{
464+
PoolSize: poolSize,
465+
ConnectionHitsTotal: hits,
466+
ConnectionMissesTotal: misses,
467+
ConnectionAgeSeconds: age,
468+
},
469+
})
470+
471+
// Miss on empty pool.
472+
_, ok := pool.Take("key")
473+
assert.That(t, !ok)
474+
assert.Equal(t, misses.total, 1.0)
475+
assert.Equal(t, hits.total, 0.0)
476+
477+
// Put a connection.
478+
conn := &callbackConn{
479+
ClosedFn: func() <-chan struct{} { return nil },
480+
UnblockedFn: func() <-chan struct{} { return closedCh },
481+
}
482+
pool.Put("key", conn)
483+
assert.Equal(t, poolSize.total, 1.0)
484+
485+
// Take it back — should be a hit.
486+
val, ok := pool.Take("key")
487+
assert.That(t, ok)
488+
assert.Equal(t, val, Conn(conn))
489+
assert.Equal(t, hits.total, 1.0)
490+
assert.Equal(t, poolSize.total, 0.0) // decremented on take
491+
assert.Equal(t, age.calls, 1)
492+
assert.That(t, age.last >= 0)
493+
494+
// Miss again now that pool is empty.
495+
_, ok = pool.Take("key")
496+
assert.That(t, !ok)
497+
assert.Equal(t, misses.total, 2.0)
498+
499+
// Put two, then Close.
500+
conn2 := &callbackConn{
501+
ClosedFn: func() <-chan struct{} { return nil },
502+
UnblockedFn: func() <-chan struct{} { return closedCh },
503+
}
504+
pool.Put("key", conn)
505+
pool.Put("key", conn2)
506+
assert.Equal(t, poolSize.total, 2.0)
507+
508+
assert.NoError(t, pool.Close())
509+
assert.Equal(t, poolSize.total, 0.0)
510+
}
511+
512+
func TestPoolMetrics_Eviction(t *testing.T) {
513+
poolSize := &testPoolGauge{}
514+
misses := &testPoolCounter{}
515+
516+
pool := New[string, Conn](Options{
517+
Capacity: 1,
518+
KeyCapacity: 1,
519+
Metrics: &PoolMetrics{
520+
PoolSize: poolSize,
521+
ConnectionMissesTotal: misses,
522+
},
523+
})
524+
defer func() { _ = pool.Close() }()
525+
526+
conn1 := &callbackConn{
527+
ClosedFn: func() <-chan struct{} { return nil },
528+
UnblockedFn: func() <-chan struct{} { return closedCh },
529+
}
530+
conn2 := &callbackConn{
531+
ClosedFn: func() <-chan struct{} { return nil },
532+
UnblockedFn: func() <-chan struct{} { return closedCh },
533+
}
534+
535+
pool.Put("key", conn1)
536+
assert.Equal(t, poolSize.total, 1.0)
537+
538+
// Putting conn2 with KeyCapacity=1 should evict conn1.
539+
pool.Put("key", conn2)
540+
// One eviction (-1) + one put (+1) = net 1.
541+
assert.Equal(t, poolSize.total, 1.0)
542+
}
543+
544+
func TestPoolMetrics_NilFields(t *testing.T) {
545+
// All PoolMetrics fields are nil — should not panic.
546+
pool := New[string, Conn](Options{
547+
Metrics: &PoolMetrics{},
548+
})
549+
550+
conn := &callbackConn{
551+
ClosedFn: func() <-chan struct{} { return nil },
552+
UnblockedFn: func() <-chan struct{} { return closedCh },
553+
}
554+
pool.Put("key", conn)
555+
pool.Take("key")
556+
pool.Take("miss")
557+
assert.NoError(t, pool.Close())
558+
}
559+
422560
func BenchmarkPool(b *testing.B) {
423561
ctx := drpctest.NewTracker(b)
424562
defer ctx.Close()

0 commit comments

Comments
 (0)