Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -209,7 +209,6 @@ DRPC is proud to get as much done in as few lines of code as possible. It's the
| storj.io/drpc/drpcerr | 42 |
| storj.io/drpc/drpcctx | 41 |
| storj.io/drpc/internal/drpcopts | 30 |
| storj.io/drpc/drpcstats | 25 |
| storj.io/drpc/drpcdebug | 22 |
| storj.io/drpc/drpcenc | 15 |
| **Total** | **3611** |
11 changes: 0 additions & 11 deletions drpcconn/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,13 +60,6 @@ func (c *Conn) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (_
NewStream begins a streaming rpc on the connection. Only one Invoke or Stream
may be open at a time.

#### func (*Conn) Stats

```go
func (c *Conn) Stats() map[string]drpcstats.Stats
```
Stats returns the collected stats grouped by rpc.

#### func (*Conn) Transport

```go
Expand All @@ -89,10 +82,6 @@ called concurrently with Invoke or NewStream.
type Options struct {
// Manager controls the options we pass to the manager of this conn.
Manager drpcmanager.Options

// CollectStats controls whether the server should collect stats on the
// rpcs it creates.
CollectStats bool
}
```

Expand Down
82 changes: 37 additions & 45 deletions drpcconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,26 @@ import (
"storj.io/drpc/drpcenc"
"storj.io/drpc/drpcmanager"
"storj.io/drpc/drpcmetadata"
"storj.io/drpc/drpcstats"
"storj.io/drpc/drpcmetrics"
"storj.io/drpc/drpcstream"
"storj.io/drpc/drpcwire"
"storj.io/drpc/internal/drpcopts"
)

// ClientMetrics holds optional metrics that the client connection will
// populate during operation.
type ClientMetrics struct {
BytesSent drpcmetrics.Counter
BytesRecv drpcmetrics.Counter
}

// Options controls configuration settings for a conn.
type Options struct {
// Manager controls the options we pass to the manager of this conn.
Manager drpcmanager.Options

// CollectStats controls whether the server should collect stats on the
// rpcs it creates.
CollectStats bool
// Metrics holds optional metrics the conn will populate. If nil, no
// metrics are recorded.
Metrics *ClientMetrics
}

// Conn is a drpc client connection.
Expand All @@ -35,8 +41,6 @@ type Conn struct {
man *drpcmanager.Manager
mu sync.Mutex
wbuf []byte

stats map[string]*drpcstats.Stats
}

var _ drpc.Conn = (*Conn)(nil)
Expand All @@ -47,43 +51,20 @@ func New(tr drpc.Transport) *Conn { return NewWithOptions(tr, Options{}) }
// NewWithOptions returns a conn that uses the transport for reads and writes.
// The Options control details of how the conn operates.
func NewWithOptions(tr drpc.Transport, opts Options) *Conn {
c := &Conn{
tr: tr,
}

if opts.CollectStats {
drpcopts.SetManagerStatsCB(&opts.Manager.Internal, c.getStats)
c.stats = make(map[string]*drpcstats.Stats)
}

c.man = drpcmanager.NewWithOptions(tr, opts.Manager)

return c
}

// Stats returns the collected stats grouped by rpc.
func (c *Conn) Stats() map[string]drpcstats.Stats {
c.mu.Lock()
defer c.mu.Unlock()

stats := make(map[string]drpcstats.Stats, len(c.stats))
for k, v := range c.stats {
stats[k] = v.AtomicClone()
if opts.Metrics != nil {
if opts.Metrics.BytesSent == nil {
opts.Metrics.BytesSent = drpcmetrics.NoOpCounter{}
}
if opts.Metrics.BytesRecv == nil {
opts.Metrics.BytesRecv = drpcmetrics.NoOpCounter{}
}
mt := &drpcmetrics.MeteredTransport{Transport: tr, BytesSent: opts.Metrics.BytesSent, BytesRecv: opts.Metrics.BytesRecv}
tr = mt
}
return stats
}

// getStats returns the drpcopts.Stats struct for the given rpc.
func (c *Conn) getStats(rpc string) *drpcstats.Stats {
c.mu.Lock()
defer c.mu.Unlock()

stats := c.stats[rpc]
if stats == nil {
stats = new(drpcstats.Stats)
c.stats[rpc] = stats
return &Conn{
tr: tr,
man: drpcmanager.NewWithOptions(tr, opts.Manager),
}
return stats
}

// Transport returns the transport the conn is using.
Expand All @@ -102,7 +83,9 @@ func (c *Conn) Close() (err error) { return c.man.Close() }

// Invoke issues the rpc on the transport serializing in, waits for a response, and
// deserializes it into out. Only one Invoke or Stream may be open at a time.
func (c *Conn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message) (err error) {
func (c *Conn) Invoke(
ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message,
) (err error) {
defer func() { err = drpc.ToRPCErr(err) }()

var metadata []byte
Expand Down Expand Up @@ -134,7 +117,14 @@ func (c *Conn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, ou
return nil
}

func (c *Conn) doInvoke(stream *drpcstream.Stream, enc drpc.Encoding, rpc string, data []byte, metadata []byte, out drpc.Message) (err error) {
func (c *Conn) doInvoke(
stream *drpcstream.Stream,
enc drpc.Encoding,
rpc string,
data []byte,
metadata []byte,
out drpc.Message,
) (err error) {
if len(metadata) > 0 {
if err := stream.RawWrite(drpcwire.KindInvokeMetadata, metadata); err != nil {
return err
Expand All @@ -157,7 +147,9 @@ func (c *Conn) doInvoke(stream *drpcstream.Stream, enc drpc.Encoding, rpc string

// NewStream begins a streaming rpc on the connection. Only one Invoke or Stream may
// be open at a time.
func (c *Conn) NewStream(ctx context.Context, rpc string, enc drpc.Encoding) (_ drpc.Stream, err error) {
func (c *Conn) NewStream(
ctx context.Context, rpc string, enc drpc.Encoding,
) (_ drpc.Stream, err error) {
defer func() { err = drpc.ToRPCErr(err) }()

var metadata []byte
Expand Down
17 changes: 9 additions & 8 deletions drpcmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (

"github.com/zeebo/errs"
grpcmetadata "google.golang.org/grpc/metadata"

"storj.io/drpc"
"storj.io/drpc/drpcdebug"
"storj.io/drpc/drpcmetadata"
Expand Down Expand Up @@ -298,14 +297,12 @@ func (m *Manager) manageReader() {
//

// newStream creates a stream value with the appropriate configuration for this manager.
func (m *Manager) newStream(ctx context.Context, sid uint64, kind drpc.StreamKind, rpc string) (*drpcstream.Stream, error) {
func (m *Manager) newStream(
ctx context.Context, sid uint64, kind drpc.StreamKind, rpc string,
) (*drpcstream.Stream, error) {
opts := m.opts.Stream
drpcopts.SetStreamKind(&opts.Internal, kind)
drpcopts.SetStreamRPC(&opts.Internal, rpc)
if cb := drpcopts.GetManagerStatsCB(&m.opts.Internal); cb != nil {
drpcopts.SetStreamStats(&opts.Internal, cb(rpc))
}

stream := drpcstream.NewWithOptions(ctx, sid, m.wr, opts)
select {
case m.streams <- streamInfo{ctx: ctx, stream: stream}:
Expand Down Expand Up @@ -425,7 +422,9 @@ func (m *Manager) Close() error {
}

// NewClientStream starts a stream on the managed transport for use by a client.
func (m *Manager) NewClientStream(ctx context.Context, rpc string) (stream *drpcstream.Stream, err error) {
func (m *Manager) NewClientStream(
ctx context.Context, rpc string,
) (stream *drpcstream.Stream, err error) {
if err := m.acquireSemaphore(ctx); err != nil {
return nil, err
}
Expand All @@ -436,7 +435,9 @@ func (m *Manager) NewClientStream(ctx context.Context, rpc string) (stream *drpc
// NewServerStream starts a stream on the managed transport for use by a server.
// It does this by waiting for the client to issue an invoke message and
// returning the details.
func (m *Manager) NewServerStream(ctx context.Context) (stream *drpcstream.Stream, rpc string, err error) {
func (m *Manager) NewServerStream(
ctx context.Context,
) (stream *drpcstream.Stream, rpc string, err error) {
if err := m.acquireSemaphore(ctx); err != nil {
return nil, "", err
}
Expand Down
1 change: 0 additions & 1 deletion drpcmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"github.com/zeebo/assert"
grpcmetadata "google.golang.org/grpc/metadata"
"storj.io/drpc/drpcmetadata"

"storj.io/drpc/drpctest"
"storj.io/drpc/drpcwire"
)
Expand Down
120 changes: 120 additions & 0 deletions drpcmetrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,120 @@
package drpcmetrics

Check failure on line 1 in drpcmetrics/metrics.go

View workflow job for this annotation

GitHub Actions / lint

at least one file in a package should have a package comment (ST1000)

import (
"google.golang.org/grpc/status"
"storj.io/drpc"
)

// Counter is a metric that can only be incremented (monotonically increasing).
// The labels parameter contains key-value pairs for metric dimensions
// (e.g. rpcService, rpcMethod). It may be nil when no
// dimensional context is available.
type Counter interface {
Add(labels map[string]string, value float64)
}

// NoOpCounter is a Counter implementation that does nothing.
type NoOpCounter struct{}

// Add implements Counter.
func (NoOpCounter) Add(map[string]string, float64) {}

// Observer is a metric that records observed values (e.g. durations,
// sizes). The labels parameter contains key-value pairs for metric
// dimensions. It may be nil when no dimensional context is available.
type Observer interface {
Observe(labels map[string]string, value float64)
}

// Gauge is a metric that can increase and decrease (e.g. pool size,
// active count). The labels parameter contains key-value pairs for
// metric dimensions. It may be nil when no dimensional context is
// available.
//
// Note: Gauge and Counter have identical method signatures but different
// semantics. Gauge values may go up or down; Counter values must only
// increase.
type Gauge interface {
Add(labels map[string]string, value float64)
}

// NoOpGauge is a Gauge implementation that does nothing.
type NoOpGauge struct{}

// Add implements Gauge.
func (NoOpGauge) Add(map[string]string, float64) {}

// NoOpObserver is an Observer implementation that does nothing.
type NoOpObserver struct{}

// Observe implements Observer.
func (NoOpObserver) Observe(map[string]string, float64) {}

// Label constants for RPC metrics. These are the label keys used in the
// labels map passed to Counter.Add and Observer.Observe.
const (
LabelRPCMethodName = "methodName"
LabelStatusCode = "statusCode"
LabelState = "state"

StateStarted = "started"
StateCompleted = "completed"
)

// StreamMetrics holds metrics for stream lifecycle tracking.
type StreamMetrics struct {
StreamsTotal Counter
StreamDuration Observer
}

// MeteredTransport wraps a Transport and increments byte counters on each
// Read and Write call.
type MeteredTransport struct {
drpc.Transport
BytesSent Counter
BytesRecv Counter
}

// Read reads from the underlying transport and increments BytesRecv.
func (t *MeteredTransport) Read(p []byte) (n int, err error) {
n, err = t.Transport.Read(p)
if n > 0 {
t.BytesRecv.Add(nil, float64(n))
}
return n, err
}

// Write writes to the underlying transport and increments BytesSent.
func (t *MeteredTransport) Write(p []byte) (n int, err error) {
n, err = t.Transport.Write(p)
if n > 0 {
t.BytesSent.Add(nil, float64(n))
}
return n, err
}

// StatusCodeLabel returns a label value for the given error's code.
func StatusCodeLabel(err error) string {
return status.Code(err).String()
}

// WithErrorCode returns a copy of labels with the error_code label added.
func WithErrorCode(labels map[string]string, err error) map[string]string {
out := make(map[string]string, len(labels)+1)
for k, v := range labels {
out[k] = v
}
statusCode := drpc.ToRPCErr(err)
out[LabelStatusCode] = StatusCodeLabel(statusCode)
return out
}

// WithState returns a copy of labels with the state label added.
func WithState(labels map[string]string, state string) map[string]string {
out := make(map[string]string, len(labels)+1)
for k, v := range labels {
out[k] = v
}
out[LabelState] = state
return out
}
7 changes: 7 additions & 0 deletions drpcmux/mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,13 @@ func (m *Mux) Register(srv interface{}, desc drpc.Description) error {
return nil
}

// IsUnitary reports whether the named RPC has unary input and output.
// It returns false if the RPC is not registered or is a streaming RPC.
func (m *Mux) IsUnitary(rpc string) bool {
data, ok := m.rpcs[rpc]
return ok && data.unitary
}

// registerOne does the work to register a single rpc.
func (m *Mux) registerOne(
srv interface{}, rpc string, enc drpc.Encoding, receiver drpc.Receiver, method interface{},
Expand Down
11 changes: 6 additions & 5 deletions drpcpool/entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,12 @@ import (
)

type entry[K comparable, V Conn] struct {
key K
val V
exp *time.Timer
global node[K, V]
local node[K, V]
key K
val V
exp *time.Timer
created time.Time
global node[K, V]
local node[K, V]
}

func (e *entry[K, V]) String() string {
Expand Down
Loading
Loading