Skip to content

Commit 162c5ab

Browse files
committed
fixup! *: extract PacketAssembler for frame-to-packet assembly
1 parent 0f1fbbe commit 162c5ab

2 files changed

Lines changed: 27 additions & 32 deletions

File tree

drpcmanager/manager.go

Lines changed: 22 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -80,8 +80,12 @@ type Manager struct {
8080
sfin chan struct{} // shared signal for stream finished
8181
streams chan streamInfo // channel to signal that a stream should start
8282

83-
newServerStreamInfo chan newStreamInfo // completed invoke info from manageReader to NewServerStream
84-
pdone drpcsignal.Chan // signals when NewServerStream has registered the new stream
83+
pdone drpcsignal.Chan // signals when NewServerStream has registered the new stream
84+
invokes chan invokeInfo // completed invoke info from manageReader to NewServerStream
85+
86+
// Below fields are owned by the manageReader goroutine, used in handleInvokeFrame.
87+
metadata map[string]string // accumulated invoke metadata
88+
pa drpcwire.PacketAssembler // assembles invoke/metadata frames into packets
8589

8690
sigs struct {
8791
term drpcsignal.Signal // set when the manager should start terminating
@@ -91,20 +95,14 @@ type Manager struct {
9195
}
9296
}
9397

94-
// newStreamInfo carries the assembled invoke data from manageReader to
98+
// invokeInfo carries the assembled invoke data from manageReader to
9599
// NewServerStream. It is reused across invocations; call Reset between uses.
96-
type newStreamInfo struct {
100+
type invokeInfo struct {
97101
sid uint64
98102
metadata map[string]string
99103
data []byte // RPC name bytes from the KindInvoke packet
100104
}
101105

102-
// Reset clears all fields for reuse on the next invoke sequence.
103-
func (ns *newStreamInfo) Reset() {
104-
ns.sid = 0
105-
ns.metadata = nil
106-
}
107-
108106
type streamInfo struct {
109107
ctx context.Context
110108
stream *drpcstream.Stream
@@ -124,7 +122,7 @@ func NewWithOptions(tr drpc.Transport, opts Options) *Manager {
124122
rd: drpcwire.NewReaderWithOptions(tr, opts.Reader),
125123
opts: opts,
126124

127-
newServerStreamInfo: make(chan newStreamInfo),
125+
invokes: make(chan invokeInfo),
128126

129127
sfin: make(chan struct{}, 1),
130128
streams: make(chan streamInfo),
@@ -140,6 +138,8 @@ func NewWithOptions(tr drpc.Transport, opts Options) *Manager {
140138
// without having to coordinate with manageReader.
141139
m.pdone.Make(1)
142140

141+
m.pa = drpcwire.NewPacketAssembler()
142+
143143
// set the internal stream options
144144
drpcopts.SetStreamTransport(&m.opts.Stream.Internal, m.tr)
145145
drpcopts.SetStreamFin(&m.opts.Stream.Internal, m.sfin)
@@ -239,9 +239,6 @@ func (m *Manager) terminate(err error) {
239239
func (m *Manager) manageReader() {
240240
defer m.sigs.read.Set(nil)
241241

242-
invokePktAssembler := drpcwire.NewPacketAssembler()
243-
createStreamInfo := newStreamInfo{}
244-
245242
for !m.sigs.term.IsSet() {
246243
incomingFrame, err := m.rd.ReadFrame()
247244
if err != nil {
@@ -276,7 +273,7 @@ func (m *Manager) manageReader() {
276273
if curr != nil && !curr.IsTerminated() {
277274
curr.Cancel(context.Canceled)
278275
}
279-
if err := m.handleInvokeFrame(&invokePktAssembler, &createStreamInfo, incomingFrame); err != nil {
276+
if err := m.handleInvokeFrame(incomingFrame); err != nil {
280277
m.terminate(managerClosed.Wrap(err))
281278
return
282279
}
@@ -305,10 +302,10 @@ func (m *Manager) checkStreamMonotonicity(incomingFrame drpcwire.Frame) bool {
305302
}
306303

307304
// handleInvokeFrame assembles invoke/metadata frames into complete packets and
308-
// forwards the finished invoke info to NewServerStream via m.pkts. Metadata
309-
// packets are accumulated into info; the invoke packet triggers the send.
310-
func (m *Manager) handleInvokeFrame(pa *drpcwire.PacketAssembler, info *newStreamInfo, fr drpcwire.Frame) error {
311-
pkt, packetReady, err := pa.AppendFrame(fr)
305+
// forwards the finished invoke info to NewServerStream via m.newServerStreamInfo.
306+
// Metadata packets are accumulated; the invoke packet triggers the send.
307+
func (m *Manager) handleInvokeFrame(fr drpcwire.Frame) error {
308+
pkt, packetReady, err := m.pa.AppendFrame(fr)
312309
if err != nil {
313310
return err
314311
}
@@ -322,22 +319,20 @@ func (m *Manager) handleInvokeFrame(pa *drpcwire.PacketAssembler, info *newStrea
322319
if err != nil {
323320
return err
324321
}
325-
info.metadata = meta
322+
m.metadata = meta
326323
return nil
327324
}
328325

329326
// Invoke packet completes the sequence. Send to NewServerStream.
330-
info.sid = pkt.ID.Stream
331-
info.data = pkt.Data
332-
333327
select {
334-
case m.newServerStreamInfo <- *info:
328+
case m.invokes <- invokeInfo{sid: pkt.ID.Stream, data: pkt.Data, metadata: m.metadata}:
335329
// Wait for NewServerStream to finish stream creation (including
336330
// sbuf.Set) before reading the next frame. This guarantees curr
337331
// is set for subsequent non-invoke packets.
338332
m.pdone.Recv()
339-
pa.Reset()
340-
info.Reset()
333+
334+
m.pa.Reset()
335+
m.metadata = nil
341336
case <-m.sigs.term.Signal():
342337
}
343338
return nil
@@ -515,7 +510,7 @@ func (m *Manager) NewServerStream(ctx context.Context) (stream *drpcstream.Strea
515510
case <-m.sigs.term.Signal():
516511
return nil, "", m.sigs.term.Err()
517512

518-
case pkt := <-m.newServerStreamInfo:
513+
case pkt := <-m.invokes:
519514
rpc = string(pkt.data)
520515
if pkt.metadata != nil {
521516
if m.opts.GRPCMetadataCompatMode {

drpcstream/stream.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ type Stream struct {
5252
read inspectMutex
5353
flush sync.Once
5454

55-
pb drpcwire.PacketAssembler
55+
pa drpcwire.PacketAssembler
5656

5757
id drpcwire.ID
5858
wr *drpcwire.Writer
@@ -91,8 +91,8 @@ func NewWithOptions(ctx context.Context, sid uint64, wr *drpcwire.Writer, opts O
9191
}
9292
}
9393

94-
pb := drpcwire.NewPacketAssembler()
95-
pb.SetStreamID(sid)
94+
pa := drpcwire.NewPacketAssembler()
95+
pa.SetStreamID(sid)
9696

9797
s := &Stream{
9898
ctx: streamCtx{
@@ -103,7 +103,7 @@ func NewWithOptions(ctx context.Context, sid uint64, wr *drpcwire.Writer, opts O
103103
fin: drpcopts.GetStreamFin(&opts.Internal),
104104
task: task,
105105

106-
pb: pb,
106+
pa: pa,
107107

108108
id: drpcwire.ID{Stream: sid},
109109
wr: wr.Reset(),
@@ -228,7 +228,7 @@ func (s *Stream) HandleFrame(fr drpcwire.Frame) (err error) {
228228
return nil
229229
}
230230

231-
packet, packetReady, err := s.pb.AppendFrame(fr)
231+
packet, packetReady, err := s.pa.AppendFrame(fr)
232232
if err != nil {
233233
return err
234234
}

0 commit comments

Comments
 (0)