From 885cb33d2be150267d42c839fec63dd05a90044f Mon Sep 17 00:00:00 2001 From: Jason Lernerman Date: Tue, 12 May 2026 09:57:39 -0400 Subject: [PATCH] fix(neuphonic,resemble): avoid dropping TTS audio frames between WebSocket listener re-registrations SynthesizeStream.recvTask wrapped each message read in a fresh Promise that called ws.removeAllListeners() and re-attached message/error/close handlers per iteration. Frames arriving between resolve() and the next on('message') registration were silently dropped by the ws library (no buffer in that window), producing audible gaps in the rendered output. Mirror the Cartesia plugin's fix: attach listeners once before the consume loop, pipe incoming frames through stream.createStreamChannel(), and detach via scoped ws.off() in finally. --- .changeset/tts-ws-listener-reregistration.md | 6 + plugins/neuphonic/src/tts.ts | 142 ++++++++------- plugins/resemble/src/tts.ts | 175 ++++++++++--------- 3 files changed, 173 insertions(+), 150 deletions(-) create mode 100644 .changeset/tts-ws-listener-reregistration.md diff --git a/.changeset/tts-ws-listener-reregistration.md b/.changeset/tts-ws-listener-reregistration.md new file mode 100644 index 000000000..e307b62cf --- /dev/null +++ b/.changeset/tts-ws-listener-reregistration.md @@ -0,0 +1,6 @@ +--- +'@livekit/agents-plugin-neuphonic': patch +'@livekit/agents-plugin-resemble': patch +--- + +fix(neuphonic,resemble): drop frames during WebSocket listener re-registration in TTS recvTask diff --git a/plugins/neuphonic/src/tts.ts b/plugins/neuphonic/src/tts.ts index 1a5b262cb..5b9d5af56 100644 --- a/plugins/neuphonic/src/tts.ts +++ b/plugins/neuphonic/src/tts.ts @@ -9,11 +9,12 @@ import { log, normalizeLanguage, shortuuid, + stream, tts, } from '@livekit/agents'; import type { AudioFrame } from '@livekit/rtc-node'; import { request } from 'node:https'; -import { WebSocket } from 'ws'; +import { type RawData, WebSocket } from 'ws'; import { type TTSEncodings, type TTSLangCodes, type TTSModels } from './models.js'; const AUTHORIZATION_HEADER = 'X-API-KEY'; @@ -198,8 +199,10 @@ export class SynthesizeStream extends tts.SynthesizeStream { } }; + // Use event channel and set up listeners ONCE to avoid missing messages during listener re-registration const recvTask = async (ws: WebSocket) => { const bstream = new AudioByteStream(this.#opts.sampleRate, NUM_CHANNELS); + const eventChannel = stream.createStreamChannel(); let lastFrame: AudioFrame | undefined; const sendLastFrame = (segmentId: string, final: boolean) => { @@ -209,80 +212,85 @@ export class SynthesizeStream extends tts.SynthesizeStream { } }; - while (!closing) { - try { - await new Promise((resolve, reject) => { - ws.removeAllListeners(); + const onMessage = (data: RawData) => { + void eventChannel.write(data).catch((error: unknown) => { + this.#logger.debug( + { error }, + 'Failed writing Neuphonic event to channel (likely closed)', + ); + }); + }; - ws.on('message', (data) => { - try { - const json = JSON.parse(data.toString()); + const onClose = (code: number, reason: Buffer) => { + if (!closing) { + this.#logger.error(`WebSocket closed with code ${code}: ${reason.toString()}`); + this.queue.put(SynthesizeStream.END_OF_STREAM); + } + void eventChannel.close(); + }; - if (json?.data?.audio) { - const audio = new Int8Array(Buffer.from(json.data.audio, 'base64')); - for (const frame of bstream.write(audio)) { - sendLastFrame(requestId, false); - lastFrame = frame; - } + const onError = (err: Error) => { + this.#logger.error({ err }, 'Neuphonic WebSocket error'); + if (!closing) { + closing = true; + this.queue.put(SynthesizeStream.END_OF_STREAM); + ws.close(); + } + void eventChannel.close(); + }; - if (json?.data?.stop) { - // This is a bool flag, it is True when audio reaches "" - for (const frame of bstream.flush()) { - sendLastFrame(requestId, false); - lastFrame = frame; - } - sendLastFrame(requestId, true); - this.queue.put(SynthesizeStream.END_OF_STREAM); - - closing = true; - ws.close(); - resolve(); - return; - } - } - resolve(); - } catch (error) { - this.#logger.error(`Error parsing WebSocket message: ${error}`); - reject(error); - } - }); + ws.on('message', onMessage); + ws.on('close', onClose); + ws.on('error', onError); - ws.on('error', (error) => { - this.#logger.error(`WebSocket error: ${error}`); - if (!closing) { - closing = true; - this.queue.put(SynthesizeStream.END_OF_STREAM); - ws.close(); - } - reject(error); - }); + try { + const reader = eventChannel.stream().getReader(); - ws.on('close', (code, reason) => { - if (!closing) { - this.#logger.error(`WebSocket closed with code ${code}: ${reason}`); - this.queue.put(SynthesizeStream.END_OF_STREAM); - } - // Only reject if we haven't processed all expected frames - if (!closing) { - reject(new Error(`WebSocket closed prematurely with code ${code}: ${reason}`)); - } else { - resolve(); - } - }); - }); - } catch (err) { - if (err instanceof Error && !err.message.includes('WebSocket closed prematurely')) { - if (err.message.includes('Queue is closed')) { - this.#logger.warn( - { err }, - 'Queue closed during transcript processing (expected during disconnect)', - ); - } else { - this.#logger.error({ err }, 'Error in recvTask from Neuphonic WebSocket'); + while (!closing) { + const result = await reader.read(); + if (result.done) break; + + const json = JSON.parse(result.value.toString()); + if (!json?.data?.audio) continue; + + const audio = new Int8Array(Buffer.from(json.data.audio, 'base64')); + for (const frame of bstream.write(audio)) { + sendLastFrame(requestId, false); + lastFrame = frame; + } + + if (json?.data?.stop) { + // This is a bool flag, it is True when audio reaches "" + for (const frame of bstream.flush()) { + sendLastFrame(requestId, false); + lastFrame = frame; } + sendLastFrame(requestId, true); + this.queue.put(SynthesizeStream.END_OF_STREAM); + + closing = true; + ws.close(); + break; + } + } + } catch (err) { + if (err instanceof Error && !err.message.includes('WebSocket closed')) { + if ( + err.message.includes('Queue is closed') || + err.message.includes('Channel is closed') + ) { + this.#logger.warn( + { err }, + 'Channel closed during transcript processing (expected during disconnect)', + ); + } else { + this.#logger.error({ err }, 'Error in recvTask from Neuphonic WebSocket'); } - break; } + } finally { + ws.off('message', onMessage); + ws.off('close', onClose); + ws.off('error', onError); } }; diff --git a/plugins/resemble/src/tts.ts b/plugins/resemble/src/tts.ts index f113a002a..37745083d 100644 --- a/plugins/resemble/src/tts.ts +++ b/plugins/resemble/src/tts.ts @@ -7,12 +7,13 @@ import { Future, log, shortuuid, + stream, tokenize, tts, } from '@livekit/agents'; import type { AudioFrame } from '@livekit/rtc-node'; import { request } from 'node:https'; -import { WebSocket } from 'ws'; +import { type RawData, WebSocket } from 'ws'; import type { OutputFormat, Precision, ResembleModel } from './models.js'; export const TTSDefaultVoiceId = '55592656'; @@ -232,8 +233,10 @@ export class SynthesizeStream extends tts.SynthesizeStream { this.#tokenizer.close(); }; + // Use event channel and set up listeners ONCE to avoid missing messages during listener re-registration const recvTask = async (ws: WebSocket) => { const bstream = new AudioByteStream(this.#opts.sampleRate, NUM_CHANNELS); + const eventChannel = stream.createStreamChannel(); let lastFrame: AudioFrame | undefined; const sendLastFrame = (segmentId: string, final: boolean) => { @@ -243,93 +246,99 @@ export class SynthesizeStream extends tts.SynthesizeStream { } }; - // Use promise-based message handling similar to ElevenLabs - while ((!closing && activeRequests.size > 0) || !this.#tokenizer.closed) { - try { - await new Promise((resolve, reject) => { - ws.removeAllListeners(); - ws.on('message', (data) => { - try { - const json = JSON.parse(data.toString()); - const segmentId = json.request_id; - - if ('audio_content' in json) { - try { - const audioData = Buffer.from(json.audio_content, 'base64'); - for (const frame of bstream.write(audioData)) { - sendLastFrame(segmentId, false); - lastFrame = frame; - } - } catch (audioError) { - this.#logger.error(`Error processing audio content: ${audioError}`); - } - } else if ('type' in json && json.type === 'audio_end') { - for (const frame of bstream.flush()) { - sendLastFrame(segmentId, false); - lastFrame = frame; - } - sendLastFrame(segmentId, true); - - activeRequests.delete(Number(segmentId)); - - // Only end the stream when all requests are complete and tokenizer is closed - if (activeRequests.size === 0 && this.#tokenizer.closed) { - this.queue.put(SynthesizeStream.END_OF_STREAM); - closing = true; - ws.close(); - resolve(); - return; - } - } else if ('success' in json && json.success === false) { - const errorName = json.error_name || 'Unknown'; - const explanation = json.error_params?.explanation || 'No details provided'; - this.#logger - .child({ error: errorName }) - .error(`Resemble API error: ${explanation}`); - - closing = true; - this.queue.put(SynthesizeStream.END_OF_STREAM); - ws.close(); - reject(new Error(`Resemble API error: ${errorName}`)); - return; - } - resolve(); - } catch (error) { - this.#logger.error(`Error parsing WebSocket message: ${error}`); - reject(error); - } - }); - - ws.on('error', (error) => { - this.#logger.error(`WebSocket error: ${error}`); - if (!closing) { - closing = true; - this.queue.put(SynthesizeStream.END_OF_STREAM); - ws.close(); - } - reject(error); - }); + const onMessage = (data: RawData) => { + void eventChannel.write(data).catch((error: unknown) => { + this.#logger.debug({ error }, 'Failed writing Resemble event to channel (likely closed)'); + }); + }; - ws.on('close', (code, reason) => { - if (!closing) { - this.#logger.error(`WebSocket closed with code ${code}: ${reason}`); - this.queue.put(SynthesizeStream.END_OF_STREAM); - } - // Only reject if we haven't received all expected frames - if (activeRequests.size > 0 || !this.#tokenizer.closed) { - reject(new Error(`WebSocket closed prematurely with code ${code}: ${reason}`)); - } else { - resolve(); + const onClose = (code: number, reason: Buffer) => { + if (!closing) { + this.#logger.error(`WebSocket closed with code ${code}: ${reason.toString()}`); + this.queue.put(SynthesizeStream.END_OF_STREAM); + } + void eventChannel.close(); + }; + + const onError = (err: Error) => { + this.#logger.error({ err }, 'Resemble WebSocket error'); + if (!closing) { + closing = true; + this.queue.put(SynthesizeStream.END_OF_STREAM); + ws.close(); + } + void eventChannel.close(); + }; + + ws.on('message', onMessage); + ws.on('close', onClose); + ws.on('error', onError); + + try { + const reader = eventChannel.stream().getReader(); + + while ((!closing && activeRequests.size > 0) || !this.#tokenizer.closed) { + const result = await reader.read(); + if (result.done) break; + + const json = JSON.parse(result.value.toString()); + const segmentId = json.request_id; + + if ('audio_content' in json) { + try { + const audioData = Buffer.from(json.audio_content, 'base64'); + for (const frame of bstream.write(audioData)) { + sendLastFrame(segmentId, false); + lastFrame = frame; } - }); - }); - } catch (err) { - // Skip log error for normal websocket close - if (err instanceof Error && !err.message.includes('WebSocket closed prematurely')) { + } catch (audioError) { + this.#logger.error(`Error processing audio content: ${audioError}`); + } + } else if ('type' in json && json.type === 'audio_end') { + for (const frame of bstream.flush()) { + sendLastFrame(segmentId, false); + lastFrame = frame; + } + sendLastFrame(segmentId, true); + + activeRequests.delete(Number(segmentId)); + + // Only end the stream when all requests are complete and tokenizer is closed + if (activeRequests.size === 0 && this.#tokenizer.closed) { + this.queue.put(SynthesizeStream.END_OF_STREAM); + closing = true; + ws.close(); + break; + } + } else if ('success' in json && json.success === false) { + const errorName = json.error_name || 'Unknown'; + const explanation = json.error_params?.explanation || 'No details provided'; + this.#logger.child({ error: errorName }).error(`Resemble API error: ${explanation}`); + + closing = true; + this.queue.put(SynthesizeStream.END_OF_STREAM); + ws.close(); + throw new Error(`Resemble API error: ${errorName}`); + } + } + } catch (err) { + if (err instanceof Error && !err.message.includes('WebSocket closed')) { + if ( + err.message.includes('Queue is closed') || + err.message.includes('Channel is closed') + ) { + this.#logger.warn( + { err }, + 'Channel closed during transcript processing (expected during disconnect)', + ); + } else { this.#logger.error({ err }, 'Error in recvTask from Resemble WebSocket'); } - break; } + } finally { + ws.off('message', onMessage); + ws.off('close', onClose); + ws.off('error', onError); } };