From 1e9626cc4c8cc9bed2e90e27f98bb1fcbfe3aaec Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Tue, 7 Apr 2026 15:01:03 -0400 Subject: [PATCH 1/7] feat: add hacky explicit queuing implementation for packets Previously, I was relying on the promise resolution order of `await this.waitForBufferStatusLow(...)` to always resolve in the same order as the promises were awaited, which in practice seemed to be the case, but I wasn't 100% sure would always be true. So just to verify, switch this over to use an explicit queue and see if that changes any of the e2e test results. --- src/room/RTCEngine.ts | 26 +++++++++++++++++++++++++- 1 file changed, 25 insertions(+), 1 deletion(-) diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index f24f69fb41..485ca93dcf 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -85,6 +85,7 @@ import type { TrackPublishOptions, VideoCodec } from './track/options'; import { getTrackPublicationInfo } from './track/utils'; import type { LoggerOptions } from './types'; import { + Future, isCompressionStreamSupported, isVideoCodec, isVideoTrack, @@ -1461,6 +1462,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } } + private lossyBytesWaitBuffer = new Map>>(); + /* @internal */ async sendLossyBytes( bytes: Uint8Array, @@ -1477,7 +1480,14 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit // buffer status to not be low before continuing. switch (bufferStatusLowBehavior) { case 'wait': - await this.waitForBufferStatusLow(kind); + if (this.isBufferStatusLow(kind)) { + const future = new Future(); + (window as any).lossyBytesWaitBuffer = this.lossyBytesWaitBuffer; + const entries = this.lossyBytesWaitBuffer.get(kind) ?? []; + entries.push(future); + this.lossyBytesWaitBuffer.set(kind, entries); + await future.promise; + } break; case 'drop': // this.log.warn(`dropping lossy data channel message`, this.logContext); @@ -1528,6 +1538,20 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit if (typeof status !== 'undefined' && status !== this.dcBufferStatus.get(kind)) { this.dcBufferStatus.set(kind, status); this.emit(EngineEvent.DCBufferStatusChanged, status, kind); + + // Now that there is space on the data channel buffer, attempt to fill up the remaining space + // with bytes ready to be sent. + const buffer = this.lossyBytesWaitBuffer.get(kind) ?? []; + while (this.isBufferStatusLow(kind)) { + let continueSendingFuture = buffer.shift(); + if (!continueSendingFuture) { + break; + } + continueSendingFuture.resolve?.(); + } + if (buffer.length === 0) { + this.lossyBytesWaitBuffer.delete(kind); + } } }; From ba37f018d4f16b85ef17000512cca15306ec70bf Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Wed, 8 Apr 2026 09:10:47 -0400 Subject: [PATCH 2/7] fix: reorganize and better document lossyBytesWaitBuffer --- src/room/RTCEngine.ts | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 485ca93dcf..3c6439f224 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -247,6 +247,10 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit /** used to indicate whether the browser is currently waiting to reconnect */ private isWaitingForNetworkReconnect: boolean = false; + /** used to buffer lossy data track packets which arrive quickly so they don't overwhelm the data + * channel buffer */ + private lossyBytesWaitBuffer = new Map>>(); + constructor(private options: InternalRoomOptions) { super(); this.log = getLogger(options.loggerName ?? LoggerNames.Engine); @@ -1462,8 +1466,6 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } } - private lossyBytesWaitBuffer = new Map>>(); - /* @internal */ async sendLossyBytes( bytes: Uint8Array, @@ -1478,11 +1480,14 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit if (!this.isBufferStatusLow(kind)) { // Depending on the exact circumstance that data is being sent, either drop or wait for the // buffer status to not be low before continuing. + // + // An example of where this is used: data tracks, so that a large DataTrackFrame's worth of + // packets doesn't have the last half of the packets dropped due to not fitting into the + // data channel buffer all at once. switch (bufferStatusLowBehavior) { case 'wait': if (this.isBufferStatusLow(kind)) { const future = new Future(); - (window as any).lossyBytesWaitBuffer = this.lossyBytesWaitBuffer; const entries = this.lossyBytesWaitBuffer.get(kind) ?? []; entries.push(future); this.lossyBytesWaitBuffer.set(kind, entries); From b7153a193546eebcd88ead82b01b13025d2e2662 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Wed, 8 Apr 2026 10:52:32 -0400 Subject: [PATCH 3/7] feat: get rid of duplicative isBufferStatusLow check --- src/room/RTCEngine.ts | 12 +++++------- 1 file changed, 5 insertions(+), 7 deletions(-) diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 3c6439f224..4b97bcbda1 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -1486,13 +1486,11 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit // data channel buffer all at once. switch (bufferStatusLowBehavior) { case 'wait': - if (this.isBufferStatusLow(kind)) { - const future = new Future(); - const entries = this.lossyBytesWaitBuffer.get(kind) ?? []; - entries.push(future); - this.lossyBytesWaitBuffer.set(kind, entries); - await future.promise; - } + const future = new Future(); + const entries = this.lossyBytesWaitBuffer.get(kind) ?? []; + entries.push(future); + this.lossyBytesWaitBuffer.set(kind, entries); + await future.promise; break; case 'drop': // this.log.warn(`dropping lossy data channel message`, this.logContext); From 2d30ccdb7c661042aa3a58f4687fb26061cca0d8 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Wed, 8 Apr 2026 11:10:14 -0400 Subject: [PATCH 4/7] feat: add mutex to sendLossyBytes for use in bufferStatusLowBehavior=wait case Lukas raised this problem: The problem isn't that packets could get enqueued into lossyBytesWaitBuffer out of order, the problem is that packets could be halfway through draining from lossyBytesWaitBuffer and ANOTHER unrelated packet could come in and get interleaved in the middle while that "drain" is occurring So to try to fix it, wrap the "wait" path in a mutex to ensure that unrelated packets can't get interleaved in the midst of draining lossyBytesWaitBuffer. --- src/room/RTCEngine.ts | 40 ++++++++++++++++++++++++++++------------ 1 file changed, 28 insertions(+), 12 deletions(-) diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 4b97bcbda1..9d6b16b035 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -250,6 +250,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit /** used to buffer lossy data track packets which arrive quickly so they don't overwhelm the data * channel buffer */ private lossyBytesWaitBuffer = new Map>>(); + private lossyBytesMutexByKind = new Map(); constructor(private options: InternalRoomOptions) { super(); @@ -1472,27 +1473,41 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit kind: Exclude, bufferStatusLowBehavior: 'drop' | 'wait' = 'drop', ) { + let unlock: (() => void) | undefined; + // make sure we do have a data connection await this.ensurePublisherConnected(kind); const dc = this.dataChannelForKind(kind); if (dc) { - if (!this.isBufferStatusLow(kind)) { - // Depending on the exact circumstance that data is being sent, either drop or wait for the - // buffer status to not be low before continuing. - // - // An example of where this is used: data tracks, so that a large DataTrackFrame's worth of - // packets doesn't have the last half of the packets dropped due to not fitting into the - // data channel buffer all at once. - switch (bufferStatusLowBehavior) { - case 'wait': + // Depending on the exact circumstance that data is being sent, either drop or wait for the + // buffer status to not be low before continuing. + // + // An example of where this is used: data tracks, so that a large DataTrackFrame's worth of + // packets doesn't have the last half of the packets dropped due to not fitting into the + // data channel buffer all at once. + switch (bufferStatusLowBehavior) { + case 'wait': + // Wait for any past enqueued data to be drained before enqueing the next byte + let mutex = this.lossyBytesMutexByKind.get(kind); + if (!mutex) { + mutex = new Mutex(); + this.lossyBytesMutexByKind.set(kind, mutex); + } + unlock = await mutex?.lock(); + + // If there isn't room in the data channel buffer, then wait for the rtc data channel to + // drain and go into "low status" before continuing. + if (!this.isBufferStatusLow(kind)) { const future = new Future(); const entries = this.lossyBytesWaitBuffer.get(kind) ?? []; entries.push(future); this.lossyBytesWaitBuffer.set(kind, entries); await future.promise; - break; - case 'drop': + } + break; + case 'drop': + if (!this.isBufferStatusLow(kind)) { // this.log.warn(`dropping lossy data channel message`, this.logContext); // Drop messages to reduce latency this.lossyDataDropCount += 1; @@ -1503,7 +1518,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit ); } return; - } + } } this.lossyDataStatCurrentBytes += bytes.byteLength; @@ -1515,6 +1530,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } this.updateAndEmitDCBufferStatus(kind); + unlock?.(); } private async resendReliableMessagesForResume(lastMessageSeq: number) { From a36a6850efbabc10486b07a7c04d89d05fe5b909 Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Wed, 8 Apr 2026 11:13:06 -0400 Subject: [PATCH 5/7] refactor: rename lossyBytesWaitBuffer => lossyBytesWaitQueue --- src/room/RTCEngine.ts | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 9d6b16b035..215ac60323 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -249,7 +249,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit /** used to buffer lossy data track packets which arrive quickly so they don't overwhelm the data * channel buffer */ - private lossyBytesWaitBuffer = new Map>>(); + private lossyBytesWaitQueue = new Map>>(); private lossyBytesMutexByKind = new Map(); constructor(private options: InternalRoomOptions) { @@ -1500,9 +1500,9 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit // drain and go into "low status" before continuing. if (!this.isBufferStatusLow(kind)) { const future = new Future(); - const entries = this.lossyBytesWaitBuffer.get(kind) ?? []; + const entries = this.lossyBytesWaitQueue.get(kind) ?? []; entries.push(future); - this.lossyBytesWaitBuffer.set(kind, entries); + this.lossyBytesWaitQueue.set(kind, entries); await future.promise; } break; @@ -1560,7 +1560,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit // Now that there is space on the data channel buffer, attempt to fill up the remaining space // with bytes ready to be sent. - const buffer = this.lossyBytesWaitBuffer.get(kind) ?? []; + const buffer = this.lossyBytesWaitQueue.get(kind) ?? []; while (this.isBufferStatusLow(kind)) { let continueSendingFuture = buffer.shift(); if (!continueSendingFuture) { @@ -1569,7 +1569,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit continueSendingFuture.resolve?.(); } if (buffer.length === 0) { - this.lossyBytesWaitBuffer.delete(kind); + this.lossyBytesWaitQueue.delete(kind); } } }; From 1520fbe2b33c5f882245a277be4d26b706b9deba Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Wed, 8 Apr 2026 11:19:13 -0400 Subject: [PATCH 6/7] fix: address lint issue --- src/room/RTCEngine.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 215ac60323..da2ed1a9dd 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -250,6 +250,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit /** used to buffer lossy data track packets which arrive quickly so they don't overwhelm the data * channel buffer */ private lossyBytesWaitQueue = new Map>>(); + private lossyBytesMutexByKind = new Map(); constructor(private options: InternalRoomOptions) { From d1858ba796cde89dbfa1b57c508d67f4326df2ed Mon Sep 17 00:00:00 2001 From: Ryan Gaus Date: Wed, 8 Apr 2026 12:02:12 -0400 Subject: [PATCH 7/7] fix: add missing changeset --- .changeset/heavy-ants-mix.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/heavy-ants-mix.md diff --git a/.changeset/heavy-ants-mix.md b/.changeset/heavy-ants-mix.md new file mode 100644 index 0000000000..0a3a943cc0 --- /dev/null +++ b/.changeset/heavy-ants-mix.md @@ -0,0 +1,5 @@ +--- +'livekit-client': patch +--- + +Add RTCEngine sendLossyBytes explicit queue to better handle sending large data track packets