From 51860c01552517cdd9043851e146359935581599 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Thu, 9 Apr 2026 09:46:24 +0200 Subject: [PATCH 01/13] ensure strict ordering of resolving waitForBufferStatus calls --- src/room/RTCEngine.ts | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 69f9023552..81b1a4a23d 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -44,6 +44,7 @@ import { type UserPacket, } from '@livekit/protocol'; import { EventEmitter } from 'events'; +import type { Throws } from '@livekit/throws-transformer/throws'; import type { MediaAttributes } from 'sdp-transform'; import type TypedEventEmitter from 'typed-emitter'; import type { SignalOptions } from '../api/SignalClient'; @@ -247,6 +248,8 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit /** used to indicate whether the browser is currently waiting to reconnect */ private isWaitingForNetworkReconnect: boolean = false; + private lossyBytesMutexByKind = new Map(); + constructor(private options: InternalRoomOptions) { super(); this.log = getLogger(options.loggerName ?? LoggerNames.Engine); @@ -1557,8 +1560,16 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } }; - waitForBufferStatusLow(kind: DataChannelKind): TypedPromise { - return new TypedPromise(async (resolve, reject) => { + async waitForBufferStatusLow( + kind: DataChannelKind, + ): Promise> { + let mutex = this.lossyBytesMutexByKind.get(kind); + if (!mutex) { + mutex = new Mutex(); + this.lossyBytesMutexByKind.set(kind, mutex); + } + const unlock = await mutex.lock(); + return new TypedPromise(async (resolve, reject) => { if (this.isBufferStatusLow(kind)) { resolve(); } else { @@ -1570,7 +1581,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit this.off(EngineEvent.Closing, onClosing); resolve(); } - }); + }).finally(() => unlock()); } /** From ab90528ba9b35beb2df5e286c47d5ada2da2aacc Mon Sep 17 00:00:00 2001 From: lukasIO Date: Thu, 9 Apr 2026 09:53:38 +0200 Subject: [PATCH 02/13] reduce changes --- src/room/RTCEngine.ts | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 81b1a4a23d..8a22b400b6 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -1560,9 +1560,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } }; - async waitForBufferStatusLow( - kind: DataChannelKind, - ): Promise> { + async waitForBufferStatusLow(kind: DataChannelKind) { let mutex = this.lossyBytesMutexByKind.get(kind); if (!mutex) { mutex = new Mutex(); From fb0131202d7afe1a955117afcb939de62c73eb39 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Thu, 9 Apr 2026 09:54:36 +0200 Subject: [PATCH 03/13] remove unused import --- src/room/RTCEngine.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 8a22b400b6..8a384fdb86 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -44,7 +44,6 @@ import { type UserPacket, } from '@livekit/protocol'; import { EventEmitter } from 'events'; -import type { Throws } from '@livekit/throws-transformer/throws'; import type { MediaAttributes } from 'sdp-transform'; import type TypedEventEmitter from 'typed-emitter'; import type { SignalOptions } from '../api/SignalClient'; From 63dd2524fe44a7e747f39bd1fde0cce69b19c678 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Thu, 9 Apr 2026 16:29:33 +0200 Subject: [PATCH 04/13] check for engine close --- src/room/RTCEngine.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 8a384fdb86..934b465189 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -1567,6 +1567,9 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } const unlock = await mutex.lock(); return new TypedPromise(async (resolve, reject) => { + if (this._isClosed) { + reject(new UnexpectedConnectionState('engine closed')); + } if (this.isBufferStatusLow(kind)) { resolve(); } else { From 3494d89afb3f59c1e8608e6cfce572963623b352 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Thu, 9 Apr 2026 16:37:32 +0200 Subject: [PATCH 05/13] use event instead of sleep loop --- src/room/RTCEngine.ts | 17 +++++++++++------ 1 file changed, 11 insertions(+), 6 deletions(-) diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 934b465189..62557cd574 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -1567,7 +1567,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } const unlock = await mutex.lock(); return new TypedPromise(async (resolve, reject) => { - if (this._isClosed) { + if (this.isClosed) { reject(new UnexpectedConnectionState('engine closed')); } if (this.isBufferStatusLow(kind)) { @@ -1575,11 +1575,16 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } else { const onClosing = () => reject(new UnexpectedConnectionState('engine closed')); this.once(EngineEvent.Closing, onClosing); - while (!this.dcBufferStatus.get(kind)) { - await sleep(10); - } - this.off(EngineEvent.Closing, onClosing); - resolve(); + this.dataChannelForKind(kind)?.addEventListener( + 'bufferedamountlow', + () => { + this.off(EngineEvent.Closing, onClosing); + resolve(); + }, + { + once: true, + }, + ); } }).finally(() => unlock()); } From f965a5525112610d29aa4880e49acc5d5970988d Mon Sep 17 00:00:00 2001 From: lukasIO Date: Fri, 10 Apr 2026 14:06:34 +0200 Subject: [PATCH 06/13] Close streamcontrollers when datatrack gets unpublished --- src/room/data-track/incoming/IncomingDataTrackManager.ts | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/room/data-track/incoming/IncomingDataTrackManager.ts b/src/room/data-track/incoming/IncomingDataTrackManager.ts index 3ca3c7674e..0358bc8c4c 100644 --- a/src/room/data-track/incoming/IncomingDataTrackManager.ts +++ b/src/room/data-track/incoming/IncomingDataTrackManager.ts @@ -436,6 +436,9 @@ export default class IncomingDataTrackManager extends (EventEmitter as new () => this.descriptors.delete(sid); if (descriptor.subscription.type === 'active') { + descriptor.subscription.streamControllers.forEach((controller) => { + controller.close(); + }); this.subscriptionHandles.delete(descriptor.subscription.subcriptionHandle); } From 628e7616461800b26d51618f829e509b45fdc82a Mon Sep 17 00:00:00 2001 From: lukasIO Date: Fri, 10 Apr 2026 14:07:48 +0200 Subject: [PATCH 07/13] Create quiet-suits-matter.md --- .changeset/quiet-suits-matter.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/quiet-suits-matter.md diff --git a/.changeset/quiet-suits-matter.md b/.changeset/quiet-suits-matter.md new file mode 100644 index 0000000000..c9da500097 --- /dev/null +++ b/.changeset/quiet-suits-matter.md @@ -0,0 +1,5 @@ +--- +"livekit-client": patch +--- + +Close streamcontrollers when datatrack gets unpublished From f61766d261fd71268ea586ecfb0ea9d2daf9d75b Mon Sep 17 00:00:00 2001 From: lukasIO Date: Fri, 10 Apr 2026 14:10:15 +0200 Subject: [PATCH 08/13] use record instead of map --- src/room/RTCEngine.ts | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 62557cd574..0f70c6085c 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -247,7 +247,11 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit /** used to indicate whether the browser is currently waiting to reconnect */ private isWaitingForNetworkReconnect: boolean = false; - private lossyBytesMutexByKind = new Map(); + private lossyBytesMutexByKind = { + [DataChannelKind.DATA_TRACK_LOSSY]: new Mutex(), + [DataChannelKind.LOSSY]: new Mutex(), + [DataChannelKind.RELIABLE]: new Mutex(), + } as const; constructor(private options: InternalRoomOptions) { super(); @@ -1560,11 +1564,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit }; async waitForBufferStatusLow(kind: DataChannelKind) { - let mutex = this.lossyBytesMutexByKind.get(kind); - if (!mutex) { - mutex = new Mutex(); - this.lossyBytesMutexByKind.set(kind, mutex); - } + let mutex = this.lossyBytesMutexByKind[kind]; const unlock = await mutex.lock(); return new TypedPromise(async (resolve, reject) => { if (this.isClosed) { From 67acb1200f800be16ac07e09faa49100f0935c9f Mon Sep 17 00:00:00 2001 From: lukasIO Date: Fri, 10 Apr 2026 14:16:59 +0200 Subject: [PATCH 09/13] also in shutdown --- src/room/data-track/incoming/IncomingDataTrackManager.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/room/data-track/incoming/IncomingDataTrackManager.ts b/src/room/data-track/incoming/IncomingDataTrackManager.ts index 0358bc8c4c..deef81dcfa 100644 --- a/src/room/data-track/incoming/IncomingDataTrackManager.ts +++ b/src/room/data-track/incoming/IncomingDataTrackManager.ts @@ -586,6 +586,10 @@ export default class IncomingDataTrackManager extends (EventEmitter as new () => if (descriptor.subscription.type === 'pending') { descriptor.subscription.completionFuture.reject?.(DataTrackSubscribeError.disconnected()); } + + if (descriptor.subscription.type === 'active') { + descriptor.subscription.streamControllers.forEach((controller) => controller.close()); + } } this.descriptors.clear(); } From c87cff06d19e531e6d25288d14cc8bdd9ef0225c Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 14 Apr 2026 15:51:51 +0200 Subject: [PATCH 10/13] remove mutex --- src/room/RTCEngine.ts | 10 +--------- 1 file changed, 1 insertion(+), 9 deletions(-) diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 0f70c6085c..1b276e9621 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -247,12 +247,6 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit /** used to indicate whether the browser is currently waiting to reconnect */ private isWaitingForNetworkReconnect: boolean = false; - private lossyBytesMutexByKind = { - [DataChannelKind.DATA_TRACK_LOSSY]: new Mutex(), - [DataChannelKind.LOSSY]: new Mutex(), - [DataChannelKind.RELIABLE]: new Mutex(), - } as const; - constructor(private options: InternalRoomOptions) { super(); this.log = getLogger(options.loggerName ?? LoggerNames.Engine); @@ -1564,8 +1558,6 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit }; async waitForBufferStatusLow(kind: DataChannelKind) { - let mutex = this.lossyBytesMutexByKind[kind]; - const unlock = await mutex.lock(); return new TypedPromise(async (resolve, reject) => { if (this.isClosed) { reject(new UnexpectedConnectionState('engine closed')); @@ -1586,7 +1578,7 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit }, ); } - }).finally(() => unlock()); + }); } /** From e32391bd37298e7525eb9a69fbef38994d73bcd5 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 14 Apr 2026 15:53:25 +0200 Subject: [PATCH 11/13] check for dc presence --- src/room/RTCEngine.ts | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 1b276e9621..aca0209cef 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -1567,7 +1567,12 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit } else { const onClosing = () => reject(new UnexpectedConnectionState('engine closed')); this.once(EngineEvent.Closing, onClosing); - this.dataChannelForKind(kind)?.addEventListener( + const dc = this.dataChannelForKind(kind); + if (!dc) { + reject(new UnexpectedConnectionState(`DataChannel not found, kind: ${kind}`)); + return; + } + dc.addEventListener( 'bufferedamountlow', () => { this.off(EngineEvent.Closing, onClosing); From 88a3cf09ab089a2d8afeee21d8be20c1fcf7b392 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 14 Apr 2026 15:54:46 +0200 Subject: [PATCH 12/13] Update quiet-suits-matter.md --- .changeset/quiet-suits-matter.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.changeset/quiet-suits-matter.md b/.changeset/quiet-suits-matter.md index c9da500097..ed36c208ff 100644 --- a/.changeset/quiet-suits-matter.md +++ b/.changeset/quiet-suits-matter.md @@ -2,4 +2,4 @@ "livekit-client": patch --- -Close streamcontrollers when datatrack gets unpublished +chore: listen to dc buffer events instead of sleep-looping From 423cd2ee63cbc07289ecf99f4763faa7058d8ef2 Mon Sep 17 00:00:00 2001 From: lukasIO Date: Tue, 14 Apr 2026 16:04:43 +0200 Subject: [PATCH 13/13] Update changeset for waitForBufferStatus ordering fix --- .changeset/fast-pots-fold.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/fast-pots-fold.md diff --git a/.changeset/fast-pots-fold.md b/.changeset/fast-pots-fold.md new file mode 100644 index 0000000000..e81bd10df4 --- /dev/null +++ b/.changeset/fast-pots-fold.md @@ -0,0 +1,5 @@ +--- +"livekit-client": patch +--- + +fix: improve ordering of resolving waitForBufferStatus calls