Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
76 changes: 60 additions & 16 deletions src/room/Room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -578,22 +578,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
this.emit(RoomEvent.DCBufferStatusChanged, status, kind);
})
.on(EngineEvent.LocalTrackSubscribed, (subscribedSid) => {
const trackPublication = this.localParticipant
.getTrackPublications()
.find(({ trackSid }) => trackSid === subscribedSid) as LocalTrackPublication | undefined;
if (!trackPublication) {
this.log.warn(
'could not find local track subscription for subscribed event',
this.logContext,
);
return;
}
this.localParticipant.emit(ParticipantEvent.LocalTrackSubscribed, trackPublication);
this.emitWhenConnected(
RoomEvent.LocalTrackSubscribed,
trackPublication,
this.localParticipant,
);
this.handleLocalTrackSubscribed(subscribedSid);
})
.on(EngineEvent.RoomMoved, (roomMoved) => {
this.log.debug('room moved', roomMoved);
Expand Down Expand Up @@ -1628,6 +1613,65 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
}
}

private handleLocalTrackSubscribed(subscribedSid: string) {
const findPublication = () =>
this.localParticipant
.getTrackPublications()
.find(({ trackSid }) => trackSid === subscribedSid) as LocalTrackPublication | undefined;

const trackPublication = findPublication();
if (trackPublication) {
this.emitLocalTrackSubscribed(trackPublication);
return;
}

// the track publication may not be registered yet if the server signals
// the subscription before publishTrack has finished adding the publication.
// defer with a timeout until LocalTrackPublished fires for the matching trackSid
this.log.debug('deferring LocalTrackSubscribed, publication not yet available', {
...this.logContext,
subscribedSid,
});

const TIMEOUT_MS = 10_000;
let timer: ReturnType<typeof setTimeout>;

const onPublished = (pub: LocalTrackPublication) => {
if (pub.trackSid === subscribedSid) {
cleanup();
this.emitLocalTrackSubscribed(pub);
}
};

const cleanup = () => {
clearTimeout(timer);
this.localParticipant.off(ParticipantEvent.LocalTrackPublished, onPublished);
this.off(RoomEvent.Disconnected, cleanup);
};

this.localParticipant.on(ParticipantEvent.LocalTrackPublished, onPublished);
this.once(RoomEvent.Disconnected, cleanup);

timer = setTimeout(() => {
cleanup();
// final attempt in case the publication was added without emitting the event
const pub = findPublication();
if (pub) {
this.emitLocalTrackSubscribed(pub);
} else {
this.log.warn(
'could not find local track publication for LocalTrackSubscribed event after timeout',
{ ...this.logContext, subscribedSid },
);
}
}, TIMEOUT_MS);
}

private emitLocalTrackSubscribed(trackPublication: LocalTrackPublication) {
this.localParticipant.emit(ParticipantEvent.LocalTrackSubscribed, trackPublication);
this.emitWhenConnected(RoomEvent.LocalTrackSubscribed, trackPublication, this.localParticipant);
}

private handleRestarting = () => {
this.clearConnectionReconcile();
// in case we went from resuming to full-reconnect, make sure to reflect it on the isResuming flag
Expand Down
Loading