Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
164 changes: 93 additions & 71 deletions drpcmanager/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,17 @@ type Manager struct {
lastFrameID drpcwire.ID
lastFrameKind drpcwire.Kind

sem drpcsignal.Chan // held by the active stream
sbuf streamBuffer // largest stream id created
pkts chan drpcwire.Packet // channel for invoke packets
pdone drpcsignal.Chan // signals when a packets buffers can be reused
sfin chan struct{} // shared signal for stream finished
streams chan streamInfo // channel to signal that a stream should start
sem drpcsignal.Chan // held by the active stream
sbuf streamBuffer // largest stream id created
sfin chan struct{} // shared signal for stream finished
streams chan streamInfo // channel to signal that a stream should start

pdone drpcsignal.Chan // signals when NewServerStream has registered the new stream
invokes chan invokeInfo // completed invoke info from manageReader to NewServerStream

// Below fields are owned by the manageReader goroutine, used in handleInvokeFrame.
metadata map[string]string // accumulated invoke metadata
pa drpcwire.PacketAssembler // assembles invoke/metadata frames into packets

sigs struct {
term drpcsignal.Signal // set when the manager should start terminating
Expand All @@ -90,6 +95,14 @@ type Manager struct {
}
}

// invokeInfo carries the assembled invoke data from manageReader to
// NewServerStream. It is reused across invocations; call Reset between uses.
type invokeInfo struct {
sid uint64
metadata map[string]string
data []byte // RPC name bytes from the KindInvoke packet
}

type streamInfo struct {
ctx context.Context
stream *drpcstream.Stream
Expand All @@ -109,7 +122,8 @@ func NewWithOptions(tr drpc.Transport, opts Options) *Manager {
rd: drpcwire.NewReaderWithOptions(tr, opts.Reader),
opts: opts,

pkts: make(chan drpcwire.Packet),
invokes: make(chan invokeInfo),

sfin: make(chan struct{}, 1),
streams: make(chan streamInfo),
}
Expand All @@ -120,10 +134,12 @@ func NewWithOptions(tr drpc.Transport, opts Options) *Manager {
// this semaphore controls the number of concurrent streams. it MUST be 1.
m.sem.Make(1)

// a buffer of size 1 allows the consumer of the packet to signal it is done
// without having to coordinate with the sender of the packet.
// a buffer of size 1 allows NewServerStream to signal it is done creating a
// new server stream without having to coordinate with manageReader.
m.pdone.Make(1)

m.pa = drpcwire.NewPacketAssembler()

// set the internal stream options
drpcopts.SetStreamTransport(&m.opts.Stream.Internal, m.tr)
drpcopts.SetStreamFin(&m.opts.Stream.Internal, m.sfin)
Expand Down Expand Up @@ -257,16 +273,8 @@ func (m *Manager) manageReader() {
if curr != nil && !curr.IsTerminated() {
curr.Cancel(context.Canceled)
}

pkt := drpcwire.Packet{ID: incomingFrame.ID, Kind: incomingFrame.Kind, Data: incomingFrame.Data}
select {
case m.pkts <- pkt:
// Wait for NewServerStream to finish stream creation (including
// sbuf.Set) before reading the next frame. This guarantees curr
// is set for subsequent non-invoke packets.
m.pdone.Recv()

case <-m.sigs.term.Signal():
if err := m.handleInvokeFrame(incomingFrame); err != nil {
m.terminate(managerClosed.Wrap(err))
return
}

Expand All @@ -293,6 +301,43 @@ func (m *Manager) checkStreamMonotonicity(incomingFrame drpcwire.Frame) bool {
return ok
}

// handleInvokeFrame assembles invoke/metadata frames into complete packets and
// forwards the finished invoke info to NewServerStream via m.newServerStreamInfo.
// Metadata packets are accumulated; the invoke packet triggers the send.
func (m *Manager) handleInvokeFrame(fr drpcwire.Frame) error {
pkt, packetReady, err := m.pa.AppendFrame(fr)
if err != nil {
return err
}
if !packetReady {
return nil
}

// Metadata arrives before invoke; accumulate it and wait for the invoke.
if pkt.Kind == drpcwire.KindInvokeMetadata {
meta, err := drpcmetadata.Decode(pkt.Data)
if err != nil {
return err
}
m.metadata = meta
return nil
}

// Invoke packet completes the sequence. Send to NewServerStream.
select {
case m.invokes <- invokeInfo{sid: pkt.ID.Stream, data: pkt.Data, metadata: m.metadata}:
// Wait for NewServerStream to finish stream creation (including
// sbuf.Set) before reading the next frame. This guarantees curr
// is set for subsequent non-invoke packets.
m.pdone.Recv()

m.pa.Reset()
m.metadata = nil
case <-m.sigs.term.Signal():
}
return nil
}

//
// manage streams
//
Expand Down Expand Up @@ -446,8 +491,6 @@ func (m *Manager) NewServerStream(ctx context.Context) (stream *drpcstream.Strea
}
}()

var meta map[string]string
var metaID uint64
var timeoutCh <-chan time.Time

// set up the timeout on the context if necessary.
Expand All @@ -457,61 +500,40 @@ func (m *Manager) NewServerStream(ctx context.Context) (stream *drpcstream.Strea
timeoutCh = timer.C
}

for {
select {
case <-timeoutCh:
return nil, "", context.DeadlineExceeded
select {
case <-timeoutCh:
return nil, "", context.DeadlineExceeded

case <-ctx.Done():
return nil, "", ctx.Err()
case <-ctx.Done():
return nil, "", ctx.Err()

case <-m.sigs.term.Signal():
return nil, "", m.sigs.term.Err()

case pkt := <-m.pkts:
switch pkt.Kind {
// keep track of any metadata being sent before an invoke so that we
// can include it if the stream id matches the eventual invoke.
case drpcwire.KindInvokeMetadata:
meta, err = drpcmetadata.Decode(pkt.Data)
m.pdone.Send()

if err != nil {
return nil, "", err
}
metaID = pkt.ID.Stream

case drpcwire.KindInvoke:
rpc = string(pkt.Data)

if metaID == pkt.ID.Stream {
if m.opts.GRPCMetadataCompatMode {
// Populate incoming metadata as grpc metadata in the
// context. This is a short-term fix that will enable us
// to send and receive grpc metadata when DRPC is enabled,
// without any changes in the calling code.
grpcMeta := make(map[string][]string, len(meta))
for k, v := range meta {
grpcMeta[k] = []string{v}
}
ctx = grpcmetadata.NewIncomingContext(ctx, grpcMeta)
} else {
// Add metadata to the incoming context.
ctx = drpcmetadata.NewIncomingContext(ctx, meta)
}
case <-m.sigs.term.Signal():
return nil, "", m.sigs.term.Err()

case pkt := <-m.invokes:
rpc = string(pkt.data)
if pkt.metadata != nil {
if m.opts.GRPCMetadataCompatMode {
// Populate incoming metadata as grpc metadata in the
// context. This is a short-term fix that will enable us
// to send and receive grpc metadata when DRPC is enabled,
// without any changes in the calling code.
grpcMeta := make(map[string][]string, len(pkt.metadata))
for k, v := range pkt.metadata {
grpcMeta[k] = []string{v}
}
stream, err := m.newStream(ctx, pkt.ID.Stream, drpc.StreamKindServer, rpc)
// Signal pdone only after stream registration so that
// manageReader sees the new stream via sbuf.Get() when it reads
// the next frame.
m.pdone.Send()
return stream, rpc, err

default:
// this should never happen, but defensive.
m.pdone.Send()
ctx = grpcmetadata.NewIncomingContext(ctx, grpcMeta)
} else {
// Add metadata to the incoming context.
ctx = drpcmetadata.NewIncomingContext(ctx, pkt.metadata)
}
}
stream, err := m.newStream(ctx, pkt.sid, drpc.StreamKindServer, rpc)
// Signal pdone only after stream registration so that
// manageReader sees the new stream via sbuf.Get() when it reads
// the next frame.
m.pdone.Send()
return stream, rpc, err
}
}

Expand Down
53 changes: 10 additions & 43 deletions drpcstream/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,10 +52,7 @@ type Stream struct {
read inspectMutex
flush sync.Once

assembling bool
pktBuf []byte
pktKind drpcwire.Kind
nextMessageID uint64
pa drpcwire.PacketAssembler

id drpcwire.ID
wr *drpcwire.Writer
Expand Down Expand Up @@ -94,6 +91,9 @@ func NewWithOptions(ctx context.Context, sid uint64, wr *drpcwire.Writer, opts O
}
}

pa := drpcwire.NewPacketAssembler()
pa.SetStreamID(sid)

s := &Stream{
ctx: streamCtx{
Context: ctx,
Expand All @@ -103,7 +103,7 @@ func NewWithOptions(ctx context.Context, sid uint64, wr *drpcwire.Writer, opts O
fin: drpcopts.GetStreamFin(&opts.Internal),
task: task,

nextMessageID: 1,
pa: pa,

id: drpcwire.ID{Stream: sid},
wr: wr.Reset(),
Expand Down Expand Up @@ -228,47 +228,14 @@ func (s *Stream) HandleFrame(fr drpcwire.Frame) (err error) {
return nil
}

if fr.ID.Stream != s.ID() {
return drpc.ProtocolError.New("frame doesn't belong to this stream (fr: %v)", fr.ID)
}

if fr.ID.Message < s.nextMessageID {
return drpc.ProtocolError.New(
"id monotonicity violation: frame %v has message ID less than expected %v", fr.ID, s.nextMessageID)
} else if fr.ID.Message > s.nextMessageID || !s.assembling {
s.pktBuf = s.pktBuf[:0]
s.assembling = true
s.nextMessageID = fr.ID.Message
} else if fr.Kind != s.pktKind {
return drpc.ProtocolError.New("frame kind change within packet: got %v, expected %v", fr.Kind, s.pktKind)
}

// TODO(shubham): add buf reuse
s.pktBuf = append(s.pktBuf, fr.Data...)

s.pktKind = fr.Kind

if s.opts.MaximumBufferSize > 0 && len(s.pktBuf) > s.opts.MaximumBufferSize {
return drpc.ProtocolError.New("data overflow (len:%d)", len(s.pktBuf))
packet, packetReady, err := s.pa.AppendFrame(fr)
if err != nil {
return err
}

if !fr.Done {
if !packetReady {
return nil
}

s.assembling = false
s.nextMessageID = fr.ID.Message + 1

err = s.handlePacket(drpcwire.Packet{
ID: fr.ID,
Kind: fr.Kind,
Control: fr.Control,
Data: s.pktBuf,
})

// TODO(shubham): add buf reuse
s.pktBuf = nil
return err
return s.handlePacket(packet)
}

// handlePacket advances the stream state machine by inspecting the packet. It
Expand Down
Loading