Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 15 additions & 13 deletions drpcmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -250,7 +250,6 @@ func (m *Manager) manageReader() {

m.log("READ", pkt.String)

again:
switch curr := m.sbuf.Get(); {
// if the packet is for the current stream, deliver it.
case curr != nil && pkt.ID.Stream == curr.ID():
Expand All @@ -271,24 +270,24 @@ func (m *Manager) manageReader() {

select {
case m.pkts <- pkt:
// Wait for NewServerStream to finish stream creation (including
// sbuf.Set) before reading the next packet. This guarantees curr
// is set for subsequent non-invoke packets.
m.pdone.Recv()

case <-m.sigs.term.Signal():
return
}

// a non-invoke packet should be delivered to some stream so we wait for
// a new stream to be created and try again. like an invoke, we
// implicitly close any previous stream.
default:
if curr != nil && !curr.IsTerminated() {
curr.Cancel(context.Canceled)
}

if !m.sbuf.Wait(curr.ID()) {
return
}
goto again
// A non-invoke packet arrived for a stream that doesn't exist yet
// (curr is nil or pkt.ID.Stream > curr.ID). The first packet of a
// new stream must be KindInvoke or KindInvokeMetadata.
m.terminate(managerClosed.Wrap(drpc.ProtocolError.New(
"first packet of a new stream must be Invoke, got %v (ID:%v)",
pkt.Kind,
pkt.ID)))
return
}
}
}
Expand Down Expand Up @@ -483,7 +482,6 @@ func (m *Manager) NewServerStream(ctx context.Context) (stream *drpcstream.Strea

case drpcwire.KindInvoke:
rpc = string(pkt.Data)
m.pdone.Send()

if metaID == pkt.ID.Stream {
if m.opts.GRPCMetadataCompatMode {
Expand All @@ -502,6 +500,10 @@ func (m *Manager) NewServerStream(ctx context.Context) (stream *drpcstream.Strea
}
}
stream, err := m.newStream(ctx, pkt.ID.Stream, drpc.StreamKindServer, rpc)
// Signal pdone only after stream registration so that
// manageReader sees the new stream via sbuf.Get() when it reads
// the next frame.
m.pdone.Send()
return stream, rpc, err

default:
Expand Down
85 changes: 85 additions & 0 deletions drpcmanager/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (

"github.com/zeebo/assert"
grpcmetadata "google.golang.org/grpc/metadata"
"storj.io/drpc"
"storj.io/drpc/drpcmetadata"

"storj.io/drpc/drpctest"
Expand Down Expand Up @@ -161,6 +162,90 @@ func TestDrpcMetadataWithGRPCMetadataCompatMode(t *testing.T) {
ctx.Wait()
}

// writeFrames serializes the given frames and writes them to w.
func writeFrames(t *testing.T, w io.Writer, frames ...drpcwire.Frame) {
t.Helper()
var buf []byte
for _, fr := range frames {
buf = drpcwire.AppendFrame(buf, fr)
}
_, err := w.Write(buf)
assert.NoError(t, err)
}

// createFrame is a shorthand for constructing a Frame.
func createFrame(kind drpcwire.Kind, sid, mid uint64, data string, done bool) drpcwire.Frame {
return drpcwire.Frame{
ID: drpcwire.ID{Stream: sid, Message: mid},
Kind: kind,
Data: []byte(data),
Done: done,
}
}

// waitForClosed blocks until the manager terminates or the timeout expires.
func waitForClosed(t *testing.T, man *Manager) {
t.Helper()
select {
case <-man.Closed():
case <-time.After(5 * time.Second):
t.Fatal("manager did not terminate in time")
}
}

// Invoke replay: after [s1,m1,invoke,done=true], lastFrameID is bumped to
// {1,2}. A replayed [s1,m1,invoke] is caught by the monotonicity check.
func TestManageReader_InvokeReplayBlocked(t *testing.T) {
ctx := drpctest.NewTracker(t)
defer ctx.Close()

cconn, sconn := net.Pipe()
defer func() { _ = cconn.Close() }()
defer func() { _ = sconn.Close() }()

man := New(sconn)
defer func() { _ = man.Close() }()

ctx.Run(func(ctx context.Context) {
_, _, _ = man.NewServerStream(ctx)
})

writeFrames(t, cconn,
createFrame(drpcwire.KindInvoke, 1, 1, "rpc", true),
createFrame(drpcwire.KindInvoke, 1, 1, "rpc", true),
)

waitForClosed(t, man)
}

// A second invoke for the same stream ID is rejected — the stream treats
// it as a protocol error, terminating the manager.
func TestManageReader_InvokeOnExistingStream(t *testing.T) {
ctx := drpctest.NewTracker(t)
defer ctx.Close()

cconn, sconn := net.Pipe()
defer func() { _ = cconn.Close() }()
defer func() { _ = sconn.Close() }()

man := New(sconn)
defer func() { _ = man.Close() }()

ctx.Run(func(ctx context.Context) {
stream, _, err := man.NewServerStream(ctx)
assert.NoError(t, err)
_ = stream
})

writeFrames(t, cconn,
createFrame(drpcwire.KindInvoke, 1, 1, "rpc1", true),
createFrame(drpcwire.KindInvoke, 1, 2, "rpc2", true),
)

waitForClosed(t, man)
assert.That(t, drpc.ProtocolError.Has(man.sigs.term.Err()))
}

type blockingTransport chan struct{}

func (b blockingTransport) Read(p []byte) (n int, err error) { <-b; return 0, io.EOF }
Expand Down
2 changes: 2 additions & 0 deletions drpcmanager/random_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,12 @@ import (
)

func TestRandomized_Client(t *testing.T) {
t.Skip("disabled as the generated random workload violates the wire protocol")
runRandomized(t, randomBytes(time.Now().UnixNano(), 1024), new(randClient))
}

func TestRandomized_Server(t *testing.T) {
t.Skip("disabled as the generated random workload violates the wire protocol")
runRandomized(t, randomBytes(time.Now().UnixNano(), 1024), new(randServer))
}

Expand Down