diff --git a/pkg/sip/inbound.go b/pkg/sip/inbound.go index ddc3903a..5bf39945 100644 --- a/pkg/sip/inbound.go +++ b/pkg/sip/inbound.go @@ -95,6 +95,7 @@ func (c *inboundCallInfo) countInvite(log logger.Logger, req *sip.Request) { hasAuth := inviteHasAuth(req) cseq := req.CSeq() if cseq == nil { + log.Debugw("countInvite: no CSeq header found", "method", req.Method) return } c.Lock() @@ -107,15 +108,38 @@ func (c *inboundCallInfo) countInvite(log logger.Logger, req *sip.Request) { countPtr = &c.invitesAuth name = "invite with auth" } + prevCSeq := *cseqPtr + prevCount := *countPtr if *cseqPtr == 0 { *cseqPtr = cseq.SeqNo + log.Debugw("countInvite: initial "+name, + "cseq", cseq.SeqNo, + "hasAuth", hasAuth, + "callID", req.CallID()) } if cseq.SeqNo > *cseqPtr { + log.Debugw("countInvite: detected reinvite", + "name", name, + "prevCSeq", prevCSeq, + "newCSeq", cseq.SeqNo, + "hasAuth", hasAuth, + "callID", req.CallID(), + "from", req.From(), + "to", req.To()) return // reinvite } *countPtr++ if *countPtr > 1 { - log.Warnw("remote appears to be retrying an "+name, nil, "invites", *countPtr, "cseq", *cseqPtr) + log.Warnw("remote appears to be retrying an "+name, nil, + "invites", *countPtr, + "cseq", *cseqPtr, + "callID", req.CallID(), + "prevCount", prevCount) + } else { + log.Debugw("countInvite: first "+name, + "cseq", cseq.SeqNo, + "hasAuth", hasAuth, + "callID", req.CallID()) } } @@ -333,7 +357,23 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE log = LoggerWithParams(log, cc) log = LoggerWithHeaders(log, cc) cc.log = log - log.Infow("processing invite") + cseq := req.CSeq() + cseqValue := uint32(0) + if cseq != nil { + cseqValue = cseq.SeqNo + } + log.Infow("processing invite", + "sipCallID", cc.SIPCallID(), + "cseq", cseqValue, + "method", req.Method, + "from", req.From(), + "to", req.To(), + "callID", req.CallID(), + "content-type", req.ContentType(), + "content-length", req.ContentLength(), + "hasAuth", inviteHasAuth(req), + "source", src.Addr().String(), + "destination", req.Destination()) if err := cc.ValidateInvite(); err != nil { if s.conf.HideInboundPort { @@ -347,11 +387,46 @@ func (s *Server) processInvite(req *sip.Request, tx sip.ServerTransaction) (retE s.cmu.RLock() existing := s.byCallID[cc.SIPCallID()] s.cmu.RUnlock() - if existing != nil && existing.cc.InviteCSeq() < cc.InviteCSeq() { - log.Infow("accepting reinvite", "sipCallID", existing.cc.ID(), "content-type", req.ContentType(), "content-length", req.ContentLength()) - existing.log().Infow("reinvite", "content-type", req.ContentType(), "content-length", req.ContentLength(), "cseq", cc.InviteCSeq()) - cc.AcceptAsKeepAlive(existing.cc.OwnSDP()) - return nil + if existing != nil { + existingCSeq := existing.cc.InviteCSeq() + newCSeq := cc.InviteCSeq() + log.Debugw("checking for reinvite", + "sipCallID", cc.SIPCallID(), + "existingCSeq", existingCSeq, + "newCSeq", newCSeq, + "existingCallID", existing.cc.ID(), + "isReinvite", existingCSeq < newCSeq, + "content-type", req.ContentType(), + "content-length", req.ContentLength()) + if existingCSeq < newCSeq { + log.Infow("accepting reinvite", + "sipCallID", existing.cc.ID(), + "existingCSeq", existingCSeq, + "newCSeq", newCSeq, + "content-type", req.ContentType(), + "content-length", req.ContentLength(), + "from", req.From(), + "to", req.To(), + "callID", req.CallID()) + existing.log().Infow("reinvite", + "content-type", req.ContentType(), + "content-length", req.ContentLength(), + "existingCSeq", existingCSeq, + "newCSeq", newCSeq, + "cseq", newCSeq) + cc.AcceptAsKeepAlive(existing.cc.OwnSDP()) + return nil + } else { + log.Debugw("existing call found but not a reinvite", + "sipCallID", cc.SIPCallID(), + "existingCSeq", existingCSeq, + "newCSeq", newCSeq, + "reason", "newCSeq not greater than existingCSeq") + } + } else { + log.Debugw("no existing call found for reinvite check", + "sipCallID", cc.SIPCallID(), + "cseq", cc.InviteCSeq()) } from, to := cc.From(), cc.To() @@ -1430,6 +1505,12 @@ func (s *Server) newInbound(log logger.Logger, id LocalTag, contact URI, invite if h := invite.CSeq(); h != nil { c.inviteCSeq = h.SeqNo c.nextRequestCSeq = h.SeqNo + 1 + c.log.Debugw("newInbound: initialized CSeq", + "sipCallID", c.sipCallID, + "inviteCSeq", c.inviteCSeq, + "nextRequestCSeq", c.nextRequestCSeq, + "from", invite.From(), + "to", invite.To()) } if callID := invite.CallID(); callID != nil { c.sipCallID = callID.Value() @@ -1656,6 +1737,13 @@ func (c *sipInbound) accepted(inviteOK *sip.Response) { } func (c *sipInbound) AcceptAsKeepAlive(sdp []byte) { + c.log.Infow("AcceptAsKeepAlive: accepting reinvite with keep-alive response", + "sipCallID", c.sipCallID, + "inviteCSeq", c.inviteCSeq, + "sdpLength", len(sdp), + "hasSDP", len(sdp) > 0, + "from", c.from, + "to", c.to) c.respondWithData(sip.StatusOK, "OK", "application/sdp", sdp) } @@ -1791,9 +1879,26 @@ func (c *sipInbound) generateViaHeader(req *sip.Request) *sip.ViaHeader { } func (c *sipInbound) setCSeq(req *sip.Request) { + oldCSeq := c.nextRequestCSeq setCSeq(req, c.nextRequestCSeq) - + c.log.Debugw("setCSeq: setting CSeq for outgoing request", + "method", req.Method, + "cseq", c.nextRequestCSeq, + "callID", c.sipCallID, + "from", req.From(), + "to", req.To(), + "destination", req.Destination(), + "source", req.Source()) c.nextRequestCSeq++ + if oldCSeq != 0 && req.Method == sip.INVITE { + c.log.Infow("setCSeq: sending INVITE request (potential reinvite)", + "method", req.Method, + "cseq", oldCSeq, + "nextCSeq", c.nextRequestCSeq, + "callID", c.sipCallID, + "from", req.From(), + "to", req.To()) + } } func (c *sipInbound) sendBye(ctx context.Context) { @@ -1845,10 +1950,76 @@ func (c *sipInbound) sendStatus(ctx context.Context, code sip.StatusCode, status } func (c *sipInbound) WriteRequest(req *sip.Request) error { + c.log.Debugw("WriteRequest: sending SIP request", + "method", req.Method, + "callID", c.sipCallID, + "cseq", func() uint32 { + if cseq := req.CSeq(); cseq != nil { + return cseq.SeqNo + } + return 0 + }(), + "from", req.From(), + "to", req.To(), + "destination", req.Destination(), + "source", req.Source(), + "hasBody", len(req.Body()) > 0, + "bodyLength", len(req.Body())) + if req.Method == sip.INVITE { + c.log.Infow("WriteRequest: sending INVITE request", + "method", req.Method, + "callID", c.sipCallID, + "cseq", func() uint32 { + if cseq := req.CSeq(); cseq != nil { + return cseq.SeqNo + } + return 0 + }(), + "inviteCSeq", c.inviteCSeq, + "from", req.From(), + "to", req.To(), + "destination", req.Destination(), + "source", req.Source(), + "hasBody", len(req.Body()) > 0, + "bodyLength", len(req.Body())) + } return c.s.sipSrv.TransportLayer().WriteMsg(req) } func (c *sipInbound) Transaction(req *sip.Request) (sip.ClientTransaction, error) { + c.log.Debugw("Transaction: creating client transaction for SIP request", + "method", req.Method, + "callID", c.sipCallID, + "cseq", func() uint32 { + if cseq := req.CSeq(); cseq != nil { + return cseq.SeqNo + } + return 0 + }(), + "from", req.From(), + "to", req.To(), + "destination", req.Destination(), + "source", req.Source(), + "hasBody", len(req.Body()) > 0, + "bodyLength", len(req.Body())) + if req.Method == sip.INVITE { + c.log.Infow("Transaction: creating client transaction for INVITE request", + "method", req.Method, + "callID", c.sipCallID, + "cseq", func() uint32 { + if cseq := req.CSeq(); cseq != nil { + return cseq.SeqNo + } + return 0 + }(), + "inviteCSeq", c.inviteCSeq, + "from", req.From(), + "to", req.To(), + "destination", req.Destination(), + "source", req.Source(), + "hasBody", len(req.Body()) > 0, + "bodyLength", len(req.Body())) + } return c.s.sipSrv.TransactionLayer().Request(req) } diff --git a/pkg/sip/outbound.go b/pkg/sip/outbound.go index b0c00f9d..a733a710 100644 --- a/pkg/sip/outbound.go +++ b/pkg/sip/outbound.go @@ -581,6 +581,15 @@ func (c *outboundCall) sipSignal(ctx context.Context, tid traceid.ID) error { toUri := CreateURIFromUserAndAddress(c.sipConf.to, c.sipConf.address, TransportFrom(c.sipConf.transport)) + c.log.Infow("sipSignal: initiating outbound call", + "to", toUri.String(), + "from", c.sipConf.from, + "address", c.sipConf.address, + "transport", c.sipConf.transport, + "sdpOfferLength", len(sdpOfferData), + "hasAuth", c.sipConf.user != "" || c.sipConf.pass != "", + "headersCount", len(c.sipConf.headers)) + ringing := false sdpResp, err := c.cc.Invite(ctx, toUri, c.sipConf.user, c.sipConf.pass, c.sipConf.headers, sdpOfferData, func(code sip.StatusCode, hdrs Headers) { if code == sip.StatusOK { @@ -733,7 +742,7 @@ func (c *Client) newOutbound(log logger.Logger, id LocalTag, from, contact URI, Address: *contact.GetContactURI(), } fromHeader.Params.Add("tag", string(id)) - return &sipOutbound{ + so := &sipOutbound{ log: log, c: c, id: id, @@ -743,6 +752,12 @@ func (c *Client) newOutbound(log logger.Logger, id LocalTag, from, contact URI, nextCSeq: 1, getHeaders: getHeaders, } + so.log.Debugw("newSipOutbound: initialized outbound SIP call", + "id", id, + "from", fromHeader.Address.String(), + "contact", contactHeader.Address.String(), + "nextCSeq", so.nextCSeq) + return so } type sipOutbound struct { @@ -821,6 +836,14 @@ func (c *sipOutbound) Invite(ctx context.Context, to URI, user, pass string, hea c.callID = guid.HashedID(fmt.Sprintf("%s-%s", string(c.id), toHeader.Address.String())) c.log = c.log.WithValues("sipCallID", c.callID) + c.log.Infow("Invite: starting outbound INVITE request", + "sipCallID", c.callID, + "to", toHeader.Address.String(), + "from", c.from, + "sdpOfferLength", len(sdpOffer), + "hasAuth", user != "" || pass != "", + "headersCount", len(headers), + "nextCSeq", c.nextCSeq) var ( sipHeaders Headers @@ -839,15 +862,49 @@ func (c *sipOutbound) Invite(ctx context.Context, to URI, user, pass string, hea authLoop: for try := 0; ; try++ { if try >= 5 { + c.log.Errorw("Invite: max auth retry attempts reached", + "sipCallID", c.callID, + "attempts", try) return nil, fmt.Errorf("max auth retry attemps reached") } + if try > 0 { + c.log.Debugw("Invite: retrying with authentication", + "sipCallID", c.callID, + "attempt", try, + "authHeaderName", authHeaderRespName, + "hasAuthHeader", authHeader != "") + } req, resp, err = c.attemptInvite(ctx, sip.CallIDHeader(c.callID), toHeader, sdpOffer, authHeaderRespName, authHeader, sipHeaders, setState) if err != nil { + c.log.Debugw("Invite: attemptInvite failed", + "sipCallID", c.callID, + "attempt", try, + "error", err) return nil, err } + c.log.Debugw("Invite: received response", + "sipCallID", c.callID, + "attempt", try, + "statusCode", resp.StatusCode, + "statusReason", resp.Reason, + "hasBody", len(resp.Body()) > 0, + "bodyLength", len(resp.Body())) var authHeaderName string switch resp.StatusCode { case sip.StatusOK: + c.log.Infow("Invite: received 200 OK response", + "sipCallID", c.callID, + "attempt", try, + "cseq", func() uint32 { + if cseq := req.CSeq(); cseq != nil { + return cseq.SeqNo + } + return 0 + }(), + "to", resp.To(), + "from", resp.From(), + "hasBody", len(resp.Body()) > 0, + "bodyLength", len(resp.Body())) break authLoop default: return nil, fmt.Errorf("unexpected status from INVITE response: %w", &livekit.SIPStatus{ @@ -918,6 +975,17 @@ authLoop: if !ok { return nil, errors.New("no tag in To header in INVITE response") } + inviteCSeq := uint32(0) + if cseq := req.CSeq(); cseq != nil { + inviteCSeq = cseq.SeqNo + } + c.log.Infow("Invite: call established successfully", + "sipCallID", c.callID, + "tag", c.tag, + "to", toHeader.Address.String(), + "from", c.from, + "inviteCSeq", inviteCSeq, + "nextCSeq", c.nextCSeq) if cont := resp.Contact(); cont != nil { req.Recipient = cont.Address @@ -978,10 +1046,35 @@ func (c *sipOutbound) attemptInvite(ctx context.Context, callID sip.CallIDHeader req.AppendHeader(h) } + cseqValue := uint32(0) + if cseq := req.CSeq(); cseq != nil { + cseqValue = cseq.SeqNo + } + c.log.Infow("attemptInvite: sending INVITE request", + "sipCallID", callID.Value(), + "cseq", cseqValue, + "to", to.Address.String(), + "from", c.from, + "destination", req.Destination(), + "source", req.Source(), + "transport", req.Transport(), + "hasAuth", authHeader != "", + "authHeaderName", authHeaderName, + "offerLength", len(offer), + "headersCount", len(headers), + "nextCSeq", c.nextCSeq) + tx, err := c.c.sipCli.TransactionRequest(req) if err != nil { + c.log.Errorw("attemptInvite: TransactionRequest failed", + "sipCallID", callID.Value(), + "cseq", cseqValue, + "error", err) return nil, nil, err } + c.log.Debugw("attemptInvite: transaction created, waiting for response", + "sipCallID", callID.Value(), + "cseq", cseqValue) defer tx.Terminate() // Log the actual local port used for TCP connections from the DialPort range @@ -1003,21 +1096,118 @@ func (c *sipOutbound) attemptInvite(ctx context.Context, callID sip.CallIDHeader } resp, err := sipResponse(ctx, tx, c.c.closing.Watch(), setState) + if err != nil { + c.log.Debugw("attemptInvite: sipResponse error", + "sipCallID", callID.Value(), + "cseq", cseqValue, + "error", err) + } else if resp != nil { + c.log.Debugw("attemptInvite: received response", + "sipCallID", callID.Value(), + "cseq", cseqValue, + "statusCode", resp.StatusCode, + "statusReason", resp.Reason, + "hasBody", len(resp.Body()) > 0, + "bodyLength", len(resp.Body())) + } return req, resp, err } func (c *sipOutbound) WriteRequest(req *sip.Request) error { + c.log.Debugw("WriteRequest: sending SIP request", + "method", req.Method, + "sipCallID", c.callID, + "cseq", func() uint32 { + if cseq := req.CSeq(); cseq != nil { + return cseq.SeqNo + } + return 0 + }(), + "from", req.From(), + "to", req.To(), + "destination", req.Destination(), + "source", req.Source(), + "hasBody", len(req.Body()) > 0, + "bodyLength", len(req.Body())) + if req.Method == sip.INVITE { + c.log.Infow("WriteRequest: sending INVITE request (potential reinvite)", + "method", req.Method, + "sipCallID", c.callID, + "cseq", func() uint32 { + if cseq := req.CSeq(); cseq != nil { + return cseq.SeqNo + } + return 0 + }(), + "nextCSeq", c.nextCSeq, + "from", req.From(), + "to", req.To(), + "destination", req.Destination(), + "source", req.Source(), + "hasBody", len(req.Body()) > 0, + "bodyLength", len(req.Body())) + } return c.c.sipCli.WriteRequest(req) } func (c *sipOutbound) Transaction(req *sip.Request) (sip.ClientTransaction, error) { + c.log.Debugw("Transaction: creating client transaction for SIP request", + "method", req.Method, + "sipCallID", c.callID, + "cseq", func() uint32 { + if cseq := req.CSeq(); cseq != nil { + return cseq.SeqNo + } + return 0 + }(), + "from", req.From(), + "to", req.To(), + "destination", req.Destination(), + "source", req.Source(), + "hasBody", len(req.Body()) > 0, + "bodyLength", len(req.Body())) + if req.Method == sip.INVITE { + c.log.Infow("Transaction: creating client transaction for INVITE request", + "method", req.Method, + "sipCallID", c.callID, + "cseq", func() uint32 { + if cseq := req.CSeq(); cseq != nil { + return cseq.SeqNo + } + return 0 + }(), + "nextCSeq", c.nextCSeq, + "from", req.From(), + "to", req.To(), + "destination", req.Destination(), + "source", req.Source(), + "hasBody", len(req.Body()) > 0, + "bodyLength", len(req.Body())) + } return c.c.sipCli.TransactionRequest(req) } func (c *sipOutbound) setCSeq(req *sip.Request) { + oldCSeq := c.nextCSeq setCSeq(req, c.nextCSeq) - + c.log.Debugw("setCSeq: setting CSeq for outgoing request", + "method", req.Method, + "cseq", c.nextCSeq, + "callID", c.callID, + "from", req.From(), + "to", req.To(), + "destination", req.Destination(), + "source", req.Source()) c.nextCSeq++ + if oldCSeq != 0 && req.Method == sip.INVITE { + c.log.Infow("setCSeq: sending INVITE request (potential reinvite)", + "method", req.Method, + "cseq", oldCSeq, + "nextCSeq", c.nextCSeq, + "callID", c.callID, + "from", req.From(), + "to", req.To()) + } } func (c *sipOutbound) sendBye(ctx context.Context) {