diff --git a/pkg/sip/media_port.go b/pkg/sip/media_port.go index f9afd99c..974d9d08 100644 --- a/pkg/sip/media_port.go +++ b/pkg/sip/media_port.go @@ -632,7 +632,6 @@ func (p *MediaPort) rtpLoop(tid traceid.ID, sess rtp.Session) { p.stats.Streams.Add(1) p.mediaReceived.Break() log := p.log.WithValues("ssrc", ssrc) - log.Infow("accepting RTP stream") go p.rtpReadLoop(tid, log, r) } } @@ -641,10 +640,13 @@ func (p *MediaPort) rtpReadLoop(tid traceid.ID, log logger.Logger, r rtp.ReadStr const maxErrors = 50 // 1 sec, given 20 ms frames buf := make([]byte, rtp.MTUSize+1) overflow := false + catchup := true var ( - h rtp.Header - pipeline string - errorCnt int + h rtp.Header + pipeline string + errorCnt int + lastPacketTime time.Time + discardedPackets int ) for { h = rtp.Header{} @@ -655,6 +657,19 @@ func (p *MediaPort) rtpReadLoop(tid traceid.ID, log logger.Logger, r rtp.ReadStr log.Errorw("read RTP failed", err) return } + if catchup { + // When a remote writer has been sending data but the local stream didn't start the RTP loop, + // such as in outbound calls that take a sec to answer, we have a ton of junk in the stream. + // We need to discard all of it. + now := time.Now() + if lastPacketTime.IsZero() || now.Sub(lastPacketTime) < time.Millisecond { + lastPacketTime = now + discardedPackets++ + continue + } + catchup = false + log.Infow("accepting RTP stream", "sequenceNumber", h.SequenceNumber, "discarded", discardedPackets) + } p.packetCount.Add(1) p.stats.Packets.Add(1) if n > rtp.MTUSize {