From a46a7eea51fd53c7dae57404481ccd77212cdf81 Mon Sep 17 00:00:00 2001 From: longcw Date: Mon, 18 May 2026 08:26:49 +0000 Subject: [PATCH 1/2] feat(avatar): add avatar metrics --- .changeset/avatar-metrics.md | 11 ++ agents/src/metrics/base.ts | 15 ++- agents/src/metrics/index.ts | 1 + agents/src/metrics/utils.ts | 13 +++ agents/src/utils.ts | 127 +++++++++++++++++++- agents/src/voice/avatar/avatar_session.ts | 134 +++++++++++++++++++++- plugins/anam/src/avatar.ts | 12 +- plugins/bey/src/avatar.ts | 10 +- plugins/lemonslice/src/avatar.ts | 12 +- plugins/liveavatar/src/avatar.ts | 26 +++-- plugins/runway/src/avatar.ts | 11 +- plugins/trugen/src/avatar.ts | 12 +- 12 files changed, 354 insertions(+), 30 deletions(-) create mode 100644 .changeset/avatar-metrics.md diff --git a/.changeset/avatar-metrics.md b/.changeset/avatar-metrics.md new file mode 100644 index 000000000..0e2be82ea --- /dev/null +++ b/.changeset/avatar-metrics.md @@ -0,0 +1,11 @@ +--- +'@livekit/agents': patch +'@livekit/agents-plugin-anam': patch +'@livekit/agents-plugin-bey': patch +'@livekit/agents-plugin-lemonslice': patch +'@livekit/agents-plugin-liveavatar': patch +'@livekit/agents-plugin-runway': patch +'@livekit/agents-plugin-trugen': patch +--- + +Add avatar join and playback latency metrics. diff --git a/agents/src/metrics/base.ts b/agents/src/metrics/base.ts index 1c9c317c1..f6af79ec9 100644 --- a/agents/src/metrics/base.ts +++ b/agents/src/metrics/base.ts @@ -16,7 +16,8 @@ export type AgentMetrics = | VADMetrics | EOUMetrics | RealtimeModelMetrics - | InterruptionMetrics; + | InterruptionMetrics + | AvatarMetrics; export type LLMMetrics = { type: 'llm_metrics'; @@ -213,3 +214,15 @@ export type InterruptionMetrics = { numRequests: number; metadata?: MetricsMetadata; }; + +export type AvatarMetrics = { + type: 'avatar_metrics'; + timestamp: number; + /** Delay between forwarding the first audio frame to the avatar and playback start. */ + playbackLatencyMs?: number; + /** Time when the avatar session was started. */ + sessionStartedAt?: number; + /** Time when the avatar participant joined and published a video track. */ + avatarJoinedAt?: number; + metadata?: MetricsMetadata; +}; diff --git a/agents/src/metrics/index.ts b/agents/src/metrics/index.ts index f3cce796c..0438b0219 100644 --- a/agents/src/metrics/index.ts +++ b/agents/src/metrics/index.ts @@ -4,6 +4,7 @@ export type { AgentMetrics, + AvatarMetrics, EOUMetrics, InterruptionMetrics, LLMMetrics, diff --git a/agents/src/metrics/utils.ts b/agents/src/metrics/utils.ts index ced021e63..de28f4b83 100644 --- a/agents/src/metrics/utils.ts +++ b/agents/src/metrics/utils.ts @@ -71,5 +71,18 @@ export const logMetrics = (metrics: AgentMetrics) => { numRequests: metrics.numRequests, }) .info('Interruption metrics'); + } else if (metrics.type === 'avatar_metrics') { + const extra: Record = {}; + if (metrics.metadata?.modelProvider) extra.modelProvider = metrics.metadata.modelProvider; + if (metrics.metadata?.modelName) extra.modelName = metrics.metadata.modelName; + if (metrics.sessionStartedAt !== undefined && metrics.avatarJoinedAt !== undefined) { + extra.avatarJoinLatencyMs = roundTwoDecimals( + metrics.avatarJoinedAt - metrics.sessionStartedAt, + ); + } + if (metrics.playbackLatencyMs !== undefined) { + extra.playbackLatencyMs = roundTwoDecimals(metrics.playbackLatencyMs); + } + logger.child(extra).info('Avatar metrics'); } }; diff --git a/agents/src/utils.ts b/agents/src/utils.ts index 6343c932f..a4b8a7ee9 100644 --- a/agents/src/utils.ts +++ b/agents/src/utils.ts @@ -2,6 +2,8 @@ // // SPDX-License-Identifier: Apache-2.0 import type { + LocalTrackPublication, + Participant, ParticipantKind, RemoteParticipant, RemoteTrackPublication, @@ -951,18 +953,51 @@ export async function waitForParticipant({ room, identity, kind, + includeLocal, + signal, +}: { + room: Room; + identity?: string; + kind?: ParticipantKind | ParticipantKind[]; + includeLocal: true; + signal?: AbortSignal; +}): Promise; +export async function waitForParticipant({ + room, + identity, + kind, + includeLocal, + signal, }: { room: Room; identity?: string; kind?: ParticipantKind | ParticipantKind[]; -}): Promise { + includeLocal?: false; + signal?: AbortSignal; +}): Promise; +export async function waitForParticipant({ + room, + identity, + kind, + includeLocal = false, + signal, +}: { + room: Room; + identity?: string; + kind?: ParticipantKind | ParticipantKind[]; + includeLocal?: boolean; + signal?: AbortSignal; +}): Promise { if (!room.isConnected) { throw new Error('Room is not connected'); } + if (signal?.aborted) { + throw new Error('waitForParticipant aborted'); + } - const fut = new Future(); + const fut = new Future(); - const kindMatch = (participant: RemoteParticipant) => { + const kindMatch = (participant: Participant) => { if (kind === undefined) return true; if (Array.isArray(kind)) { @@ -984,10 +1019,27 @@ export async function waitForParticipant({ fut.reject(new Error('Got disconnected from room while waiting for participant')); }; + const onAbort = () => { + if (!fut.done) { + fut.reject(new Error('waitForParticipant aborted')); + } + }; + room.on(RoomEvent.ParticipantConnected, onParticipantConnected); room.on(RoomEvent.Disconnected, onDisconnected); + signal?.addEventListener('abort', onAbort, { once: true }); try { + const localParticipant = room.localParticipant; + if ( + includeLocal && + localParticipant && + (identity === undefined || localParticipant.identity === identity) && + kindMatch(localParticipant) + ) { + return localParticipant; + } + for (const p of room.remoteParticipants.values()) { onParticipantConnected(p); if (fut.done) { @@ -999,6 +1051,7 @@ export async function waitForParticipant({ } finally { room.off(RoomEvent.ParticipantConnected, onParticipantConnected); room.off(RoomEvent.Disconnected, onDisconnected); + signal?.removeEventListener('abort', onAbort); } } @@ -1006,8 +1059,24 @@ export async function waitForTrackPublication({ room, identity, kind, - waitForSubscription = false, + waitForSubscription, signal, + includeLocal, +}: { + room: Room; + identity?: string; + kind: TrackKind; + waitForSubscription?: boolean; + signal?: AbortSignal; + includeLocal: true; +}): Promise; +export async function waitForTrackPublication({ + room, + identity, + kind, + waitForSubscription, + signal, + includeLocal, }: { room: Room; /** @@ -1029,7 +1098,23 @@ export async function waitForTrackPublication({ * publication leak listeners until the room disconnects. */ signal?: AbortSignal; -}): Promise { + includeLocal?: boolean; +}): Promise; +export async function waitForTrackPublication({ + room, + identity, + kind, + waitForSubscription = false, + signal, + includeLocal = false, +}: { + room: Room; + identity?: string; + kind: TrackKind; + waitForSubscription?: boolean; + signal?: AbortSignal; + includeLocal?: boolean; +}): Promise { if (!room.isConnected) { throw new Error('Room is not connected'); } @@ -1037,7 +1122,7 @@ export async function waitForTrackPublication({ throw new Error('waitForTrackPublication aborted'); } - const fut = new Future(); + const fut = new Future(); const kindMatch = (k: TrackKind | undefined) => { if (kind === undefined || kind === null) { @@ -1077,11 +1162,24 @@ export async function waitForTrackPublication({ } }; + const onLocalTrackPublished = (publication: LocalTrackPublication | undefined) => { + if (fut.done || !publication) return; + const localParticipant = room.localParticipant; + if (localParticipant && (identity === undefined || localParticipant.identity === identity)) { + if (kindMatch(publication.kind)) { + fut.resolve(publication); + } + } + }; + if (waitForSubscription) { room.on(RoomEvent.TrackSubscribed, onTrackSubscribed); } else { room.on(RoomEvent.TrackPublished, onTrackPublished); } + if (includeLocal) { + room.on(RoomEvent.LocalTrackPublished, onLocalTrackPublished); + } const onAbort = () => { if (!fut.done) { @@ -1091,6 +1189,20 @@ export async function waitForTrackPublication({ signal?.addEventListener('abort', onAbort, { once: true }); try { + const localParticipant = room.localParticipant; + if ( + includeLocal && + localParticipant && + (identity === undefined || localParticipant.identity === identity) + ) { + for (const publication of localParticipant.trackPublications.values()) { + if (kindMatch(publication.kind)) { + fut.resolve(publication); + break; + } + } + } + for (const p of room.remoteParticipants.values()) { for (const publication of p.trackPublications.values()) { if (matches(publication, p)) { @@ -1108,6 +1220,9 @@ export async function waitForTrackPublication({ } else { room.off(RoomEvent.TrackPublished, onTrackPublished); } + if (includeLocal) { + room.off(RoomEvent.LocalTrackPublished, onLocalTrackPublished); + } signal?.removeEventListener('abort', onAbort); } } diff --git a/agents/src/voice/avatar/avatar_session.ts b/agents/src/voice/avatar/avatar_session.ts index 87707e893..15f2f998d 100644 --- a/agents/src/voice/avatar/avatar_session.ts +++ b/agents/src/voice/avatar/avatar_session.ts @@ -2,9 +2,23 @@ // // SPDX-License-Identifier: Apache-2.0 import type { Room } from '@livekit/rtc-node'; +import { RoomEvent, TrackKind } from '@livekit/rtc-node'; +import type { TypedEventEmitter as TypedEmitter } from '@livekit/typed-emitter'; +import { EventEmitter } from 'node:events'; import { getJobContext } from '../../job.js'; import { log } from '../../log.js'; +import type { AvatarMetrics } from '../../metrics/base.js'; +import { waitForParticipant, waitForTrackPublication } from '../../utils.js'; import type { AgentSession } from '../agent_session.js'; +import { + AgentSessionEventTypes, + type ConversationItemAddedEvent, + createMetricsCollectedEvent, +} from '../events.js'; + +export type AvatarSessionCallbacks = { + metrics_collected: (metrics: AvatarMetrics) => void; +}; /** * Base class for avatar plugin sessions. @@ -16,8 +30,20 @@ import type { AgentSession } from '../agent_session.js'; * - Warns when the avatar session is started after {@link AgentSession.start} — in that * case the existing audio output will be replaced by the avatar's. */ -export class AvatarSession { +export class AvatarSession extends (EventEmitter as new () => TypedEmitter) { #logger = log(); + #agentSession?: AgentSession; + #room?: Room; + #waitAvatarJoinAbort?: AbortController; + #waitAvatarJoinPromise?: Promise; + + get avatarIdentity(): string { + return 'unknown'; + } + + get provider(): string { + return 'unknown'; + } /** * Start the avatar session. @@ -26,7 +52,7 @@ export class AvatarSession { * top of their implementation. Subclasses may widen the return type (e.g. returning a * session id), matching the `# type: ignore[override]` escape hatch used in Python. */ - async start(agentSession: AgentSession, _room: Room): Promise { + async start(agentSession: AgentSession, room: Room): Promise { const jobCtx = getJobContext(false); if (jobCtx !== undefined) { jobCtx.addShutdownCallback(() => this.aclose()); @@ -46,6 +72,19 @@ export class AvatarSession { 'Please start the avatar session before AgentSession.start() to avoid this.', ); } + + this.#agentSession = agentSession; + this.#room = room; + this.#agentSession.on( + AgentSessionEventTypes.ConversationItemAdded, + this.#onConversationItemAdded, + ); + + if (room.isConnected) { + this.#startWaitAvatarJoin(); + } else { + room.on(RoomEvent.ConnectionStateChanged, this.#onConnectionStateChanged); + } return undefined; } @@ -53,5 +92,94 @@ export class AvatarSession { * Release any resources owned by this avatar session. Default implementation is a no-op; * subclasses can override to perform cleanup. */ - async aclose(): Promise {} + async aclose(): Promise { + if (this.#agentSession) { + this.#agentSession.off( + AgentSessionEventTypes.ConversationItemAdded, + this.#onConversationItemAdded, + ); + this.#agentSession = undefined; + } + + if (this.#room) { + this.#room.off(RoomEvent.ConnectionStateChanged, this.#onConnectionStateChanged); + this.#room = undefined; + } + + this.#waitAvatarJoinAbort?.abort(); + if (this.#waitAvatarJoinPromise) { + await this.#waitAvatarJoinPromise; + this.#waitAvatarJoinPromise = undefined; + } + this.#waitAvatarJoinAbort = undefined; + } + + #startWaitAvatarJoin() { + if (this.#waitAvatarJoinPromise) return; + + const abortController = new AbortController(); + this.#waitAvatarJoinAbort = abortController; + this.#waitAvatarJoinPromise = this.#waitAvatarJoin(abortController.signal).catch((error) => { + if (!abortController.signal.aborted) { + this.#logger.warn({ error: String(error) }, 'failed while waiting for avatar participant'); + } + }); + } + + async #waitAvatarJoin(signal: AbortSignal): Promise { + const room = this.#room; + if (!room) return; + + const startedAt = Date.now(); + await waitForParticipant({ + room, + identity: this.avatarIdentity, + includeLocal: true, + signal, + }); + await waitForTrackPublication({ + room, + identity: this.avatarIdentity, + kind: TrackKind.KIND_VIDEO, + includeLocal: true, + signal, + }); + const joinedAt = Date.now(); + this.#emitMetrics({ + type: 'avatar_metrics', + timestamp: joinedAt, + sessionStartedAt: startedAt, + avatarJoinedAt: joinedAt, + metadata: { modelProvider: this.provider }, + }); + } + + #onConversationItemAdded = (ev: ConversationItemAddedEvent) => { + const { item } = ev; + if (item.type !== 'message' || item.role !== 'assistant') return; + + const { playbackLatency } = item.metrics; + if (playbackLatency === undefined) return; + + this.#emitMetrics({ + type: 'avatar_metrics', + timestamp: ev.createdAt, + playbackLatencyMs: playbackLatency * 1000, + metadata: { modelProvider: this.provider }, + }); + }; + + #onConnectionStateChanged = () => { + if (this.#room?.isConnected) { + this.#startWaitAvatarJoin(); + } + }; + + #emitMetrics(metrics: AvatarMetrics) { + this.emit('metrics_collected', metrics); + this.#agentSession?.emit( + AgentSessionEventTypes.MetricsCollected, + createMetricsCollectedEvent({ metrics }), + ); + } } diff --git a/plugins/anam/src/avatar.ts b/plugins/anam/src/avatar.ts index 22d64fe6e..9e2f40e62 100644 --- a/plugins/anam/src/avatar.ts +++ b/plugins/anam/src/avatar.ts @@ -53,6 +53,14 @@ export class AvatarSession extends voice.AvatarSession { super(); } + override get avatarIdentity(): string { + return this.opts.avatarParticipantIdentity ?? AVATAR_IDENTITY; + } + + override get provider(): string { + return 'anam'; + } + async start( agentSession: voice.AgentSession, room: Room, @@ -93,7 +101,7 @@ export class AvatarSession extends voice.AvatarSession { const jwt = await mintAvatarJoinToken({ roomName: room.name!, - avatarIdentity: this.opts.avatarParticipantIdentity ?? AVATAR_IDENTITY, + avatarIdentity: this.avatarIdentity, publishOnBehalf: localIdentity, apiKey: lkKey, apiSecret: lkSecret, @@ -116,7 +124,7 @@ export class AvatarSession extends voice.AvatarSession { agentSession.output.audio = new voice.DataStreamAudioOutput({ room, - destinationIdentity: this.opts.avatarParticipantIdentity ?? AVATAR_IDENTITY, + destinationIdentity: this.avatarIdentity, waitRemoteTrack: TrackKind.KIND_VIDEO, }); } diff --git a/plugins/bey/src/avatar.ts b/plugins/bey/src/avatar.ts index 6bc3a0321..ce23bb31e 100644 --- a/plugins/bey/src/avatar.ts +++ b/plugins/bey/src/avatar.ts @@ -130,6 +130,14 @@ export class AvatarSession extends voice.AvatarSession { this.connOptions = options.connOptions || DEFAULT_API_CONNECT_OPTIONS; } + override get avatarIdentity(): string { + return this.avatarParticipantIdentity; + } + + override get provider(): string { + return 'bey'; + } + /** * Starts the avatar session and connects it to the agent. * @@ -198,7 +206,7 @@ export class AvatarSession extends voice.AvatarSession { agentSession.output.audio = new voice.DataStreamAudioOutput({ room, - destinationIdentity: this.avatarParticipantIdentity, + destinationIdentity: this.avatarIdentity, waitRemoteTrack: TrackKind.KIND_VIDEO, }); } diff --git a/plugins/lemonslice/src/avatar.ts b/plugins/lemonslice/src/avatar.ts index 333b89083..e4d6a970b 100644 --- a/plugins/lemonslice/src/avatar.ts +++ b/plugins/lemonslice/src/avatar.ts @@ -173,6 +173,14 @@ export class AvatarSession extends voice.AvatarSession { this.connOptions = options.connOptions || DEFAULT_API_CONNECT_OPTIONS; } + override get avatarIdentity(): string { + return this.avatarParticipantIdentity; + } + + override get provider(): string { + return 'lemonslice'; + } + /** * Starts the avatar session and connects it to the agent. * @@ -224,7 +232,7 @@ export class AvatarSession extends voice.AvatarSession { } const at = new AccessToken(livekitApiKey, livekitApiSecret, { - identity: this.avatarParticipantIdentity, + identity: this.avatarIdentity, name: this.avatarParticipantName, }); at.kind = 'agent'; @@ -246,7 +254,7 @@ export class AvatarSession extends voice.AvatarSession { agentSession.output.audio = new voice.DataStreamAudioOutput({ room, - destinationIdentity: this.avatarParticipantIdentity, + destinationIdentity: this.avatarIdentity, sampleRate: SAMPLE_RATE, waitRemoteTrack: TrackKind.KIND_VIDEO, waitPlaybackStart: true, diff --git a/plugins/liveavatar/src/avatar.ts b/plugins/liveavatar/src/avatar.ts index 18c702e7d..ce0575d3b 100644 --- a/plugins/liveavatar/src/avatar.ts +++ b/plugins/liveavatar/src/avatar.ts @@ -80,7 +80,7 @@ export interface StartOptions { * * Ref: python livekit-plugins/livekit-plugins-liveavatar/livekit/plugins/liveavatar/avatar.py */ -export class AvatarSession { +export class AvatarSession extends voice.AvatarSession { private avatarId: string | null; private apiUrl?: string; private apiKey: string; @@ -117,6 +117,7 @@ export class AvatarSession { #logger = log(); constructor(options: AvatarSessionOptions = {}) { + super(); this.avatarId = options.avatarId ?? process.env.LIVEAVATAR_AVATAR_ID ?? null; this.apiUrl = options.apiUrl; this.apiKey = options.apiKey ?? process.env.LIVEAVATAR_API_KEY ?? ''; @@ -133,6 +134,14 @@ export class AvatarSession { }); } + override get avatarIdentity(): string { + return this.avatarParticipantIdentity; + } + + override get provider(): string { + return 'liveavatar'; + } + /** * Start the avatar session and wire it into the agent session's audio output. * @@ -143,6 +152,8 @@ export class AvatarSession { room: Room, options: StartOptions = {}, ): Promise { + await super.start(agentSession, room); + this.agentSession = agentSession; this.room = room; @@ -172,7 +183,7 @@ export class AvatarSession { } const at = new AccessToken(livekitApiKey, livekitApiSecret, { - identity: this.avatarParticipantIdentity, + identity: this.avatarIdentity, name: this.avatarParticipantName, }); at.kind = 'agent'; @@ -235,16 +246,6 @@ export class AvatarSession { this.mainTaskPromise = this.mainTask().catch((e) => { this.#logger.warn({ error: String(e) }, 'LiveAvatar main task failed'); }); - - // Best-effort cleanup on job shutdown. - try { - const jobCtx = getJobContext(); - jobCtx.addShutdownCallback(async () => { - await this.aclose(); - }); - } catch { - // No active job context — caller is expected to manage lifecycle. - } } /** @@ -263,6 +264,7 @@ export class AvatarSession { // logged in mainTask } } + await super.aclose(); } /** diff --git a/plugins/runway/src/avatar.ts b/plugins/runway/src/avatar.ts index ff1777b05..c1a23c228 100644 --- a/plugins/runway/src/avatar.ts +++ b/plugins/runway/src/avatar.ts @@ -105,6 +105,14 @@ export class AvatarSession extends voice.AvatarSession { this.connOptions = options.connOptions || DEFAULT_API_CONNECT_OPTIONS; } + override get avatarIdentity(): string { + return this.avatarParticipantIdentity; + } + + override get provider(): string { + return 'runway'; + } + async start( agentSession: voice.AgentSession, room: Room, @@ -149,7 +157,7 @@ export class AvatarSession extends voice.AvatarSession { agentSession.output.audio = new voice.DataStreamAudioOutput({ room, - destinationIdentity: this.avatarParticipantIdentity, + destinationIdentity: this.avatarIdentity, waitRemoteTrack: TrackKind.KIND_VIDEO, sampleRate: SAMPLE_RATE, }); @@ -309,5 +317,6 @@ export class AvatarSession extends voice.AvatarSession { override async aclose(): Promise { await this.ensureEndSessionPromise(); + await super.aclose(); } } diff --git a/plugins/trugen/src/avatar.ts b/plugins/trugen/src/avatar.ts index 25620535e..71276e012 100644 --- a/plugins/trugen/src/avatar.ts +++ b/plugins/trugen/src/avatar.ts @@ -98,6 +98,14 @@ export class AvatarSession extends voice.AvatarSession { this.connOptions = options.connOptions || DEFAULT_API_CONNECT_OPTIONS; } + override get avatarIdentity(): string { + return this.avatarParticipantIdentity; + } + + override get provider(): string { + return 'trugen'; + } + /** * Starts the avatar session and connects it to the agent. * @@ -141,7 +149,7 @@ export class AvatarSession extends voice.AvatarSession { } const at = new AccessToken(livekitApiKey, livekitApiSecret, { - identity: this.avatarParticipantIdentity, + identity: this.avatarIdentity, name: this.avatarParticipantName, }); at.kind = 'agent'; @@ -161,7 +169,7 @@ export class AvatarSession extends voice.AvatarSession { agentSession.output.audio = new voice.DataStreamAudioOutput({ room, - destinationIdentity: this.avatarParticipantIdentity, + destinationIdentity: this.avatarIdentity, waitRemoteTrack: TrackKind.KIND_VIDEO, }); } From 9cea79bc846595d7dcee246636848488b01319a2 Mon Sep 17 00:00:00 2001 From: longcw Date: Mon, 18 May 2026 08:31:53 +0000 Subject: [PATCH 2/2] fix(avatar): tolerate minimal session mocks --- agents/src/voice/avatar/avatar_session.ts | 36 ++++++++++++++--------- 1 file changed, 22 insertions(+), 14 deletions(-) diff --git a/agents/src/voice/avatar/avatar_session.ts b/agents/src/voice/avatar/avatar_session.ts index 15f2f998d..2904692ae 100644 --- a/agents/src/voice/avatar/avatar_session.ts +++ b/agents/src/voice/avatar/avatar_session.ts @@ -75,14 +75,16 @@ export class AvatarSession extends (EventEmitter as new () => TypedEmitter TypedEmitter { if (this.#agentSession) { - this.#agentSession.off( - AgentSessionEventTypes.ConversationItemAdded, - this.#onConversationItemAdded, - ); + if (typeof this.#agentSession.off === 'function') { + this.#agentSession.off( + AgentSessionEventTypes.ConversationItemAdded, + this.#onConversationItemAdded, + ); + } this.#agentSession = undefined; } if (this.#room) { - this.#room.off(RoomEvent.ConnectionStateChanged, this.#onConnectionStateChanged); + if (typeof this.#room.off === 'function') { + this.#room.off(RoomEvent.ConnectionStateChanged, this.#onConnectionStateChanged); + } this.#room = undefined; } @@ -177,9 +183,11 @@ export class AvatarSession extends (EventEmitter as new () => TypedEmitter