Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 19 additions & 2 deletions packages/sdk/src/realtime/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,15 @@ const realTimeClientConnectOptionsSchema = z.object({
resolution: z.enum(["720p", "1080p"]).optional(),
/** Local track publish codec. Desktop Safari is always pinned to vp8 and ignores this value. */
preferredVideoCodec: z.enum(["h264", "vp9"]).optional(),
/**
* Play remote audio tracks. Default `false`.
*
* When `false`, any audio the model emits is dropped on the client — no
* playback element is attached and audio is not added to the stream
* passed to `onRemoteStream`. Set `true` when the model emits audio you
* want the user to hear.
*/
playRemoteAudio: z.boolean().optional(),
});
export type RealTimeClientConnectOptions = Omit<z.infer<typeof realTimeClientConnectOptionsSchema>, "model"> & {
model: ModelDefinition | CustomModelDefinition;
Expand Down Expand Up @@ -102,8 +111,15 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => {
const parsedOptions = realTimeClientConnectOptionsSchema.safeParse(options);
if (!parsedOptions.success) throw parsedOptions.error;

const { onRemoteStream, onConnectionChange, onQueuePosition, initialState, resolution, preferredVideoCodec } =
parsedOptions.data;
const {
onRemoteStream,
onConnectionChange,
onQueuePosition,
initialState,
resolution,
preferredVideoCodec,
playRemoteAudio,
} = parsedOptions.data;
const mirror = parsedOptions.data.mirror ?? false;
let inputStream: MediaStream = stream ?? new MediaStream();

Expand Down Expand Up @@ -169,6 +185,7 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => {
initialPrompt,
logger,
videoCodec: publishCodec,
playRemoteAudio,
});

let sessionId: string | null = null;
Expand Down
10 changes: 10 additions & 0 deletions packages/sdk/src/realtime/media-channel.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,15 @@ export interface MediaChannelConfig {
localStream: MediaStream | null;
logger?: Logger;
videoCodec?: VideoCodec;
/**
* Play remote audio tracks. Default `false`.
*
* When `false`, any audio tracks delivered to this room are dropped on the
* client: no playback element is attached and the track is not added to
* the stream emitted on `remoteStream`. Set to `true` when the model emits
* audio you want the user to hear.
*/
playRemoteAudio?: boolean;
}

export type MediaConnectOptions = {
Expand Down Expand Up @@ -81,6 +90,7 @@ export class MediaChannel {
room.on(RoomEvent.TrackSubscribed, (track: RemoteTrack, _pub, participant: RemoteParticipant) => {
if (!participant.identity.startsWith(REALTIME_CONFIG.livekit.inferenceServerIdentityPrefix)) return;
if (track.kind !== Track.Kind.Video && track.kind !== Track.Kind.Audio) return;
if (track.kind === Track.Kind.Audio && !this.config.playRemoteAudio) return;

track.attach();
const mediaStreamTrack = track.mediaStreamTrack;
Expand Down
2 changes: 2 additions & 0 deletions packages/sdk/src/realtime/stream-session.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ interface StreamSessionConfig {
initialPrompt?: InitialPrompt;
logger?: Logger;
videoCodec?: VideoCodec;
playRemoteAudio?: boolean;
}

export class StreamSession {
Expand Down Expand Up @@ -317,6 +318,7 @@ export class StreamSession {
localStream: this.config.localStream,
logger: this.logger,
videoCodec: this.config.videoCodec,
playRemoteAudio: this.config.playRemoteAudio,
});
this.wireSignalingEvents();
this.wireMediaEvents();
Expand Down
10 changes: 10 additions & 0 deletions packages/sdk/src/realtime/subscribe-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ export type SubscribeOptions = {
token: string;
onRemoteStream: (stream: MediaStream) => void;
onConnectionChange?: (state: ConnectionState) => void;
/**
* Play remote audio tracks. Default `false`.
*
* When `false`, any audio the model emits is dropped on the client — no
* playback element is attached and audio is not added to the stream
* passed to `onRemoteStream`. Set `true` when the model emits audio you
* want the viewer to hear.
*/
playRemoteAudio?: boolean;
};

export type RealTimeSubscribeClientOptions = {
Expand Down Expand Up @@ -148,6 +157,7 @@ export const createRealTimeSubscribeClient = (opts: RealTimeSubscribeClientOptio
activeRoom.on(RoomEvent.TrackSubscribed, (track: RemoteTrack, _pub, participant: RemoteParticipant) => {
if (!participant.identity.startsWith(REALTIME_CONFIG.livekit.inferenceServerIdentityPrefix)) return;
if (track.kind !== Track.Kind.Video && track.kind !== Track.Kind.Audio) return;
if (track.kind === Track.Kind.Audio && !options.playRemoteAudio) return;

track.attach();
const mediaStreamTrack = track.mediaStreamTrack;
Expand Down
63 changes: 60 additions & 3 deletions packages/sdk/tests/realtime.unit.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -743,16 +743,18 @@ describe("StreamSession startup orchestration", () => {
});
};

const subscribeRemoteTrack = () => {
const subscribeRemoteTrack = (kind: "video" | "audio" = "video") => {
const room = liveKitMock.roomInstances.at(-1) as InstanceType<typeof liveKitMock.MockRoom>;
const mediaStreamTrack = { id: "remote-video", kind: "video" };
const mediaStreamTrack = { id: `remote-${kind}`, kind };
const trackKind = kind === "video" ? liveKitMock.Track.Kind.Video : liveKitMock.Track.Kind.Audio;
const track = {
kind: liveKitMock.Track.Kind.Video,
kind: trackKind,
mediaStreamTrack,
attach: vi.fn(),
on: vi.fn(),
};
room.emit(liveKitMock.RoomEvent.TrackSubscribed, track, {}, { identity: "inference-server-1" });
return track;
};

const createLocalStream = () =>
Expand Down Expand Up @@ -976,6 +978,61 @@ describe("StreamSession startup orchestration", () => {
expect(firstRoom.localParticipant.publishTrack).not.toHaveBeenCalled();
await expect(connectPromise).rejects.toThrow("WebSocket closed: 1000 closed");
});

it("drops remote audio tracks by default — no attach, not added to remoteStream", async () => {
const { StreamSession } = await import("../src/realtime/stream-session.js");
const session = new StreamSession({
url: "wss://example.test/realtime",
localStream: null,
initialPrompt: { text: "go" },
});
const remoteStreams: MediaStream[] = [];
session.on("remoteStream", (stream) => remoteStreams.push(stream));

session.connect().catch(() => {});
const ws = FakeWebSocket.instances[0];
ws.onopen?.();
await flushMicrotasks();
sendRoomInfo(ws);
await flushMicrotasks();

const audioTrack = subscribeRemoteTrack("audio");
expect(audioTrack.attach).not.toHaveBeenCalled();
expect(remoteStreams).toHaveLength(0);

const videoTrack = subscribeRemoteTrack("video");
expect(videoTrack.attach).toHaveBeenCalledTimes(1);
expect(remoteStreams).toHaveLength(1);
expect((remoteStreams[0] as unknown as FakeMediaStream).getTracks()).toEqual([
expect.objectContaining({ kind: "video" }),
]);
});

it("plays remote audio tracks when playRemoteAudio is true", async () => {
const { StreamSession } = await import("../src/realtime/stream-session.js");
const session = new StreamSession({
url: "wss://example.test/realtime",
localStream: null,
initialPrompt: { text: "go" },
playRemoteAudio: true,
});
const remoteStreams: MediaStream[] = [];
session.on("remoteStream", (stream) => remoteStreams.push(stream));

session.connect().catch(() => {});
const ws = FakeWebSocket.instances[0];
ws.onopen?.();
await flushMicrotasks();
sendRoomInfo(ws);
await flushMicrotasks();

const audioTrack = subscribeRemoteTrack("audio");
expect(audioTrack.attach).toHaveBeenCalledTimes(1);
expect(remoteStreams).toHaveLength(1);
expect((remoteStreams[0] as unknown as FakeMediaStream).getTracks()).toEqual([
expect.objectContaining({ kind: "audio" }),
]);
});
});

describe("WebRTC Error Classification", () => {
Expand Down
Loading