diff --git a/README.md b/README.md index 56702ff..50c5442 100644 --- a/README.md +++ b/README.md @@ -209,7 +209,6 @@ DRPC is proud to get as much done in as few lines of code as possible. It's the | storj.io/drpc/drpcerr | 42 | | storj.io/drpc/drpcctx | 41 | | storj.io/drpc/internal/drpcopts | 30 | -| storj.io/drpc/drpcstats | 25 | | storj.io/drpc/drpcdebug | 22 | | storj.io/drpc/drpcenc | 15 | | **Total** | **3611** | diff --git a/drpcconn/README.md b/drpcconn/README.md index 702a1ad..483a007 100644 --- a/drpcconn/README.md +++ b/drpcconn/README.md @@ -60,13 +60,6 @@ func (c *Conn) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (_ NewStream begins a streaming rpc on the connection. Only one Invoke or Stream may be open at a time. -#### func (*Conn) Stats - -```go -func (c *Conn) Stats() map[string]drpcstats.Stats -``` -Stats returns the collected stats grouped by rpc. - #### func (*Conn) Transport ```go @@ -89,10 +82,6 @@ called concurrently with Invoke or NewStream. type Options struct { // Manager controls the options we pass to the manager of this conn. Manager drpcmanager.Options - - // CollectStats controls whether the server should collect stats on the - // rpcs it creates. - CollectStats bool } ``` diff --git a/drpcconn/conn.go b/drpcconn/conn.go index 636f346..037aab8 100644 --- a/drpcconn/conn.go +++ b/drpcconn/conn.go @@ -13,20 +13,26 @@ import ( "storj.io/drpc/drpcenc" "storj.io/drpc/drpcmanager" "storj.io/drpc/drpcmetadata" - "storj.io/drpc/drpcstats" + "storj.io/drpc/drpcmetrics" "storj.io/drpc/drpcstream" "storj.io/drpc/drpcwire" - "storj.io/drpc/internal/drpcopts" ) +// ClientMetrics holds optional metrics that the client connection will +// populate during operation. +type ClientMetrics struct { + BytesSent drpcmetrics.Counter + BytesRecv drpcmetrics.Counter +} + // Options controls configuration settings for a conn. type Options struct { // Manager controls the options we pass to the manager of this conn. Manager drpcmanager.Options - // CollectStats controls whether the server should collect stats on the - // rpcs it creates. - CollectStats bool + // Metrics holds optional metrics the conn will populate. If nil, no + // metrics are recorded. + Metrics *ClientMetrics } // Conn is a drpc client connection. @@ -35,8 +41,6 @@ type Conn struct { man *drpcmanager.Manager mu sync.Mutex wbuf []byte - - stats map[string]*drpcstats.Stats } var _ drpc.Conn = (*Conn)(nil) @@ -47,43 +51,20 @@ func New(tr drpc.Transport) *Conn { return NewWithOptions(tr, Options{}) } // NewWithOptions returns a conn that uses the transport for reads and writes. // The Options control details of how the conn operates. func NewWithOptions(tr drpc.Transport, opts Options) *Conn { - c := &Conn{ - tr: tr, - } - - if opts.CollectStats { - drpcopts.SetManagerStatsCB(&opts.Manager.Internal, c.getStats) - c.stats = make(map[string]*drpcstats.Stats) - } - - c.man = drpcmanager.NewWithOptions(tr, opts.Manager) - - return c -} - -// Stats returns the collected stats grouped by rpc. -func (c *Conn) Stats() map[string]drpcstats.Stats { - c.mu.Lock() - defer c.mu.Unlock() - - stats := make(map[string]drpcstats.Stats, len(c.stats)) - for k, v := range c.stats { - stats[k] = v.AtomicClone() + if opts.Metrics != nil { + if opts.Metrics.BytesSent == nil { + opts.Metrics.BytesSent = drpcmetrics.NoOpCounter{} + } + if opts.Metrics.BytesRecv == nil { + opts.Metrics.BytesRecv = drpcmetrics.NoOpCounter{} + } + mt := &drpcmetrics.MeteredTransport{Transport: tr, BytesSent: opts.Metrics.BytesSent, BytesRecv: opts.Metrics.BytesRecv} + tr = mt } - return stats -} - -// getStats returns the drpcopts.Stats struct for the given rpc. -func (c *Conn) getStats(rpc string) *drpcstats.Stats { - c.mu.Lock() - defer c.mu.Unlock() - - stats := c.stats[rpc] - if stats == nil { - stats = new(drpcstats.Stats) - c.stats[rpc] = stats + return &Conn{ + tr: tr, + man: drpcmanager.NewWithOptions(tr, opts.Manager), } - return stats } // Transport returns the transport the conn is using. @@ -102,7 +83,9 @@ func (c *Conn) Close() (err error) { return c.man.Close() } // Invoke issues the rpc on the transport serializing in, waits for a response, and // deserializes it into out. Only one Invoke or Stream may be open at a time. -func (c *Conn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message) (err error) { +func (c *Conn) Invoke( + ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message, +) (err error) { defer func() { err = drpc.ToRPCErr(err) }() var metadata []byte @@ -134,7 +117,14 @@ func (c *Conn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, ou return nil } -func (c *Conn) doInvoke(stream *drpcstream.Stream, enc drpc.Encoding, rpc string, data []byte, metadata []byte, out drpc.Message) (err error) { +func (c *Conn) doInvoke( + stream *drpcstream.Stream, + enc drpc.Encoding, + rpc string, + data []byte, + metadata []byte, + out drpc.Message, +) (err error) { if len(metadata) > 0 { if err := stream.RawWrite(drpcwire.KindInvokeMetadata, metadata); err != nil { return err @@ -157,7 +147,9 @@ func (c *Conn) doInvoke(stream *drpcstream.Stream, enc drpc.Encoding, rpc string // NewStream begins a streaming rpc on the connection. Only one Invoke or Stream may // be open at a time. -func (c *Conn) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (_ drpc.Stream, err error) { +func (c *Conn) NewStream( + ctx context.Context, rpc string, enc drpc.Encoding, +) (_ drpc.Stream, err error) { defer func() { err = drpc.ToRPCErr(err) }() var metadata []byte diff --git a/drpcmanager/manager.go b/drpcmanager/manager.go index d730836..218ee30 100644 --- a/drpcmanager/manager.go +++ b/drpcmanager/manager.go @@ -15,7 +15,6 @@ import ( "github.com/zeebo/errs" grpcmetadata "google.golang.org/grpc/metadata" - "storj.io/drpc" "storj.io/drpc/drpcdebug" "storj.io/drpc/drpcmetadata" @@ -298,14 +297,12 @@ func (m *Manager) manageReader() { // // newStream creates a stream value with the appropriate configuration for this manager. -func (m *Manager) newStream(ctx context.Context, sid uint64, kind drpc.StreamKind, rpc string) (*drpcstream.Stream, error) { +func (m *Manager) newStream( + ctx context.Context, sid uint64, kind drpc.StreamKind, rpc string, +) (*drpcstream.Stream, error) { opts := m.opts.Stream drpcopts.SetStreamKind(&opts.Internal, kind) drpcopts.SetStreamRPC(&opts.Internal, rpc) - if cb := drpcopts.GetManagerStatsCB(&m.opts.Internal); cb != nil { - drpcopts.SetStreamStats(&opts.Internal, cb(rpc)) - } - stream := drpcstream.NewWithOptions(ctx, sid, m.wr, opts) select { case m.streams <- streamInfo{ctx: ctx, stream: stream}: @@ -425,7 +422,9 @@ func (m *Manager) Close() error { } // NewClientStream starts a stream on the managed transport for use by a client. -func (m *Manager) NewClientStream(ctx context.Context, rpc string) (stream *drpcstream.Stream, err error) { +func (m *Manager) NewClientStream( + ctx context.Context, rpc string, +) (stream *drpcstream.Stream, err error) { if err := m.acquireSemaphore(ctx); err != nil { return nil, err } @@ -436,7 +435,9 @@ func (m *Manager) NewClientStream(ctx context.Context, rpc string) (stream *drpc // NewServerStream starts a stream on the managed transport for use by a server. // It does this by waiting for the client to issue an invoke message and // returning the details. -func (m *Manager) NewServerStream(ctx context.Context) (stream *drpcstream.Stream, rpc string, err error) { +func (m *Manager) NewServerStream( + ctx context.Context, +) (stream *drpcstream.Stream, rpc string, err error) { if err := m.acquireSemaphore(ctx); err != nil { return nil, "", err } diff --git a/drpcmanager/manager_test.go b/drpcmanager/manager_test.go index 5918113..28d5aa4 100644 --- a/drpcmanager/manager_test.go +++ b/drpcmanager/manager_test.go @@ -15,7 +15,6 @@ import ( "github.com/zeebo/assert" grpcmetadata "google.golang.org/grpc/metadata" "storj.io/drpc/drpcmetadata" - "storj.io/drpc/drpctest" "storj.io/drpc/drpcwire" ) diff --git a/drpcmetrics/metrics.go b/drpcmetrics/metrics.go new file mode 100644 index 0000000..73e9426 --- /dev/null +++ b/drpcmetrics/metrics.go @@ -0,0 +1,120 @@ +package drpcmetrics + +import ( + "google.golang.org/grpc/status" + "storj.io/drpc" +) + +// Counter is a metric that can only be incremented (monotonically increasing). +// The labels parameter contains key-value pairs for metric dimensions +// (e.g. rpcService, rpcMethod). It may be nil when no +// dimensional context is available. +type Counter interface { + Add(labels map[string]string, value float64) +} + +// NoOpCounter is a Counter implementation that does nothing. +type NoOpCounter struct{} + +// Add implements Counter. +func (NoOpCounter) Add(map[string]string, float64) {} + +// Observer is a metric that records observed values (e.g. durations, +// sizes). The labels parameter contains key-value pairs for metric +// dimensions. It may be nil when no dimensional context is available. +type Observer interface { + Observe(labels map[string]string, value float64) +} + +// Gauge is a metric that can increase and decrease (e.g. pool size, +// active count). The labels parameter contains key-value pairs for +// metric dimensions. It may be nil when no dimensional context is +// available. +// +// Note: Gauge and Counter have identical method signatures but different +// semantics. Gauge values may go up or down; Counter values must only +// increase. +type Gauge interface { + Add(labels map[string]string, value float64) +} + +// NoOpGauge is a Gauge implementation that does nothing. +type NoOpGauge struct{} + +// Add implements Gauge. +func (NoOpGauge) Add(map[string]string, float64) {} + +// NoOpObserver is an Observer implementation that does nothing. +type NoOpObserver struct{} + +// Observe implements Observer. +func (NoOpObserver) Observe(map[string]string, float64) {} + +// Label constants for RPC metrics. These are the label keys used in the +// labels map passed to Counter.Add and Observer.Observe. +const ( + LabelRPCMethodName = "methodName" + LabelStatusCode = "statusCode" + LabelState = "state" + + StateStarted = "started" + StateCompleted = "completed" +) + +// StreamMetrics holds metrics for stream lifecycle tracking. +type StreamMetrics struct { + StreamsTotal Counter + StreamDuration Observer +} + +// MeteredTransport wraps a Transport and increments byte counters on each +// Read and Write call. +type MeteredTransport struct { + drpc.Transport + BytesSent Counter + BytesRecv Counter +} + +// Read reads from the underlying transport and increments BytesRecv. +func (t *MeteredTransport) Read(p []byte) (n int, err error) { + n, err = t.Transport.Read(p) + if n > 0 { + t.BytesRecv.Add(nil, float64(n)) + } + return n, err +} + +// Write writes to the underlying transport and increments BytesSent. +func (t *MeteredTransport) Write(p []byte) (n int, err error) { + n, err = t.Transport.Write(p) + if n > 0 { + t.BytesSent.Add(nil, float64(n)) + } + return n, err +} + +// StatusCodeLabel returns a label value for the given error's code. +func StatusCodeLabel(err error) string { + return status.Code(err).String() +} + +// WithErrorCode returns a copy of labels with the error_code label added. +func WithErrorCode(labels map[string]string, err error) map[string]string { + out := make(map[string]string, len(labels)+1) + for k, v := range labels { + out[k] = v + } + statusCode := drpc.ToRPCErr(err) + out[LabelStatusCode] = StatusCodeLabel(statusCode) + return out +} + +// WithState returns a copy of labels with the state label added. +func WithState(labels map[string]string, state string) map[string]string { + out := make(map[string]string, len(labels)+1) + for k, v := range labels { + out[k] = v + } + out[LabelState] = state + return out +} diff --git a/drpcmux/mux.go b/drpcmux/mux.go index bfca769..e31ba7b 100644 --- a/drpcmux/mux.go +++ b/drpcmux/mux.go @@ -67,6 +67,13 @@ func (m *Mux) Register(srv interface{}, desc drpc.Description) error { return nil } +// IsUnitary reports whether the named RPC has unary input and output. +// It returns false if the RPC is not registered or is a streaming RPC. +func (m *Mux) IsUnitary(rpc string) bool { + data, ok := m.rpcs[rpc] + return ok && data.unitary +} + // registerOne does the work to register a single rpc. func (m *Mux) registerOne( srv interface{}, rpc string, enc drpc.Encoding, receiver drpc.Receiver, method interface{}, diff --git a/drpcpool/entry.go b/drpcpool/entry.go index 78cd45a..536aacb 100644 --- a/drpcpool/entry.go +++ b/drpcpool/entry.go @@ -9,11 +9,12 @@ import ( ) type entry[K comparable, V Conn] struct { - key K - val V - exp *time.Timer - global node[K, V] - local node[K, V] + key K + val V + exp *time.Timer + created time.Time + global node[K, V] + local node[K, V] } func (e *entry[K, V]) String() string { diff --git a/drpcpool/pool.go b/drpcpool/pool.go index 173fe92..726347a 100644 --- a/drpcpool/pool.go +++ b/drpcpool/pool.go @@ -10,10 +10,18 @@ import ( "time" "github.com/zeebo/errs" - "storj.io/drpc/drpcdebug" + "storj.io/drpc/drpcmetrics" ) +// PoolMetrics holds optional metrics for connection pool monitoring. +type PoolMetrics struct { + PoolSize drpcmetrics.Gauge + ConnectionHitsTotal drpcmetrics.Counter + ConnectionMissesTotal drpcmetrics.Counter + ConnectionAgeSeconds drpcmetrics.Observer +} + // Options contains the options to configure a pool. type Options struct { // Expiration will remove any values from the Pool after the @@ -28,6 +36,10 @@ type Options struct { // the Pool holds unlimited for any single key. Negative means // no values for any single key. KeyCapacity int + + // Metrics holds optional metrics the pool will populate. If nil, + // no metrics are recorded. + Metrics *PoolMetrics } // Pool is a connection pool with key type K. It maintains a cache of connections @@ -43,10 +55,52 @@ type Pool[K comparable, V Conn] struct { // New constructs a new Pool with the provided Options. func New[K comparable, V Conn](opts Options) *Pool[K, V] { - return &Pool[K, V]{ + pool := Pool[K, V]{ opts: opts, entries: make(map[K]*list[K, V]), } + initPoolMetrics(&pool) + return &pool +} + +func initPoolMetrics[K comparable, V Conn](pool *Pool[K, V]) { + if pool.opts.Metrics == nil { + return + } + if pool.opts.Metrics.PoolSize == nil { + pool.opts.Metrics.PoolSize = drpcmetrics.NoOpGauge{} + } + if pool.opts.Metrics.ConnectionAgeSeconds == nil { + pool.opts.Metrics.ConnectionAgeSeconds = drpcmetrics.NoOpObserver{} + } + if pool.opts.Metrics.ConnectionHitsTotal == nil { + pool.opts.Metrics.ConnectionHitsTotal = drpcmetrics.NoOpCounter{} + } + if pool.opts.Metrics.ConnectionMissesTotal == nil { + pool.opts.Metrics.ConnectionMissesTotal = drpcmetrics.NoOpCounter{} + } +} + +func (p *Pool[K, V]) recordHit(created time.Time) { + if p.opts.Metrics == nil { + return + } + p.opts.Metrics.ConnectionHitsTotal.Add(nil, 1) + p.opts.Metrics.ConnectionAgeSeconds.Observe(nil, time.Since(created).Seconds()) +} + +func (p *Pool[K, V]) recordMiss() { + if p.opts.Metrics == nil { + return + } + p.opts.Metrics.ConnectionMissesTotal.Add(nil, 1) +} + +func (p *Pool[K, V]) recordPoolSizeChange(delta float64) { + if p.opts.Metrics == nil { + return + } + p.opts.Metrics.PoolSize.Add(nil, delta) } func (p *Pool[K, V]) log(what string, cb func() string) { @@ -62,12 +116,14 @@ func (p *Pool[K, V]) Close() (err error) { defer p.mu.Unlock() var eg errs.Group + count := p.order.count for ent := p.order.head; ent != nil; ent = ent.global.next { eg.Add(p.closeEntry(ent)) } p.entries = make(map[K]*list[K, V]) p.order = list[K, V]{} + p.recordPoolSizeChange(-float64(count)) return eg.Err() } @@ -75,8 +131,9 @@ func (p *Pool[K, V]) Close() (err error) { // Get returns a new Conn that will use the provided dial function to create an // underlying conn to be cached by the Pool when Conn methods are invoked. It will // share any cached connections with other conns that use the same key. -func (p *Pool[K, V]) Get(ctx context.Context, key K, - dial func(ctx context.Context, key K) (V, error)) Conn { +func (p *Pool[K, V]) Get( + ctx context.Context, key K, dial func(ctx context.Context, key K) (V, error), +) Conn { return &poolConn[K, V]{ key: key, pool: p, @@ -99,6 +156,7 @@ func (p *Pool[K, V]) removeEntry(ent *entry[K, V]) { local.removeEntry(ent, (*entry[K, V]).localList) p.order.removeEntry(ent, (*entry[K, V]).globalList) + p.recordPoolSizeChange(-1) if local.count == 0 { delete(p.entries, ent.key) @@ -123,6 +181,7 @@ func (p *Pool[K, V]) Take(key K) (V, bool) { local := p.entries[key] if local == nil { + p.recordMiss() return *new(V), false } @@ -137,6 +196,7 @@ func (p *Pool[K, V]) Take(key K) (V, bool) { local.removeEntry(ent, (*entry[K, V]).localList) p.order.removeEntry(ent, (*entry[K, V]).globalList) + p.recordPoolSizeChange(-1) if ent.exp != nil && !ent.exp.Stop() { continue @@ -145,9 +205,11 @@ func (p *Pool[K, V]) Take(key K) (V, bool) { } p.log("TAKEN", ent.String) + p.recordHit(ent.created) return ent.val, true } + p.recordMiss() return *new(V), false } @@ -177,6 +239,7 @@ func (p *Pool[K, V]) Put(key K, val V) { local.removeEntry(ent, (*entry[K, V]).localList) p.order.removeEntry(ent, (*entry[K, V]).globalList) + p.recordPoolSizeChange(-1) } for p.opts.Capacity != 0 && p.order.count >= p.opts.Capacity { @@ -187,15 +250,17 @@ func (p *Pool[K, V]) Put(key K, val V) { local.removeEntry(ent, (*entry[K, V]).localList) p.order.removeEntry(ent, (*entry[K, V]).globalList) + p.recordPoolSizeChange(-1) if local.count == 0 { delete(p.entries, ent.key) } } - ent := &entry[K, V]{key: key, val: val} + ent := &entry[K, V]{key: key, val: val, created: time.Now()} local.appendEntry(ent, (*entry[K, V]).localList) p.order.appendEntry(ent, (*entry[K, V]).globalList) + p.recordPoolSizeChange(1) p.log("PUT", ent.String) if p.opts.Expiration > 0 { diff --git a/drpcpool/pool_test.go b/drpcpool/pool_test.go index 22677b3..a3b7daa 100644 --- a/drpcpool/pool_test.go +++ b/drpcpool/pool_test.go @@ -9,7 +9,6 @@ import ( "time" "github.com/zeebo/assert" - "storj.io/drpc" "storj.io/drpc/drpctest" ) @@ -419,6 +418,144 @@ func TestPool_StreamContext(t *testing.T) { } } +// +// pool metrics tests +// + +type testPoolCounter struct { + total float64 +} + +func (c *testPoolCounter) Add(_ map[string]string, value float64) { + c.total += value +} + +type testPoolGauge struct { + total float64 +} + +func (g *testPoolGauge) Add(_ map[string]string, value float64) { + g.total += value +} + +type testPoolObserver struct { + calls int + last float64 +} + +func (o *testPoolObserver) Observe(_ map[string]string, value float64) { + o.calls++ + o.last = value +} + +func TestPoolMetrics_PutTakeClose(t *testing.T) { + ctx := drpctest.NewTracker(t) + defer ctx.Close() + + poolSize := &testPoolGauge{} + hits := &testPoolCounter{} + misses := &testPoolCounter{} + age := &testPoolObserver{} + + pool := New[string, Conn](Options{ + Capacity: 10, + Metrics: &PoolMetrics{ + PoolSize: poolSize, + ConnectionHitsTotal: hits, + ConnectionMissesTotal: misses, + ConnectionAgeSeconds: age, + }, + }) + + // Miss on empty pool. + _, ok := pool.Take("key") + assert.That(t, !ok) + assert.Equal(t, misses.total, 1.0) + assert.Equal(t, hits.total, 0.0) + + // Put a connection. + conn := &callbackConn{ + ClosedFn: func() <-chan struct{} { return nil }, + UnblockedFn: func() <-chan struct{} { return closedCh }, + } + pool.Put("key", conn) + assert.Equal(t, poolSize.total, 1.0) + + // Take it back — should be a hit. + val, ok := pool.Take("key") + assert.That(t, ok) + assert.Equal(t, val, Conn(conn)) + assert.Equal(t, hits.total, 1.0) + assert.Equal(t, poolSize.total, 0.0) // decremented on take + assert.Equal(t, age.calls, 1) + assert.That(t, age.last >= 0) + + // Miss again now that pool is empty. + _, ok = pool.Take("key") + assert.That(t, !ok) + assert.Equal(t, misses.total, 2.0) + + // Put two, then Close. + conn2 := &callbackConn{ + ClosedFn: func() <-chan struct{} { return nil }, + UnblockedFn: func() <-chan struct{} { return closedCh }, + } + pool.Put("key", conn) + pool.Put("key", conn2) + assert.Equal(t, poolSize.total, 2.0) + + assert.NoError(t, pool.Close()) + assert.Equal(t, poolSize.total, 0.0) +} + +func TestPoolMetrics_Eviction(t *testing.T) { + poolSize := &testPoolGauge{} + misses := &testPoolCounter{} + + pool := New[string, Conn](Options{ + Capacity: 1, + KeyCapacity: 1, + Metrics: &PoolMetrics{ + PoolSize: poolSize, + ConnectionMissesTotal: misses, + }, + }) + defer func() { _ = pool.Close() }() + + conn1 := &callbackConn{ + ClosedFn: func() <-chan struct{} { return nil }, + UnblockedFn: func() <-chan struct{} { return closedCh }, + } + conn2 := &callbackConn{ + ClosedFn: func() <-chan struct{} { return nil }, + UnblockedFn: func() <-chan struct{} { return closedCh }, + } + + pool.Put("key", conn1) + assert.Equal(t, poolSize.total, 1.0) + + // Putting conn2 with KeyCapacity=1 should evict conn1. + pool.Put("key", conn2) + // One eviction (-1) + one put (+1) = net 1. + assert.Equal(t, poolSize.total, 1.0) +} + +func TestPoolMetrics_NilFields(t *testing.T) { + // All PoolMetrics fields are nil — should not panic. + pool := New[string, Conn](Options{ + Metrics: &PoolMetrics{}, + }) + + conn := &callbackConn{ + ClosedFn: func() <-chan struct{} { return nil }, + UnblockedFn: func() <-chan struct{} { return closedCh }, + } + pool.Put("key", conn) + pool.Take("key") + pool.Take("miss") + assert.NoError(t, pool.Close()) +} + func BenchmarkPool(b *testing.B) { ctx := drpctest.NewTracker(b) defer ctx.Close() diff --git a/drpcserver/README.md b/drpcserver/README.md index 595dc6b..2196bba 100644 --- a/drpcserver/README.md +++ b/drpcserver/README.md @@ -18,9 +18,6 @@ type Options struct { // handling individual clients. It is not called if nil. Log func(error) - // CollectStats controls whether the server should collect stats on the - // rpcs it serves. - CollectStats bool } ``` @@ -65,9 +62,3 @@ func (s *Server) ServeOne(ctx context.Context, tr drpc.Transport) (err error) ``` ServeOne serves a single set of rpcs on the provided transport. -#### func (*Server) Stats - -```go -func (s *Server) Stats() map[string]drpcstats.Stats -``` -Stats returns the collected stats grouped by rpc. diff --git a/drpcserver/server.go b/drpcserver/server.go index 13c8186..6bd430e 100644 --- a/drpcserver/server.go +++ b/drpcserver/server.go @@ -6,8 +6,8 @@ package drpcserver import ( "context" "crypto/tls" + "errors" "net" - "sync" "time" "github.com/zeebo/errs" @@ -15,9 +15,8 @@ import ( "storj.io/drpc/drpccache" "storj.io/drpc/drpcctx" "storj.io/drpc/drpcmanager" - "storj.io/drpc/drpcstats" + "storj.io/drpc/drpcmetrics" "storj.io/drpc/drpcstream" - "storj.io/drpc/internal/drpcopts" ) // Options controls configuration settings for a server. @@ -30,10 +29,6 @@ type Options struct { // handling individual clients. It is not called if nil. Log func(error) - // CollectStats controls whether the server should collect stats on the - // rpcs it serves. - CollectStats bool - // TLSConfig, if non-nil, is used to wrap the listener with tls.NewListener // in Serve(). The TLS handshake is performed explicitly in ServeOne before // processing requests. @@ -44,15 +39,76 @@ type Options struct { // *tls.Conn) and may inspect ConnectionState to enforce cipher suite // restrictions. If it returns a non-nil error the connection is rejected. TLSCipherRestrict func(conn net.Conn) error + + // Metrics holds optional metrics the server will populate. If nil, no + // metrics are recorded. + Metrics *ServerMetrics +} + +// ServerMetrics holds optional metrics that the server will populate during +// operation. +// Metrics are defined and registered by the caller (e.g. in CockroachDB) and +// passed in; this package never imports a metrics library. +type ServerMetrics struct { + BytesSent drpcmetrics.Counter + BytesRecv drpcmetrics.Counter + InactivityTimeouts drpcmetrics.Counter + TLSHandshakeErrors drpcmetrics.Counter + Stream *drpcmetrics.StreamMetrics +} + +// addTLSHandshakeError increments the TLS handshake error counter. +func (m *ServerMetrics) addTLSHandshakeError() { + if m != nil { + m.TLSHandshakeErrors.Add(nil, 1) + } +} + +// addInactivityTimeout increments the inactivity timeout counter. +func (m *ServerMetrics) addInactivityTimeout() { + if m != nil { + m.InactivityTimeouts.Add(nil, 1) + } +} + +// wrapTransport wraps tr with byte counters. +func (m *ServerMetrics) wrapTransport(tr drpc.Transport) drpc.Transport { + if m == nil { + return tr + } + return &drpcmetrics.MeteredTransport{Transport: tr, BytesSent: m.BytesSent, BytesRecv: m.BytesRecv} +} + +// rpcLabels builds a label map for the given RPC string. +func (s *Server) rpcLabels(rpc string) map[string]string { + return map[string]string{ + drpcmetrics.LabelRPCMethodName: rpc, + } +} + +// recordStreamStart records stream start metrics. +func (s *Server) recordStreamStart(labels map[string]string) { + sm := s.opts.Metrics + if sm == nil { + return + } + sm.Stream.StreamsTotal.Add(drpcmetrics.WithState(labels, drpcmetrics.StateStarted), 1) +} + +// recordStreamEnd records stream end metrics. +func (s *Server) recordStreamEnd(labels map[string]string, err error, duration time.Duration) { + sm := s.opts.Metrics + if sm == nil { + return + } + sm.Stream.StreamsTotal.Add(drpcmetrics.WithState(labels, drpcmetrics.StateCompleted), 1) + sm.Stream.StreamDuration.Observe(drpcmetrics.WithErrorCode(labels, err), duration.Seconds()) } // Server is an implementation of drpc.Server to serve drpc connections. type Server struct { opts Options handler drpc.Handler - - mu sync.Mutex - stats map[string]*drpcstats.Stats } // New constructs a new Server. @@ -73,38 +129,34 @@ func NewWithOptions(handler drpc.Handler, opts Options) *Server { opts: opts, handler: handler, } - - if s.opts.CollectStats { - drpcopts.SetManagerStatsCB(&s.opts.Manager.Internal, s.getStats) - s.stats = make(map[string]*drpcstats.Stats) + if s.opts.Metrics != nil { + initServerMetrics(s.opts.Metrics) } - return s } -// Stats returns the collected stats grouped by rpc. -func (s *Server) Stats() map[string]drpcstats.Stats { - s.mu.Lock() - defer s.mu.Unlock() - - stats := make(map[string]drpcstats.Stats, len(s.stats)) - for k, v := range s.stats { - stats[k] = v.AtomicClone() +func initServerMetrics(m *ServerMetrics) { + if m.BytesSent == nil { + m.BytesSent = drpcmetrics.NoOpCounter{} } - return stats -} - -// getStats returns the drpcopts.Stats struct for the given rpc. -func (s *Server) getStats(rpc string) *drpcstats.Stats { - s.mu.Lock() - defer s.mu.Unlock() - - stats := s.stats[rpc] - if stats == nil { - stats = new(drpcstats.Stats) - s.stats[rpc] = stats + if m.BytesRecv == nil { + m.BytesRecv = drpcmetrics.NoOpCounter{} + } + if m.InactivityTimeouts == nil { + m.InactivityTimeouts = drpcmetrics.NoOpCounter{} + } + if m.TLSHandshakeErrors == nil { + m.TLSHandshakeErrors = drpcmetrics.NoOpCounter{} + } + if m.Stream == nil { + m.Stream = &drpcmetrics.StreamMetrics{} + } + if m.Stream.StreamsTotal == nil { + m.Stream.StreamsTotal = drpcmetrics.NoOpCounter{} + } + if m.Stream.StreamDuration == nil { + m.Stream.StreamDuration = drpcmetrics.NoOpObserver{} } - return stats } // ServeOne serves a single set of rpcs on the provided transport. @@ -126,10 +178,12 @@ func (s *Server) ServeOne(ctx context.Context, tr drpc.Transport) (err error) { // anyway. err := tlsConn.HandshakeContext(ctx) if err != nil { + s.opts.Metrics.addTLSHandshakeError() return drpc.ConnectionError.New("server handshake [%q] failed: %w", tlsConn.RemoteAddr(), err) } if s.opts.TLSCipherRestrict != nil { if err := s.opts.TLSCipherRestrict(tlsConn); err != nil { + s.opts.Metrics.addTLSHandshakeError() return drpc.ConnectionError.New("server handshake [%q] failed: %w", tlsConn.RemoteAddr(), err) } } @@ -140,6 +194,8 @@ func (s *Server) ServeOne(ctx context.Context, tr drpc.Transport) (err error) { } } + tr = s.opts.Metrics.wrapTransport(tr) + man := drpcmanager.NewWithOptions(tr, s.opts.Manager) defer func() { err = errs.Combine(err, man.Close()) }() @@ -151,6 +207,9 @@ func (s *Server) ServeOne(ctx context.Context, tr drpc.Transport) (err error) { for { stream, rpc, err := man.NewServerStream(ctx) if err != nil { + if errors.Is(err, context.DeadlineExceeded) && ctx.Err() == nil { + s.opts.Metrics.addInactivityTimeout() + } return errs.Wrap(err) } if err := s.handleRPC(stream, rpc); err != nil { @@ -215,9 +274,26 @@ func (s *Server) Serve(ctx context.Context, lis net.Listener) (err error) { // handleRPC handles the rpc that has been requested by the stream. func (s *Server) handleRPC(stream *drpcstream.Stream, rpc string) (err error) { - err = s.handler.HandleRPC(stream, rpc) - if err != nil { - return errs.Wrap(stream.SendError(err)) + var labels map[string]string + var start time.Time + if s.opts.Metrics != nil { + labels = s.rpcLabels(rpc) + s.recordStreamStart(labels) + start = time.Now() } - return errs.Wrap(stream.CloseSend()) + + handlerErr := s.handler.HandleRPC(stream, rpc) + + if handlerErr != nil { + sendErr := errs.Wrap(stream.SendError(handlerErr)) + s.recordStreamEnd(labels, handlerErr, time.Since(start)) + return sendErr + } + + closeErr := errs.Wrap(stream.CloseSend()) + // Record success: the handler returned nil, so the RPC itself + // succeeded. A CloseSend failure is a transport-level issue and + // should not be attributed as the RPC's error code. + s.recordStreamEnd(labels, nil, time.Since(start)) + return closeErr } diff --git a/drpcstats/README.md b/drpcstats/README.md deleted file mode 100644 index 0dbea23..0000000 --- a/drpcstats/README.md +++ /dev/null @@ -1,40 +0,0 @@ -# package drpcstats - -`import "storj.io/drpc/drpcstats"` - -Package drpcstats contatins types for stat collection. - -## Usage - -#### type Stats - -```go -type Stats struct { - Read uint64 - Written uint64 -} -``` - -Stats keeps counters of read and written bytes. - -#### func (*Stats) AddRead - -```go -func (s *Stats) AddRead(n uint64) -``` -AddRead atomically adds n bytes to the Read counter. - -#### func (*Stats) AddWritten - -```go -func (s *Stats) AddWritten(n uint64) -``` -AddWritten atomically adds n bytes to the Written counter. - -#### func (*Stats) AtomicClone - -```go -func (s *Stats) AtomicClone() Stats -``` -AtomicClone returns a copy of the stats that is safe to use concurrently with -Add methods. diff --git a/drpcstats/doc.go b/drpcstats/doc.go deleted file mode 100644 index fc6fec7..0000000 --- a/drpcstats/doc.go +++ /dev/null @@ -1,5 +0,0 @@ -// Copyright (C) 2024 Storj Labs, Inc. -// See LICENSE for copying information. - -// Package drpcstats contatins types for stat collection. -package drpcstats diff --git a/drpcstats/stats.go b/drpcstats/stats.go deleted file mode 100644 index b98ed28..0000000 --- a/drpcstats/stats.go +++ /dev/null @@ -1,36 +0,0 @@ -// Copyright (C) 2024 Storj Labs, Inc. -// See LICENSE for copying information. - -package drpcstats - -import ( - "sync/atomic" -) - -// Stats keeps counters of read and written bytes. -type Stats struct { - Read uint64 - Written uint64 -} - -// AddRead atomically adds n bytes to the Read counter. -func (s *Stats) AddRead(n uint64) { - if s != nil { - atomic.AddUint64(&s.Read, n) - } -} - -// AddWritten atomically adds n bytes to the Written counter. -func (s *Stats) AddWritten(n uint64) { - if s != nil { - atomic.AddUint64(&s.Written, n) - } -} - -// AtomicClone returns a copy of the stats that is safe to use concurrently with Add methods. -func (s *Stats) AtomicClone() Stats { - return Stats{ - Read: atomic.LoadUint64(&s.Read), - Written: atomic.LoadUint64(&s.Written), - } -} diff --git a/drpcstream/pktbuf.go b/drpcstream/pktbuf.go index db68864..c265159 100644 --- a/drpcstream/pktbuf.go +++ b/drpcstream/pktbuf.go @@ -3,9 +3,7 @@ package drpcstream -import ( - "sync" -) +import "sync" type packetBuffer struct { mu sync.Mutex diff --git a/drpcstream/stream.go b/drpcstream/stream.go index 29ccd63..768cd32 100644 --- a/drpcstream/stream.go +++ b/drpcstream/stream.go @@ -11,7 +11,6 @@ import ( "sync" "github.com/zeebo/errs" - "storj.io/drpc" "storj.io/drpc/drpcctx" "storj.io/drpc/drpcdebug" @@ -223,8 +222,6 @@ func (s *Stream) HandlePacket(pkt drpcwire.Packet) (err error) { return nil } - drpcopts.GetStreamStats(&s.opts.Internal).AddRead(uint64(len(pkt.Data))) - if s.sigs.term.IsSet() { return nil } @@ -330,7 +327,6 @@ func (s *Stream) sendPacketLocked(kind drpcwire.Kind, control bool, data []byte) fr.Control = control fr.Done = true - drpcopts.GetStreamStats(&s.opts.Internal).AddWritten(uint64(len(data))) s.log("SEND", fr.String) if err := s.wr.WriteFrame(fr); err != nil { @@ -390,7 +386,6 @@ func (s *Stream) rawWriteLocked(kind drpcwire.Kind, data []byte) (err error) { fr.Data, data = drpcwire.SplitData(data, n) fr.Done = len(data) == 0 - drpcopts.GetStreamStats(&s.opts.Internal).AddWritten(uint64(len(fr.Data))) s.log("SEND", fr.String) if err := s.wr.WriteFrame(fr); err != nil { diff --git a/internal/drpcopts/README.md b/internal/drpcopts/README.md index f8b7f7d..811cfdd 100644 --- a/internal/drpcopts/README.md +++ b/internal/drpcopts/README.md @@ -9,13 +9,6 @@ users of the library that are not required to be backward compatible. ## Usage -#### func GetManagerStatsCB - -```go -func GetManagerStatsCB(opts *Manager) func(string) *drpcstats.Stats -``` -GetManagerStatsCB returns the stats callback stored in the options. - #### func GetStreamFin ```go @@ -26,23 +19,23 @@ GetStreamFin returns the chan<- struct{} stored in the options. #### func GetStreamKind ```go -func GetStreamKind(opts *Stream) string +func GetStreamKind(opts *Stream) drpc.StreamKind ``` -GetStreamKind returns the kind debug string stored in the options. +GetStreamKind returns the StreamKind stored in the options. -#### func GetStreamRPC +#### func GetStreamMux ```go -func GetStreamRPC(opts *Stream) string +func GetStreamMux(opts *Stream) bool ``` -GetStreamRPC returns the RPC debug string stored in the options. +GetStreamMux returns whether multiplexing is enabled. -#### func GetStreamStats +#### func GetStreamRPC ```go -func GetStreamStats(opts *Stream) *drpcstats.Stats +func GetStreamRPC(opts *Stream) string ``` -GetStreamStats returns the Stats stored in the options. +GetStreamRPC returns the RPC debug string stored in the options. #### func GetStreamTransport @@ -51,13 +44,6 @@ func GetStreamTransport(opts *Stream) drpc.Transport ``` GetStreamTransport returns the drpc.Transport stored in the options. -#### func SetManagerStatsCB - -```go -func SetManagerStatsCB(opts *Manager, statsCB func(string) *drpcstats.Stats) -``` -SetManagerStatsCB sets the stats callback stored in the options. - #### func SetStreamFin ```go @@ -68,23 +54,23 @@ SetStreamFin sets the chan<- struct{} stored in the options. #### func SetStreamKind ```go -func SetStreamKind(opts *Stream, kind string) +func SetStreamKind(opts *Stream, kind drpc.StreamKind) ``` -SetStreamKind sets the kind debug string stored in the options. +SetStreamKind sets the StreamKind stored in the options. -#### func SetStreamRPC +#### func SetStreamMux ```go -func SetStreamRPC(opts *Stream, rpc string) +func SetStreamMux(opts *Stream, mux bool) ``` -SetStreamRPC sets the RPC debug string stored in the options. +SetStreamMux sets whether multiplexing is enabled. -#### func SetStreamStats +#### func SetStreamRPC ```go -func SetStreamStats(opts *Stream, stats *drpcstats.Stats) +func SetStreamRPC(opts *Stream, rpc string) ``` -SetStreamStats sets the Stats stored in the options. +SetStreamRPC sets the RPC debug string stored in the options. #### func SetStreamTransport @@ -109,4 +95,4 @@ type Stream struct { } ``` -Stream contains internal options for the drpcstream package. +Stream contains internal options for the drpcstream package. \ No newline at end of file diff --git a/internal/drpcopts/manager.go b/internal/drpcopts/manager.go index 081b70e..38954e8 100644 --- a/internal/drpcopts/manager.go +++ b/internal/drpcopts/manager.go @@ -3,15 +3,5 @@ package drpcopts -import "storj.io/drpc/drpcstats" - // Manager contains internal options for the drpcmanager package. -type Manager struct { - statsCB func(string) *drpcstats.Stats -} - -// GetManagerStatsCB returns the stats callback stored in the options. -func GetManagerStatsCB(opts *Manager) func(string) *drpcstats.Stats { return opts.statsCB } - -// SetManagerStatsCB sets the stats callback stored in the options. -func SetManagerStatsCB(opts *Manager, statsCB func(string) *drpcstats.Stats) { opts.statsCB = statsCB } +type Manager struct{} diff --git a/internal/drpcopts/stream.go b/internal/drpcopts/stream.go index 6ab0511..e333491 100644 --- a/internal/drpcopts/stream.go +++ b/internal/drpcopts/stream.go @@ -3,10 +3,7 @@ package drpcopts -import ( - "storj.io/drpc" - "storj.io/drpc/drpcstats" -) +import "storj.io/drpc" // Stream contains internal options for the drpcstream package. type Stream struct { @@ -14,7 +11,7 @@ type Stream struct { fin chan<- struct{} kind drpc.StreamKind rpc string - stats *drpcstats.Stats + mux bool } // GetStreamTransport returns the drpc.Transport stored in the options. @@ -41,8 +38,8 @@ func GetStreamRPC(opts *Stream) string { return opts.rpc } // SetStreamRPC sets the RPC debug string stored in the options. func SetStreamRPC(opts *Stream, rpc string) { opts.rpc = rpc } -// GetStreamStats returns the Stats stored in the options. -func GetStreamStats(opts *Stream) *drpcstats.Stats { return opts.stats } +// GetStreamMux returns whether multiplexing is enabled. +func GetStreamMux(opts *Stream) bool { return opts.mux } -// SetStreamStats sets the Stats stored in the options. -func SetStreamStats(opts *Stream, stats *drpcstats.Stats) { opts.stats = stats } +// SetStreamMux sets whether multiplexing is enabled. +func SetStreamMux(opts *Stream, mux bool) { opts.mux = mux } diff --git a/internal/integration/common_test.go b/internal/integration/common_test.go index 5acb61a..58c727e 100644 --- a/internal/integration/common_test.go +++ b/internal/integration/common_test.go @@ -14,7 +14,6 @@ import ( "github.com/zeebo/assert" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - "storj.io/drpc/drpcconn" "storj.io/drpc/drpcmanager" "storj.io/drpc/drpcmetadata" @@ -39,7 +38,9 @@ func data(n int64) []byte { func in(n int64) *In { return &In{In: n} } func out(n int64) *Out { return &Out{Out: n} } -func createRawConnection(t testing.TB, server DRPCServiceServer, ctx *drpctest.Tracker) *drpcconn.Conn { +func createRawConnection( + t testing.TB, server DRPCServiceServer, ctx *drpctest.Tracker, +) *drpcconn.Conn { c1, c2 := net.Pipe() mux := drpcmux.New() assert.NoError(t, DRPCRegisterService(mux, server)) diff --git a/internal/integration/metrics_test.go b/internal/integration/metrics_test.go new file mode 100644 index 0000000..005a471 --- /dev/null +++ b/internal/integration/metrics_test.go @@ -0,0 +1,494 @@ +// Copyright (C) 2019 Storj Labs, Inc. +// See LICENSE for copying information. + +package integration + +import ( + "context" + "crypto/tls" + "errors" + "io" + "net" + "sync" + "testing" + "time" + + "github.com/zeebo/assert" + "storj.io/drpc/drpcconn" + "storj.io/drpc/drpcmanager" + "storj.io/drpc/drpcmetrics" + "storj.io/drpc/drpcmux" + "storj.io/drpc/drpcserver" + "storj.io/drpc/drpctest" +) + +// +// test metric implementations +// + +type testCounter struct { + mu sync.Mutex + calls []metricCall +} + +type metricCall struct { + labels map[string]string + value float64 +} + +func (c *testCounter) Add(labels map[string]string, value float64) { + c.mu.Lock() + defer c.mu.Unlock() + c.calls = append(c.calls, metricCall{labels: labels, value: value}) +} + +func (c *testCounter) total() float64 { + c.mu.Lock() + defer c.mu.Unlock() + var t float64 + for _, call := range c.calls { + t += call.value + } + return t +} + +func (c *testCounter) count() int { + c.mu.Lock() + defer c.mu.Unlock() + return len(c.calls) +} + +func (c *testCounter) callAt(i int) metricCall { + c.mu.Lock() + defer c.mu.Unlock() + return c.calls[i] +} + +type testObserver struct { + mu sync.Mutex + calls []metricCall +} + +func (o *testObserver) Observe(labels map[string]string, value float64) { + o.mu.Lock() + defer o.mu.Unlock() + o.calls = append(o.calls, metricCall{labels: labels, value: value}) +} + +func (o *testObserver) count() int { + o.mu.Lock() + defer o.mu.Unlock() + return len(o.calls) +} + +func (o *testObserver) callAt(i int) metricCall { + o.mu.Lock() + defer o.mu.Unlock() + return o.calls[i] +} + +type testGauge struct { + mu sync.Mutex + calls []metricCall +} + +func (g *testGauge) Add(labels map[string]string, value float64) { + g.mu.Lock() + defer g.mu.Unlock() + g.calls = append(g.calls, metricCall{labels: labels, value: value}) +} + +func (g *testGauge) total() float64 { + g.mu.Lock() + defer g.mu.Unlock() + var t float64 + for _, call := range g.calls { + t += call.value + } + return t +} + +// +// connection helpers +// + +func createMeteredServerConnection( + t testing.TB, server DRPCServiceServer, metrics *drpcserver.ServerMetrics, +) (DRPCServiceClient, func()) { + ctx := drpctest.NewTracker(t) + c1, c2 := net.Pipe() + mux := drpcmux.New() + assert.NoError(t, DRPCRegisterService(mux, server)) + srv := drpcserver.NewWithOptions(mux, drpcserver.Options{ + Metrics: metrics, + }) + ctx.Run(func(ctx context.Context) { _ = srv.ServeOne(ctx, c1) }) + conn := drpcconn.NewWithOptions(c2, drpcconn.Options{ + Manager: drpcmanager.Options{SoftCancel: true}, + }) + return NewDRPCServiceClient(conn), func() { + _ = conn.Close() + ctx.Close() + } +} + +func createMeteredClientConnection( + t testing.TB, server DRPCServiceServer, metrics *drpcconn.ClientMetrics, +) (DRPCServiceClient, func()) { + ctx := drpctest.NewTracker(t) + c1, c2 := net.Pipe() + mux := drpcmux.New() + assert.NoError(t, DRPCRegisterService(mux, server)) + srv := drpcserver.New(mux) + ctx.Run(func(ctx context.Context) { _ = srv.ServeOne(ctx, c1) }) + conn := drpcconn.NewWithOptions(c2, drpcconn.Options{ + Manager: drpcmanager.Options{SoftCancel: true}, + Metrics: metrics, + }) + return NewDRPCServiceClient(conn), func() { + _ = conn.Close() + ctx.Close() + } +} + +// waitForCount waits until counter.count() reaches at least n. +// Server-side stream-end metrics are recorded after the response is sent, +// so the client may observe the response before the server records them. +func waitForCount(t testing.TB, c interface{ count() int }, n int) { + t.Helper() + deadline := time.Now().Add(5 * time.Second) + for c.count() < n { + if time.Now().After(deadline) { + t.Fatalf("timed out waiting for count >= %d (got %d)", n, c.count()) + } + time.Sleep(time.Millisecond) + } +} + +// +// tests +// + +func TestClientByteMetrics(t *testing.T) { + ctx := drpctest.NewTracker(t) + defer ctx.Close() + + sent := &testCounter{} + recv := &testCounter{} + cli, close := createMeteredClientConnection(t, standardImpl, &drpcconn.ClientMetrics{ + BytesSent: sent, + BytesRecv: recv, + }) + defer close() + + // Unary RPC. + out, err := cli.Method1(ctx, in(1)) + assert.NoError(t, err) + assert.True(t, Equal(out, &Out{Out: 1})) + + sentAfterUnary := sent.total() + recvAfterUnary := recv.total() + assert.That(t, sentAfterUnary > 0) + assert.That(t, recvAfterUnary > 0) + + // Server-streaming RPC: should increase counters further. + stream, err := cli.Method3(ctx, in(3)) + assert.NoError(t, err) + for { + _, err := stream.Recv() + if errors.Is(err, io.EOF) { + break + } + assert.NoError(t, err) + } + assert.NoError(t, stream.Close()) + + assert.That(t, sent.total() > sentAfterUnary) + assert.That(t, recv.total() > recvAfterUnary) +} + +func TestClientByteMetricsPartialNil(t *testing.T) { + ctx := drpctest.NewTracker(t) + defer ctx.Close() + + sent := &testCounter{} + cli, close := createMeteredClientConnection(t, standardImpl, &drpcconn.ClientMetrics{ + BytesSent: sent, + // BytesRecv intentionally nil. + }) + defer close() + + out, err := cli.Method1(ctx, in(1)) + assert.NoError(t, err) + assert.True(t, Equal(out, &Out{Out: 1})) + assert.That(t, sent.total() > 0) +} + +func TestServerByteMetrics(t *testing.T) { + ctx := drpctest.NewTracker(t) + defer ctx.Close() + + sent := &testCounter{} + recv := &testCounter{} + cli, close := createMeteredServerConnection(t, standardImpl, &drpcserver.ServerMetrics{ + BytesSent: sent, + BytesRecv: recv, + }) + defer close() + + out, err := cli.Method1(ctx, in(1)) + assert.NoError(t, err) + assert.True(t, Equal(out, &Out{Out: 1})) + + // The server's byte counters are incremented inside transport + // Read/Write which may not have returned by the time the client + // observes the response, so poll briefly. + waitForCount(t, sent, 1) + waitForCount(t, recv, 1) + assert.That(t, sent.total() > 0) + assert.That(t, recv.total() > 0) +} + +func TestServerStreamMetrics(t *testing.T) { + t.Run("unary success", func(t *testing.T) { + streamsTotal := &testCounter{} + streamDuration := &testObserver{} + + cli, close := createMeteredServerConnection(t, standardImpl, &drpcserver.ServerMetrics{ + Stream: &drpcmetrics.StreamMetrics{ + StreamsTotal: streamsTotal, + StreamDuration: streamDuration, + }, + }) + defer close() + + ctx := drpctest.NewTracker(t) + defer ctx.Close() + + out, err := cli.Method1(ctx, in(1)) + assert.NoError(t, err) + assert.True(t, Equal(out, &Out{Out: 1})) + + // recordStreamStart fires before the handler, so streamsTotal + // is immediately available. recordStreamEnd fires after the + // response is sent, so we poll for the completed count. + assert.That(t, streamsTotal.count() >= 1) + waitForCount(t, streamsTotal, 2) // started + completed + waitForCount(t, streamDuration, 1) + + // First call should be the "started" event. + startLabels := streamsTotal.callAt(0).labels + assert.Equal(t, startLabels[drpcmetrics.LabelRPCMethodName], "/service.Service/Method1") + assert.Equal(t, startLabels[drpcmetrics.LabelState], drpcmetrics.StateStarted) + + // Second call should be the "completed" event. + endLabels := streamsTotal.callAt(1).labels + assert.Equal(t, endLabels[drpcmetrics.LabelState], drpcmetrics.StateCompleted) + + durLabels := streamDuration.callAt(0).labels + assert.Equal(t, durLabels[drpcmetrics.LabelStatusCode], "OK") + }) + + t.Run("unary error", func(t *testing.T) { + streamsTotal := &testCounter{} + streamDuration := &testObserver{} + + cli, close := createMeteredServerConnection(t, standardImpl, &drpcserver.ServerMetrics{ + Stream: &drpcmetrics.StreamMetrics{ + StreamsTotal: streamsTotal, + StreamDuration: streamDuration, + }, + }) + defer close() + + ctx := drpctest.NewTracker(t) + defer ctx.Close() + + // Input 5 maps to gRPC code 5 = NotFound. + _, err := cli.Method1(ctx, in(5)) + assert.Error(t, err) + + waitForCount(t, streamsTotal, 2) // started + completed + waitForCount(t, streamDuration, 1) + + durLabels := streamDuration.callAt(0).labels + assert.Equal(t, durLabels[drpcmetrics.LabelStatusCode], "NotFound") + }) + + t.Run("streaming", func(t *testing.T) { + streamsTotal := &testCounter{} + streamDuration := &testObserver{} + + cli, close := createMeteredServerConnection(t, standardImpl, &drpcserver.ServerMetrics{ + Stream: &drpcmetrics.StreamMetrics{ + StreamsTotal: streamsTotal, + StreamDuration: streamDuration, + }, + }) + defer close() + + ctx := drpctest.NewTracker(t) + defer ctx.Close() + + stream, err := cli.Method3(ctx, in(3)) + assert.NoError(t, err) + for { + _, err := stream.Recv() + if errors.Is(err, io.EOF) { + break + } + assert.NoError(t, err) + } + assert.NoError(t, stream.Close()) + + waitForCount(t, streamsTotal, 2) // started + completed + waitForCount(t, streamDuration, 1) + + startLabels := streamsTotal.callAt(0).labels + assert.Equal(t, startLabels[drpcmetrics.LabelRPCMethodName], "/service.Service/Method3") + + durLabels := streamDuration.callAt(0).labels + assert.Equal(t, durLabels[drpcmetrics.LabelStatusCode], "OK") + }) +} + +func TestServerStreamMetricsNilFields(t *testing.T) { + ctx := drpctest.NewTracker(t) + defer ctx.Close() + + // All StreamMetrics fields are nil — should not panic. + cli, close := createMeteredServerConnection(t, standardImpl, &drpcserver.ServerMetrics{ + Stream: &drpcmetrics.StreamMetrics{}, + }) + defer close() + + out, err := cli.Method1(ctx, in(1)) + assert.NoError(t, err) + assert.True(t, Equal(out, &Out{Out: 1})) + + stream, err := cli.Method3(ctx, in(3)) + assert.NoError(t, err) + for { + _, err := stream.Recv() + if errors.Is(err, io.EOF) { + break + } + assert.NoError(t, err) + } + assert.NoError(t, stream.Close()) +} + +func TestServerInactivityTimeoutMetric(t *testing.T) { + timeouts := &testCounter{} + + ctx := drpctest.NewTracker(t) + c1, c2 := net.Pipe() + mux := drpcmux.New() + assert.NoError(t, DRPCRegisterService(mux, standardImpl)) + srv := drpcserver.NewWithOptions(mux, drpcserver.Options{ + Manager: drpcmanager.Options{ + InactivityTimeout: 10 * time.Millisecond, + }, + Metrics: &drpcserver.ServerMetrics{ + InactivityTimeouts: timeouts, + }, + }) + + // Run the server; it should hit the inactivity timeout since + // we never send an RPC after the first one completes. + done := make(chan error, 1) + ctx.Run(func(ctx context.Context) { + done <- srv.ServeOne(ctx, c1) + }) + + conn := drpcconn.NewWithOptions(c2, drpcconn.Options{ + Manager: drpcmanager.Options{SoftCancel: true}, + }) + + // Issue one RPC to get past the initial stream, then let the + // connection sit idle until the inactivity timeout fires. + outMsg, err := NewDRPCServiceClient(conn).Method1(ctx, in(1)) + assert.NoError(t, err) + assert.True(t, Equal(outMsg, &Out{Out: 1})) + + // Wait for the server to exit due to inactivity. + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for server to exit") + } + + assert.Equal(t, timeouts.count(), 1) + + _ = conn.Close() + ctx.Close() +} + +func TestServerMetricsAllNilFields(t *testing.T) { + ctx := drpctest.NewTracker(t) + defer ctx.Close() + + // Non-nil ServerMetrics with all nil fields — should not panic. + cli, close := createMeteredServerConnection(t, standardImpl, &drpcserver.ServerMetrics{}) + defer close() + + out, err := cli.Method1(ctx, in(1)) + assert.NoError(t, err) + assert.True(t, Equal(out, &Out{Out: 1})) +} + +func TestServerByteMetricsPartialNil(t *testing.T) { + ctx := drpctest.NewTracker(t) + defer ctx.Close() + + sent := &testCounter{} + // BytesRecv intentionally nil. + cli, close := createMeteredServerConnection(t, standardImpl, &drpcserver.ServerMetrics{ + BytesSent: sent, + }) + defer close() + + out, err := cli.Method1(ctx, in(1)) + assert.NoError(t, err) + assert.True(t, Equal(out, &Out{Out: 1})) + + waitForCount(t, sent, 1) + assert.That(t, sent.total() > 0) +} + +func TestServerTLSHandshakeErrorMetric(t *testing.T) { + tlsErrors := &testCounter{} + + mux := drpcmux.New() + assert.NoError(t, DRPCRegisterService(mux, standardImpl)) + srv := drpcserver.NewWithOptions(mux, drpcserver.Options{ + Metrics: &drpcserver.ServerMetrics{ + TLSHandshakeErrors: tlsErrors, + }, + }) + + // Create a TLS-wrapped connection with an invalid handshake: + // the server expects a TLS client hello but receives plain text. + c1, c2 := net.Pipe() + + tlsServerConn := tls.Server(c1, &tls.Config{ + // Intentionally no certificates — handshake will fail. + }) + + done := make(chan error, 1) + go func() { + done <- srv.ServeOne(context.Background(), tlsServerConn) + }() + + // Write garbage to trigger a TLS handshake failure on the server. + _, _ = c2.Write([]byte("not a tls handshake")) + + select { + case <-done: + case <-time.After(5 * time.Second): + t.Fatal("timed out waiting for server to exit") + } + + _ = c2.Close() + assert.Equal(t, tlsErrors.count(), 1) +} diff --git a/internal/integration/simple_test.go b/internal/integration/simple_test.go index 26a2e8c..df83e11 100644 --- a/internal/integration/simple_test.go +++ b/internal/integration/simple_test.go @@ -8,17 +8,11 @@ import ( "errors" "fmt" "io" - "net" "testing" "github.com/zeebo/assert" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" - - "storj.io/drpc/drpcconn" - "storj.io/drpc/drpcmux" - "storj.io/drpc/drpcserver" - "storj.io/drpc/drpcstats" "storj.io/drpc/drpctest" ) @@ -115,101 +109,3 @@ func TestConcurrent(t *testing.T) { assert.NoError(t, <-errs) } } - -func TestServerStats(t *testing.T) { - ctx := drpctest.NewTracker(t) - defer ctx.Close() - - c1, c2 := net.Pipe() - mux := drpcmux.New() - _ = DRPCRegisterService(mux, standardImpl) - - srv := drpcserver.NewWithOptions(mux, drpcserver.Options{ - CollectStats: true, - }) - ctx.Run(func(ctx context.Context) { _ = srv.ServeOne(ctx, c1) }) - - conn := drpcconn.NewWithOptions(c2, drpcconn.Options{}) - defer func() { _ = conn.Close() }() - cli := NewDRPCServiceClient(conn) - - assert.Equal(t, srv.Stats(), map[string]drpcstats.Stats{}) - - _, err := cli.Method1(ctx, in(5)) - assert.Error(t, err) - - assert.Equal(t, srv.Stats(), map[string]drpcstats.Stats{ - "/service.Service/Method1": {Read: 2, Written: 9}, - }) - - _, err = cli.Method1(ctx, in(1)) - assert.NoError(t, err) - - assert.Equal(t, srv.Stats(), map[string]drpcstats.Stats{ - "/service.Service/Method1": {Read: 2 + 2, Written: 9 + 2}, - }) - - stream, err := cli.Method3(ctx, in(3)) - assert.NoError(t, err) - for i := 0; i < 3; i++ { - _, err := stream.Recv() - assert.NoError(t, err) - } - _, err = stream.Recv() - assert.That(t, errors.Is(err, io.EOF)) - assert.NoError(t, stream.Close()) - - assert.Equal(t, srv.Stats(), map[string]drpcstats.Stats{ - "/service.Service/Method1": {Read: 2 + 2, Written: 9 + 2}, - "/service.Service/Method3": {Read: 2, Written: 6}, - }) -} - -func TestClientStats(t *testing.T) { - ctx := drpctest.NewTracker(t) - defer ctx.Close() - - c1, c2 := net.Pipe() - mux := drpcmux.New() - _ = DRPCRegisterService(mux, standardImpl) - - srv := drpcserver.New(mux) - ctx.Run(func(ctx context.Context) { _ = srv.ServeOne(ctx, c1) }) - - conn := drpcconn.NewWithOptions(c2, drpcconn.Options{ - CollectStats: true, - }) - defer func() { _ = conn.Close() }() - cli := NewDRPCServiceClient(conn) - - assert.Equal(t, srv.Stats(), map[string]drpcstats.Stats{}) - - _, err := cli.Method1(ctx, in(5)) - assert.Error(t, err) - - assert.Equal(t, conn.Stats(), map[string]drpcstats.Stats{ - "/service.Service/Method1": {Read: 9, Written: 26}, - }) - - _, err = cli.Method1(ctx, in(1)) - assert.NoError(t, err) - - assert.Equal(t, conn.Stats(), map[string]drpcstats.Stats{ - "/service.Service/Method1": {Read: 9 + 2, Written: 26 + 26}, - }) - - stream, err := cli.Method3(ctx, in(3)) - assert.NoError(t, err) - for i := 0; i < 3; i++ { - _, err := stream.Recv() - assert.NoError(t, err) - } - _, err = stream.Recv() - assert.That(t, errors.Is(err, io.EOF)) - assert.NoError(t, stream.Close()) - - assert.Equal(t, conn.Stats(), map[string]drpcstats.Stats{ - "/service.Service/Method1": {Read: 9 + 2, Written: 26 + 26}, - "/service.Service/Method3": {Read: 6, Written: 26}, - }) -}