Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/heavy-ants-mix.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'livekit-client': patch
---

Add RTCEngine sendLossyBytes explicit queue to better handle sending large data track packets
62 changes: 53 additions & 9 deletions src/room/RTCEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ import type { TrackPublishOptions, VideoCodec } from './track/options';
import { getTrackPublicationInfo } from './track/utils';
import type { LoggerOptions } from './types';
import {
Future,
isCompressionStreamSupported,
isVideoCodec,
isVideoTrack,
Expand Down Expand Up @@ -246,6 +247,12 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
/** used to indicate whether the browser is currently waiting to reconnect */
private isWaitingForNetworkReconnect: boolean = false;

/** used to buffer lossy data track packets which arrive quickly so they don't overwhelm the data
* channel buffer */
private lossyBytesWaitQueue = new Map<DataChannelKind, Array<Future<void, never>>>();

private lossyBytesMutexByKind = new Map<DataChannelKind, Mutex>();

constructor(private options: InternalRoomOptions) {
super();
this.log = getLogger(options.loggerName ?? LoggerNames.Engine);
Expand Down Expand Up @@ -1467,19 +1474,41 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
kind: Exclude<DataChannelKind, DataChannelKind.RELIABLE>,
bufferStatusLowBehavior: 'drop' | 'wait' = 'drop',
) {
let unlock: (() => void) | undefined;

// make sure we do have a data connection
await this.ensurePublisherConnected(kind);

const dc = this.dataChannelForKind(kind);
if (dc) {
if (!this.isBufferStatusLow(kind)) {
// Depending on the exact circumstance that data is being sent, either drop or wait for the
// buffer status to not be low before continuing.
switch (bufferStatusLowBehavior) {
case 'wait':
await this.waitForBufferStatusLow(kind);
break;
case 'drop':
// Depending on the exact circumstance that data is being sent, either drop or wait for the
// buffer status to not be low before continuing.
//
// An example of where this is used: data tracks, so that a large DataTrackFrame's worth of
// packets doesn't have the last half of the packets dropped due to not fitting into the
// data channel buffer all at once.
switch (bufferStatusLowBehavior) {
case 'wait':
// Wait for any past enqueued data to be drained before enqueing the next byte
let mutex = this.lossyBytesMutexByKind.get(kind);
if (!mutex) {
mutex = new Mutex();
this.lossyBytesMutexByKind.set(kind, mutex);
}
unlock = await mutex?.lock();

// If there isn't room in the data channel buffer, then wait for the rtc data channel to
// drain and go into "low status" before continuing.
if (!this.isBufferStatusLow(kind)) {
const future = new Future<void, never>();
const entries = this.lossyBytesWaitQueue.get(kind) ?? [];
entries.push(future);
this.lossyBytesWaitQueue.set(kind, entries);
await future.promise;
}
break;
case 'drop':
if (!this.isBufferStatusLow(kind)) {
// this.log.warn(`dropping lossy data channel message`, this.logContext);
// Drop messages to reduce latency
this.lossyDataDropCount += 1;
Expand All @@ -1490,7 +1519,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
);
}
return;
}
}
}
this.lossyDataStatCurrentBytes += bytes.byteLength;

Expand All @@ -1502,6 +1531,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
}

this.updateAndEmitDCBufferStatus(kind);
unlock?.();
}

private async resendReliableMessagesForResume(lastMessageSeq: number) {
Expand All @@ -1528,6 +1558,20 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
if (typeof status !== 'undefined' && status !== this.dcBufferStatus.get(kind)) {
this.dcBufferStatus.set(kind, status);
this.emit(EngineEvent.DCBufferStatusChanged, status, kind);

// Now that there is space on the data channel buffer, attempt to fill up the remaining space
// with bytes ready to be sent.
const buffer = this.lossyBytesWaitQueue.get(kind) ?? [];
while (this.isBufferStatusLow(kind)) {
let continueSendingFuture = buffer.shift();
if (!continueSendingFuture) {
break;
}
continueSendingFuture.resolve?.();
}
if (buffer.length === 0) {
this.lossyBytesWaitQueue.delete(kind);
}
}
};

Expand Down
Loading