diff --git a/.changeset/six-dryers-fry.md b/.changeset/six-dryers-fry.md new file mode 100644 index 000000000..8fc9e10a2 --- /dev/null +++ b/.changeset/six-dryers-fry.md @@ -0,0 +1,5 @@ +--- +"@livekit/agents": patch +--- + +Fix playback flush and speech interruption races diff --git a/agents/src/voice/agent_activity.test.ts b/agents/src/voice/agent_activity.test.ts index 03ddd2dd4..4baa662ff 100644 --- a/agents/src/voice/agent_activity.test.ts +++ b/agents/src/voice/agent_activity.test.ts @@ -16,9 +16,10 @@ */ import { Heap } from 'heap-js'; import { describe, expect, it, vi } from 'vitest'; -import type { ChatContext } from '../llm/chat_context.js'; +import { ChatContext } from '../llm/chat_context.js'; import { LLM, type LLMStream } from '../llm/llm.js'; import { Future } from '../utils.js'; +import { type VADEvent, VADEventType } from '../vad.js'; import { AgentActivity } from './agent_activity.js'; import type { PreemptiveGenerationInfo } from './audio_recognition.js'; import { SpeechHandle } from './speech_handle.js'; @@ -97,6 +98,69 @@ function buildMainTaskRunner() { }; } +function buildVadInterruptionRunner() { + const audioOutput = { + canPause: true, + pause: vi.fn(), + resume: vi.fn(), + }; + const audioRecognition = { + onEndOfAgentSpeech: vi.fn(), + cancelBackchannelBoundary: vi.fn(), + }; + const speechHandle = SpeechHandle.create({ allowInterruptions: true }); + + const fakeActivity: Record = { + turnDetection: 'vad', + isInterruptionByAudioActivityEnabled: true, + isDefaultInterruptionByAudioActivityEnabled: true, + isInterruptionDetectionEnabled: false, + _currentSpeech: speechHandle, + stt: undefined, + llm: undefined, + realtimeSession: undefined, + audioRecognition, + falseInterruptionTimer: undefined, + pausedSpeech: undefined, + agentSession: { + agentState: 'speaking', + _aecWarmupRemaining: 0, + _userSpeakingSpan: undefined, + output: { audio: audioOutput }, + sessionOptions: { + turnHandling: { + interruption: { + minDuration: 0, + minWords: 0, + resumeFalseInterruption: true, + falseInterruptionTimeout: 500, + }, + }, + }, + _updateAgentState: vi.fn((state: string) => { + (fakeActivity.agentSession as { agentState: string }).agentState = state; + }), + }, + }; + Object.setPrototypeOf(fakeActivity, AgentActivity.prototype); + + const vadEvent: VADEvent = { + type: VADEventType.INFERENCE_DONE, + samplesIndex: 0, + timestamp: Date.now(), + speechDuration: 1, + silenceDuration: 0, + frames: [], + probability: 1, + inferenceDuration: 0, + speaking: true, + rawAccumulatedSilence: 0, + rawAccumulatedSpeech: 1, + }; + + return { fakeActivity, audioOutput, audioRecognition, vadEvent }; +} + describe('AgentActivity - mainTask', () => { it('should recover when speech handle is interrupted after authorization', async () => { const { fakeActivity, mainTask, speechQueue, q_updated } = buildMainTaskRunner(); @@ -230,6 +294,83 @@ describe('AgentActivity - mainTask', () => { }); }); +describe('AgentActivity - VAD interruption', () => { + it('ends agent speech once when repeated VAD inference pauses the same speech', () => { + const { fakeActivity, audioOutput, audioRecognition, vadEvent } = buildVadInterruptionRunner(); + + fakeActivity.onVADInferenceDone(vadEvent); + fakeActivity.onVADInferenceDone(vadEvent); + + expect(audioOutput.pause).toHaveBeenCalledTimes(1); + expect(audioRecognition.onEndOfAgentSpeech).toHaveBeenCalledTimes(1); + expect((fakeActivity.agentSession as { agentState: string }).agentState).toBe('listening'); + }); +}); + +describe('AgentActivity - speech completion', () => { + it('ends audio recognition speech when pipeline completion moves session out of speaking', () => { + const audioRecognition = { + onEndOfAgentSpeech: vi.fn(), + }; + const fakeActivity = { + speechQueue: { + peek: () => undefined, + }, + _currentSpeech: { + done: () => true, + }, + audioRecognition, + agentSession: { + agentState: 'speaking', + _updateAgentState: vi.fn((state: string) => { + fakeActivity.agentSession.agentState = state; + }), + }, + }; + + const onPipelineReplyDone = (AgentActivity.prototype as Record) + .onPipelineReplyDone as (this: typeof fakeActivity) => void; + + onPipelineReplyDone.call(fakeActivity); + + expect(fakeActivity.agentSession._updateAgentState).toHaveBeenCalledWith('listening'); + expect(audioRecognition.onEndOfAgentSpeech).toHaveBeenCalledTimes(1); + }); +}); + +describe('AgentActivity - confirmed interruptions', () => { + it('ends audio recognition when confirmed interruption only has paused speech', () => { + const pausedSpeech = SpeechHandle.create({ allowInterruptions: true }); + const audioRecognition = { onEndOfAgentSpeech: vi.fn() }; + + const fakeActivity: Record = { + restoreInterruptionByAudioActivity: vi.fn(), + interruptByAudioActivity: vi.fn(() => false), + _currentSpeech: undefined, + pausedSpeech: { handle: pausedSpeech }, + falseInterruptionTimer: undefined, + audioRecognition, + agentSession: { agentState: 'listening' }, + }; + Object.setPrototypeOf(fakeActivity, AgentActivity.prototype); + + const onInterruption = (AgentActivity.prototype as Record).onInterruption as ( + this: typeof fakeActivity, + ev: Parameters[0], + ) => void; + + onInterruption.call(fakeActivity, { + type: 'overlapping_speech', + detectedAt: Date.now(), + isInterruption: true, + }); + + expect(pausedSpeech.interrupted).toBe(true); + expect(fakeActivity.pausedSpeech).toBeUndefined(); + expect(audioRecognition.onEndOfAgentSpeech).toHaveBeenCalledTimes(1); + }); +}); + /** * Unit tests for the preemptive-generation guards in AgentActivity. * @@ -268,7 +409,7 @@ function buildPreemptiveRunner(opts: Partial = {}) { ); const cancelPreemptiveGeneration = vi.fn(); - const fakeChatCtx = { copy: () => fakeChatCtx } as unknown as ChatContext; + const fakeChatCtx = new ChatContext(); const fakeActivity = { _preemptiveGenerationCount: 0, @@ -293,6 +434,7 @@ function buildPreemptiveRunner(opts: Partial = {}) { generateReply, cancelPreemptiveGeneration, }; + Object.setPrototypeOf(fakeActivity, AgentActivity.prototype); const onPreemptiveGeneration = (AgentActivity.prototype as Record) .onPreemptiveGeneration as (this: unknown, info: PreemptiveGenerationInfo) => void; @@ -374,4 +516,16 @@ describe('AgentActivity - onPreemptiveGeneration guards', () => { expect(generateReply).not.toHaveBeenCalled(); expect(cancelPreemptiveGeneration).not.toHaveBeenCalled(); }); + + it('skips preemption while a paused speech is still active', () => { + const { fakeActivity, generateReply, call } = buildPreemptiveRunner(); + const pausedSpeech = SpeechHandle.create({ allowInterruptions: true }); + + fakeActivity.pausedSpeech = { handle: pausedSpeech }; + + call(); + + expect(fakeActivity._preemptiveGenerationCount).toBe(0); + expect(generateReply).not.toHaveBeenCalled(); + }); }); diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts index e53f39b14..0decb4575 100644 --- a/agents/src/voice/agent_activity.ts +++ b/agents/src/voice/agent_activity.ts @@ -1191,19 +1191,19 @@ export class AgentActivity implements RecognitionHooks { } } - private interruptByAudioActivity(options?: { ignoreUserTranscriptUntil?: number }): void { + private interruptByAudioActivity(options?: { ignoreUserTranscriptUntil?: number }): boolean { if (!this.isInterruptionByAudioActivityEnabled) { - return; + return false; } if (this.agentSession._aecWarmupRemaining > 0) { // Disable interruption from audio activity while AEC warmup is active. - return; + return false; } if (this.llm instanceof RealtimeModel && this.llm.capabilities.turnDetection) { // skip speech handle interruption if server side turn detection is enabled - return; + return false; } // Refactored interruption word count check: @@ -1225,7 +1225,7 @@ export class AgentActivity implements RecognitionHooks { // Only allow interruption if word count meets or exceeds minInterruptionWords // This applies to all cases: empty strings, partial speech, and full speech if (wordCount < this.agentSession.sessionOptions.turnHandling.interruption?.minWords) { - return; + return false; } } @@ -1246,12 +1246,13 @@ export class AgentActivity implements RecognitionHooks { const timeout = this.agentSession.sessionOptions.turnHandling.interruption.falseInterruptionTimeout; const audioOutput = this.agentSession.output.audio; + if (this.pausedSpeech?.handle === this._currentSpeech) { + this.updatePausedSpeech(this._currentSpeech, timeout); + return false; + } + const wasAgentSpeaking = this.agentSession.agentState === 'speaking'; - if ( - this.isInterruptionDetectionEnabled && - this.audioRecognition && - this.agentSession.agentState === 'speaking' - ) { + if (this.isInterruptionDetectionEnabled && this.audioRecognition && wasAgentSpeaking) { this.audioRecognition.onStartOfOverlapSpeech( 0, Date.now(), @@ -1261,14 +1262,17 @@ export class AgentActivity implements RecognitionHooks { this.updatePausedSpeech(this._currentSpeech, timeout); audioOutput!.pause(); - this.agentSession._updateAgentState('listening'); - if (this.audioRecognition) { - this.audioRecognition.onEndOfAgentSpeech( - options?.ignoreUserTranscriptUntil ?? Date.now(), - ); - } - if (this.isInterruptionDetectionEnabled) { - this.restoreInterruptionByAudioActivity(); + if (wasAgentSpeaking) { + this.agentSession._updateAgentState('listening'); + if (this.audioRecognition) { + this.audioRecognition.onEndOfAgentSpeech( + options?.ignoreUserTranscriptUntil ?? Date.now(), + ); + } + if (this.isInterruptionDetectionEnabled) { + this.restoreInterruptionByAudioActivity(); + } + return true; } } else { this.logger.info( @@ -1279,14 +1283,22 @@ export class AgentActivity implements RecognitionHooks { this._currentSpeech.interrupt(); } } + return false; } onInterruption(ev: OverlappingSpeechEvent) { this.restoreInterruptionByAudioActivity(); - this.interruptByAudioActivity({ + + const endedAgentSpeech = this.interruptByAudioActivity({ ignoreUserTranscriptUntil: ev.overlapStartedAt || ev.detectedAt, }); - if (this.audioRecognition) { + const interruptedPausedSpeech = endedAgentSpeech ? undefined : this.interruptPausedSpeech(); + + if ( + !endedAgentSpeech && + this.audioRecognition && + (this.agentSession.agentState === 'speaking' || interruptedPausedSpeech !== undefined) + ) { this.audioRecognition.onEndOfAgentSpeech(ev.overlapStartedAt || ev.detectedAt); } } @@ -1374,6 +1386,7 @@ export class AgentActivity implements RecognitionHooks { if ( !preemptiveOpts.enabled || this.schedulingPaused || + (this.pausedSpeech !== undefined && !this.pausedSpeech.handle.interrupted) || (this._currentSpeech !== undefined && !this._currentSpeech.interrupted) || !(this.llm instanceof LLM) ) { @@ -1807,7 +1820,11 @@ export class AgentActivity implements RecognitionHooks { private onPipelineReplyDone(): void { if (!this.speechQueue.peek() && (!this._currentSpeech || this._currentSpeech.done())) { + const wasSpeaking = this.agentSession.agentState === 'speaking'; this.agentSession._updateAgentState('listening'); + if (wasSpeaking && this.audioRecognition) { + this.audioRecognition.onEndOfAgentSpeech(Date.now()); + } } } @@ -2562,6 +2579,10 @@ export class AgentActivity implements RecognitionHooks { this._backgroundSpeeches.delete(speechHandle); } + if (speechHandle.interrupted) { + return; + } + if (toolOutput.output.length === 0) return; // important: no agent output should be used after this point @@ -3586,6 +3607,25 @@ export class AgentActivity implements RecognitionHooks { }, timeout); } + private interruptPausedSpeech(): SpeechHandle | undefined { + if (this.falseInterruptionTimer !== undefined) { + clearTimeout(this.falseInterruptionTimer); + this.falseInterruptionTimer = undefined; + } + + if (!this.pausedSpeech) { + return undefined; + } + + const speechHandle = this.pausedSpeech.handle; + if (!speechHandle.interrupted && speechHandle.allowInterruptions) { + speechHandle.interrupt(); + } + this.pausedSpeech = undefined; + + return speechHandle; + } + private async cancelSpeechPause(options?: { interrupt?: boolean }): Promise { const { interrupt = true } = options ?? {}; diff --git a/agents/src/voice/audio_recognition.ts b/agents/src/voice/audio_recognition.ts index 894ca9d9f..093ae0b2e 100644 --- a/agents/src/voice/audio_recognition.ts +++ b/agents/src/voice/audio_recognition.ts @@ -12,9 +12,8 @@ import { context as otelContext, trace, } from '@opentelemetry/api'; +import type { ReadableStream, WritableStreamDefaultWriter } from 'node:stream/web'; import { TransformStream } from 'node:stream/web'; -import type { WritableStreamDefaultWriter } from 'node:stream/web'; -import type { ReadableStream } from 'node:stream/web'; import { isAPIError } from '../_exceptions.js'; import { apiConnectDefaults, intervalForRetry } from '../inference/interruption/defaults.js'; import { InterruptionDetectionError } from '../inference/interruption/errors.js'; @@ -461,8 +460,13 @@ export class AudioRecognition { async onEndOfAgentSpeech(ignoreUserTranscriptUntil: number) { this.cancelBackchannelBoundary(); - if (this.isAgentSpeaking) { - this.endpointing.onEndOfAgentSpeech(Date.now()); + const now = Date.now(); + const wasAgentSpeaking = this.isAgentSpeaking; + // Capture before the assignment below; the overlap-end notification only fires when no + // overlap had been registered during this agent speech. + const priorIgnoreUserTranscriptUntil = this.ignoreUserTranscriptUntil; + if (wasAgentSpeaking) { + this.endpointing.onEndOfAgentSpeech(now); } if (!this.isInterruptionEnabled) { @@ -470,32 +474,37 @@ export class AudioRecognition { return; } + let endCooldown = 0; + if (wasAgentSpeaking) { + endCooldown = this.backchannelBoundary ? this.backchannelBoundary[1] : 0; + const ignoreUntil = this.ignoreUserTranscriptUntil + ? Math.min(ignoreUserTranscriptUntil, this.ignoreUserTranscriptUntil) + : ignoreUserTranscriptUntil; + this.logger.trace({ ignoreUntil, endCooldown }, 'flushing held transcripts'); + // Subtracting `endCooldown` widens the release window so transcripts that ended just + // before the agent finished speaking (premature corrections) are surfaced. + this.ignoreUserTranscriptUntil = ignoreUntil - endCooldown; + } + // Clear before awaiting the sentinel so STT events arriving while the sentinel is in + // flight are not buffered. + this.isAgentSpeaking = false; + const inputOpen = await this.trySendInterruptionSentinel( InterruptionStreamSentinel.agentSpeechEnded(), ); if (!inputOpen) { - this.isAgentSpeaking = false; return; } - if (this.isAgentSpeaking) { - if (this.ignoreUserTranscriptUntil === undefined) { + if (wasAgentSpeaking) { + // Notify overlap end after the agent-speech-ended sentinel resets the inference stream + // so it does not emit a synthetic `isInterruption: false` event following a real + // interruption. + if (priorIgnoreUserTranscriptUntil === undefined) { this.onEndOfOverlapSpeech(Date.now()); } - - const endCooldown = this.backchannelBoundary ? this.backchannelBoundary[1] : 0; - const ignoreUntil = this.ignoreUserTranscriptUntil - ? Math.min(ignoreUserTranscriptUntil, this.ignoreUserTranscriptUntil) - : ignoreUserTranscriptUntil; - this.logger.trace({ ignoreUntil, endCooldown }, 'flushing held transcripts'); - // Subtracting `endCooldown` widens the release window so transcripts that ended just - // before the agent finished speaking (premature corrections) are surfaced. - this.ignoreUserTranscriptUntil = ignoreUntil - endCooldown; - - // flush held transcripts if possible await this.flushHeldTranscripts(endCooldown); } - this.isAgentSpeaking = false; } /** Start interruption inference when agent is speaking and overlap speech starts. */ diff --git a/agents/src/voice/audio_recognition_interruption.test.ts b/agents/src/voice/audio_recognition_interruption.test.ts new file mode 100644 index 000000000..3011f661a --- /dev/null +++ b/agents/src/voice/audio_recognition_interruption.test.ts @@ -0,0 +1,57 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 +import { describe, expect, it, vi } from 'vitest'; +import { ChatContext } from '../llm/chat_context.js'; +import { initializeLogger } from '../log.js'; +import { type SpeechEvent, SpeechEventType } from '../stt/stt.js'; +import { AudioRecognition, type RecognitionHooks } from './audio_recognition.js'; + +function createHooks(): RecognitionHooks { + return { + onInterruption: vi.fn(), + onStartOfSpeech: vi.fn(), + onVADInferenceDone: vi.fn(), + onEndOfSpeech: vi.fn(), + onInterimTranscript: vi.fn(), + onFinalTranscript: vi.fn(), + onPreemptiveGeneration: vi.fn(), + retrieveChatCtx: () => ChatContext.empty(), + onEndOfTurn: vi.fn(async () => true), + }; +} + +describe('AudioRecognition interruption buffering', () => { + initializeLogger({ pretty: false, level: 'silent' }); + + it('does not keep buffering final transcripts after agent speech end begins', async () => { + const hooks = createHooks(); + hooks.onEndOfTurn = vi.fn(async () => false); + const recognition = new AudioRecognition({ + recognitionHooks: hooks, + minEndpointingDelay: 0, + maxEndpointingDelay: 0, + }); + + await recognition.onStartOfAgentSpeech(Date.now()); + const internals = recognition as unknown as { + isInterruptionEnabled: boolean; + trySendInterruptionSentinel: () => Promise; + onSTTEvent: (ev: SpeechEvent) => Promise; + transcriptBuffer: unknown[]; + }; + internals.isInterruptionEnabled = true; + internals.trySendInterruptionSentinel = vi.fn(() => new Promise(() => {})); + + void recognition.onEndOfAgentSpeech(Date.now()); + const finalTranscript: SpeechEvent = { + type: SpeechEventType.FINAL_TRANSCRIPT, + alternatives: [{ text: 'still listening', confidence: 0.9 }], + }; + await internals.onSTTEvent(finalTranscript); + + expect(hooks.onFinalTranscript).toHaveBeenCalledTimes(1); + expect(recognition.currentTranscript).toBe('still listening'); + expect(internals.transcriptBuffer).toHaveLength(0); + }); +}); diff --git a/agents/src/voice/room_io/_output.test.ts b/agents/src/voice/room_io/_output.test.ts index 16e07d505..cd758fba0 100644 --- a/agents/src/voice/room_io/_output.test.ts +++ b/agents/src/voice/room_io/_output.test.ts @@ -5,6 +5,8 @@ import { describe, expect, it, vi } from 'vitest'; import { Future } from '../../utils.js'; import { ParticipantAudioOutput } from './_output.js'; +const nextTick = () => new Promise((resolve) => setImmediate(resolve)); + describe('ParticipantAudioOutput waitForPlayoutTask', () => { it('resets tracked duration after non-interrupted playout', async () => { let resolvePlayout!: () => void; @@ -95,4 +97,58 @@ describe('ParticipantAudioOutput waitForPlayoutTask', () => { interrupted: true, }); }); + + it('does not finish one segment twice when flush is called again before playout drains', async () => { + let resolvePlayout!: () => void; + const waitForPlayout = new Promise((resolve) => { + resolvePlayout = resolve; + }); + + const output = Object.create(ParticipantAudioOutput.prototype) as ParticipantAudioOutput & { + pushedDuration: number; + flushTask?: { done: boolean }; + flushPushedDuration?: number; + interruptedFuture: Future; + firstFrameEmitted: boolean; + audioSource: { + waitForPlayout: () => Promise; + queuedDuration: number; + clearQueue: () => void; + }; + onPlaybackFinished: (event: { playbackPosition: number; interrupted: boolean }) => void; + logger: { + error: () => void; + }; + }; + + const onPlaybackFinished = vi.fn(); + output.pushedDuration = 1.0; + output.interruptedFuture = new Future(); + output.firstFrameEmitted = true; + output.onPlaybackFinished = onPlaybackFinished; + output.logger = { + error: vi.fn(), + }; + output.audioSource = { + waitForPlayout: () => waitForPlayout, + queuedDuration: 0, + clearQueue: vi.fn(), + }; + + output.flush(); + await nextTick(); + + output.flush(); + await nextTick(); + + resolvePlayout(); + await nextTick(); + + expect(onPlaybackFinished).toHaveBeenCalledTimes(1); + expect(onPlaybackFinished).toHaveBeenCalledWith({ + playbackPosition: 1.0, + interrupted: false, + }); + expect(output.logger.error).not.toHaveBeenCalled(); + }); }); diff --git a/agents/src/voice/room_io/_output.ts b/agents/src/voice/room_io/_output.ts index c90c7e5a3..63772f142 100644 --- a/agents/src/voice/room_io/_output.ts +++ b/agents/src/voice/room_io/_output.ts @@ -375,6 +375,7 @@ export class ParticipantAudioOutput extends AudioOutput { private audioSource: AudioSource; private publication?: LocalTrackPublication; private flushTask?: Task; + private flushPushedDuration?: number; /** Duration of audio pushed to the source, in seconds */ private pushedDuration: number = 0; @@ -465,11 +466,24 @@ export class ParticipantAudioOutput extends AudioOutput { } if (this.flushTask && !this.flushTask.done) { + if (this.flushPushedDuration === this.pushedDuration) { + return; + } + this.logger.error('flush called while playback is in progress'); this.flushTask.cancel(); } - this.flushTask = Task.from((controller) => this.waitForPlayoutTask(controller)); + this.flushPushedDuration = this.pushedDuration; + const flushTask = Task.from((controller) => this.waitForPlayoutTask(controller)); + this.flushTask = flushTask; + void flushTask.result + .finally(() => { + if (this.flushTask === flushTask) { + this.flushPushedDuration = undefined; + } + }) + .catch(() => {}); } clearBuffer(): void {