From 1f6b74effbd6d01663eae623bb2245e896558436 Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 21 Jan 2026 11:18:18 -0800 Subject: [PATCH 1/2] adding termination --- pkg/sip/inbound.go | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index ddc3903a..596ee8d2 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -599,6 +599,7 @@ type inboundCall struct { joinDur func() time.Duration forwardDTMF atomic.Bool done atomic.Bool + terminated atomic.Bool started core.Fuse stats Stats jitterBuf bool @@ -1129,6 +1130,13 @@ func (c *inboundCall) printStats(log logger.Logger) { // close should only be called from handleInvite. func (c *inboundCall) close(ctx context.Context, error bool, status CallStatus, reason string) { + go func() { + time.Sleep(5 * time.Minute) + if !c.terminated.Load() { + c.log().Errorw("call failed to terminate after 5 minutes", nil) + } + }() + ctx = context.WithoutCancel(ctx) if !c.done.CompareAndSwap(false, true) { return @@ -1184,6 +1192,7 @@ func (c *inboundCall) close(ctx context.Context, error bool, status CallStatus, } c.cancel() + c.terminated.Store(true) } func (c *inboundCall) closeWithTimeout(ctx context.Context, isError bool) { From b1b2ee33a9b9a5d23c7688a515f492448de1c1d5 Mon Sep 17 00:00:00 2001 From: Alex Date: Wed, 21 Jan 2026 13:51:30 -0800 Subject: [PATCH 2/2] Adding prom metric, too --- pkg/sip/inbound.go | 3 ++- pkg/sip/outbound.go | 16 +++++++++++++-- pkg/stats/monitor.go | 49 ++++++++++++++++++++++++++++---------------- 3 files changed, 47 insertions(+), 21 deletions(-) diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index 596ee8d2..6cdbea86 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -1133,7 +1133,8 @@ func (c *inboundCall) close(ctx context.Context, error bool, status CallStatus, go func() { time.Sleep(5 * time.Minute) if !c.terminated.Load() { - c.log().Errorw("call failed to terminate after 5 minutes", nil) + c.mon.CallTerminationFailure() + c.log().Errorw("call failed to terminate after 5 minutes", nil) // To be able to get call IDs } }() diff --git a/pkg/sip/outbound.go b/pkg/sip/outbound.go index 9c68a673..c52e7de1 100644 --- a/pkg/sip/outbound.go +++ b/pkg/sip/outbound.go @@ -21,6 +21,7 @@ import ( "net" "sort" "sync" + "sync/atomic" "time" "github.com/frostbyte73/core" @@ -86,6 +87,8 @@ type outboundCall struct { lkRoom RoomInterface lkRoomIn msdk.PCM16Writer // output to room; OPUS at 48k sipConf sipOutboundConfig + + terminated atomic.Bool } func (c *Client) newCall(ctx context.Context, tid traceid.ID, conf *config.Config, log logger.Logger, id LocalTag, room RoomConfig, sipConf sipOutboundConfig, state *CallState, projectID string) (*outboundCall, error) { @@ -509,8 +512,16 @@ func sipResponse(ctx context.Context, tx sip.ClientTransaction, stop <-chan stru } func (c *outboundCall) stopSIP(ctx context.Context, reason string) { + go func() { + time.Sleep(5 * time.Minute) + if !c.terminated.Load() { + c.mon.CallTerminationFailure() + c.log.Errorw("call failed to terminate after 5 minutes", nil) // To be able to get call IDs + } + }() c.mon.CallTerminate(reason) c.cc.Close(ctx) + c.terminated.Store(true) } func (c *outboundCall) setStatus(v CallStatus) { @@ -761,8 +772,9 @@ type sipOutbound struct { nextCSeq uint32 getHeaders setHeadersFunc - referCseq uint32 - referDone chan error + referCseq uint32 + referDone chan error + terminated atomic.Bool } func (c *sipOutbound) From() sip.Uri { diff --git a/pkg/stats/monitor.go b/pkg/stats/monitor.go index 8f14e209..cb9d9ef3 100644 --- a/pkg/stats/monitor.go +++ b/pkg/stats/monitor.go @@ -60,24 +60,25 @@ const ( type Monitor struct { nodeID string - inviteReqRaw prometheus.Counter - inviteReq *prometheus.CounterVec - inviteAccept *prometheus.CounterVec - inviteErr *prometheus.CounterVec - callsActive *prometheus.GaugeVec - callsTerminated *prometheus.CounterVec - packetsRTP *prometheus.CounterVec - durSession *prometheus.HistogramVec - durCall *prometheus.HistogramVec - durJoin *prometheus.HistogramVec - durCheck *prometheus.HistogramVec - cpuLoad prometheus.Gauge - sdpSize *prometheus.HistogramVec - nodeAvailable prometheus.GaugeFunc - transfersTotal *prometheus.CounterVec - transfersSucceeded *prometheus.CounterVec - transfersFailed *prometheus.CounterVec - transfersActive *prometheus.GaugeVec + inviteReqRaw prometheus.Counter + inviteReq *prometheus.CounterVec + inviteAccept *prometheus.CounterVec + inviteErr *prometheus.CounterVec + callsActive *prometheus.GaugeVec + callsTerminated *prometheus.CounterVec + callsTerminationFailures *prometheus.CounterVec + packetsRTP *prometheus.CounterVec + durSession *prometheus.HistogramVec + durCall *prometheus.HistogramVec + durJoin *prometheus.HistogramVec + durCheck *prometheus.HistogramVec + cpuLoad prometheus.Gauge + sdpSize *prometheus.HistogramVec + nodeAvailable prometheus.GaugeFunc + transfersTotal *prometheus.CounterVec + transfersSucceeded *prometheus.CounterVec + transfersFailed *prometheus.CounterVec + transfersActive *prometheus.GaugeVec cpu *hwstats.CPUStats maxUtilization float64 @@ -170,6 +171,14 @@ func (m *Monitor) Start(conf *config.Config) error { ConstLabels: prometheus.Labels{"node_id": conf.NodeID}, }, []string{"dir", "to", "reason"})) + m.callsTerminationFailures = mustRegister(m, prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "livekit", + Subsystem: "sip", + Name: "calls_termination_failures", + Help: "Number of calls that failed to terminate after 5 minutes", + ConstLabels: prometheus.Labels{"node_id": conf.NodeID}, + }, []string{"dir"})) + m.packetsRTP = mustRegister(m, prometheus.NewCounterVec(prometheus.CounterOpts{ Namespace: "livekit", Subsystem: "sip", @@ -395,6 +404,10 @@ func (c *CallMonitor) CallTerminate(reason string) { c.m.callsTerminated.With(c.labels(prometheus.Labels{"reason": reason})).Inc() } +func (c *CallMonitor) CallTerminationFailure() { + c.m.callsTerminationFailures.With(c.labels(prometheus.Labels{})).Inc() +} + func (c *CallMonitor) RTPPacketSend(payloadType string) { c.m.packetsRTP.With(c.labels(prometheus.Labels{"op": "send", "payload": payloadType})).Inc() }