From 27f70b7121867b3c81d22584d93954b745eda1e4 Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Fri, 15 May 2026 14:04:26 -0700 Subject: [PATCH 1/6] Fix playback flush and speech interruption races --- agents/src/voice/agent_activity.test.ts | 77 ++++++++++++++++++++++++ agents/src/voice/agent_activity.ts | 45 +++++++++----- agents/src/voice/audio_recognition.ts | 4 +- agents/src/voice/room_io/_output.test.ts | 56 +++++++++++++++++ agents/src/voice/room_io/_output.ts | 16 ++++- 5 files changed, 180 insertions(+), 18 deletions(-) diff --git a/agents/src/voice/agent_activity.test.ts b/agents/src/voice/agent_activity.test.ts index 03ddd2dd4..92ab4f268 100644 --- a/agents/src/voice/agent_activity.test.ts +++ b/agents/src/voice/agent_activity.test.ts @@ -19,6 +19,7 @@ import { describe, expect, it, vi } from 'vitest'; import type { ChatContext } from '../llm/chat_context.js'; import { LLM, type LLMStream } from '../llm/llm.js'; import { Future } from '../utils.js'; +import { type VADEvent, VADEventType } from '../vad.js'; import { AgentActivity } from './agent_activity.js'; import type { PreemptiveGenerationInfo } from './audio_recognition.js'; import { SpeechHandle } from './speech_handle.js'; @@ -97,6 +98,69 @@ function buildMainTaskRunner() { }; } +function buildVadInterruptionRunner() { + const audioOutput = { + canPause: true, + pause: vi.fn(), + resume: vi.fn(), + }; + const audioRecognition = { + onEndOfAgentSpeech: vi.fn(), + cancelBackchannelBoundary: vi.fn(), + }; + const speechHandle = SpeechHandle.create({ allowInterruptions: true }); + + const fakeActivity: Record = { + turnDetection: 'vad', + isInterruptionByAudioActivityEnabled: true, + isDefaultInterruptionByAudioActivityEnabled: true, + isInterruptionDetectionEnabled: false, + _currentSpeech: speechHandle, + stt: undefined, + llm: undefined, + realtimeSession: undefined, + audioRecognition, + falseInterruptionTimer: undefined, + pausedSpeech: undefined, + agentSession: { + agentState: 'speaking', + _aecWarmupRemaining: 0, + _userSpeakingSpan: undefined, + output: { audio: audioOutput }, + sessionOptions: { + turnHandling: { + interruption: { + minDuration: 0, + minWords: 0, + resumeFalseInterruption: true, + falseInterruptionTimeout: 500, + }, + }, + }, + _updateAgentState: vi.fn((state: string) => { + (fakeActivity.agentSession as { agentState: string }).agentState = state; + }), + }, + }; + Object.setPrototypeOf(fakeActivity, AgentActivity.prototype); + + const vadEvent: VADEvent = { + type: VADEventType.INFERENCE_DONE, + samplesIndex: 0, + timestamp: Date.now(), + speechDuration: 1, + silenceDuration: 0, + frames: [], + probability: 1, + inferenceDuration: 0, + speaking: true, + rawAccumulatedSilence: 0, + rawAccumulatedSpeech: 1, + }; + + return { fakeActivity, audioOutput, audioRecognition, vadEvent }; +} + describe('AgentActivity - mainTask', () => { it('should recover when speech handle is interrupted after authorization', async () => { const { fakeActivity, mainTask, speechQueue, q_updated } = buildMainTaskRunner(); @@ -230,6 +294,19 @@ describe('AgentActivity - mainTask', () => { }); }); +describe('AgentActivity - VAD interruption', () => { + it('ends agent speech once when repeated VAD inference pauses the same speech', () => { + const { fakeActivity, audioOutput, audioRecognition, vadEvent } = buildVadInterruptionRunner(); + + fakeActivity.onVADInferenceDone(vadEvent); + fakeActivity.onVADInferenceDone(vadEvent); + + expect(audioOutput.pause).toHaveBeenCalledTimes(1); + expect(audioRecognition.onEndOfAgentSpeech).toHaveBeenCalledTimes(1); + expect((fakeActivity.agentSession as { agentState: string }).agentState).toBe('listening'); + }); +}); + /** * Unit tests for the preemptive-generation guards in AgentActivity. * diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts index e53f39b14..5d5a9a585 100644 --- a/agents/src/voice/agent_activity.ts +++ b/agents/src/voice/agent_activity.ts @@ -1191,19 +1191,19 @@ export class AgentActivity implements RecognitionHooks { } } - private interruptByAudioActivity(options?: { ignoreUserTranscriptUntil?: number }): void { + private interruptByAudioActivity(options?: { ignoreUserTranscriptUntil?: number }): boolean { if (!this.isInterruptionByAudioActivityEnabled) { - return; + return false; } if (this.agentSession._aecWarmupRemaining > 0) { // Disable interruption from audio activity while AEC warmup is active. - return; + return false; } if (this.llm instanceof RealtimeModel && this.llm.capabilities.turnDetection) { // skip speech handle interruption if server side turn detection is enabled - return; + return false; } // Refactored interruption word count check: @@ -1225,7 +1225,7 @@ export class AgentActivity implements RecognitionHooks { // Only allow interruption if word count meets or exceeds minInterruptionWords // This applies to all cases: empty strings, partial speech, and full speech if (wordCount < this.agentSession.sessionOptions.turnHandling.interruption?.minWords) { - return; + return false; } } @@ -1246,11 +1246,16 @@ export class AgentActivity implements RecognitionHooks { const timeout = this.agentSession.sessionOptions.turnHandling.interruption.falseInterruptionTimeout; const audioOutput = this.agentSession.output.audio; + if (this.pausedSpeech?.handle === this._currentSpeech) { + this.updatePausedSpeech(this._currentSpeech, timeout); + return false; + } + const wasAgentSpeaking = this.agentSession.agentState === 'speaking'; if ( this.isInterruptionDetectionEnabled && this.audioRecognition && - this.agentSession.agentState === 'speaking' + wasAgentSpeaking ) { this.audioRecognition.onStartOfOverlapSpeech( 0, @@ -1261,14 +1266,17 @@ export class AgentActivity implements RecognitionHooks { this.updatePausedSpeech(this._currentSpeech, timeout); audioOutput!.pause(); - this.agentSession._updateAgentState('listening'); - if (this.audioRecognition) { - this.audioRecognition.onEndOfAgentSpeech( - options?.ignoreUserTranscriptUntil ?? Date.now(), - ); - } - if (this.isInterruptionDetectionEnabled) { - this.restoreInterruptionByAudioActivity(); + if (wasAgentSpeaking) { + this.agentSession._updateAgentState('listening'); + if (this.audioRecognition) { + this.audioRecognition.onEndOfAgentSpeech( + options?.ignoreUserTranscriptUntil ?? Date.now(), + ); + } + if (this.isInterruptionDetectionEnabled) { + this.restoreInterruptionByAudioActivity(); + } + return true; } } else { this.logger.info( @@ -1279,14 +1287,15 @@ export class AgentActivity implements RecognitionHooks { this._currentSpeech.interrupt(); } } + return false; } onInterruption(ev: OverlappingSpeechEvent) { this.restoreInterruptionByAudioActivity(); - this.interruptByAudioActivity({ + const endedAgentSpeech = this.interruptByAudioActivity({ ignoreUserTranscriptUntil: ev.overlapStartedAt || ev.detectedAt, }); - if (this.audioRecognition) { + if (!endedAgentSpeech && this.audioRecognition && this.agentSession.agentState === 'speaking') { this.audioRecognition.onEndOfAgentSpeech(ev.overlapStartedAt || ev.detectedAt); } } @@ -2562,6 +2571,10 @@ export class AgentActivity implements RecognitionHooks { this._backgroundSpeeches.delete(speechHandle); } + if (speechHandle.interrupted) { + return; + } + if (toolOutput.output.length === 0) return; // important: no agent output should be used after this point diff --git a/agents/src/voice/audio_recognition.ts b/agents/src/voice/audio_recognition.ts index 894ca9d9f..1850d0d1e 100644 --- a/agents/src/voice/audio_recognition.ts +++ b/agents/src/voice/audio_recognition.ts @@ -461,8 +461,10 @@ export class AudioRecognition { async onEndOfAgentSpeech(ignoreUserTranscriptUntil: number) { this.cancelBackchannelBoundary(); + const now = Date.now(); + if (this.isAgentSpeaking) { - this.endpointing.onEndOfAgentSpeech(Date.now()); + this.endpointing.onEndOfAgentSpeech(now); } if (!this.isInterruptionEnabled) { diff --git a/agents/src/voice/room_io/_output.test.ts b/agents/src/voice/room_io/_output.test.ts index 16e07d505..cd758fba0 100644 --- a/agents/src/voice/room_io/_output.test.ts +++ b/agents/src/voice/room_io/_output.test.ts @@ -5,6 +5,8 @@ import { describe, expect, it, vi } from 'vitest'; import { Future } from '../../utils.js'; import { ParticipantAudioOutput } from './_output.js'; +const nextTick = () => new Promise((resolve) => setImmediate(resolve)); + describe('ParticipantAudioOutput waitForPlayoutTask', () => { it('resets tracked duration after non-interrupted playout', async () => { let resolvePlayout!: () => void; @@ -95,4 +97,58 @@ describe('ParticipantAudioOutput waitForPlayoutTask', () => { interrupted: true, }); }); + + it('does not finish one segment twice when flush is called again before playout drains', async () => { + let resolvePlayout!: () => void; + const waitForPlayout = new Promise((resolve) => { + resolvePlayout = resolve; + }); + + const output = Object.create(ParticipantAudioOutput.prototype) as ParticipantAudioOutput & { + pushedDuration: number; + flushTask?: { done: boolean }; + flushPushedDuration?: number; + interruptedFuture: Future; + firstFrameEmitted: boolean; + audioSource: { + waitForPlayout: () => Promise; + queuedDuration: number; + clearQueue: () => void; + }; + onPlaybackFinished: (event: { playbackPosition: number; interrupted: boolean }) => void; + logger: { + error: () => void; + }; + }; + + const onPlaybackFinished = vi.fn(); + output.pushedDuration = 1.0; + output.interruptedFuture = new Future(); + output.firstFrameEmitted = true; + output.onPlaybackFinished = onPlaybackFinished; + output.logger = { + error: vi.fn(), + }; + output.audioSource = { + waitForPlayout: () => waitForPlayout, + queuedDuration: 0, + clearQueue: vi.fn(), + }; + + output.flush(); + await nextTick(); + + output.flush(); + await nextTick(); + + resolvePlayout(); + await nextTick(); + + expect(onPlaybackFinished).toHaveBeenCalledTimes(1); + expect(onPlaybackFinished).toHaveBeenCalledWith({ + playbackPosition: 1.0, + interrupted: false, + }); + expect(output.logger.error).not.toHaveBeenCalled(); + }); }); diff --git a/agents/src/voice/room_io/_output.ts b/agents/src/voice/room_io/_output.ts index c90c7e5a3..63772f142 100644 --- a/agents/src/voice/room_io/_output.ts +++ b/agents/src/voice/room_io/_output.ts @@ -375,6 +375,7 @@ export class ParticipantAudioOutput extends AudioOutput { private audioSource: AudioSource; private publication?: LocalTrackPublication; private flushTask?: Task; + private flushPushedDuration?: number; /** Duration of audio pushed to the source, in seconds */ private pushedDuration: number = 0; @@ -465,11 +466,24 @@ export class ParticipantAudioOutput extends AudioOutput { } if (this.flushTask && !this.flushTask.done) { + if (this.flushPushedDuration === this.pushedDuration) { + return; + } + this.logger.error('flush called while playback is in progress'); this.flushTask.cancel(); } - this.flushTask = Task.from((controller) => this.waitForPlayoutTask(controller)); + this.flushPushedDuration = this.pushedDuration; + const flushTask = Task.from((controller) => this.waitForPlayoutTask(controller)); + this.flushTask = flushTask; + void flushTask.result + .finally(() => { + if (this.flushTask === flushTask) { + this.flushPushedDuration = undefined; + } + }) + .catch(() => {}); } clearBuffer(): void { From 2e0f067c81b4f4a48aad5f0c8e1961dc3f5b870f Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Fri, 15 May 2026 14:21:34 -0700 Subject: [PATCH 2/6] Update agent_activity.ts --- agents/src/voice/agent_activity.ts | 6 +----- 1 file changed, 1 insertion(+), 5 deletions(-) diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts index 5d5a9a585..ebc268bea 100644 --- a/agents/src/voice/agent_activity.ts +++ b/agents/src/voice/agent_activity.ts @@ -1252,11 +1252,7 @@ export class AgentActivity implements RecognitionHooks { } const wasAgentSpeaking = this.agentSession.agentState === 'speaking'; - if ( - this.isInterruptionDetectionEnabled && - this.audioRecognition && - wasAgentSpeaking - ) { + if (this.isInterruptionDetectionEnabled && this.audioRecognition && wasAgentSpeaking) { this.audioRecognition.onStartOfOverlapSpeech( 0, Date.now(), From d87ff4f111610cf4796d57e2b083ba62294d6307 Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Fri, 15 May 2026 18:31:10 -0700 Subject: [PATCH 3/6] Fix voice interruption state recovery Ensure agent speech end state is synchronized across VAD interruption, confirmed interruption, and pipeline completion paths so STT does not remain stuck buffering user speech as overlap. Add focused regressions for repeated VAD interruption, paused-speech interruption recovery, and pipeline completion cleanup. --- agents/src/voice/agent_activity.test.ts | 91 ++++++++++++++++++- agents/src/voice/agent_activity.ts | 33 ++++++- agents/src/voice/audio_recognition.ts | 31 ++++--- .../audio_recognition_interruption.test.ts | 51 +++++++++++ 4 files changed, 188 insertions(+), 18 deletions(-) create mode 100644 agents/src/voice/audio_recognition_interruption.test.ts diff --git a/agents/src/voice/agent_activity.test.ts b/agents/src/voice/agent_activity.test.ts index 92ab4f268..658222ecf 100644 --- a/agents/src/voice/agent_activity.test.ts +++ b/agents/src/voice/agent_activity.test.ts @@ -16,7 +16,7 @@ */ import { Heap } from 'heap-js'; import { describe, expect, it, vi } from 'vitest'; -import type { ChatContext } from '../llm/chat_context.js'; +import { ChatContext } from '../llm/chat_context.js'; import { LLM, type LLMStream } from '../llm/llm.js'; import { Future } from '../utils.js'; import { type VADEvent, VADEventType } from '../vad.js'; @@ -307,6 +307,79 @@ describe('AgentActivity - VAD interruption', () => { }); }); +describe('AgentActivity - speech completion', () => { + it('ends audio recognition speech when pipeline completion moves session out of speaking', () => { + const audioRecognition = { + onEndOfAgentSpeech: vi.fn(), + }; + const fakeActivity = { + speechQueue: { + peek: () => undefined, + }, + _currentSpeech: { + done: () => true, + }, + audioRecognition, + agentSession: { + agentState: 'speaking', + _updateAgentState: vi.fn((state: string) => { + fakeActivity.agentSession.agentState = state; + }), + }, + }; + + const onPipelineReplyDone = (AgentActivity.prototype as Record) + .onPipelineReplyDone as (this: typeof fakeActivity) => void; + + onPipelineReplyDone.call(fakeActivity); + + expect(fakeActivity.agentSession._updateAgentState).toHaveBeenCalledWith('listening'); + expect(audioRecognition.onEndOfAgentSpeech).toHaveBeenCalledTimes(1); + }); +}); + +describe('AgentActivity - confirmed interruptions', () => { + it('ends audio recognition when confirmed interruption only has paused speech', () => { + const pausedSpeech = SpeechHandle.create({ allowInterruptions: true }); + + const fakeActivity: Record = { + restoreInterruptionByAudioActivity: vi.fn(), + interruptByAudioActivity: vi.fn(() => false), + _currentSpeech: undefined, + pausedSpeech: { handle: pausedSpeech }, + falseInterruptionTimer: undefined, + audioRecognition: { + onEndOfAgentSpeech: vi.fn(), + }, + agentSession: { + agentState: 'listening', + }, + }; + Object.setPrototypeOf(fakeActivity, AgentActivity.prototype); + + const onInterruption = (AgentActivity.prototype as Record).onInterruption as ( + this: typeof fakeActivity, + ev: Parameters[0], + ) => void; + + onInterruption.call(fakeActivity, { + type: 'overlapping_speech', + detectedAt: Date.now(), + isInterruption: true, + }); + + expect(pausedSpeech.interrupted).toBe(true); + expect(fakeActivity.pausedSpeech).toBeUndefined(); + expect( + ( + fakeActivity.audioRecognition as { + onEndOfAgentSpeech: ReturnType; + } + ).onEndOfAgentSpeech, + ).toHaveBeenCalledTimes(1); + }); +}); + /** * Unit tests for the preemptive-generation guards in AgentActivity. * @@ -345,7 +418,7 @@ function buildPreemptiveRunner(opts: Partial = {}) { ); const cancelPreemptiveGeneration = vi.fn(); - const fakeChatCtx = { copy: () => fakeChatCtx } as unknown as ChatContext; + const fakeChatCtx = new ChatContext(); const fakeActivity = { _preemptiveGenerationCount: 0, @@ -370,6 +443,7 @@ function buildPreemptiveRunner(opts: Partial = {}) { generateReply, cancelPreemptiveGeneration, }; + Object.setPrototypeOf(fakeActivity, AgentActivity.prototype); const onPreemptiveGeneration = (AgentActivity.prototype as Record) .onPreemptiveGeneration as (this: unknown, info: PreemptiveGenerationInfo) => void; @@ -451,4 +525,17 @@ describe('AgentActivity - onPreemptiveGeneration guards', () => { expect(generateReply).not.toHaveBeenCalled(); expect(cancelPreemptiveGeneration).not.toHaveBeenCalled(); }); + + it('skips preemption while a paused speech is still active', () => { + const { fakeActivity, generateReply, call } = buildPreemptiveRunner(); + const pausedSpeech = SpeechHandle.create({ allowInterruptions: true }); + + fakeActivity.pausedSpeech = { handle: pausedSpeech }; + + call(); + + expect(fakeActivity._preemptiveGenerationCount).toBe(0); + expect(generateReply).not.toHaveBeenCalled(); + }); + }); diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts index ebc268bea..0decb4575 100644 --- a/agents/src/voice/agent_activity.ts +++ b/agents/src/voice/agent_activity.ts @@ -1288,10 +1288,17 @@ export class AgentActivity implements RecognitionHooks { onInterruption(ev: OverlappingSpeechEvent) { this.restoreInterruptionByAudioActivity(); + const endedAgentSpeech = this.interruptByAudioActivity({ ignoreUserTranscriptUntil: ev.overlapStartedAt || ev.detectedAt, }); - if (!endedAgentSpeech && this.audioRecognition && this.agentSession.agentState === 'speaking') { + const interruptedPausedSpeech = endedAgentSpeech ? undefined : this.interruptPausedSpeech(); + + if ( + !endedAgentSpeech && + this.audioRecognition && + (this.agentSession.agentState === 'speaking' || interruptedPausedSpeech !== undefined) + ) { this.audioRecognition.onEndOfAgentSpeech(ev.overlapStartedAt || ev.detectedAt); } } @@ -1379,6 +1386,7 @@ export class AgentActivity implements RecognitionHooks { if ( !preemptiveOpts.enabled || this.schedulingPaused || + (this.pausedSpeech !== undefined && !this.pausedSpeech.handle.interrupted) || (this._currentSpeech !== undefined && !this._currentSpeech.interrupted) || !(this.llm instanceof LLM) ) { @@ -1812,7 +1820,11 @@ export class AgentActivity implements RecognitionHooks { private onPipelineReplyDone(): void { if (!this.speechQueue.peek() && (!this._currentSpeech || this._currentSpeech.done())) { + const wasSpeaking = this.agentSession.agentState === 'speaking'; this.agentSession._updateAgentState('listening'); + if (wasSpeaking && this.audioRecognition) { + this.audioRecognition.onEndOfAgentSpeech(Date.now()); + } } } @@ -3595,6 +3607,25 @@ export class AgentActivity implements RecognitionHooks { }, timeout); } + private interruptPausedSpeech(): SpeechHandle | undefined { + if (this.falseInterruptionTimer !== undefined) { + clearTimeout(this.falseInterruptionTimer); + this.falseInterruptionTimer = undefined; + } + + if (!this.pausedSpeech) { + return undefined; + } + + const speechHandle = this.pausedSpeech.handle; + if (!speechHandle.interrupted && speechHandle.allowInterruptions) { + speechHandle.interrupt(); + } + this.pausedSpeech = undefined; + + return speechHandle; + } + private async cancelSpeechPause(options?: { interrupt?: boolean }): Promise { const { interrupt = true } = options ?? {}; diff --git a/agents/src/voice/audio_recognition.ts b/agents/src/voice/audio_recognition.ts index 1850d0d1e..eb1e365d7 100644 --- a/agents/src/voice/audio_recognition.ts +++ b/agents/src/voice/audio_recognition.ts @@ -12,9 +12,8 @@ import { context as otelContext, trace, } from '@opentelemetry/api'; +import type { ReadableStream, WritableStreamDefaultWriter } from 'node:stream/web'; import { TransformStream } from 'node:stream/web'; -import type { WritableStreamDefaultWriter } from 'node:stream/web'; -import type { ReadableStream } from 'node:stream/web'; import { isAPIError } from '../_exceptions.js'; import { apiConnectDefaults, intervalForRetry } from '../inference/interruption/defaults.js'; import { InterruptionDetectionError } from '../inference/interruption/errors.js'; @@ -462,8 +461,8 @@ export class AudioRecognition { this.cancelBackchannelBoundary(); const now = Date.now(); - - if (this.isAgentSpeaking) { + const wasAgentSpeaking = this.isAgentSpeaking; + if (wasAgentSpeaking) { this.endpointing.onEndOfAgentSpeech(now); } @@ -472,20 +471,13 @@ export class AudioRecognition { return; } - const inputOpen = await this.trySendInterruptionSentinel( - InterruptionStreamSentinel.agentSpeechEnded(), - ); - if (!inputOpen) { - this.isAgentSpeaking = false; - return; - } - - if (this.isAgentSpeaking) { + let endCooldown = 0; + if (wasAgentSpeaking) { if (this.ignoreUserTranscriptUntil === undefined) { this.onEndOfOverlapSpeech(Date.now()); } - const endCooldown = this.backchannelBoundary ? this.backchannelBoundary[1] : 0; + endCooldown = this.backchannelBoundary ? this.backchannelBoundary[1] : 0; const ignoreUntil = this.ignoreUserTranscriptUntil ? Math.min(ignoreUserTranscriptUntil, this.ignoreUserTranscriptUntil) : ignoreUserTranscriptUntil; @@ -493,11 +485,20 @@ export class AudioRecognition { // Subtracting `endCooldown` widens the release window so transcripts that ended just // before the agent finished speaking (premature corrections) are surfaced. this.ignoreUserTranscriptUntil = ignoreUntil - endCooldown; + } + this.isAgentSpeaking = false; + + const inputOpen = await this.trySendInterruptionSentinel( + InterruptionStreamSentinel.agentSpeechEnded(), + ); + if (!inputOpen) { + return; + } + if (wasAgentSpeaking) { // flush held transcripts if possible await this.flushHeldTranscripts(endCooldown); } - this.isAgentSpeaking = false; } /** Start interruption inference when agent is speaking and overlap speech starts. */ diff --git a/agents/src/voice/audio_recognition_interruption.test.ts b/agents/src/voice/audio_recognition_interruption.test.ts new file mode 100644 index 000000000..60da56cb7 --- /dev/null +++ b/agents/src/voice/audio_recognition_interruption.test.ts @@ -0,0 +1,51 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { describe, expect, it, vi } from 'vitest'; +import { ChatContext } from '../llm/chat_context.js'; +import { initializeLogger } from '../log.js'; +import { type SpeechEvent, SpeechEventType } from '../stt/stt.js'; +import { AudioRecognition, type RecognitionHooks } from './audio_recognition.js'; + +function createHooks(): RecognitionHooks { + return { + onInterruption: vi.fn(), + onStartOfSpeech: vi.fn(), + onVADInferenceDone: vi.fn(), + onEndOfSpeech: vi.fn(), + onInterimTranscript: vi.fn(), + onFinalTranscript: vi.fn(), + onPreemptiveGeneration: vi.fn(), + retrieveChatCtx: () => ChatContext.empty(), + onEndOfTurn: vi.fn(async () => true), + }; +} + +describe('AudioRecognition interruption buffering', () => { + initializeLogger({ pretty: false, level: 'silent' }); + + it('does not keep buffering final transcripts after agent speech end begins', async () => { + const hooks = createHooks(); + hooks.onEndOfTurn = vi.fn(async () => false); + const recognition = new AudioRecognition({ + recognitionHooks: hooks, + minEndpointingDelay: 0, + maxEndpointingDelay: 0, + }); + + await recognition.onStartOfAgentSpeech(Date.now()); + (recognition as any).isInterruptionEnabled = true; + (recognition as any).trySendInterruptionSentinel = vi.fn(() => new Promise(() => {})); + + void recognition.onEndOfAgentSpeech(Date.now()); + const finalTranscript: SpeechEvent = { + type: SpeechEventType.FINAL_TRANSCRIPT, + alternatives: [{ text: 'still listening', confidence: 0.9 }], + }; + await (recognition as any).onSTTEvent(finalTranscript); + + expect(hooks.onFinalTranscript).toHaveBeenCalledTimes(1); + expect(recognition.currentTranscript).toBe('still listening'); + expect((recognition as any).transcriptBuffer).toHaveLength(0); + }); +}); From c1466c98d87d890a3dcfdbe2b9a5f0f65aac0a52 Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Fri, 15 May 2026 18:34:01 -0700 Subject: [PATCH 4/6] Create six-dryers-fry.md --- .changeset/six-dryers-fry.md | 5 +++++ 1 file changed, 5 insertions(+) create mode 100644 .changeset/six-dryers-fry.md diff --git a/.changeset/six-dryers-fry.md b/.changeset/six-dryers-fry.md new file mode 100644 index 000000000..8fc9e10a2 --- /dev/null +++ b/.changeset/six-dryers-fry.md @@ -0,0 +1,5 @@ +--- +"@livekit/agents": patch +--- + +Fix playback flush and speech interruption races From 810fead03b0ded446556746ca7102d4efc6275ec Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Fri, 15 May 2026 23:00:25 -0700 Subject: [PATCH 5/6] avoid sending a overlapped backchannel signal along with interruption --- agents/src/voice/agent_activity.test.ts | 1 - agents/src/voice/audio_recognition.ts | 17 ++++++++++++----- 2 files changed, 12 insertions(+), 6 deletions(-) diff --git a/agents/src/voice/agent_activity.test.ts b/agents/src/voice/agent_activity.test.ts index 658222ecf..a62386a9c 100644 --- a/agents/src/voice/agent_activity.test.ts +++ b/agents/src/voice/agent_activity.test.ts @@ -537,5 +537,4 @@ describe('AgentActivity - onPreemptiveGeneration guards', () => { expect(fakeActivity._preemptiveGenerationCount).toBe(0); expect(generateReply).not.toHaveBeenCalled(); }); - }); diff --git a/agents/src/voice/audio_recognition.ts b/agents/src/voice/audio_recognition.ts index eb1e365d7..c17d548f9 100644 --- a/agents/src/voice/audio_recognition.ts +++ b/agents/src/voice/audio_recognition.ts @@ -462,6 +462,10 @@ export class AudioRecognition { const now = Date.now(); const wasAgentSpeaking = this.isAgentSpeaking; + // Snapshot the prior value BEFORE we mutate `this.ignoreUserTranscriptUntil` below. + // The gate around `onEndOfOverlapSpeech` depends on whether overlap had been registered + // during this agent speech (i.e. the field was undefined at entry). + const priorIgnoreUserTranscriptUntil = this.ignoreUserTranscriptUntil; if (wasAgentSpeaking) { this.endpointing.onEndOfAgentSpeech(now); } @@ -473,10 +477,6 @@ export class AudioRecognition { let endCooldown = 0; if (wasAgentSpeaking) { - if (this.ignoreUserTranscriptUntil === undefined) { - this.onEndOfOverlapSpeech(Date.now()); - } - endCooldown = this.backchannelBoundary ? this.backchannelBoundary[1] : 0; const ignoreUntil = this.ignoreUserTranscriptUntil ? Math.min(ignoreUserTranscriptUntil, this.ignoreUserTranscriptUntil) @@ -486,6 +486,8 @@ export class AudioRecognition { // before the agent finished speaking (premature corrections) are surfaced. this.ignoreUserTranscriptUntil = ignoreUntil - endCooldown; } + // Set isAgentSpeaking = false BEFORE awaiting the sentinel so STT events + // arriving while the sentinel is in flight are not buffered (STT-deaf fix). this.isAgentSpeaking = false; const inputOpen = await this.trySendInterruptionSentinel( @@ -496,7 +498,12 @@ export class AudioRecognition { } if (wasAgentSpeaking) { - // flush held transcripts if possible + // Notify overlap end AFTER the agent-speech-ended sentinel resets the inference + // stream, so it does not emit a synthetic isInterruption:false event after the + // confirmed interruption. + if (priorIgnoreUserTranscriptUntil === undefined) { + this.onEndOfOverlapSpeech(Date.now()); + } await this.flushHeldTranscripts(endCooldown); } } From 42261c3014bdff0adabd44dfb00c0cdf4f8a0661 Mon Sep 17 00:00:00 2001 From: Brian Yin Date: Sun, 17 May 2026 18:24:58 -0700 Subject: [PATCH 6/6] improve semantics --- agents/src/voice/agent_activity.test.ts | 17 ++++------------- agents/src/voice/audio_recognition.ts | 15 +++++++-------- .../audio_recognition_interruption.test.ts | 14 ++++++++++---- 3 files changed, 21 insertions(+), 25 deletions(-) diff --git a/agents/src/voice/agent_activity.test.ts b/agents/src/voice/agent_activity.test.ts index a62386a9c..4baa662ff 100644 --- a/agents/src/voice/agent_activity.test.ts +++ b/agents/src/voice/agent_activity.test.ts @@ -341,6 +341,7 @@ describe('AgentActivity - speech completion', () => { describe('AgentActivity - confirmed interruptions', () => { it('ends audio recognition when confirmed interruption only has paused speech', () => { const pausedSpeech = SpeechHandle.create({ allowInterruptions: true }); + const audioRecognition = { onEndOfAgentSpeech: vi.fn() }; const fakeActivity: Record = { restoreInterruptionByAudioActivity: vi.fn(), @@ -348,12 +349,8 @@ describe('AgentActivity - confirmed interruptions', () => { _currentSpeech: undefined, pausedSpeech: { handle: pausedSpeech }, falseInterruptionTimer: undefined, - audioRecognition: { - onEndOfAgentSpeech: vi.fn(), - }, - agentSession: { - agentState: 'listening', - }, + audioRecognition, + agentSession: { agentState: 'listening' }, }; Object.setPrototypeOf(fakeActivity, AgentActivity.prototype); @@ -370,13 +367,7 @@ describe('AgentActivity - confirmed interruptions', () => { expect(pausedSpeech.interrupted).toBe(true); expect(fakeActivity.pausedSpeech).toBeUndefined(); - expect( - ( - fakeActivity.audioRecognition as { - onEndOfAgentSpeech: ReturnType; - } - ).onEndOfAgentSpeech, - ).toHaveBeenCalledTimes(1); + expect(audioRecognition.onEndOfAgentSpeech).toHaveBeenCalledTimes(1); }); }); diff --git a/agents/src/voice/audio_recognition.ts b/agents/src/voice/audio_recognition.ts index c17d548f9..093ae0b2e 100644 --- a/agents/src/voice/audio_recognition.ts +++ b/agents/src/voice/audio_recognition.ts @@ -462,9 +462,8 @@ export class AudioRecognition { const now = Date.now(); const wasAgentSpeaking = this.isAgentSpeaking; - // Snapshot the prior value BEFORE we mutate `this.ignoreUserTranscriptUntil` below. - // The gate around `onEndOfOverlapSpeech` depends on whether overlap had been registered - // during this agent speech (i.e. the field was undefined at entry). + // Capture before the assignment below; the overlap-end notification only fires when no + // overlap had been registered during this agent speech. const priorIgnoreUserTranscriptUntil = this.ignoreUserTranscriptUntil; if (wasAgentSpeaking) { this.endpointing.onEndOfAgentSpeech(now); @@ -486,8 +485,8 @@ export class AudioRecognition { // before the agent finished speaking (premature corrections) are surfaced. this.ignoreUserTranscriptUntil = ignoreUntil - endCooldown; } - // Set isAgentSpeaking = false BEFORE awaiting the sentinel so STT events - // arriving while the sentinel is in flight are not buffered (STT-deaf fix). + // Clear before awaiting the sentinel so STT events arriving while the sentinel is in + // flight are not buffered. this.isAgentSpeaking = false; const inputOpen = await this.trySendInterruptionSentinel( @@ -498,9 +497,9 @@ export class AudioRecognition { } if (wasAgentSpeaking) { - // Notify overlap end AFTER the agent-speech-ended sentinel resets the inference - // stream, so it does not emit a synthetic isInterruption:false event after the - // confirmed interruption. + // Notify overlap end after the agent-speech-ended sentinel resets the inference stream + // so it does not emit a synthetic `isInterruption: false` event following a real + // interruption. if (priorIgnoreUserTranscriptUntil === undefined) { this.onEndOfOverlapSpeech(Date.now()); } diff --git a/agents/src/voice/audio_recognition_interruption.test.ts b/agents/src/voice/audio_recognition_interruption.test.ts index 60da56cb7..3011f661a 100644 --- a/agents/src/voice/audio_recognition_interruption.test.ts +++ b/agents/src/voice/audio_recognition_interruption.test.ts @@ -34,18 +34,24 @@ describe('AudioRecognition interruption buffering', () => { }); await recognition.onStartOfAgentSpeech(Date.now()); - (recognition as any).isInterruptionEnabled = true; - (recognition as any).trySendInterruptionSentinel = vi.fn(() => new Promise(() => {})); + const internals = recognition as unknown as { + isInterruptionEnabled: boolean; + trySendInterruptionSentinel: () => Promise; + onSTTEvent: (ev: SpeechEvent) => Promise; + transcriptBuffer: unknown[]; + }; + internals.isInterruptionEnabled = true; + internals.trySendInterruptionSentinel = vi.fn(() => new Promise(() => {})); void recognition.onEndOfAgentSpeech(Date.now()); const finalTranscript: SpeechEvent = { type: SpeechEventType.FINAL_TRANSCRIPT, alternatives: [{ text: 'still listening', confidence: 0.9 }], }; - await (recognition as any).onSTTEvent(finalTranscript); + await internals.onSTTEvent(finalTranscript); expect(hooks.onFinalTranscript).toHaveBeenCalledTimes(1); expect(recognition.currentTranscript).toBe('still listening'); - expect((recognition as any).transcriptBuffer).toHaveLength(0); + expect(internals.transcriptBuffer).toHaveLength(0); }); });