Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
f647f84
feat: add client protocol tracking to remote participants
1egoman Mar 9, 2026
44568bc
feat: add client protocol advertisement code, only advertise client p…
1egoman Mar 9, 2026
4621792
feat: add initial first implementation pass
1egoman Mar 9, 2026
0d07cf3
feat: ensure that payload is streamed when compressed, not all buffer…
1egoman Mar 9, 2026
f7e2af3
feat: use a data streams transmission approach much closer to what lu…
1egoman Mar 9, 2026
e3475fe
fix: add long rpc message to example
1egoman Mar 9, 2026
c7ca54b
feat: cleanup code and don't create RPCRequest / RPCResponse in data …
1egoman Mar 10, 2026
23fd785
fix: make functions into arrow fns to work around `this` being unset
1egoman Mar 10, 2026
d6332fb
refactor: break up long if / else if / else chain
1egoman Mar 10, 2026
1678a63
fix: add isCallerStillConnected to ensure that rpc drops responses fo…
1egoman Mar 10, 2026
7c3eafd
fix: adjust copyright headers and remove obsolete rpc exports
1egoman Mar 10, 2026
fe4e887
fix: flip around response timeout nan check
1egoman Mar 10, 2026
42eceef
fix: adjust compression / data stream thresholds to not be inclusive
1egoman Mar 10, 2026
5f89148
fix: remove small payload uncompressed path
1egoman Mar 13, 2026
1f67e47
refactor: run npm run format
1egoman Mar 13, 2026
1774046
feat: commit (updated) rpc benchmark
1egoman Mar 13, 2026
b1248eb
fix: remove references to "small response", uncompressed payloads are…
1egoman Mar 27, 2026
8da9c59
refactor: move ack code into handleIncomingRpcAck
1egoman Mar 27, 2026
9b20f07
refactor: extract all rpc related packet sending code out of the engi…
1egoman Mar 27, 2026
e24cd15
feat: add initial rpc client manager / rpc server manager focused tests
1egoman Mar 27, 2026
bf666c2
feat: port over all existing rpc tests to call RpcClientManager / Rpc…
1egoman Mar 27, 2026
1cfb5c4
fix: address tsc warnings
1egoman Mar 27, 2026
671664c
fix: remove stale throws import
1egoman Mar 27, 2026
cbae080
feat: migrate rpc to use data streams for sending rpc requests
1egoman Mar 30, 2026
18bac37
feat: migrate RpcClientManager / RpcServerManager to not be tightly c…
1egoman Mar 31, 2026
bc788e5
fix: address fallout from bad rebase
1egoman Apr 2, 2026
8a75a8a
fix: remove DataPacket_Kind from RpcClientManager / RpcServerManager
1egoman Apr 2, 2026
d97becb
fix: adjust rpc client / server manager tests to fix type errors
1egoman Apr 2, 2026
5597802
feat: remove compression from this for now
1egoman Apr 2, 2026
bc44f20
feat: consistency use text streams everywhere for rpc, not byte streams
1egoman Apr 3, 2026
4e40eb6
fix: add ack to rpc messages also in the data stream case
1egoman Apr 3, 2026
5ed8c39
fix: make rpc resilient to engine teardowns
1egoman Apr 3, 2026
d7a8a1e
fix: remove LLM generated file headers
1egoman Apr 3, 2026
f2bc189
feat: make handleIncomingRpcRequest take a RpcRequest
1egoman Apr 3, 2026
ca28e0d
feat: add explicit version field to data streams rpc (this is now exp…
1egoman Apr 3, 2026
e4e985f
fix: rename client protocol version var to match new implementation p…
1egoman Apr 3, 2026
d048d8b
fix: convert console.error -> log.error
1egoman Apr 3, 2026
73e300d
refactor: rename MAX_LEGACY_PAYLOAD_BYTES => MAX_V1_PAYLOAD_BYTES
1egoman Apr 3, 2026
0fb650f
fix: add docs comments for v1 / v2 rpcs
1egoman Apr 3, 2026
bf2a484
fix: adjust tests over to use new handleIncomingRpcRequest signature
1egoman Apr 3, 2026
c2c42d9
fix: run npm run format
1egoman Apr 3, 2026
4b6b23d
fix: remove dead code in rpc client manager test
1egoman Apr 3, 2026
63a4e0c
feat: add tests for v2 -> v2 rpc messages
1egoman Apr 3, 2026
ff75778
fix: update tests to exercise new v1 data streams with long payload p…
1egoman Apr 3, 2026
241bbe5
fix: make rpc example payload size longer
1egoman Apr 3, 2026
0632d40
fix: run npm run format
1egoman Apr 3, 2026
4ccc761
feat: make a few small changes to the rpc v2 protocol
1egoman Apr 3, 2026
d9e1e6e
fix: npm run format
1egoman Apr 3, 2026
0ac0561
fix: add missing changeset
1egoman Apr 3, 2026
0dd63ca
fix: remove more LLM added copyright notices
1egoman Apr 3, 2026
a4ed27d
refactor: reorganize some code / cosmetic changes
1egoman Apr 3, 2026
07ff61a
feat: stop checking in rpc-benchmark
1egoman Apr 3, 2026
f2b194f
docs: check in draft spec
1egoman Apr 3, 2026
f69d8a0
refactor: use named constants for rpc versions
1egoman Apr 9, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/chubby-buckets-drop.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
'livekit-client': patch
---

Add new RPC protocol updates to support infinite payload length in requests / responses
349 changes: 349 additions & 0 deletions RPC_SPEC.md

Large diffs are not rendered by default.

34 changes: 34 additions & 0 deletions examples/rpc/rpc-demo.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,13 @@ async function main() {
console.error('Error:', error);
}

try {
console.log('\n\nRunning send long info example...');
await Promise.all([performSendVeryLongInfo(callersRoom)]);
} catch (error) {
console.error('Error:', error);
}

try {
console.log('\n\nRunning error handling example...');
await Promise.all([performDivision(callersRoom)]);
Expand Down Expand Up @@ -85,6 +92,18 @@ const registerReceiverMethods = async (greetersRoom: Room, mathGeniusRoom: Room)
},
);

await greetersRoom.registerRpcMethod(
'exchanging-long-info',
// eslint-disable-next-line @typescript-eslint/no-unused-vars
async (data: RpcInvocationData) => {
console.log(
`[Greeter] ${data.callerIdentity} has arrived and said that its long info is "${data.payload}"`,
);
await new Promise((resolve) => setTimeout(resolve, 2000));
return new Array<string>(20_000).fill('Y').join('');
},
);

await mathGeniusRoom.registerRpcMethod('square-root', async (data: RpcInvocationData) => {
const jsonData = JSON.parse(data.payload);
const number = jsonData.number;
Expand Down Expand Up @@ -136,6 +155,21 @@ const performGreeting = async (room: Room): Promise<void> => {
}
};

const performSendVeryLongInfo = async (room: Room): Promise<void> => {
console.log('[Caller] Sending the greeter a very long message');
try {
const response = await room.localParticipant.performRpc({
destinationIdentity: 'greeter',
method: 'exchanging-long-info',
payload: new Array<string>(20_000).fill('X').join(''),
});
console.log(`[Caller] The greeter's long info is: "${response}"`);
} catch (error) {
console.error('[Caller] RPC call failed:', error);
throw error;
}
};

const performDisconnection = async (room: Room): Promise<void> => {
console.log('[Caller] Checking back in on the greeter...');
try {
Expand Down
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@
},
"dependencies": {
"@livekit/mutex": "1.1.1",
"@livekit/protocol": "1.44.0",
"@livekit/protocol": "1.45.0",
"events": "^3.3.0",
"jose": "^6.1.0",
"loglevel": "^1.9.2",
Expand Down
10 changes: 5 additions & 5 deletions pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 0 additions & 26 deletions src/room/RTCEngine.ts
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ import {
Room as RoomModel,
RoomMovedResponse,
RpcAck,
RpcResponse,
ServerInfo,
SessionDescription,
SignalTarget,
Expand Down Expand Up @@ -74,7 +73,6 @@ import {
UnexpectedConnectionState,
} from './errors';
import { EngineEvent } from './events';
import { RpcError } from './rpc';
import CriticalTimers from './timers';
import type LocalTrack from './track/LocalTrack';
import type LocalTrackPublication from './track/LocalTrackPublication';
Expand Down Expand Up @@ -1391,30 +1389,6 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit
});
};

/** @internal */
async publishRpcResponse(
destinationIdentity: string,
requestId: string,
payload: string | null,
error: RpcError | null,
) {
const packet = new DataPacket({
destinationIdentities: [destinationIdentity],
kind: DataPacket_Kind.RELIABLE,
value: {
case: 'rpcResponse',
value: new RpcResponse({
requestId,
value: error
? { case: 'error', value: error.toProto() }
: { case: 'payload', value: payload ?? '' },
}),
},
});

await this.sendDataPacket(packet, DataChannelKind.RELIABLE);
}

/** @internal */
async publishRpcAck(destinationIdentity: string, requestId: string) {
const packet = new DataPacket({
Expand Down
164 changes: 83 additions & 81 deletions src/room/Room.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import type {
} from '../options';
import TypedPromise from '../utils/TypedPromise';
import { getBrowser } from '../utils/browserParser';
import { CLIENT_PROTOCOL_DEFAULT } from '../version';
import { BackOffStrategy } from './BackOffStrategy';
import DeviceManager from './DeviceManager';
import RTCEngine, { DataChannelKind } from './RTCEngine';
Expand Down Expand Up @@ -78,7 +79,14 @@ import LocalParticipant from './participant/LocalParticipant';
import Participant from './participant/Participant';
import { type ConnectionQuality, ParticipantKind } from './participant/Participant';
import RemoteParticipant from './participant/RemoteParticipant';
import { MAX_PAYLOAD_BYTES, RpcError, type RpcInvocationData, byteLength } from './rpc';
import {
RPC_REQUEST_DATA_STREAM_TOPIC,
RPC_RESPONSE_DATA_STREAM_TOPIC,
RpcClientManager,
RpcError,
type RpcInvocationData,
RpcServerManager,
} from './rpc';
import CriticalTimers from './timers';
import LocalAudioTrack from './track/LocalAudioTrack';
import type LocalTrack from './track/LocalTrack';
Expand Down Expand Up @@ -214,7 +222,9 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)

private outgoingDataTrackManager: OutgoingDataTrackManager;

private rpcHandlers: Map<string, (data: RpcInvocationData) => Promise<string>> = new Map();
private rpcClientManager: RpcClientManager;

private rpcServerManager: RpcServerManager;

get hasE2EESetup(): boolean {
return this.e2eeManager !== undefined;
Expand Down Expand Up @@ -292,16 +302,37 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
this.engine.sendLossyBytes(bytes, DataChannelKind.DATA_TRACK_LOSSY, 'wait');
});

this.registerRpcDataStreamHandler();

this.rpcClientManager = new RpcClientManager(
this.log,
this.outgoingDataStreamManager,
this.getRemoteParticipantClientProtocol,
() => this.engine.latestJoinResponse?.serverInfo?.version,
);
this.rpcClientManager.on('sendDataPacket', ({ packet }) => {
this.engine?.sendDataPacket(packet, DataChannelKind.RELIABLE);
});
this.rpcServerManager = new RpcServerManager(
this.log,
this.outgoingDataStreamManager,
this.getRemoteParticipantClientProtocol,
);
this.rpcServerManager.on('sendDataPacket', ({ packet }) => {
this.engine?.sendDataPacket(packet, DataChannelKind.RELIABLE);
});

this.disconnectLock = new Mutex();

this.localParticipant = new LocalParticipant(
'',
'',
this.engine,
this.options,
this.rpcHandlers,
this.outgoingDataStreamManager,
this.outgoingDataTrackManager,
this.rpcClientManager,
this.rpcServerManager,
);

if (this.options.e2ee || this.options.encryption) {
Expand Down Expand Up @@ -390,12 +421,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
* Other errors thrown in your handler will not be transmitted as-is, and will instead arrive to the caller as `1500` ("Application Error").
*/
registerRpcMethod(method: string, handler: (data: RpcInvocationData) => Promise<string>) {
if (this.rpcHandlers.has(method)) {
throw Error(
`RPC handler already registered for method ${method}, unregisterRpcMethod before trying to register again`,
);
}
this.rpcHandlers.set(method, handler);
this.rpcServerManager.registerRpcMethod(method, handler);
}

/**
Expand All @@ -404,7 +430,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
* @param method - The name of the RPC method to unregister
*/
unregisterRpcMethod(method: string) {
this.rpcHandlers.delete(method);
this.rpcServerManager.unregisterRpcMethod(method);
}

/**
Expand Down Expand Up @@ -1795,7 +1821,7 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
});
this.emit(RoomEvent.ParticipantDisconnected, participant);
participant.setDisconnected();
this.localParticipant?.handleParticipantDisconnected(participant.identity);
this.rpcClientManager.handleParticipantDisconnected(participant.identity);
}

// updates are sent only when there's a change to speaker ordering
Expand Down Expand Up @@ -1939,14 +1965,31 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
this.handleDataStream(packet, encryptionType);
} else if (packet.value.case === 'rpcRequest') {
const rpc = packet.value.value;
this.handleIncomingRpcRequest(
packet.participantIdentity,
rpc.id,
rpc.method,
rpc.payload,
rpc.responseTimeoutMs,
rpc.version,
);
this.rpcServerManager.handleIncomingRpcRequest(packet.participantIdentity, rpc);
} else if (packet.value.case === 'rpcResponse') {
const rpcResponse = packet.value.value;
switch (rpcResponse.value.case) {
case 'payload':
this.rpcClientManager.handleIncomingRpcResponseSuccess(
rpcResponse.requestId,
rpcResponse.value.value,
);
break;
case 'error':
this.rpcClientManager.handleIncomingRpcResponseFailure(
rpcResponse.requestId,
RpcError.fromProto(rpcResponse.value.value),
);
break;
default:
this.log.warn(
`Unknown rpcResponse.value.case: ${rpcResponse.value.case}`,
this.logContext,
);
break;
}
} else if (packet.value.case === 'rpcAck') {
this.rpcClientManager.handleIncomingRpcAck(packet.value.value.requestId);
}
};

Expand Down Expand Up @@ -2010,68 +2053,6 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
this.incomingDataStreamManager.handleDataStreamPacket(packet, encryptionType);
};

private async handleIncomingRpcRequest(
callerIdentity: string,
requestId: string,
method: string,
payload: string,
responseTimeout: number,
version: number,
) {
await this.engine.publishRpcAck(callerIdentity, requestId);

if (version !== 1) {
await this.engine.publishRpcResponse(
callerIdentity,
requestId,
null,
RpcError.builtIn('UNSUPPORTED_VERSION'),
);
return;
}

const handler = this.rpcHandlers.get(method);

if (!handler) {
await this.engine.publishRpcResponse(
callerIdentity,
requestId,
null,
RpcError.builtIn('UNSUPPORTED_METHOD'),
);
return;
}

let responseError: RpcError | null = null;
let responsePayload: string | null = null;

try {
const response = await handler({
requestId,
callerIdentity,
payload,
responseTimeout,
});
if (byteLength(response) > MAX_PAYLOAD_BYTES) {
responseError = RpcError.builtIn('RESPONSE_PAYLOAD_TOO_LARGE');
this.log.warn(`RPC Response payload too large for ${method}`);
} else {
responsePayload = response;
}
} catch (error) {
if (error instanceof RpcError) {
responseError = error;
} else {
this.log.warn(
`Uncaught error returned by RPC handler for ${method}. Returning APPLICATION_ERROR instead.`,
error,
);
responseError = RpcError.builtIn('APPLICATION_ERROR');
}
}
await this.engine.publishRpcResponse(callerIdentity, requestId, responsePayload, responseError);
}

bufferedSegments: Map<string, TranscriptionSegmentModel> = new Map();

private handleAudioPlaybackStarted = () => {
Expand Down Expand Up @@ -2412,6 +2393,27 @@ class Room extends (EventEmitter as new () => TypedEmitter<RoomEventCallbacks>)
}
}

private getRemoteParticipantClientProtocol = (identity: Participant['identity']) => {
return this.remoteParticipants.get(identity)?.clientProtocol ?? CLIENT_PROTOCOL_DEFAULT;
};

private registerRpcDataStreamHandler() {
this.incomingDataStreamManager.registerTextStreamHandler(
RPC_REQUEST_DATA_STREAM_TOPIC,
async (reader, { identity }) => {
const attributes = reader.info.attributes ?? {};
await this.rpcServerManager.handleIncomingDataStream(reader, identity, attributes);
},
);
this.incomingDataStreamManager.registerTextStreamHandler(
RPC_RESPONSE_DATA_STREAM_TOPIC,
async (reader) => {
const attributes = reader.info.attributes ?? {};
await this.rpcClientManager.handleIncomingDataStream(reader, attributes);
},
);
}

private registerConnectionReconcile() {
this.clearConnectionReconcile();
let consecutiveFailures = 0;
Expand Down
Loading
Loading