Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/six-dryers-fry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@livekit/agents": patch
---

Fix playback flush and speech interruption races
158 changes: 156 additions & 2 deletions agents/src/voice/agent_activity.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,10 @@
*/
import { Heap } from 'heap-js';
import { describe, expect, it, vi } from 'vitest';
import type { ChatContext } from '../llm/chat_context.js';
import { ChatContext } from '../llm/chat_context.js';
import { LLM, type LLMStream } from '../llm/llm.js';
import { Future } from '../utils.js';
import { type VADEvent, VADEventType } from '../vad.js';
import { AgentActivity } from './agent_activity.js';
import type { PreemptiveGenerationInfo } from './audio_recognition.js';
import { SpeechHandle } from './speech_handle.js';
Expand Down Expand Up @@ -97,6 +98,69 @@ function buildMainTaskRunner() {
};
}

function buildVadInterruptionRunner() {
const audioOutput = {
canPause: true,
pause: vi.fn(),
resume: vi.fn(),
};
const audioRecognition = {
onEndOfAgentSpeech: vi.fn(),
cancelBackchannelBoundary: vi.fn(),
};
const speechHandle = SpeechHandle.create({ allowInterruptions: true });

const fakeActivity: Record<string, unknown> = {
turnDetection: 'vad',
isInterruptionByAudioActivityEnabled: true,
isDefaultInterruptionByAudioActivityEnabled: true,
isInterruptionDetectionEnabled: false,
_currentSpeech: speechHandle,
stt: undefined,
llm: undefined,
realtimeSession: undefined,
audioRecognition,
falseInterruptionTimer: undefined,
pausedSpeech: undefined,
agentSession: {
agentState: 'speaking',
_aecWarmupRemaining: 0,
_userSpeakingSpan: undefined,
output: { audio: audioOutput },
sessionOptions: {
turnHandling: {
interruption: {
minDuration: 0,
minWords: 0,
resumeFalseInterruption: true,
falseInterruptionTimeout: 500,
},
},
},
_updateAgentState: vi.fn((state: string) => {
(fakeActivity.agentSession as { agentState: string }).agentState = state;
}),
},
};
Object.setPrototypeOf(fakeActivity, AgentActivity.prototype);

const vadEvent: VADEvent = {
type: VADEventType.INFERENCE_DONE,
samplesIndex: 0,
timestamp: Date.now(),
speechDuration: 1,
silenceDuration: 0,
frames: [],
probability: 1,
inferenceDuration: 0,
speaking: true,
rawAccumulatedSilence: 0,
rawAccumulatedSpeech: 1,
};

return { fakeActivity, audioOutput, audioRecognition, vadEvent };
}

describe('AgentActivity - mainTask', () => {
it('should recover when speech handle is interrupted after authorization', async () => {
const { fakeActivity, mainTask, speechQueue, q_updated } = buildMainTaskRunner();
Expand Down Expand Up @@ -230,6 +294,83 @@ describe('AgentActivity - mainTask', () => {
});
});

describe('AgentActivity - VAD interruption', () => {
it('ends agent speech once when repeated VAD inference pauses the same speech', () => {
const { fakeActivity, audioOutput, audioRecognition, vadEvent } = buildVadInterruptionRunner();

fakeActivity.onVADInferenceDone(vadEvent);
fakeActivity.onVADInferenceDone(vadEvent);

expect(audioOutput.pause).toHaveBeenCalledTimes(1);
expect(audioRecognition.onEndOfAgentSpeech).toHaveBeenCalledTimes(1);
expect((fakeActivity.agentSession as { agentState: string }).agentState).toBe('listening');
});
});

describe('AgentActivity - speech completion', () => {
it('ends audio recognition speech when pipeline completion moves session out of speaking', () => {
const audioRecognition = {
onEndOfAgentSpeech: vi.fn(),
};
const fakeActivity = {
speechQueue: {
peek: () => undefined,
},
_currentSpeech: {
done: () => true,
},
audioRecognition,
agentSession: {
agentState: 'speaking',
_updateAgentState: vi.fn((state: string) => {
fakeActivity.agentSession.agentState = state;
}),
},
};

const onPipelineReplyDone = (AgentActivity.prototype as Record<string, unknown>)
.onPipelineReplyDone as (this: typeof fakeActivity) => void;

onPipelineReplyDone.call(fakeActivity);

expect(fakeActivity.agentSession._updateAgentState).toHaveBeenCalledWith('listening');
expect(audioRecognition.onEndOfAgentSpeech).toHaveBeenCalledTimes(1);
});
});

describe('AgentActivity - confirmed interruptions', () => {
it('ends audio recognition when confirmed interruption only has paused speech', () => {
const pausedSpeech = SpeechHandle.create({ allowInterruptions: true });
const audioRecognition = { onEndOfAgentSpeech: vi.fn() };

const fakeActivity: Record<string, unknown> = {
restoreInterruptionByAudioActivity: vi.fn(),
interruptByAudioActivity: vi.fn(() => false),
_currentSpeech: undefined,
pausedSpeech: { handle: pausedSpeech },
falseInterruptionTimer: undefined,
audioRecognition,
agentSession: { agentState: 'listening' },
};
Object.setPrototypeOf(fakeActivity, AgentActivity.prototype);

const onInterruption = (AgentActivity.prototype as Record<string, unknown>).onInterruption as (
this: typeof fakeActivity,
ev: Parameters<AgentActivity['onInterruption']>[0],
) => void;

onInterruption.call(fakeActivity, {
type: 'overlapping_speech',
detectedAt: Date.now(),
isInterruption: true,
});

expect(pausedSpeech.interrupted).toBe(true);
expect(fakeActivity.pausedSpeech).toBeUndefined();
expect(audioRecognition.onEndOfAgentSpeech).toHaveBeenCalledTimes(1);
});
});

/**
* Unit tests for the preemptive-generation guards in AgentActivity.
*
Expand Down Expand Up @@ -268,7 +409,7 @@ function buildPreemptiveRunner(opts: Partial<PreemptiveOpts> = {}) {
);
const cancelPreemptiveGeneration = vi.fn();

const fakeChatCtx = { copy: () => fakeChatCtx } as unknown as ChatContext;
const fakeChatCtx = new ChatContext();

const fakeActivity = {
_preemptiveGenerationCount: 0,
Expand All @@ -293,6 +434,7 @@ function buildPreemptiveRunner(opts: Partial<PreemptiveOpts> = {}) {
generateReply,
cancelPreemptiveGeneration,
};
Object.setPrototypeOf(fakeActivity, AgentActivity.prototype);

const onPreemptiveGeneration = (AgentActivity.prototype as Record<string, unknown>)
.onPreemptiveGeneration as (this: unknown, info: PreemptiveGenerationInfo) => void;
Expand Down Expand Up @@ -374,4 +516,16 @@ describe('AgentActivity - onPreemptiveGeneration guards', () => {
expect(generateReply).not.toHaveBeenCalled();
expect(cancelPreemptiveGeneration).not.toHaveBeenCalled();
});

it('skips preemption while a paused speech is still active', () => {
const { fakeActivity, generateReply, call } = buildPreemptiveRunner();
const pausedSpeech = SpeechHandle.create({ allowInterruptions: true });

fakeActivity.pausedSpeech = { handle: pausedSpeech };

call();

expect(fakeActivity._preemptiveGenerationCount).toBe(0);
expect(generateReply).not.toHaveBeenCalled();
});
});
80 changes: 60 additions & 20 deletions agents/src/voice/agent_activity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1191,19 +1191,19 @@ export class AgentActivity implements RecognitionHooks {
}
}

private interruptByAudioActivity(options?: { ignoreUserTranscriptUntil?: number }): void {
private interruptByAudioActivity(options?: { ignoreUserTranscriptUntil?: number }): boolean {
if (!this.isInterruptionByAudioActivityEnabled) {
return;
return false;
}

if (this.agentSession._aecWarmupRemaining > 0) {
// Disable interruption from audio activity while AEC warmup is active.
return;
return false;
}

if (this.llm instanceof RealtimeModel && this.llm.capabilities.turnDetection) {
// skip speech handle interruption if server side turn detection is enabled
return;
return false;
}

// Refactored interruption word count check:
Expand All @@ -1225,7 +1225,7 @@ export class AgentActivity implements RecognitionHooks {
// Only allow interruption if word count meets or exceeds minInterruptionWords
// This applies to all cases: empty strings, partial speech, and full speech
if (wordCount < this.agentSession.sessionOptions.turnHandling.interruption?.minWords) {
return;
return false;
}
}

Expand All @@ -1246,12 +1246,13 @@ export class AgentActivity implements RecognitionHooks {
const timeout =
this.agentSession.sessionOptions.turnHandling.interruption.falseInterruptionTimeout;
const audioOutput = this.agentSession.output.audio;
if (this.pausedSpeech?.handle === this._currentSpeech) {
this.updatePausedSpeech(this._currentSpeech, timeout);
return false;
}
const wasAgentSpeaking = this.agentSession.agentState === 'speaking';

if (
this.isInterruptionDetectionEnabled &&
this.audioRecognition &&
this.agentSession.agentState === 'speaking'
) {
if (this.isInterruptionDetectionEnabled && this.audioRecognition && wasAgentSpeaking) {
this.audioRecognition.onStartOfOverlapSpeech(
0,
Date.now(),
Expand All @@ -1261,14 +1262,17 @@ export class AgentActivity implements RecognitionHooks {

this.updatePausedSpeech(this._currentSpeech, timeout);
audioOutput!.pause();
this.agentSession._updateAgentState('listening');
if (this.audioRecognition) {
this.audioRecognition.onEndOfAgentSpeech(
options?.ignoreUserTranscriptUntil ?? Date.now(),
);
}
if (this.isInterruptionDetectionEnabled) {
this.restoreInterruptionByAudioActivity();
if (wasAgentSpeaking) {
this.agentSession._updateAgentState('listening');
if (this.audioRecognition) {
this.audioRecognition.onEndOfAgentSpeech(
options?.ignoreUserTranscriptUntil ?? Date.now(),
);
}
if (this.isInterruptionDetectionEnabled) {
this.restoreInterruptionByAudioActivity();
}
return true;
}
} else {
this.logger.info(
Expand All @@ -1279,14 +1283,22 @@ export class AgentActivity implements RecognitionHooks {
this._currentSpeech.interrupt();
}
}
return false;
}

onInterruption(ev: OverlappingSpeechEvent) {
this.restoreInterruptionByAudioActivity();
this.interruptByAudioActivity({

const endedAgentSpeech = this.interruptByAudioActivity({
ignoreUserTranscriptUntil: ev.overlapStartedAt || ev.detectedAt,
});
if (this.audioRecognition) {
const interruptedPausedSpeech = endedAgentSpeech ? undefined : this.interruptPausedSpeech();

if (
!endedAgentSpeech &&
this.audioRecognition &&
(this.agentSession.agentState === 'speaking' || interruptedPausedSpeech !== undefined)
) {
this.audioRecognition.onEndOfAgentSpeech(ev.overlapStartedAt || ev.detectedAt);
}
}
Expand Down Expand Up @@ -1374,6 +1386,7 @@ export class AgentActivity implements RecognitionHooks {
if (
!preemptiveOpts.enabled ||
this.schedulingPaused ||
(this.pausedSpeech !== undefined && !this.pausedSpeech.handle.interrupted) ||
(this._currentSpeech !== undefined && !this._currentSpeech.interrupted) ||
!(this.llm instanceof LLM)
) {
Expand Down Expand Up @@ -1807,7 +1820,11 @@ export class AgentActivity implements RecognitionHooks {

private onPipelineReplyDone(): void {
if (!this.speechQueue.peek() && (!this._currentSpeech || this._currentSpeech.done())) {
const wasSpeaking = this.agentSession.agentState === 'speaking';
this.agentSession._updateAgentState('listening');
if (wasSpeaking && this.audioRecognition) {
this.audioRecognition.onEndOfAgentSpeech(Date.now());
}
}
}

Expand Down Expand Up @@ -2562,6 +2579,10 @@ export class AgentActivity implements RecognitionHooks {
this._backgroundSpeeches.delete(speechHandle);
}

if (speechHandle.interrupted) {
return;
Comment thread
toubatbrian marked this conversation as resolved.
}

if (toolOutput.output.length === 0) return;

// important: no agent output should be used after this point
Expand Down Expand Up @@ -3586,6 +3607,25 @@ export class AgentActivity implements RecognitionHooks {
}, timeout);
}

private interruptPausedSpeech(): SpeechHandle | undefined {
if (this.falseInterruptionTimer !== undefined) {
clearTimeout(this.falseInterruptionTimer);
this.falseInterruptionTimer = undefined;
}

if (!this.pausedSpeech) {
return undefined;
}

const speechHandle = this.pausedSpeech.handle;
if (!speechHandle.interrupted && speechHandle.allowInterruptions) {
speechHandle.interrupt();
}
this.pausedSpeech = undefined;

return speechHandle;
}
Comment thread
toubatbrian marked this conversation as resolved.

private async cancelSpeechPause(options?: { interrupt?: boolean }): Promise<void> {
const { interrupt = true } = options ?? {};

Expand Down
Loading
Loading