diff --git a/.changeset/heavy-ants-mix.md b/.changeset/heavy-ants-mix.md new file mode 100644 index 0000000000..0a3a943cc0 --- /dev/null +++ b/.changeset/heavy-ants-mix.md @@ -0,0 +1,5 @@ +--- +'livekit-client': patch +--- + +Add RTCEngine sendLossyBytes explicit queue to better handle sending large data track packets diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index f24f69fb41..da2ed1a9dd 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -85,6 +85,7 @@ import type { TrackPublishOptions, VideoCodec } from './track/options'; import { getTrackPublicationInfo } from './track/utils'; import type { LoggerOptions } from './types'; import { + Future, isCompressionStreamSupported, isVideoCodec, isVideoTrack, @@ -246,6 +247,12 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit /** used to indicate whether the browser is currently waiting to reconnect */ private isWaitingForNetworkReconnect: boolean = false; + /** used to buffer lossy data track packets which arrive quickly so they don't overwhelm the data + * channel buffer */ + private lossyBytesWaitQueue = new Map>>(); + + private lossyBytesMutexByKind = new Map(); + constructor(private options: InternalRoomOptions) { super(); this.log = getLogger(options.loggerName ?? LoggerNames.Engine); @@ -1467,19 +1474,41 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit kind: Exclude, bufferStatusLowBehavior: 'drop' | 'wait' = 'drop', ) { + let unlock: (() => void) | undefined; + // make sure we do have a data connection await this.ensurePublisherConnected(kind); const dc = this.dataChannelForKind(kind); if (dc) { - if (!this.isBufferStatusLow(kind)) { - // Depending on the exact circumstance that data is being sent, either drop or wait for the - // buffer status to not be low before continuing. - switch (bufferStatusLowBehavior) { - case 'wait': - await this.waitForBufferStatusLow(kind); - break; - case 'drop': + // Depending on the exact circumstance that data is being sent, either drop or wait for the + // buffer status to not be low before continuing. + // + // An example of where this is used: data tracks, so that a large DataTrackFrame's worth of + // packets doesn't have the last half of the packets dropped due to not fitting into the + // data channel buffer all at once. + switch (bufferStatusLowBehavior) { + case 'wait': + // Wait for any past enqueued data to be drained before enqueing the next byte + let mutex = this.lossyBytesMutexByKind.get(kind); + if (!mutex) { + mutex = new Mutex(); + this.lossyBytesMutexByKind.set(kind, mutex); + } + unlock = await mutex?.lock(); + + // If there isn't room in the data channel buffer, then wait for the rtc data channel to + // drain and go into "low status" before continuing. + if (!this.isBufferStatusLow(kind)) { + const future = new Future(); + const entries = this.lossyBytesWaitQueue.get(kind) ?? []; + entries.push(future); + this.lossyBytesWaitQueue.set(kind, entries); + await future.promise; + } + break; + case 'drop': + if (!this.isBufferStatusLow(kind)) { // this.log.warn(`dropping lossy data channel message`, this.logContext); // Drop messages to reduce latency this.lossyDataDropCount += 1; @@ -1490,7 +1519,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit ); } return; - } + } } this.lossyDataStatCurrentBytes += bytes.byteLength; @@ -1502,6 +1531,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } this.updateAndEmitDCBufferStatus(kind); + unlock?.(); } private async resendReliableMessagesForResume(lastMessageSeq: number) { @@ -1528,6 +1558,20 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit if (typeof status !== 'undefined' && status !== this.dcBufferStatus.get(kind)) { this.dcBufferStatus.set(kind, status); this.emit(EngineEvent.DCBufferStatusChanged, status, kind); + + // Now that there is space on the data channel buffer, attempt to fill up the remaining space + // with bytes ready to be sent. + const buffer = this.lossyBytesWaitQueue.get(kind) ?? []; + while (this.isBufferStatusLow(kind)) { + let continueSendingFuture = buffer.shift(); + if (!continueSendingFuture) { + break; + } + continueSendingFuture.resolve?.(); + } + if (buffer.length === 0) { + this.lossyBytesWaitQueue.delete(kind); + } } };