diff --git a/drpcconn/conn.go b/drpcconn/conn.go index 77f6ce2..0b8785e 100644 --- a/drpcconn/conn.go +++ b/drpcconn/conn.go @@ -5,8 +5,11 @@ package drpcconn import ( "context" + "fmt" + "runtime/pprof" "sync" + "github.com/google/uuid" "github.com/zeebo/errs" "storj.io/drpc" @@ -32,6 +35,7 @@ type Options struct { // Conn is a drpc client connection. type Conn struct { tr drpc.Transport + id string man *drpcmanager.Manager mu sync.Mutex wbuf []byte @@ -49,6 +53,7 @@ func New(tr drpc.Transport) *Conn { return NewWithOptions(tr, Options{}) } func NewWithOptions(tr drpc.Transport, opts Options) *Conn { c := &Conn{ tr: tr, + id: uuid.New().String(), } if opts.CollectStats { @@ -102,12 +107,23 @@ func (c *Conn) Close() (err error) { return c.man.Close() } // Invoke issues the rpc on the transport serializing in, waits for a response, and // deserializes it into out. Only one Invoke or Stream may be open at a time. -func (c *Conn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message) (err error) { +func (c *Conn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message) error { + var connID string = fmt.Sprintf("conn-id: %s", c.id) + var managerErr error + + pprof.Do(ctx, pprof.Labels("drpc-client", connID), func(ctx context.Context) { + managerErr = c.invoke(ctx, rpc, enc, in, out) + }) + + return managerErr +} + +func (c *Conn) invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, out drpc.Message) (invokeErr error) { var metadata []byte if md, ok := drpcmetadata.Get(ctx); ok { - metadata, err = drpcmetadata.Encode(metadata, md) - if err != nil { - return err + metadata, invokeErr = drpcmetadata.Encode(metadata, md) + if invokeErr != nil { + return invokeErr } } @@ -115,7 +131,7 @@ func (c *Conn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, ou if err != nil { return err } - defer func() { err = errs.Combine(err, stream.Close()) }() + defer func() { invokeErr = errs.Combine(invokeErr, stream.Close()) }() // we have to protect c.wbuf here even though the manager only allows one // stream at a time because the stream may async close allowing another @@ -123,15 +139,19 @@ func (c *Conn) Invoke(ctx context.Context, rpc string, enc drpc.Encoding, in, ou c.mu.Lock() defer c.mu.Unlock() - c.wbuf, err = drpcenc.MarshalAppend(in, enc, c.wbuf[:0]) + c.wbuf, invokeErr = drpcenc.MarshalAppend(in, enc, c.wbuf[:0]) if err != nil { - return err + return invokeErr } - if err := c.doInvoke(stream, enc, rpc, c.wbuf, metadata, out); err != nil { - return err - } - return nil + streamID := fmt.Sprintf("stream-id: %d", stream.ID()) + pprof.Do(ctx, pprof.Labels(streamID, rpc), func(ctx context.Context) { + if err := c.doInvoke(stream, enc, rpc, c.wbuf, metadata, out); err != nil { + invokeErr = err + } + }) + + return invokeErr } func (c *Conn) doInvoke(stream *drpcstream.Stream, enc drpc.Encoding, rpc string, data []byte, metadata []byte, out drpc.Message) (err error) { diff --git a/drpcserver/server.go b/drpcserver/server.go index 0bc4b36..fe4e2a5 100644 --- a/drpcserver/server.go +++ b/drpcserver/server.go @@ -6,10 +6,13 @@ package drpcserver import ( "context" "crypto/tls" + "fmt" "net" + "runtime/pprof" "sync" "time" + "github.com/google/uuid" "github.com/zeebo/errs" "storj.io/drpc" "storj.io/drpc/drpccache" @@ -91,7 +94,7 @@ func (s *Server) getStats(rpc string) *drpcstats.Stats { } // ServeOne serves a single set of rpcs on the provided transport. -func (s *Server) ServeOne(ctx context.Context, tr drpc.Transport) (err error) { +func (s *Server) ServeOne(ctx context.Context, tr drpc.Transport) error { // Check if the transport is a TLS connection if tlsConn, ok := tr.(*tls.Conn); ok { // Manually perform the TLS handshake to access peer certificate @@ -118,23 +121,46 @@ func (s *Server) ServeOne(ctx context.Context, tr drpc.Transport) (err error) { } } - man := drpcmanager.NewWithOptions(tr, s.opts.Manager) - defer func() { err = errs.Combine(err, man.Close()) }() + var connErr error + var connID string = fmt.Sprintf("conn-id: %s", uuid.New().String()) - cache := drpccache.New() - defer cache.Clear() + pprof.Do(ctx, pprof.Labels("drpc-server", connID), func(ctx context.Context) { + man := drpcmanager.NewWithOptions(tr, s.opts.Manager) + defer func() { connErr = errs.Combine(connErr, man.Close()) }() - ctx = drpccache.WithContext(ctx, cache) + cache := drpccache.New() + defer cache.Clear() - for { - stream, rpc, err := man.NewServerStream(ctx) - if err != nil { - return errs.Wrap(err) + ctx = drpccache.WithContext(ctx, cache) + + for { + stream, rpc, err := man.NewServerStream(ctx) + if err != nil { + connErr = errs.Wrap(err) + return + } + + if err := s.doHandleRPC(ctx, stream, rpc); err != nil { + connErr = err + return + } } + }) + + return connErr +} + +func (s *Server) doHandleRPC(ctx context.Context, stream *drpcstream.Stream, rpc string) (err error) { + var rpcErr error + + streamID := fmt.Sprintf("stream-id: %d", stream.ID()) + pprof.Do(ctx, pprof.Labels("drpc-server-stream", streamID), func(ctx context.Context) { if err := s.handleRPC(stream, rpc); err != nil { - return errs.Wrap(err) + rpcErr = errs.Wrap(err) } - } + }) + + return rpcErr } var temporarySleep = 500 * time.Millisecond @@ -177,6 +203,11 @@ func (s *Server) Serve(ctx context.Context, lis net.Listener) (err error) { return errs.Wrap(err) } + if cc, ok := conn.(*net.TCPConn); ok { + _ = cc.SetKeepAlive(true) + _ = cc.SetKeepAlivePeriod(2 * time.Second) + } + // TODO(jeff): connection limits? tracker.Run(func(ctx context.Context) { err := s.ServeOne(ctx, conn) diff --git a/go.mod b/go.mod index bbed14e..84f3b1d 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,9 @@ module storj.io/drpc -go 1.19 +go 1.23.0 require ( + github.com/google/uuid v1.3.0 github.com/stretchr/testify v1.10.0 github.com/zeebo/assert v1.3.0 github.com/zeebo/errs v1.2.2 diff --git a/go.sum b/go.sum index 4655a0d..293f882 100644 --- a/go.sum +++ b/go.sum @@ -6,6 +6,8 @@ github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiu github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/go-cmp v0.5.9/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA=