Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2,110 changes: 1,992 additions & 118 deletions packages/sdk/index.html

Large diffs are not rendered by default.

11 changes: 9 additions & 2 deletions packages/sdk/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,14 @@ export type {
SubscribeEvents,
SubscribeOptions,
} from "./realtime/subscribe-client";
export type { ConnectionState, GenerationEndedMessage, QueuePosition, QueuePositionMessage } from "./realtime/types";
export type {
ConnectionState,
GenerationEndedMessage,
QueuePosition,
QueuePositionMessage,
RealtimeWebSocketErrorMessage,
RealtimeWebSocketErrorType,
} from "./realtime/types";
export {
type CanonicalModel,
type CustomModelDefinition,
Expand All @@ -67,7 +74,7 @@ export {
} from "./shared/model";
export type { ModelState } from "./shared/types";
export type { CreateTokenOptions, CreateTokenResponse, TokensClient } from "./tokens/client";
export { type DecartSDKError, ERROR_CODES } from "./utils/errors";
export { type DecartSDKError, ERROR_CODES, type RealtimeServerErrorData } from "./utils/errors";
export { createConsoleLogger, type Logger, type LogLevel, noopLogger } from "./utils/logger";

// Schema with validation to ensure proxy and apiKey are mutually exclusive
Expand Down
28 changes: 27 additions & 1 deletion packages/sdk/src/realtime/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ import { RealtimeObservability } from "./observability/realtime-observability";
import type { WebRTCStats } from "./observability/webrtc-stats";
import { StreamSession } from "./stream-session";
import type { ConnectionState, GenerationEnded, GenerationTick, ImageSetOptions, QueuePosition } from "./types";
import type { RoomOptions, TrackPublishOptions } from "livekit-client";

import type { RealtimeVideoStats } from "./media-channel";

export type RealTimeClientOptions = {
baseUrl: string;
Expand Down Expand Up @@ -52,6 +55,15 @@ const realTimeClientConnectOptionsSchema = z.object({
queryParams: z.record(z.string(), z.string()).optional(),
mirror: z.union([z.literal("auto"), z.boolean()]).optional(),
resolution: z.enum(["720p", "1080p"]).optional(),
publishOptions: z
.custom<Partial<TrackPublishOptions>>((val) => val === undefined || typeof val === "object")
.optional(),
roomOptions: z.custom<Partial<RoomOptions>>((val) => val === undefined || typeof val === "object").optional(),
remoteVideoElement: z
.custom<HTMLVideoElement>(
(val) => val === undefined || (typeof val === "object" && val !== null && "srcObject" in val),
)
.optional(),
});
export type RealTimeClientConnectOptions = Omit<z.infer<typeof realTimeClientConnectOptionsSchema>, "model"> & {
model: ModelDefinition | CustomModelDefinition;
Expand Down Expand Up @@ -79,6 +91,7 @@ export type RealTimeClient = {
subscribeToken: string | null;
getSubscribeToken: () => string | null;
setImage: (image: Blob | File | string | null, options?: ImageSetOptions) => Promise<void>;
getVideoStats: () => Promise<RealtimeVideoStats>;
};

export const createRealTimeClient = (opts: RealTimeClientOptions) => {
Expand All @@ -92,7 +105,16 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => {
const parsedOptions = realTimeClientConnectOptionsSchema.safeParse(options);
if (!parsedOptions.success) throw parsedOptions.error;

const { onRemoteStream, onConnectionChange, onQueuePosition, initialState, resolution } = parsedOptions.data;
const {
onRemoteStream,
onConnectionChange,
onQueuePosition,
initialState,
resolution,
publishOptions,
roomOptions,
remoteVideoElement,
} = parsedOptions.data;
const mirror = parsedOptions.data.mirror ?? false;
let inputStream: MediaStream = stream ?? new MediaStream();

Expand Down Expand Up @@ -154,6 +176,9 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => {
initialPrompt,
logger,
videoCodec: safariCodec,
publishOptions,
roomOptions,
remoteVideoElement,
});

let sessionId: string | null = null;
Expand Down Expand Up @@ -209,6 +234,7 @@ export const createRealTimeClient = (opts: RealTimeClientOptions) => {
return subscribeToken;
},
getSubscribeToken: () => subscribeToken,
getVideoStats: () => activeSession.getVideoStats(),
setImage: async (image: Blob | File | string | null, imgOptions?: ImageSetOptions) => {
if (image === null) return activeSession.setImage(null, imgOptions);
const base64 = await imageToBase64(image);
Expand Down
230 changes: 217 additions & 13 deletions packages/sdk/src/realtime/media-channel.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
import {
type DisconnectReason,
LocalVideoTrack,
type RemoteParticipant,
type RemoteTrack,
RemoteVideoTrack,
Room,
RoomEvent,
type RoomOptions,
Track,
TrackEvent,
type TrackPublishOptions,
type VideoReceiverStats,
type VideoSenderStats,
} from "livekit-client";
import mitt, { type Emitter } from "mitt";

Expand All @@ -16,17 +21,129 @@ import type { RealtimeObservability } from "./observability/realtime-observabili

export type VideoCodec = "h264" | "vp8" | "vp9" | "av1";

export function getDefaultVideoPublishOptions(videoCodec?: VideoCodec): TrackPublishOptions {
const videoEncoding = {
export interface RealtimeVideoSenderStats extends VideoSenderStats {
qpSum?: number;
framesEncoded?: number;
totalEncodeTime?: number;
keyFramesEncoded?: number;
codecMimeType?: string;
}

export interface RealtimeVideoReceiverStats extends VideoReceiverStats {
qpSum?: number;
freezeCount?: number;
totalFreezesDuration?: number;
pauseCount?: number;
totalPausesDuration?: number;
keyFramesDecoded?: number;
framesPerSecond?: number;
codecMimeType?: string;
}

export interface RealtimeVideoStats {
sender: RealtimeVideoSenderStats[];
receiver: RealtimeVideoReceiverStats[];
}

type RawOutboundRtp = {
type?: string;
kind?: string;
rid?: string;
ssrc?: number;
qpSum?: number;
framesEncoded?: number;
totalEncodeTime?: number;
keyFramesEncoded?: number;
codecId?: string;
};

type RawInboundRtp = {
type?: string;
kind?: string;
qpSum?: number;
framesDropped?: number;
freezeCount?: number;
totalFreezesDuration?: number;
pauseCount?: number;
totalPausesDuration?: number;
keyFramesDecoded?: number;
framesPerSecond?: number;
codecId?: string;
};

type RawCodec = { type?: string; id?: string; mimeType?: string };

function shortCodecName(mimeType: string | undefined): string | undefined {
if (!mimeType) return undefined;
const slash = mimeType.indexOf("/");
return slash >= 0 ? mimeType.slice(slash + 1).toUpperCase() : mimeType.toUpperCase();
}

function collectCodecMimeMap(report: RTCStatsReport | null): Map<string, string> {
const map = new Map<string, string>();
if (!report) return map;
report.forEach((stat: unknown) => {
const c = stat as RawCodec;
if (c.type === "codec" && c.id && typeof c.mimeType === "string") {
map.set(c.id, c.mimeType);
}
});
return map;
}

function mergeSenderLayer(
layer: VideoSenderStats,
outbound: RawOutboundRtp | undefined,
codecMime: Map<string, string>,
): RealtimeVideoSenderStats {
return {
...layer,
qpSum: outbound?.qpSum,
framesEncoded: outbound?.framesEncoded,
totalEncodeTime: outbound?.totalEncodeTime,
keyFramesEncoded: outbound?.keyFramesEncoded,
codecMimeType: shortCodecName(outbound?.codecId ? codecMime.get(outbound.codecId) : undefined),
};
}

function mergeReceiver(
base: VideoReceiverStats,
inbound: RawInboundRtp | undefined,
codecMime: Map<string, string>,
): RealtimeVideoReceiverStats {
const merged: RealtimeVideoReceiverStats = {
...base,
qpSum: inbound?.qpSum,
freezeCount: inbound?.freezeCount,
totalFreezesDuration: inbound?.totalFreezesDuration,
pauseCount: inbound?.pauseCount,
totalPausesDuration: inbound?.totalPausesDuration,
keyFramesDecoded: inbound?.keyFramesDecoded,
framesPerSecond: inbound?.framesPerSecond,
codecMimeType: shortCodecName(inbound?.codecId ? codecMime.get(inbound.codecId) : undefined),
};
if (typeof inbound?.framesDropped === "number") merged.framesDropped = inbound.framesDropped;
return merged;
}

export function getDefaultVideoPublishOptions(
videoCodec?: VideoCodec,
overrides?: Partial<TrackPublishOptions>,
): TrackPublishOptions {
const defaultEncoding = {
maxBitrate: REALTIME_CONFIG.livekit.defaultMaxVideoBitrateBps,
maxFramerate: REALTIME_CONFIG.livekit.defaultPublishFps,
};

return {
source: Track.Source.Camera,
videoCodec: videoCodec ?? REALTIME_CONFIG.livekit.defaultVideoCodec,
simulcast: true,
videoEncoding,
videoCodec: videoCodec ?? REALTIME_CONFIG.livekit.defaultVideoCodec,
...overrides,
videoEncoding: {
...defaultEncoding,
...overrides?.videoEncoding,
},
};
}

Expand All @@ -41,6 +158,9 @@ export interface MediaChannelConfig {
localStream: MediaStream | null;
logger?: Logger;
videoCodec?: VideoCodec;
publishOptions?: Partial<TrackPublishOptions>;
roomOptions?: Partial<RoomOptions>;
remoteVideoElement?: HTMLVideoElement;
}

export type MediaConnectOptions = {
Expand Down Expand Up @@ -71,14 +191,21 @@ export class MediaChannel {
}

async connect(opts: MediaConnectOptions): Promise<void> {
this.room ??= new Room(REALTIME_CONFIG.livekit.roomOptions);
this.room ??= new Room({
...REALTIME_CONFIG.livekit.roomOptions,
...this.config.roomOptions,
});
const room = this.room;

room.on(RoomEvent.TrackSubscribed, (track: RemoteTrack, _pub, participant: RemoteParticipant) => {
if (!participant.identity.startsWith(REALTIME_CONFIG.livekit.inferenceServerIdentityPrefix)) return;
if (track.kind !== Track.Kind.Video && track.kind !== Track.Kind.Audio) return;
if (track.kind !== Track.Kind.Video) return;

track.attach();
if (this.config.remoteVideoElement) {
track.attach(this.config.remoteVideoElement);
} else {
track.attach();
}
const mediaStreamTrack = track.mediaStreamTrack;
if (mediaStreamTrack) {
this.remoteStream ??= new MediaStream();
Expand All @@ -103,6 +230,78 @@ export class MediaChannel {
this.config.observability?.setLiveKitRoom(room);
}

async getVideoStats(): Promise<RealtimeVideoStats> {
const room = this.room;
if (!room) return { sender: [], receiver: [] };

const sender: RealtimeVideoSenderStats[] = [];
for (const pub of room.localParticipant.videoTrackPublications.values()) {
const track = pub.track;
if (!(track instanceof LocalVideoTrack)) continue;

let layers: VideoSenderStats[] = [];
try {
layers = await track.getSenderStats();
} catch (error) {
this.logger.debug("getSenderStats failed", { error: (error as Error).message });
}

let report: RTCStatsReport | null = null;
try {
report = (await track.getRTCStatsReport()) ?? null;
} catch (error) {
this.logger.debug("getRTCStatsReport (sender) failed", { error: (error as Error).message });
}

const codecMime = collectCodecMimeMap(report);
const outbounds: RawOutboundRtp[] = [];
report?.forEach((stat: unknown) => {
const s = stat as RawOutboundRtp;
if (s.type === "outbound-rtp" && s.kind === "video") outbounds.push(s);
});

for (const layer of layers) {
const match = outbounds.find((o) => (o.rid ?? "") === (layer.rid ?? "")) ?? outbounds[0];
sender.push(mergeSenderLayer(layer, match, codecMime));
}
}

const receiver: RealtimeVideoReceiverStats[] = [];
for (const participant of room.remoteParticipants.values()) {
if (!participant.identity.startsWith(REALTIME_CONFIG.livekit.inferenceServerIdentityPrefix)) continue;
for (const pub of participant.videoTrackPublications.values()) {
const track = pub.track;
if (!(track instanceof RemoteVideoTrack)) continue;

let stats: VideoReceiverStats | undefined;
try {
stats = await track.getReceiverStats();
} catch (error) {
this.logger.debug("getReceiverStats failed", { error: (error as Error).message });
}
if (!stats) continue;

let report: RTCStatsReport | null = null;
try {
report = (await track.getRTCStatsReport()) ?? null;
} catch (error) {
this.logger.debug("getRTCStatsReport (receiver) failed", { error: (error as Error).message });
}

const codecMime = collectCodecMimeMap(report);
let inbound: RawInboundRtp | undefined;
report?.forEach((stat: unknown) => {
const s = stat as RawInboundRtp;
if (!inbound && s.type === "inbound-rtp" && s.kind === "video") inbound = s;
});

receiver.push(mergeReceiver(stats, inbound, codecMime));
}
}

return { sender, receiver };
}

async publishLocalTracks(): Promise<void> {
if (!this.config.localStream) return;
this.config.observability?.startPhase("publish-local-track");
Expand All @@ -122,12 +321,17 @@ export class MediaChannel {

private async publishTracks(stream: MediaStream): Promise<void> {
if (!this.room) return;
for (const track of stream.getTracks()) {
if (track.kind === "video") {
await this.room.localParticipant.publishTrack(track, getDefaultVideoPublishOptions(this.config.videoCodec));
} else {
await this.room.localParticipant.publishTrack(track);
}
for (const track of stream.getVideoTracks()) {
const publishOptions = getDefaultVideoPublishOptions(this.config.videoCodec, this.config.publishOptions);
this.logger.info("livekit: publishing video track", {
videoCodec: publishOptions.videoCodec,
simulcast: publishOptions.simulcast,
scalabilityMode: publishOptions.scalabilityMode,
degradationPreference: publishOptions.degradationPreference,
maxBitrate: publishOptions.videoEncoding?.maxBitrate,
maxFramerate: publishOptions.videoEncoding?.maxFramerate,
});
await this.room.localParticipant.publishTrack(track, publishOptions);
}
}
}
Loading
Loading