From f5173bc11c5195a27a34b4f0be9cc6d0660de5a4 Mon Sep 17 00:00:00 2001 From: longcw Date: Mon, 18 May 2026 07:18:57 +0000 Subject: [PATCH 1/3] feat(voice): add user turn limits --- .changeset/user-turn-limit-options.md | 5 + agents/src/voice/agent.ts | 13 ++ agents/src/voice/agent_activity.ts | 119 +++++++++++++++++- agents/src/voice/audio_recognition.ts | 64 ++++++++++ agents/src/voice/events.ts | 34 +++++ agents/src/voice/index.ts | 1 + agents/src/voice/turn_config/turn_handling.ts | 7 ++ .../src/voice/turn_config/user_turn_limit.ts | 30 +++++ agents/src/voice/turn_config/utils.ts | 8 ++ 9 files changed, 280 insertions(+), 1 deletion(-) create mode 100644 .changeset/user-turn-limit-options.md create mode 100644 agents/src/voice/turn_config/user_turn_limit.ts diff --git a/.changeset/user-turn-limit-options.md b/.changeset/user-turn-limit-options.md new file mode 100644 index 000000000..e13e0bc0c --- /dev/null +++ b/.changeset/user-turn-limit-options.md @@ -0,0 +1,5 @@ +--- +"@livekit/agents": minor +--- + +Add user turn limit options for interrupting long user speech. diff --git a/agents/src/voice/agent.ts b/agents/src/voice/agent.ts index 890d7ea7d..782e1785a 100644 --- a/agents/src/voice/agent.ts +++ b/agents/src/voice/agent.ts @@ -33,6 +33,7 @@ import { Future, Task } from '../utils.js'; import type { VAD } from '../vad.js'; import { type AgentActivity, agentActivityStorage } from './agent_activity.js'; import type { AgentSession, TurnDetectionMode } from './agent_session.js'; +import type { UserTurnExceededEvent } from './events.js'; import type { TimedString } from './io.js'; import type { SpeechHandle } from './speech_handle.js'; import type { TurnHandlingOptions } from './turn_config/turn_handling.js'; @@ -297,6 +298,18 @@ export class Agent { async onUserTurnCompleted(_chatCtx: ChatContext, _newMessage: ChatMessage): Promise {} + async onUserTurnExceeded(ev: UserTurnExceededEvent): Promise { + await this.session.generateReply({ + userInput: ev.transcript, + instructions: + 'The user has been speaking too long without giving a chance to reply. ' + + 'Politely cut in with a short reply or notice. Keep it short since the user cannot interrupt it.', + allowInterruptions: false, + toolChoice: 'none', + inputModality: 'audio', + }); + } + async sttNode( audio: ReadableStream, modelSettings: ModelSettings, diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts index f2afae247..385e62372 100644 --- a/agents/src/voice/agent_activity.ts +++ b/agents/src/voice/agent_activity.ts @@ -61,9 +61,11 @@ import { IdleTimeoutError, Task, cancelAndWait, + delay, isDevMode, isHosted, waitFor, + waitForAbort, waitUntilTimeout, } from '../utils.js'; import { VAD, type VADEvent } from '../vad.js'; @@ -83,7 +85,7 @@ import { type RecognitionHooks, type STTPipeline, } from './audio_recognition.js'; -import type { AgentState } from './events.js'; +import type { AgentState, AgentStateChangedEvent, UserTurnExceededEvent } from './events.js'; import { AgentSessionEventTypes, createAgentFalseInterruptionEvent, @@ -223,6 +225,8 @@ export class AgentActivity implements RecognitionHooks { private pausedSpeech?: PausedSpeechInfo; private falseInterruptionTimer?: NodeJS.Timeout; private cancelSpeechPauseTask?: Promise; + private userTurnExceededLocked = false; + private userTurnExceededTask?: Task; private readonly onRealtimeGenerationCreated = (ev: GenerationCreatedEvent): void => this.onGenerationCreated(ev); @@ -521,6 +525,7 @@ export class AgentActivity implements RecognitionHooks { ...this.agentSession.sessionOptions.turnHandling.endpointing, ...(this.agent.turnHandling?.endpointing ?? {}), }), + userTurnLimit: this.agentSession.sessionOptions.turnHandling.userTurnLimit, rootSpanContext: this.agentSession.rootSpanContext, sttModel: this.stt?.label, sttProvider: this.getSttProvider(), @@ -1420,6 +1425,78 @@ export class AgentActivity implements RecognitionHooks { }; } + onUserTurnExceeded(ev: UserTurnExceededEvent): void { + if (this.userTurnExceededLocked) { + return; + } + + this.userTurnExceededTask?.cancel(); + this.userTurnExceededTask = this.createSpeechTask({ + taskFn: (controller) => this.runUserTurnExceededTask(ev, controller.signal), + name: 'AgentActivity.userTurnExceeded', + }); + } + + private async runUserTurnExceededTask( + ev: UserTurnExceededEvent, + signal: AbortSignal, + ): Promise { + // Let the current STT event finish scheduling the regular EOU task first. + await delay(0, { signal }); + + const agentSpeaking = new Future(); + const onAgentStateChanged = (stateEv: AgentStateChangedEvent): void => { + if (stateEv.newState === 'speaking' && !agentSpeaking.done) { + agentSpeaking.resolve(); + } + }; + + if (this.agentSession.agentState === 'speaking') { + agentSpeaking.resolve(); + } else { + this.agentSession.on(AgentSessionEventTypes.AgentStateChanged, onAgentStateChanged); + } + + const waitInactiveTask = Task.from( + () => this.waitForInactive({ waitForAgent: true, waitForUser: false }, signal), + undefined, + 'AgentActivity.waitForInactiveForUserTurnExceeded', + ); + + try { + await ThrowsPromise.race([ + agentSpeaking.await, + waitInactiveTask.result, + waitForAbort(signal), + ]); + if (signal.aborted) { + return; + } + if (agentSpeaking.done) { + return; + } + } finally { + this.agentSession.off(AgentSessionEventTypes.AgentStateChanged, onAgentStateChanged); + if (!waitInactiveTask.done) { + waitInactiveTask.cancel(); + } + } + + this.logger.debug( + { numWords: ev.accumulatedWordCount, duration: ev.duration }, + 'user turn limit exceeded', + ); + this.userTurnExceededLocked = true; + try { + await this.agent.onUserTurnExceeded(ev); + } catch (error) { + this.logger.error({ error }, 'error in onUserTurnExceeded callback'); + } finally { + this.userTurnExceededLocked = false; + this.userTurnExceededTask = undefined; + } + } + private cancelPreemptiveGeneration(): void { if (this._preemptiveGeneration !== undefined) { this._preemptiveGeneration.speechHandle._cancel(); @@ -1536,6 +1613,46 @@ export class AgentActivity implements RecognitionHooks { return this.agentSession.chatCtx; } + private async waitForInactive( + options: { waitForAgent?: boolean; waitForUser?: boolean }, + signal: AbortSignal, + ): Promise { + const waitForAgent = options.waitForAgent ?? true; + const waitForUser = options.waitForUser ?? true; + let agentActive = true; + let userActive = true; + + while ((waitForAgent && agentActive) || (waitForUser && userActive)) { + if (waitForAgent) { + if (this.audioRecognition) { + await ThrowsPromise.race([ + this.audioRecognition.waitForEndOfTurnTask(), + waitForAbort(signal), + ]); + } + + if (!this._currentSpeech && this.speechQueue.size() === 0) { + agentActive = false; + } else { + agentActive = true; + const currentUpdate = this.q_updated; + if (currentUpdate.done) { + await delay(0, { signal }); + } else { + await ThrowsPromise.race([currentUpdate.await, waitForAbort(signal)]); + } + } + } + + if (waitForUser) { + userActive = this.agentSession.userState !== 'listening'; + if (userActive) { + await delay(0, { signal }); + } + } + } + } + private async mainTask(signal: AbortSignal): Promise { const abortFuture = new Future(); const abortHandler = () => { diff --git a/agents/src/voice/audio_recognition.ts b/agents/src/voice/audio_recognition.ts index 3d2e03b21..cbf637dce 100644 --- a/agents/src/voice/audio_recognition.ts +++ b/agents/src/voice/audio_recognition.ts @@ -33,15 +33,18 @@ import { mergeReadableStreams } from '../stream/merge_readable_streams.js'; import { type StreamChannel, createStreamChannel } from '../stream/stream_channel.js'; import { type SpeechEvent, SpeechEventType } from '../stt/stt.js'; import { traceTypes, tracer } from '../telemetry/index.js'; +import { splitWords } from '../tokenize/basic/word.js'; import { Task, cancelAndWait, delay, readStream, waitForAbort } from '../utils.js'; import { type VAD, type VADEvent, VADEventType } from '../vad.js'; import type { TurnDetectionMode } from './agent_session.js'; +import { type UserTurnExceededEvent, createUserTurnExceededEvent } from './events.js'; import type { STTNode } from './io.js'; import { type BaseEndpointing, createEndpointing, defaultEndpointingOptions, } from './turn_config/endpointing.js'; +import type { UserTurnLimitOptions } from './turn_config/user_turn_limit.js'; import { setParticipantSpanAttributes } from './utils.js'; export interface EndOfTurnInfo { @@ -75,10 +78,17 @@ export interface RecognitionHooks { onFinalTranscript: (ev: SpeechEvent, speaking: boolean | undefined) => void; onEndOfTurn: (info: EndOfTurnInfo) => Promise; onPreemptiveGeneration: (info: PreemptiveGenerationInfo) => void; + onUserTurnExceeded: (ev: UserTurnExceededEvent) => void; retrieveChatCtx: () => ChatContext; } +interface UserTurnTracker { + words: number; + transcript: string; + startedAt?: number; +} + export class STTPipeline { static readonly PUMP_TASK_CANCEL_TIMEOUT = 5000; @@ -153,6 +163,8 @@ export interface AudioRecognitionOptions { backchannelBoundary?: number | [number, number] | null; /** Endpointing delay strategy. */ endpointing?: BaseEndpointing; + /** User turn limit configuration. */ + userTurnLimit?: UserTurnLimitOptions; /** @deprecated Use endpointing instead. */ minEndpointingDelay?: number; /** @deprecated Use endpointing instead. */ @@ -188,6 +200,7 @@ export class AudioRecognition { private turnDetector?: _TurnDetector; private turnDetectionMode?: TurnDetectionMode; private endpointing: BaseEndpointing; + private userTurnLimit?: UserTurnLimitOptions; private lastLanguage?: LanguageCode; private rootSpanContext?: Context; private sttModel?: string; @@ -209,6 +222,7 @@ export class AudioRecognition { private sampleRate?: number; private userTurnSpan?: Span; + private userTurnTracker: UserTurnTracker = { words: 0, transcript: '' }; // Provider-known STT ids for the current user turn. Written to the // `user_turn` span when it ends so we can correlate traces with the // provider's logs for debugging. @@ -264,6 +278,7 @@ export class AudioRecognition { this.vad = opts.vad; this.turnDetector = opts.turnDetector; this.turnDetectionMode = opts.turnDetectionMode; + this.userTurnLimit = opts.userTurnLimit; this.endpointing = opts.endpointing ?? createEndpointing({ @@ -446,6 +461,7 @@ export class AudioRecognition { async onStartOfAgentSpeech(startedAt: number) { this.isAgentSpeaking = true; this.endpointing.onStartOfAgentSpeech(startedAt); + this.userTurnTracker = { words: 0, transcript: '' }; if (this.backchannelBoundary && this.backchannelBoundary[0] > 0) { this.cancelBackchannelBoundary(); @@ -819,6 +835,8 @@ export class AudioRecognition { this.lastSpeakingTime = Date.now(); } + this.checkUserTurnLimit(transcript); + if (this.vadBaseTurnDetection || this.userTurnCommitted) { if (transcriptChanged) { this.logger.debug( @@ -1167,6 +1185,52 @@ export class AudioRecognition { }); } + async waitForEndOfTurnTask(): Promise { + if (!this.bounceEOUTask || this.bounceEOUTask.done) { + return; + } + + try { + await this.bounceEOUTask.result; + } catch (error) { + if (error instanceof Error && error.message.includes('This operation was aborted')) { + return; + } + throw error; + } + } + + private checkUserTurnLimit(transcript: string): void { + const maxWords = this.userTurnLimit?.maxWords ?? null; + const maxDuration = this.userTurnLimit?.maxDuration ?? null; + + if (maxWords === null && maxDuration === null) { + return; + } + + const now = Date.now(); + this.userTurnTracker.startedAt ??= this.speechStartTime ?? now; + this.userTurnTracker.words += splitWords(transcript, true).length; + this.userTurnTracker.transcript = `${this.userTurnTracker.transcript} ${transcript}`.trim(); + + const duration = now - this.userTurnTracker.startedAt; + const timeExceeded = maxDuration !== null && duration >= maxDuration; + const wordsExceeded = maxWords !== null && this.userTurnTracker.words >= maxWords; + + if (!timeExceeded && !wordsExceeded) { + return; + } + + this.hooks.onUserTurnExceeded( + createUserTurnExceededEvent({ + transcript: this.currentTranscript, + accumulatedTranscript: this.userTurnTracker.transcript, + accumulatedWordCount: this.userTurnTracker.words, + duration, + }), + ); + } + private startSttTasks(reusePipeline?: STTPipeline) { if (!this.stt) return; diff --git a/agents/src/voice/events.ts b/agents/src/voice/events.ts index 8c39d6d47..55cd13e0f 100644 --- a/agents/src/voice/events.ts +++ b/agents/src/voice/events.ts @@ -245,6 +245,40 @@ export const createSpeechCreatedEvent = ({ createdAt, }); +export type UserTurnExceededEvent = { + type: 'user_turn_exceeded'; + /** Transcript from the current uncommitted user turn only. */ + transcript: string; + /** Full transcript since the start of user speaking in the accumulation window. */ + accumulatedTranscript: string; + /** Total word count since the start of user speaking in the accumulation window. */ + accumulatedWordCount: number; + /** Duration of the user turn accumulation window in milliseconds. */ + duration: number; + createdAt: number; +}; + +export const createUserTurnExceededEvent = ({ + transcript, + accumulatedTranscript, + accumulatedWordCount, + duration, + createdAt = Date.now(), +}: { + transcript: string; + accumulatedTranscript: string; + accumulatedWordCount: number; + duration: number; + createdAt?: number; +}): UserTurnExceededEvent => ({ + type: 'user_turn_exceeded', + transcript, + accumulatedTranscript, + accumulatedWordCount, + duration, + createdAt, +}); + export type ErrorEvent = { type: 'error'; error: RealtimeModelError | STTError | TTSError | LLMError | InterruptionDetectionError | unknown; diff --git a/agents/src/voice/index.ts b/agents/src/voice/index.ts index e8813e460..16998da50 100644 --- a/agents/src/voice/index.ts +++ b/agents/src/voice/index.ts @@ -40,5 +40,6 @@ export * from './report.js'; export * from './room_io/index.js'; export { RunContext } from './run_context.js'; export * from './turn_config/endpointing.js'; +export * from './turn_config/user_turn_limit.js'; export * as testing from './testing/index.js'; export * as textTransforms from './transcription/text_transforms.js'; diff --git a/agents/src/voice/turn_config/turn_handling.ts b/agents/src/voice/turn_config/turn_handling.ts index 786c034de..a7d3a51be 100644 --- a/agents/src/voice/turn_config/turn_handling.ts +++ b/agents/src/voice/turn_config/turn_handling.ts @@ -8,6 +8,7 @@ import { type PreemptiveGenerationOptions, defaultPreemptiveGenerationOptions, } from './preemptive_generation.js'; +import { type UserTurnLimitOptions, defaultUserTurnLimitOptions } from './user_turn_limit.js'; /** * Configuration for the turn handling system. Used to configure the turn taking behavior of the @@ -52,12 +53,17 @@ export interface TurnHandlingOptions { * Preemptive generation configuration. Use `{ enabled: false }` to disable. */ preemptiveGeneration: Partial; + /** + * User turn limit configuration. Use `{ maxWords: 50 }` to enable. + */ + userTurnLimit?: Partial; } export interface InternalTurnHandlingOptions extends TurnHandlingOptions { endpointing: EndpointingOptions; interruption: InterruptionOptions; preemptiveGeneration: PreemptiveGenerationOptions; + userTurnLimit: UserTurnLimitOptions; } export const defaultTurnHandlingOptions: InternalTurnHandlingOptions = { @@ -65,4 +71,5 @@ export const defaultTurnHandlingOptions: InternalTurnHandlingOptions = { interruption: defaultInterruptionOptions, endpointing: defaultEndpointingOptions, preemptiveGeneration: defaultPreemptiveGenerationOptions, + userTurnLimit: defaultUserTurnLimitOptions, }; diff --git a/agents/src/voice/turn_config/user_turn_limit.ts b/agents/src/voice/turn_config/user_turn_limit.ts new file mode 100644 index 000000000..7c2cba761 --- /dev/null +++ b/agents/src/voice/turn_config/user_turn_limit.ts @@ -0,0 +1,30 @@ +// SPDX-FileCopyrightText: 2026 LiveKit, Inc. +// +// SPDX-License-Identifier: Apache-2.0 + +/** + * Configuration for detecting when a user has been speaking too long without the agent + * successfully responding. + * + * The framework tracks accumulated word count and wall-clock duration across consecutive user + * turns. Counters only reset when the agent transitions to speaking state. + * + * Both thresholds default to `null` (disabled). Set at least one to enable the feature. + */ +export interface UserTurnLimitOptions { + /** + * Maximum accumulated word count before triggering. `null` disables word-based limiting. + */ + maxWords: number | null; + + /** + * Maximum wall-clock duration in milliseconds since the user first started speaking in the + * current accumulation window. `null` disables duration-based limiting. + */ + maxDuration: number | null; +} + +export const defaultUserTurnLimitOptions: UserTurnLimitOptions = { + maxWords: null, + maxDuration: null, +}; diff --git a/agents/src/voice/turn_config/utils.ts b/agents/src/voice/turn_config/utils.ts index 5886ce354..8db23d49a 100644 --- a/agents/src/voice/turn_config/utils.ts +++ b/agents/src/voice/turn_config/utils.ts @@ -12,6 +12,7 @@ import { defaultEndpointingOptions } from './endpointing.js'; import { defaultInterruptionOptions } from './interruption.js'; import { defaultPreemptiveGenerationOptions } from './preemptive_generation.js'; import { type TurnHandlingOptions, defaultTurnHandlingOptions } from './turn_handling.js'; +import { defaultUserTurnLimitOptions } from './user_turn_limit.js'; const defaultSessionOptions = { maxToolSteps: 3, @@ -68,6 +69,9 @@ export function migrateLegacyOptions(legacyOptions: AgentSessionOption preemptiveGeneration: { ...sessionOptions.turnHandling?.preemptiveGeneration, }, + userTurnLimit: { + ...sessionOptions.turnHandling?.userTurnLimit, + }, turnDetection: sessionOptions?.turnHandling?.turnDetection ?? turnDetection, } as const; @@ -137,6 +141,10 @@ export function mergeWithDefaults(config: TurnHandlingOptions) { ...defaultPreemptiveGenerationOptions, ...stripUndefined(config.preemptiveGeneration ?? {}), }, + userTurnLimit: { + ...defaultUserTurnLimitOptions, + ...stripUndefined(config.userTurnLimit ?? {}), + }, } as const; } From 54bd8f455c653d0a100ccfe023257b878416101f Mon Sep 17 00:00:00 2001 From: longcw Date: Mon, 18 May 2026 07:24:42 +0000 Subject: [PATCH 2/3] fix(voice): handle user turn limit waits --- agents/src/voice/agent_activity.ts | 27 +++++++++++++++++++++++---- 1 file changed, 23 insertions(+), 4 deletions(-) diff --git a/agents/src/voice/agent_activity.ts b/agents/src/voice/agent_activity.ts index 385e62372..c3f695c40 100644 --- a/agents/src/voice/agent_activity.ts +++ b/agents/src/voice/agent_activity.ts @@ -1625,10 +1625,11 @@ export class AgentActivity implements RecognitionHooks { while ((waitForAgent && agentActive) || (waitForUser && userActive)) { if (waitForAgent) { if (this.audioRecognition) { - await ThrowsPromise.race([ + await this.waitForOrAbort( this.audioRecognition.waitForEndOfTurnTask(), - waitForAbort(signal), - ]); + signal, + 'error waiting for end-of-turn task', + ); } if (!this._currentSpeech && this.speechQueue.size() === 0) { @@ -1639,7 +1640,11 @@ export class AgentActivity implements RecognitionHooks { if (currentUpdate.done) { await delay(0, { signal }); } else { - await ThrowsPromise.race([currentUpdate.await, waitForAbort(signal)]); + await this.waitForOrAbort( + currentUpdate.await, + signal, + 'error waiting for speech queue update', + ); } } } @@ -1653,6 +1658,20 @@ export class AgentActivity implements RecognitionHooks { } } + private async waitForOrAbort( + promise: Promise, + signal: AbortSignal, + errorMessage: string, + ): Promise { + try { + await Promise.race([promise, waitForAbort(signal)]); + } catch (error) { + if (!signal.aborted) { + this.logger.error({ error }, errorMessage); + } + } + } + private async mainTask(signal: AbortSignal): Promise { const abortFuture = new Future(); const abortHandler = () => { From f9a391e241d9e29db4a9954c3992264864be86a5 Mon Sep 17 00:00:00 2001 From: longcw Date: Mon, 18 May 2026 07:28:38 +0000 Subject: [PATCH 3/3] fix(voice): include user turn limit defaults in test --- agents/src/voice/turn_config/utils.test.ts | 2 ++ 1 file changed, 2 insertions(+) diff --git a/agents/src/voice/turn_config/utils.test.ts b/agents/src/voice/turn_config/utils.test.ts index 90010c2c6..d2aef0a74 100644 --- a/agents/src/voice/turn_config/utils.test.ts +++ b/agents/src/voice/turn_config/utils.test.ts @@ -8,6 +8,7 @@ import { defaultEndpointingOptions } from './endpointing.js'; import { defaultInterruptionOptions } from './interruption.js'; import { defaultPreemptiveGenerationOptions } from './preemptive_generation.js'; import { defaultTurnHandlingOptions } from './turn_handling.js'; +import { defaultUserTurnLimitOptions } from './user_turn_limit.js'; import { migrateLegacyOptions, migrateTurnHandling } from './utils.js'; beforeAll(() => { @@ -23,6 +24,7 @@ describe('migrateLegacyOptions', () => { endpointing: defaultEndpointingOptions, interruption: defaultInterruptionOptions, preemptiveGeneration: defaultPreemptiveGenerationOptions, + userTurnLimit: defaultUserTurnLimitOptions, }); expect(result.maxToolSteps).toBe(defaultAgentSessionOptions.maxToolSteps); expect(result.userAwayTimeout).toBe(defaultAgentSessionOptions.userAwayTimeout);