Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/user-turn-limit-options.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"@livekit/agents": minor
---

Add user turn limit options for interrupting long user speech.
13 changes: 13 additions & 0 deletions agents/src/voice/agent.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import { Future, Task } from '../utils.js';
import type { VAD } from '../vad.js';
import { type AgentActivity, agentActivityStorage } from './agent_activity.js';
import type { AgentSession, TurnDetectionMode } from './agent_session.js';
import type { UserTurnExceededEvent } from './events.js';
import type { TimedString } from './io.js';
import type { SpeechHandle } from './speech_handle.js';
import type { TurnHandlingOptions } from './turn_config/turn_handling.js';
Expand Down Expand Up @@ -297,6 +298,18 @@ export class Agent<UserData = any> {

async onUserTurnCompleted(_chatCtx: ChatContext, _newMessage: ChatMessage): Promise<void> {}

async onUserTurnExceeded(ev: UserTurnExceededEvent): Promise<void> {
await this.session.generateReply({
userInput: ev.transcript,
instructions:
'The user has been speaking too long without giving a chance to reply. ' +
'Politely cut in with a short reply or notice. Keep it short since the user cannot interrupt it.',
allowInterruptions: false,
toolChoice: 'none',
inputModality: 'audio',
});
}

async sttNode(
audio: ReadableStream<AudioFrame>,
modelSettings: ModelSettings,
Expand Down
138 changes: 137 additions & 1 deletion agents/src/voice/agent_activity.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,11 @@ import {
IdleTimeoutError,
Task,
cancelAndWait,
delay,
isDevMode,
isHosted,
waitFor,
waitForAbort,
waitUntilTimeout,
} from '../utils.js';
import { VAD, type VADEvent } from '../vad.js';
Expand All @@ -83,7 +85,7 @@ import {
type RecognitionHooks,
type STTPipeline,
} from './audio_recognition.js';
import type { AgentState } from './events.js';
import type { AgentState, AgentStateChangedEvent, UserTurnExceededEvent } from './events.js';
import {
AgentSessionEventTypes,
createAgentFalseInterruptionEvent,
Expand Down Expand Up @@ -223,6 +225,8 @@ export class AgentActivity implements RecognitionHooks {
private pausedSpeech?: PausedSpeechInfo;
private falseInterruptionTimer?: NodeJS.Timeout;
private cancelSpeechPauseTask?: Promise<void>;
private userTurnExceededLocked = false;
private userTurnExceededTask?: Task<void>;

private readonly onRealtimeGenerationCreated = (ev: GenerationCreatedEvent): void =>
this.onGenerationCreated(ev);
Expand Down Expand Up @@ -521,6 +525,7 @@ export class AgentActivity implements RecognitionHooks {
...this.agentSession.sessionOptions.turnHandling.endpointing,
...(this.agent.turnHandling?.endpointing ?? {}),
}),
userTurnLimit: this.agentSession.sessionOptions.turnHandling.userTurnLimit,
rootSpanContext: this.agentSession.rootSpanContext,
sttModel: this.stt?.label,
sttProvider: this.getSttProvider(),
Expand Down Expand Up @@ -1420,6 +1425,78 @@ export class AgentActivity implements RecognitionHooks {
};
}

onUserTurnExceeded(ev: UserTurnExceededEvent): void {
if (this.userTurnExceededLocked) {
return;
}

this.userTurnExceededTask?.cancel();
this.userTurnExceededTask = this.createSpeechTask({
taskFn: (controller) => this.runUserTurnExceededTask(ev, controller.signal),
name: 'AgentActivity.userTurnExceeded',
});
}

private async runUserTurnExceededTask(
ev: UserTurnExceededEvent,
signal: AbortSignal,
): Promise<void> {
// Let the current STT event finish scheduling the regular EOU task first.
await delay(0, { signal });

const agentSpeaking = new Future<void, never>();
const onAgentStateChanged = (stateEv: AgentStateChangedEvent): void => {
if (stateEv.newState === 'speaking' && !agentSpeaking.done) {
agentSpeaking.resolve();
}
};

if (this.agentSession.agentState === 'speaking') {
agentSpeaking.resolve();
} else {
this.agentSession.on(AgentSessionEventTypes.AgentStateChanged, onAgentStateChanged);
}

const waitInactiveTask = Task.from(
() => this.waitForInactive({ waitForAgent: true, waitForUser: false }, signal),
undefined,
'AgentActivity.waitForInactiveForUserTurnExceeded',
);
Comment on lines +1460 to +1464
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 waitInactiveTask.cancel() is ineffective because the task ignores its own AbortController

In runUserTurnExceededTask, the waitInactiveTask is created with Task.from(() => this.waitForInactive(..., signal), ...) where signal is the outer task's signal. The arrow function ignores the Task's own AbortController (it doesn't destructure the controller parameter). When waitInactiveTask.cancel() is called in the finally block at line 1481, it aborts the Task's internal controller — but waitForInactive is listening to the outer signal, not the Task's controller. This means the cancel is effectively a no-op: the waitForInactive promise continues running as a detached background operation.

Additionally, since waitInactiveTask.result is part of the ThrowsPromise.race at line 1467-1471, if the race resolves via agentSpeaking or waitForAbort(signal), the losing waitInactiveTask.result promise can later reject (e.g., when waitForInactive hits delay(0, { signal }) after signal abort), causing an unhandled promise rejection.

Prompt for agents
In agents/src/voice/agent_activity.ts, the `waitInactiveTask` at line 1460-1464 in `runUserTurnExceededTask` is created with `Task.from(() => this.waitForInactive(..., signal), ...)`. The problem is the function ignores the Task's own AbortController and passes the outer `signal` to `waitForInactive`. This means `waitInactiveTask.cancel()` at line 1481 does not actually cancel the underlying work.

To fix this, the task should use its own controller's signal so that `cancel()` works correctly. You also need to link the outer signal to the inner task so that outer cancellation propagates. For example:

1. Change the Task.from to use the task's controller: `Task.from((controller) => this.waitForInactive({...}, controller.signal), ...)`
2. Add a signal listener to propagate the outer abort: `const onAbort = () => waitInactiveTask.cancel(); signal.addEventListener('abort', onAbort, { once: true });` and clean it up in the finally block.
3. Add a `.catch(() => {})` on `waitInactiveTask.result` before the race (or after the race resolves) to prevent unhandled promise rejections from the losing promise.
Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.


try {
await ThrowsPromise.race([
agentSpeaking.await,
waitInactiveTask.result,
waitForAbort(signal),
]);
if (signal.aborted) {
return;
}
if (agentSpeaking.done) {
return;
}
} finally {
this.agentSession.off(AgentSessionEventTypes.AgentStateChanged, onAgentStateChanged);
if (!waitInactiveTask.done) {
waitInactiveTask.cancel();
}
}

this.logger.debug(
{ numWords: ev.accumulatedWordCount, duration: ev.duration },
'user turn limit exceeded',
);
this.userTurnExceededLocked = true;
try {
await this.agent.onUserTurnExceeded(ev);
} catch (error) {
this.logger.error({ error }, 'error in onUserTurnExceeded callback');
} finally {
this.userTurnExceededLocked = false;
this.userTurnExceededTask = undefined;
}
}

private cancelPreemptiveGeneration(): void {
if (this._preemptiveGeneration !== undefined) {
this._preemptiveGeneration.speechHandle._cancel();
Expand Down Expand Up @@ -1536,6 +1613,65 @@ export class AgentActivity implements RecognitionHooks {
return this.agentSession.chatCtx;
}

private async waitForInactive(
options: { waitForAgent?: boolean; waitForUser?: boolean },
signal: AbortSignal,
): Promise<void> {
const waitForAgent = options.waitForAgent ?? true;
const waitForUser = options.waitForUser ?? true;
let agentActive = true;
let userActive = true;

while ((waitForAgent && agentActive) || (waitForUser && userActive)) {
if (waitForAgent) {
if (this.audioRecognition) {
await this.waitForOrAbort(
this.audioRecognition.waitForEndOfTurnTask(),
signal,
'error waiting for end-of-turn task',
);
}

if (!this._currentSpeech && this.speechQueue.size() === 0) {
agentActive = false;
} else {
agentActive = true;
const currentUpdate = this.q_updated;
if (currentUpdate.done) {
await delay(0, { signal });
} else {
await this.waitForOrAbort(
currentUpdate.await,
signal,
'error waiting for speech queue update',
);
}
}
}

if (waitForUser) {
userActive = this.agentSession.userState !== 'listening';
if (userActive) {
await delay(0, { signal });
}
}
}
}

private async waitForOrAbort(
promise: Promise<void>,
signal: AbortSignal,
errorMessage: string,
): Promise<void> {
try {
await Promise.race([promise, waitForAbort(signal)]);
} catch (error) {
if (!signal.aborted) {
this.logger.error({ error }, errorMessage);
}
}
}

private async mainTask(signal: AbortSignal): Promise<void> {
const abortFuture = new Future<void, never>();
const abortHandler = () => {
Expand Down
64 changes: 64 additions & 0 deletions agents/src/voice/audio_recognition.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,15 +33,18 @@ import { mergeReadableStreams } from '../stream/merge_readable_streams.js';
import { type StreamChannel, createStreamChannel } from '../stream/stream_channel.js';
import { type SpeechEvent, SpeechEventType } from '../stt/stt.js';
import { traceTypes, tracer } from '../telemetry/index.js';
import { splitWords } from '../tokenize/basic/word.js';
import { Task, cancelAndWait, delay, readStream, waitForAbort } from '../utils.js';
import { type VAD, type VADEvent, VADEventType } from '../vad.js';
import type { TurnDetectionMode } from './agent_session.js';
import { type UserTurnExceededEvent, createUserTurnExceededEvent } from './events.js';
import type { STTNode } from './io.js';
import {
type BaseEndpointing,
createEndpointing,
defaultEndpointingOptions,
} from './turn_config/endpointing.js';
import type { UserTurnLimitOptions } from './turn_config/user_turn_limit.js';
import { setParticipantSpanAttributes } from './utils.js';

export interface EndOfTurnInfo {
Expand Down Expand Up @@ -75,10 +78,17 @@ export interface RecognitionHooks {
onFinalTranscript: (ev: SpeechEvent, speaking: boolean | undefined) => void;
onEndOfTurn: (info: EndOfTurnInfo) => Promise<boolean>;
onPreemptiveGeneration: (info: PreemptiveGenerationInfo) => void;
onUserTurnExceeded: (ev: UserTurnExceededEvent) => void;

retrieveChatCtx: () => ChatContext;
}

interface UserTurnTracker {
words: number;
transcript: string;
startedAt?: number;
}

export class STTPipeline {
static readonly PUMP_TASK_CANCEL_TIMEOUT = 5000;

Expand Down Expand Up @@ -153,6 +163,8 @@ export interface AudioRecognitionOptions {
backchannelBoundary?: number | [number, number] | null;
/** Endpointing delay strategy. */
endpointing?: BaseEndpointing;
/** User turn limit configuration. */
userTurnLimit?: UserTurnLimitOptions;
/** @deprecated Use endpointing instead. */
minEndpointingDelay?: number;
/** @deprecated Use endpointing instead. */
Expand Down Expand Up @@ -188,6 +200,7 @@ export class AudioRecognition {
private turnDetector?: _TurnDetector;
private turnDetectionMode?: TurnDetectionMode;
private endpointing: BaseEndpointing;
private userTurnLimit?: UserTurnLimitOptions;
private lastLanguage?: LanguageCode;
private rootSpanContext?: Context;
private sttModel?: string;
Expand All @@ -209,6 +222,7 @@ export class AudioRecognition {
private sampleRate?: number;

private userTurnSpan?: Span;
private userTurnTracker: UserTurnTracker = { words: 0, transcript: '' };
// Provider-known STT ids for the current user turn. Written to the
// `user_turn` span when it ends so we can correlate traces with the
// provider's logs for debugging.
Expand Down Expand Up @@ -264,6 +278,7 @@ export class AudioRecognition {
this.vad = opts.vad;
this.turnDetector = opts.turnDetector;
this.turnDetectionMode = opts.turnDetectionMode;
this.userTurnLimit = opts.userTurnLimit;
this.endpointing =
opts.endpointing ??
createEndpointing({
Expand Down Expand Up @@ -446,6 +461,7 @@ export class AudioRecognition {
async onStartOfAgentSpeech(startedAt: number) {
this.isAgentSpeaking = true;
this.endpointing.onStartOfAgentSpeech(startedAt);
this.userTurnTracker = { words: 0, transcript: '' };

if (this.backchannelBoundary && this.backchannelBoundary[0] > 0) {
this.cancelBackchannelBoundary();
Expand Down Expand Up @@ -819,6 +835,8 @@ export class AudioRecognition {
this.lastSpeakingTime = Date.now();
}

this.checkUserTurnLimit(transcript);

if (this.vadBaseTurnDetection || this.userTurnCommitted) {
if (transcriptChanged) {
this.logger.debug(
Expand Down Expand Up @@ -1167,6 +1185,52 @@ export class AudioRecognition {
});
}

async waitForEndOfTurnTask(): Promise<void> {
if (!this.bounceEOUTask || this.bounceEOUTask.done) {
return;
}

try {
await this.bounceEOUTask.result;
} catch (error) {
if (error instanceof Error && error.message.includes('This operation was aborted')) {
return;
}
throw error;
}
}

private checkUserTurnLimit(transcript: string): void {
const maxWords = this.userTurnLimit?.maxWords ?? null;
const maxDuration = this.userTurnLimit?.maxDuration ?? null;

if (maxWords === null && maxDuration === null) {
return;
}

const now = Date.now();
this.userTurnTracker.startedAt ??= this.speechStartTime ?? now;
this.userTurnTracker.words += splitWords(transcript, true).length;
this.userTurnTracker.transcript = `${this.userTurnTracker.transcript} ${transcript}`.trim();

const duration = now - this.userTurnTracker.startedAt;
const timeExceeded = maxDuration !== null && duration >= maxDuration;
const wordsExceeded = maxWords !== null && this.userTurnTracker.words >= maxWords;

if (!timeExceeded && !wordsExceeded) {
return;
}

this.hooks.onUserTurnExceeded(
createUserTurnExceededEvent({
transcript: this.currentTranscript,
accumulatedTranscript: this.userTurnTracker.transcript,
accumulatedWordCount: this.userTurnTracker.words,
duration,
}),
);
}

private startSttTasks(reusePipeline?: STTPipeline) {
if (!this.stt) return;

Expand Down
34 changes: 34 additions & 0 deletions agents/src/voice/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -245,6 +245,40 @@ export const createSpeechCreatedEvent = ({
createdAt,
});

export type UserTurnExceededEvent = {
type: 'user_turn_exceeded';
/** Transcript from the current uncommitted user turn only. */
transcript: string;
/** Full transcript since the start of user speaking in the accumulation window. */
accumulatedTranscript: string;
/** Total word count since the start of user speaking in the accumulation window. */
accumulatedWordCount: number;
/** Duration of the user turn accumulation window in milliseconds. */
duration: number;
createdAt: number;
};

export const createUserTurnExceededEvent = ({
transcript,
accumulatedTranscript,
accumulatedWordCount,
duration,
createdAt = Date.now(),
}: {
transcript: string;
accumulatedTranscript: string;
accumulatedWordCount: number;
duration: number;
createdAt?: number;
}): UserTurnExceededEvent => ({
type: 'user_turn_exceeded',
transcript,
accumulatedTranscript,
accumulatedWordCount,
duration,
createdAt,
});

export type ErrorEvent = {
type: 'error';
error: RealtimeModelError | STTError | TTSError | LLMError | InterruptionDetectionError | unknown;
Expand Down
Loading
Loading