Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions drpcconn/conn_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ func TestConn_InvokeFlushesSendClose(t *testing.T) {
wr := drpcwire.NewWriter(ps, 64)
rd := drpcwire.NewReader(ps)

_, _ = rd.ReadPacket() // Invoke
_, _ = rd.ReadPacket() // Message
pkt, _ := rd.ReadPacket() // CloseSend
_, _ = rd.ReadFrame() // Invoke
_, _ = rd.ReadFrame() // Message
pkt, _ := rd.ReadFrame() // CloseSend

_ = wr.WritePacket(drpcwire.Packet{
Data: []byte("qux"),
Expand All @@ -54,8 +54,8 @@ func TestConn_InvokeFlushesSendClose(t *testing.T) {
})
_ = wr.Flush()

_, _ = rd.ReadPacket() // Close
<-invokeDone // wait for invoke to return
_, _ = rd.ReadFrame() // Close
<-invokeDone // wait for invoke to return

// ensure that any later packets are dropped by writing one
// before closing the transport.
Expand Down Expand Up @@ -98,7 +98,7 @@ func TestConn_InvokeSendsGrpcAndDrpcMetadata(t *testing.T) {
wr := drpcwire.NewWriter(ps, 64)
rd := drpcwire.NewReader(ps)

md, err := rd.ReadPacket() // Metadata
md, err := rd.ReadFrame() // Metadata
assert.NoError(t, err)
assert.Equal(t, md.Kind, drpcwire.KindInvokeMetadata)
metadata, err := drpcmetadata.Decode(md.Data)
Expand All @@ -110,9 +110,9 @@ func TestConn_InvokeSendsGrpcAndDrpcMetadata(t *testing.T) {
"common-key": "common-value2",
})

_, _ = rd.ReadPacket() // Invoke
_, _ = rd.ReadPacket() // Message
pkt, _ := rd.ReadPacket() // CloseSend
_, _ = rd.ReadFrame() // Invoke
_, _ = rd.ReadFrame() // Message
pkt, _ := rd.ReadFrame() // CloseSend

_ = wr.WritePacket(drpcwire.Packet{
Data: []byte("qux"),
Expand All @@ -121,7 +121,7 @@ func TestConn_InvokeSendsGrpcAndDrpcMetadata(t *testing.T) {
})
_ = wr.Flush()

_, _ = rd.ReadPacket() // Close
_, _ = rd.ReadFrame() // Close
})

conn := New(pc)
Expand Down Expand Up @@ -154,7 +154,7 @@ func TestConn_NewStreamSendsGrpcAndDrpcMetadata(t *testing.T) {
ctx.Run(func(ctx context.Context) {
rd := drpcwire.NewReader(ps)

md, err := rd.ReadPacket() // Metadata
md, err := rd.ReadFrame() // Metadata
assert.NoError(t, err)
assert.Equal(t, md.Kind, drpcwire.KindInvokeMetadata)
metadata, err := drpcmetadata.Decode(md.Data)
Expand All @@ -164,8 +164,8 @@ func TestConn_NewStreamSendsGrpcAndDrpcMetadata(t *testing.T) {
"drpc-key": "drpc-value",
})

_, _ = rd.ReadPacket() // Invoke
_, _ = rd.ReadPacket() // CloseSend
_, _ = rd.ReadFrame() // Invoke
_, _ = rd.ReadFrame() // CloseSend
})

conn := New(pc)
Expand Down
77 changes: 39 additions & 38 deletions drpcmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,9 @@ type Manager struct {
rd *drpcwire.Reader
opts Options

lastFrameID drpcwire.ID
lastFrameKind drpcwire.Kind

sem drpcsignal.Chan // held by the active stream
sbuf streamBuffer // largest stream id created
pkts chan drpcwire.Packet // channel for invoke packets
Expand Down Expand Up @@ -213,27 +216,15 @@ func (m *Manager) terminate(err error) {
// manage reader
//

// manageReader is always reading a packet and dispatching it to the appropriate
// stream or queue. It sets the read signal when it exits so that one can wait
// to ensure that no one is reading on the reader. It sets the term signal if
// there is any error reading packets.
// manageReader reads the frame and dispatches them to the appropriate stream or
// queue. It sets the read signal when it exits so that one can wait to ensure
// that no one is reading on the reader. It sets the term signal if there is any
// error reading frames.
func (m *Manager) manageReader() {
defer m.sigs.read.Set(nil)

var pkt drpcwire.Packet
var err error
var run int

for !m.sigs.term.IsSet() {
// if we have a run of "small" packets, drop the buffer to release
// memory so that a burst of large packets does not cause eternally
// large heap usage.
if run > 10 {
pkt.Data = nil
run = 0
}

pkt, err = m.rd.ReadPacketUsing(pkt.Data[:0])
incomingFrame, err := m.rd.ReadFrame()
if err != nil {
if isConnectionReset(err) {
err = drpc.ClosedError.Wrap(err)
Expand All @@ -242,36 +233,36 @@ func (m *Manager) manageReader() {
return
}

if len(pkt.Data) < cap(pkt.Data)/4 {
run++
} else {
run = 0
}
m.log("READ", incomingFrame.String)

m.log("READ", pkt.String)
if ok := m.checkStreamMonotonicity(incomingFrame); !ok {
m.terminate(managerClosed.Wrap(drpc.ProtocolError.New("id monotonicity violation")))
return
}

switch curr := m.sbuf.Get(); {
// if the packet is for the current stream, deliver it.
case curr != nil && pkt.ID.Stream == curr.ID():
if err := curr.HandlePacket(pkt); err != nil {
// If the frame is for the current stream, deliver it.
case curr != nil && incomingFrame.ID.Stream == curr.ID():
if err := curr.HandleFrame(incomingFrame); err != nil {
m.terminate(managerClosed.Wrap(err))
return
}

// if an old message has been sent, just ignore it.
case curr != nil && pkt.ID.Stream < curr.ID():
// If a frame arrives for an old stream, just ignore it.
case curr != nil && incomingFrame.ID.Stream < curr.ID():
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This case is redundant for the server. A frame that passes the monotonicity check above can never fail this check for the server since incomingFrame.Stream >= lastFrameID.Stream >= curr.ID() will always be true on the server side. However, it is needed for the client. It will help to add a comment here explaining why this is needed.


// if any invoke sequence is being sent, close any old unterminated
// stream and forward it to be handled.
case pkt.Kind == drpcwire.KindInvoke || pkt.Kind == drpcwire.KindInvokeMetadata:
// If an invoke sequence is being sent for a new stream, close any
// old unterminated stream and forward it to be handled.
case incomingFrame.Kind == drpcwire.KindInvoke || incomingFrame.Kind == drpcwire.KindInvokeMetadata:
if curr != nil && !curr.IsTerminated() {
curr.Cancel(context.Canceled)
}

pkt := drpcwire.Packet{ID: incomingFrame.ID, Kind: incomingFrame.Kind, Data: incomingFrame.Data}
select {
case m.pkts <- pkt:
// Wait for NewServerStream to finish stream creation (including
// sbuf.Set) before reading the next packet. This guarantees curr
// sbuf.Set) before reading the next frame. This guarantees curr
// is set for subsequent non-invoke packets.
m.pdone.Recv()

Expand All @@ -280,18 +271,28 @@ func (m *Manager) manageReader() {
}

default:
// A non-invoke packet arrived for a stream that doesn't exist yet
// (curr is nil or pkt.ID.Stream > curr.ID). The first packet of a
// new stream must be KindInvoke or KindInvokeMetadata.
// A non-invoke frame arrived for a stream that doesn't exist yet
// (curr is nil or incomingFrame.ID.Stream > curr.ID). The first
// frame of a new stream must be KindInvoke or KindInvokeMetadata.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This comment and the error is true for the server but doesn't make sense for the client side. Can we make it generic or clarify client vs server explicitly.

m.terminate(managerClosed.Wrap(drpc.ProtocolError.New(
"first packet of a new stream must be Invoke, got %v (ID:%v)",
pkt.Kind,
pkt.ID)))
"first frame of a new stream must be Invoke, got %v (ID:%v)",
incomingFrame.Kind,
incomingFrame.ID)))
return
}
}
}

func (m *Manager) checkStreamMonotonicity(incomingFrame drpcwire.Frame) bool {
ok := incomingFrame.ID.Stream >= m.lastFrameID.Stream
m.lastFrameKind = incomingFrame.Kind
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: This is not being used anywhere - I'm guessing for future use.

m.lastFrameID = incomingFrame.ID
if incomingFrame.Done {
m.lastFrameID.Message += 1
}
return ok
}

//
// manage streams
//
Expand Down
Loading