Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions pkg/sip/inbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -599,6 +599,7 @@ type inboundCall struct {
joinDur func() time.Duration
forwardDTMF atomic.Bool
done atomic.Bool
terminated atomic.Bool
started core.Fuse
stats Stats
jitterBuf bool
Expand Down Expand Up @@ -1129,6 +1130,14 @@ func (c *inboundCall) printStats(log logger.Logger) {

// close should only be called from handleInvite.
func (c *inboundCall) close(ctx context.Context, error bool, status CallStatus, reason string) {
go func() {
time.Sleep(5 * time.Minute)
if !c.terminated.Load() {
c.mon.CallTerminationFailure()
c.log().Errorw("call failed to terminate after 5 minutes", nil) // To be able to get call IDs
}
}()

ctx = context.WithoutCancel(ctx)
Comment on lines +1133 to 1141
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
go func() {
time.Sleep(5 * time.Minute)
if !c.terminated.Load() {
c.mon.CallTerminationFailure()
c.log().Errorw("call failed to terminate after 5 minutes", nil) // To be able to get call IDs
}
}()
ctx = context.WithoutCancel(ctx)
ctx = context.WithoutCancel(ctx) // do not timeout
ctx, cancel := context.WithCancel(ctx)
defer cancel()
go func() {
select {
case <-ctx.Done():
return
case <-time.After(5 * time.Minute):
if !c.terminated.Load() {
c.mon.CallTerminationFailure()
c.log().Errorw("call failed to terminate after 5 minutes", nil) // To be able to get call IDs
}
}
}()

We shouldn't block a goroutine unconditionally, it should also select on the context of the parent call.

Also, this should be after the CAS on c.done, so that we don't run an extra goroutine if Close is called twice.

if !c.done.CompareAndSwap(false, true) {
return
Expand Down Expand Up @@ -1184,6 +1193,7 @@ func (c *inboundCall) close(ctx context.Context, error bool, status CallStatus,
}

c.cancel()
c.terminated.Store(true)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe defer it?

}

func (c *inboundCall) closeWithTimeout(ctx context.Context, isError bool) {
Expand Down
16 changes: 14 additions & 2 deletions pkg/sip/outbound.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"net"
"sort"
"sync"
"sync/atomic"
"time"

"github.com/frostbyte73/core"
Expand Down Expand Up @@ -86,6 +87,8 @@ type outboundCall struct {
lkRoom RoomInterface
lkRoomIn msdk.PCM16Writer // output to room; OPUS at 48k
sipConf sipOutboundConfig

terminated atomic.Bool
}

func (c *Client) newCall(ctx context.Context, tid traceid.ID, conf *config.Config, log logger.Logger, id LocalTag, room RoomConfig, sipConf sipOutboundConfig, state *CallState, projectID string) (*outboundCall, error) {
Expand Down Expand Up @@ -509,8 +512,16 @@ func sipResponse(ctx context.Context, tx sip.ClientTransaction, stop <-chan stru
}

func (c *outboundCall) stopSIP(ctx context.Context, reason string) {
go func() {
time.Sleep(5 * time.Minute)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same here, we shouldn't block unconditionally.

if !c.terminated.Load() {
c.mon.CallTerminationFailure()
c.log.Errorw("call failed to terminate after 5 minutes", nil) // To be able to get call IDs
}
}()
c.mon.CallTerminate(reason)
c.cc.Close(ctx)
c.terminated.Store(true)
}

func (c *outboundCall) setStatus(v CallStatus) {
Expand Down Expand Up @@ -761,8 +772,9 @@ type sipOutbound struct {
nextCSeq uint32
getHeaders setHeadersFunc

referCseq uint32
referDone chan error
referCseq uint32
referDone chan error
terminated atomic.Bool
}

func (c *sipOutbound) From() sip.Uri {
Expand Down
49 changes: 31 additions & 18 deletions pkg/stats/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,24 +60,25 @@ const (
type Monitor struct {
nodeID string

inviteReqRaw prometheus.Counter
inviteReq *prometheus.CounterVec
inviteAccept *prometheus.CounterVec
inviteErr *prometheus.CounterVec
callsActive *prometheus.GaugeVec
callsTerminated *prometheus.CounterVec
packetsRTP *prometheus.CounterVec
durSession *prometheus.HistogramVec
durCall *prometheus.HistogramVec
durJoin *prometheus.HistogramVec
durCheck *prometheus.HistogramVec
cpuLoad prometheus.Gauge
sdpSize *prometheus.HistogramVec
nodeAvailable prometheus.GaugeFunc
transfersTotal *prometheus.CounterVec
transfersSucceeded *prometheus.CounterVec
transfersFailed *prometheus.CounterVec
transfersActive *prometheus.GaugeVec
inviteReqRaw prometheus.Counter
inviteReq *prometheus.CounterVec
inviteAccept *prometheus.CounterVec
inviteErr *prometheus.CounterVec
callsActive *prometheus.GaugeVec
callsTerminated *prometheus.CounterVec
callsTerminationFailures *prometheus.CounterVec
packetsRTP *prometheus.CounterVec
durSession *prometheus.HistogramVec
durCall *prometheus.HistogramVec
durJoin *prometheus.HistogramVec
durCheck *prometheus.HistogramVec
cpuLoad prometheus.Gauge
sdpSize *prometheus.HistogramVec
nodeAvailable prometheus.GaugeFunc
transfersTotal *prometheus.CounterVec
transfersSucceeded *prometheus.CounterVec
transfersFailed *prometheus.CounterVec
transfersActive *prometheus.GaugeVec

cpu *hwstats.CPUStats
maxUtilization float64
Expand Down Expand Up @@ -170,6 +171,14 @@ func (m *Monitor) Start(conf *config.Config) error {
ConstLabels: prometheus.Labels{"node_id": conf.NodeID},
}, []string{"dir", "to", "reason"}))

m.callsTerminationFailures = mustRegister(m, prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "livekit",
Subsystem: "sip",
Name: "calls_termination_failures",
Help: "Number of calls that failed to terminate after 5 minutes",
ConstLabels: prometheus.Labels{"node_id": conf.NodeID},
}, []string{"dir"}))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Calling c.labels will add to as well, which is not defined here. This will likely panic.


m.packetsRTP = mustRegister(m, prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "livekit",
Subsystem: "sip",
Expand Down Expand Up @@ -395,6 +404,10 @@ func (c *CallMonitor) CallTerminate(reason string) {
c.m.callsTerminated.With(c.labels(prometheus.Labels{"reason": reason})).Inc()
}

func (c *CallMonitor) CallTerminationFailure() {
c.m.callsTerminationFailures.With(c.labels(prometheus.Labels{})).Inc()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
c.m.callsTerminationFailures.With(c.labels(prometheus.Labels{})).Inc()
c.m.callsTerminationFailures.With(c.labels(nil)).Inc()

}

func (c *CallMonitor) RTPPacketSend(payloadType string) {
c.m.packetsRTP.With(c.labels(prometheus.Labels{"op": "send", "payload": payloadType})).Inc()
}
Expand Down