diff --git a/.changeset/chubby-buckets-drop.md b/.changeset/chubby-buckets-drop.md new file mode 100644 index 0000000000..a5678201c5 --- /dev/null +++ b/.changeset/chubby-buckets-drop.md @@ -0,0 +1,5 @@ +--- +'livekit-client': patch +--- + +Add new RPC protocol updates to support infinite payload length in requests / responses diff --git a/RPC_SPEC.md b/RPC_SPEC.md new file mode 100644 index 0000000000..857bbaf57d --- /dev/null +++ b/RPC_SPEC.md @@ -0,0 +1,349 @@ +# RPC v2 Specification + +## Overview + +RPC (Remote Procedure Call) allows participants in a LiveKit room to invoke methods on each other +and receive responses. RPC v1 used inline protobuf packets (`RpcRequest` / `RpcResponse`) with a +hard 15 KB payload limit. + +RPC v2 lifts this limit by transporting request and response payloads over **data streams**, while +retaining v1 as a fallback for legacy clients. This removes the previously set 15kb request / +response payload size limitation, making both effectively limitless. + +A v2 client should seamlessly communicate with v1 clients by detecting the remote participant's +client protocol version and falling back to v1 packets if it doesn't support rpc v2. + +--- + +## Part 1: Client protocol + +### What is client protocol? + +`clientProtocol` is a new integer that each participant advertises in their `ParticipantInfo` via +the LiveKit signaling channel. It tells other participants what client-to-client features this SDK +supports. It is distinct from the existing `protocol` field (which tracks signaling protocol +version) - `clientProtocol` specifically governs peer-to-peer feature negotiation. + +| Value | Constant name | Meaning | +|-------|---------------|---------| +| `0` | `CLIENT_PROTOCOL_DEFAULT` | Legacy client. Only supports RPC v1. | +| `1` | `CLIENT_PROTOCOL_DATA_STREAM_RPC` | Supports RPC v2 (data stream-based payloads). | + +### What SDKs need to do + +1. **Advertise**: Set your SDK's `clientProtocol` to `1` (`CLIENT_PROTOCOL_DATA_STREAM_RPC`) in the + `ParticipantInfo` sent during the join handshake. + +2. **Read**: When a remote participant joins or updates, store their `clientProtocol` value. This is + available on the `ParticipantInfo` protobuf. If the field is absent or unrecognized, treat it as + `0` (`CLIENT_PROTOCOL_DEFAULT`). + +3. **Use**: Before sending an RPC request or response, look up the remote participant's + `clientProtocol` to decide whether to use the v1 (packet) or v2 (data stream) transport. + +--- + +## Part 2: RPC protocol updates + +As a review, here is how RPC v1 works today: + +``` +Caller Handler + | | + |--- RpcRequest (DataPacket) ------->| + | | (looks up handler, invokes it) + |<-- RpcAck (DataPacket) -----------| + |<-- RpcResponse (DataPacket) ------| + | | +``` + +1. The **caller** sends a `DataPacket` containing a `RpcRequest` protobuf: + - `id`: a UUID identifying this request + - `method`: the method name + - `payload`: the request payload string (must be <= 15 KB) + - `responseTimeoutMs`: the effective timeout in milliseconds + - `version`: `1` + - Packet kind: `RELIABLE` + - Destination: the handler's identity + +2. The **handler** receives the `RpcRequest` packet and immediately sends an **ack** - a + `DataPacket` containing a `RpcAck` protobuf with the `requestId`. This tells the caller that + the handler is alive and processing. + +3. The handler invokes the registered method handler. When it completes, the handler sends a + `DataPacket` containing a `RpcResponse` protobuf: + - `requestId`: matches the original request + - `value`: either `{ case: 'payload', value: responseString }` for success, or + `{ case: 'error', value: RpcError protobuf }` for failure + - The response payload is also subject to the 15 KB limit. + +4. The **caller** receives the `RpcResponse` and resolves or rejects the pending promise. + +### RPC v2 Example + +v2 replaces the `RpcRequest` and `RpcResponse` protobuf packets with **text data streams** for +carrying payloads. The ack mechanism is unchanged. This removes the payload size limit while +remaining backward-compatible with v1 clients. + +``` +Caller Handler + | | + |--- Text data stream (request) --->| + | topic: "lk.rpc_request" | + | attrs: request_id, method, | + | timeout, version=2 | + | body: | + | | (reads stream, looks up handler, invokes it) + |<-- RpcAck (DataPacket) -----------| + |<-- Text data stream (response) ---| + | topic: "lk.rpc_response" | + | attrs: request_id | + | body: | + | | +``` + +1. The **caller** opens a text data stream with: + - **Topic**: `lk.rpc_request` + - **Destination identities**: `[destinationIdentity]` + - **Attributes**: + - `lk.rpc_request_id`: a newly generated UUID + - `lk.rpc_request_method`: the method name + - `lk.rpc_request_response_timeout_ms`: the effective timeout in milliseconds, as a string + - `lk.rpc_request_version`: `"2"` + - Writes the payload string to the stream, then closes it. + +2. The **handler** receives the data stream on topic `lk.rpc_request`. It parses the attributes + to extract the request ID, method, timeout, and version. It sends an **ack** (same `RpcAck` + packet as v1), then reads the full stream payload. + +3. The handler invokes the registered method handler. On success, it sends the response as a + text data stream: + - **Topic**: `lk.rpc_response` + - **Destination identities**: `[callerIdentity]` + - **Attributes**: `{ "lk.rpc_request_id": requestId }` + - Writes the response payload, then closes the stream. + +4. The **caller** receives the data stream on topic `lk.rpc_response`. It reads the + `lk.rpc_request_id` attribute to match it to the pending request, reads the full stream, + and resolves the pending promise with the payload. + +The user-facing API should be identical for v1 and v2. + +The protocol version negotiation is invisible to the user. The only visible difference that a user +should see is that if they send a rpc request or receive a rpc response from a participant +supporting rpc v2 with a length greater than 15kb, they will NOT receive a +`REQUEST_PAYLOAD_TOO_LARGE` / `RESPONSE_PAYLOAD_TOO_LARGE` error - it will "just work". With rpc v2, +these errors are effectively deprecated. + +#### Error responses in v2 + +**Error responses are always sent as v1 `RpcResponse` packets**, even when both sides are v2. This +is because error payloads tend to be small (code + message + optional data) and using packets keeps +the error path simple and uniform. This means: + +- Success responses between two v2 clients: **data stream** +- Error responses between two v2 clients: **packet** (`RpcResponse` with `error` case) +- All responses to v1 clients: **packet** + +#### Data stream topic routing + +RPC requests and responses use separate data stream topics: + +- **`lk.rpc_request`**: Register a text stream handler for this topic. Incoming streams are RPC + requests. Route to the handler-side logic, passing the sender identity and the stream attributes. +- **`lk.rpc_response`**: Register a text stream handler for this topic. Incoming streams are RPC + responses. Read the `lk.rpc_request_id` attribute to match the response to a pending request, + then route to the caller-side logic. + +### Version negotiation and backward compatibility + +The transport used for a given RPC call depends on what both sides support. The caller decides the +request transport; the handler decides the response transport. + +| Caller | Handler | Request transport | Response transport | +|--------|---------|------------------|--------------------| +| v2 | v2 | Data stream | Data stream (success) / Packet (error) | +| v2 | v1 | Packet (`RpcRequest`) | Packet (`RpcResponse`) | +| v1 | v2 | Packet (`RpcRequest`) | Packet (`RpcResponse`) | +| v1 | v1 | Packet (`RpcRequest`) | Packet (`RpcResponse`) | + +**Data streams are only used when both sides are v2.** Cross-version interactions always fall back +to v1 packets. This is because: + +- The **caller** checks the remote participant's `clientProtocol` before sending. If the remote is + v1, the caller sends a v1 `RpcRequest` packet. +- The **handler** checks the caller's `clientProtocol` before responding. If the caller is v1, the + handler sends a v1 `RpcResponse` packet. (The handler knows the caller is v1 because the request + arrived as a v1 packet, and it can also check the caller's `clientProtocol`.) + +### Timeout and ack behavior + +These behaviors are the same for v1 and v2. + +## Minimum required test cases + +The following tests represent the minimum set that must pass for a conforming implementation. They +are organized by the version interaction being tested. Since this spec describes implementing a v2 +SDK, at least one side of every interaction is always v2. Each test describes the full lifecycle +from both the caller and handler perspectives. + +### v2 -> v2 (both sides support data streams) + +1. **Caller happy path (short payload)** + - The caller opens a text data stream on topic `lk.rpc_request` with attributes + `lk.rpc_request_id`, `lk.rpc_request_method`, `lk.rpc_request_response_timeout_ms`, and + `lk.rpc_request_version: "2"`. + - The caller writes the payload string to the stream and closes it. + - Verify no `RpcRequest` packet is produced. + - Simulate the handler sending a `RpcAck` packet and a successful response. + - Verify the caller resolves with the response payload. + +2. **Caller happy path (large payload > 15 KB)** + - The caller opens a text data stream on topic `lk.rpc_request` with the same attributes as + above, but with a payload exceeding 15 KB (e.g., 20,000 bytes). + - The caller writes the large payload to the stream and closes it. + - Verify no `REQUEST_PAYLOAD_TOO_LARGE` error is raised - the data stream path has no size + limit. + - Simulate the handler sending a `RpcAck` packet and a successful response. + - Verify the caller resolves with the response payload. + +3. **Handler happy path** + - The handler receives a text data stream on topic `lk.rpc_request` with valid attributes. + - The handler sends a `RpcAck` packet with the request ID. + - The handler reads the full stream payload and invokes the registered method handler with + `{ requestId, callerIdentity, payload, responseTimeout }`. + - The method handler returns a response string. + - The handler sends the response as a text data stream on topic `lk.rpc_response` with + attribute `lk.rpc_request_id` set to the request ID. + - Verify no `RpcResponse` packet is produced - successful v2 responses use data streams. + +4. **Unhandled error in handler** + - The handler receives a v2 data stream request. + - The handler sends a `RpcAck` packet. + - The registered method handler throws a non-RpcError exception (e.g., a generic `Error`). + - The handler sends a `RpcResponse` **packet** (not a data stream) with error code + `APPLICATION_ERROR` (1500). + - Verify error responses always use packets, even between two v2 clients. + +5. **RpcError passthrough in handler** + - The handler receives a v2 data stream request. + - The handler sends a `RpcAck` packet. + - The registered method handler throws a `RpcError` with a custom code (e.g., 101) and + message. + - The handler sends a `RpcResponse` packet preserving the original error code and message. + +6. **Response timeout** + - The caller sends a v2 data stream request with a short response timeout (e.g., 50ms). + - No `RpcAck` or response arrives. + - After the timeout elapses, the caller rejects with `RESPONSE_TIMEOUT` (code 1502). + +7. **Error response** + - The caller sends a v2 data stream request. + - Simulate the handler sending a `RpcAck` packet, then a `RpcResponse` packet with an error + (e.g., code 101, message "Test error message"). + - Verify the caller rejects with the correct error code and message. + +8. **Participant disconnection** + - The caller sends a v2 data stream request. + - Before any ack or response arrives, the remote participant disconnects. + - Verify the caller rejects with `RECIPIENT_DISCONNECTED` (code 1503). + +### v2 -> v1 (v2 caller, v1 handler) + +10. **Caller happy path (request fallback)** + - The caller detects the remote's `clientProtocol` is 0. + - The caller sends a v1 `RpcRequest` packet (not a data stream) with correct `id`, `method`, + `payload`, `responseTimeoutMs`, and `version: 1`. + - Verify no data stream is opened. + - Simulate the handler sending a `RpcAck` packet, then a `RpcResponse` packet with a + success payload. + - Verify the caller resolves with the response payload. + +11. **Handler happy path (v1 request)** + - The handler receives a v1 `RpcRequest` packet with `version: 1`. + - The handler sends a `RpcAck` packet with the request ID. + - The handler invokes the registered method handler with `{ requestId, callerIdentity, + payload, responseTimeout }`. + - The method handler returns a response string. + - The handler detects the caller's `clientProtocol` is 0 and sends the response as a v1 + `RpcResponse` packet (not a data stream). + +12. **Payload too large** + - The caller detects the remote's `clientProtocol` is 0. + - The caller attempts to send a payload exceeding 15 KB. + - Verify it rejects immediately with `REQUEST_PAYLOAD_TOO_LARGE` (code 1402) without producing + any packet or data stream. + +13. **Response timeout** + - The caller detects the remote's `clientProtocol` is 0. + - The caller sends a v1 `RpcRequest` packet with a short response timeout (e.g., 50ms). + - No `RpcAck` or response arrives. + - After the timeout elapses, the caller rejects with `RESPONSE_TIMEOUT` (code 1502). + +14. **Error response** + - The caller detects the remote's `clientProtocol` is 0. + - The caller sends a v1 `RpcRequest` packet. + - Simulate the handler sending a `RpcAck` packet, then a `RpcResponse` packet with an + error (e.g., code 101, message "Test error message"). + - Verify the caller rejects with the correct error code and message. + +15. **Participant disconnection** + - The caller detects the remote's `clientProtocol` is 0. + - The caller sends a v1 `RpcRequest` packet. + - Before any ack or response arrives, the remote participant disconnects. + - Verify the caller rejects with `RECIPIENT_DISCONNECTED` (code 1503). + +### v1 -> v2 (v1 caller, v2 handler) + +16. **Handler happy path (response fallback)** + - A v1 caller sends a v1 `RpcRequest` packet with `version: 1`. + - The v2-capable handler receives it and sends a `RpcAck` packet. + - The handler invokes the registered method handler, which returns a response string. + - The handler detects the caller's `clientProtocol` is 0 and sends the response as a v1 + `RpcResponse` packet (not a data stream), even though it supports v2. + - Verify no data stream is opened for the response. + +17. **Unhandled error in handler (v1 caller)** + - A v1 caller sends a v1 `RpcRequest` packet. + - The handler sends a `RpcAck` packet. + - The registered method handler throws a non-RpcError exception (e.g., a generic `Error`). + - The handler sends a `RpcResponse` packet with error code `APPLICATION_ERROR` (1500). + +18. **RpcError passthrough (v1 caller)** + - A v1 caller sends a v1 `RpcRequest` packet. + - The handler sends a `RpcAck` packet. + - The registered method handler throws a `RpcError` with a custom code (e.g., 101) and + message. + - The handler sends a `RpcResponse` packet preserving the original error code and message. + +--- + +## Benchmarking + +Implementing a benchmark is not required, but could be useful for validating correctness and +performance. The below outlines the test criteria used for `client-sdk-cpp` and `client-sdk-js`. + +For an exact reference implementation, see https://github.com/livekit/client-sdk-js/commit/da26fa022197326a8f31db5421f175fad2fe4651. + +### Approach + +The benchmark connects two participants to the same room in a single process: + +1. **Setup**: A "caller" and "receiver" join the same room. +2. **Echo handler**: The receiver registers an RPC method (`benchmark-echo`) that returns the + received payload unchanged. +3. **Payload**: Pre-generate a payload of the desired size. Compute a checksum (e.g., sum of + character codes) for integrity verification. +4. **Caller loop**: N concurrent workers each loop for a configured duration, calling the + echo method and verifying the response matches the original payload (length + checksum). +5. **Metrics**: Track success/failure counts, latency percentiles (p50, p95, p99), throughput + (calls/sec), and error breakdown. + +### Suggested parameters + +| Parameter | Suggested default | Description | +|-----------|-------------------|-------------| +| Payload size | 15360 bytes | Size of the RPC payload in bytes | +| Duration | 30 seconds | How long the benchmark runs | +| Concurrency | 3 | Number of parallel caller loops | +| Delay between calls | 10ms | Pause between consecutive calls per thread | diff --git a/examples/rpc/rpc-demo.ts b/examples/rpc/rpc-demo.ts index 4815754781..95ab445e9a 100644 --- a/examples/rpc/rpc-demo.ts +++ b/examples/rpc/rpc-demo.ts @@ -36,6 +36,13 @@ async function main() { console.error('Error:', error); } + try { + console.log('\n\nRunning send long info example...'); + await Promise.all([performSendVeryLongInfo(callersRoom)]); + } catch (error) { + console.error('Error:', error); + } + try { console.log('\n\nRunning error handling example...'); await Promise.all([performDivision(callersRoom)]); @@ -85,6 +92,18 @@ const registerReceiverMethods = async (greetersRoom: Room, mathGeniusRoom: Room) }, ); + await greetersRoom.registerRpcMethod( + 'exchanging-long-info', + // eslint-disable-next-line @typescript-eslint/no-unused-vars + async (data: RpcInvocationData) => { + console.log( + `[Greeter] ${data.callerIdentity} has arrived and said that its long info is "${data.payload}"`, + ); + await new Promise((resolve) => setTimeout(resolve, 2000)); + return new Array(20_000).fill('Y').join(''); + }, + ); + await mathGeniusRoom.registerRpcMethod('square-root', async (data: RpcInvocationData) => { const jsonData = JSON.parse(data.payload); const number = jsonData.number; @@ -136,6 +155,21 @@ const performGreeting = async (room: Room): Promise => { } }; +const performSendVeryLongInfo = async (room: Room): Promise => { + console.log('[Caller] Sending the greeter a very long message'); + try { + const response = await room.localParticipant.performRpc({ + destinationIdentity: 'greeter', + method: 'exchanging-long-info', + payload: new Array(20_000).fill('X').join(''), + }); + console.log(`[Caller] The greeter's long info is: "${response}"`); + } catch (error) { + console.error('[Caller] RPC call failed:', error); + throw error; + } +}; + const performDisconnection = async (room: Room): Promise => { console.log('[Caller] Checking back in on the greeter...'); try { diff --git a/package.json b/package.json index 02c97a227f..f7de11dbc6 100644 --- a/package.json +++ b/package.json @@ -57,7 +57,7 @@ }, "dependencies": { "@livekit/mutex": "1.1.1", - "@livekit/protocol": "1.44.0", + "@livekit/protocol": "1.45.0", "events": "^3.3.0", "jose": "^6.1.0", "loglevel": "^1.9.2", diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 4a968f6b2f..e1400fe9c8 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -12,8 +12,8 @@ importers: specifier: 1.1.1 version: 1.1.1 '@livekit/protocol': - specifier: 1.44.0 - version: 1.44.0 + specifier: 1.45.0 + version: 1.45.0 '@types/dom-mediacapture-record': specifier: ^1 version: 1.0.22 @@ -1128,8 +1128,8 @@ packages: '@livekit/mutex@1.1.1': resolution: {integrity: sha512-EsshAucklmpuUAfkABPxJNhzj9v2sG7JuzFDL4ML1oJQSV14sqrpTYnsaOudMAw9yOaW53NU3QQTlUQoRs4czw==} - '@livekit/protocol@1.44.0': - resolution: {integrity: sha512-/vfhDUGcUKO8Q43r6i+5FrDhl5oZjm/X3U4x2Iciqvgn5C8qbj+57YPcWSJ1kyIZm5Cm6AV2nAPjMm3ETD/iyg==} + '@livekit/protocol@1.45.0': + resolution: {integrity: sha512-z22Ej7RRBFm5uVZpU7kBHOdDwZV6Hz+1crCOrse2g7yx8TcHXG0bKnOKwyN/meD233nEDlU2IHNCoT8Vq8lvtg==} '@livekit/throws-transformer@0.1.3': resolution: {integrity: sha512-PBttE6W6g/2ALGu6kWOunZ5qdrXwP9Ge1An2/62OfE6Rhc0Abd4yp6ex2pWhwUfGxDsSZvFgoB1Ia/5mWAMuKQ==} @@ -5279,7 +5279,7 @@ snapshots: '@livekit/mutex@1.1.1': {} - '@livekit/protocol@1.44.0': + '@livekit/protocol@1.45.0': dependencies: '@bufbuild/protobuf': 1.10.1 diff --git a/src/room/RTCEngine.ts b/src/room/RTCEngine.ts index 69f9023552..c6943c9e00 100644 --- a/src/room/RTCEngine.ts +++ b/src/room/RTCEngine.ts @@ -25,7 +25,6 @@ import { Room as RoomModel, RoomMovedResponse, RpcAck, - RpcResponse, ServerInfo, SessionDescription, SignalTarget, @@ -74,7 +73,6 @@ import { UnexpectedConnectionState, } from './errors'; import { EngineEvent } from './events'; -import { RpcError } from './rpc'; import CriticalTimers from './timers'; import type LocalTrack from './track/LocalTrack'; import type LocalTrackPublication from './track/LocalTrackPublication'; @@ -1391,30 +1389,6 @@ export default class RTCEngine extends (EventEmitter as new () => TypedEventEmit }); }; - /** @internal */ - async publishRpcResponse( - destinationIdentity: string, - requestId: string, - payload: string | null, - error: RpcError | null, - ) { - const packet = new DataPacket({ - destinationIdentities: [destinationIdentity], - kind: DataPacket_Kind.RELIABLE, - value: { - case: 'rpcResponse', - value: new RpcResponse({ - requestId, - value: error - ? { case: 'error', value: error.toProto() } - : { case: 'payload', value: payload ?? '' }, - }), - }, - }); - - await this.sendDataPacket(packet, DataChannelKind.RELIABLE); - } - /** @internal */ async publishRpcAck(destinationIdentity: string, requestId: string) { const packet = new DataPacket({ diff --git a/src/room/Room.ts b/src/room/Room.ts index 8183ae0a22..d2bcbde39b 100644 --- a/src/room/Room.ts +++ b/src/room/Room.ts @@ -45,6 +45,7 @@ import type { } from '../options'; import TypedPromise from '../utils/TypedPromise'; import { getBrowser } from '../utils/browserParser'; +import { CLIENT_PROTOCOL_DEFAULT } from '../version'; import { BackOffStrategy } from './BackOffStrategy'; import DeviceManager from './DeviceManager'; import RTCEngine, { DataChannelKind } from './RTCEngine'; @@ -78,7 +79,14 @@ import LocalParticipant from './participant/LocalParticipant'; import Participant from './participant/Participant'; import { type ConnectionQuality, ParticipantKind } from './participant/Participant'; import RemoteParticipant from './participant/RemoteParticipant'; -import { MAX_PAYLOAD_BYTES, RpcError, type RpcInvocationData, byteLength } from './rpc'; +import { + RPC_REQUEST_DATA_STREAM_TOPIC, + RPC_RESPONSE_DATA_STREAM_TOPIC, + RpcClientManager, + RpcError, + type RpcInvocationData, + RpcServerManager, +} from './rpc'; import CriticalTimers from './timers'; import LocalAudioTrack from './track/LocalAudioTrack'; import type LocalTrack from './track/LocalTrack'; @@ -214,7 +222,9 @@ class Room extends (EventEmitter as new () => TypedEmitter) private outgoingDataTrackManager: OutgoingDataTrackManager; - private rpcHandlers: Map Promise> = new Map(); + private rpcClientManager: RpcClientManager; + + private rpcServerManager: RpcServerManager; get hasE2EESetup(): boolean { return this.e2eeManager !== undefined; @@ -292,6 +302,26 @@ class Room extends (EventEmitter as new () => TypedEmitter) this.engine.sendLossyBytes(bytes, DataChannelKind.DATA_TRACK_LOSSY, 'wait'); }); + this.registerRpcDataStreamHandler(); + + this.rpcClientManager = new RpcClientManager( + this.log, + this.outgoingDataStreamManager, + this.getRemoteParticipantClientProtocol, + () => this.engine.latestJoinResponse?.serverInfo?.version, + ); + this.rpcClientManager.on('sendDataPacket', ({ packet }) => { + this.engine?.sendDataPacket(packet, DataChannelKind.RELIABLE); + }); + this.rpcServerManager = new RpcServerManager( + this.log, + this.outgoingDataStreamManager, + this.getRemoteParticipantClientProtocol, + ); + this.rpcServerManager.on('sendDataPacket', ({ packet }) => { + this.engine?.sendDataPacket(packet, DataChannelKind.RELIABLE); + }); + this.disconnectLock = new Mutex(); this.localParticipant = new LocalParticipant( @@ -299,9 +329,10 @@ class Room extends (EventEmitter as new () => TypedEmitter) '', this.engine, this.options, - this.rpcHandlers, this.outgoingDataStreamManager, this.outgoingDataTrackManager, + this.rpcClientManager, + this.rpcServerManager, ); if (this.options.e2ee || this.options.encryption) { @@ -390,12 +421,7 @@ class Room extends (EventEmitter as new () => TypedEmitter) * Other errors thrown in your handler will not be transmitted as-is, and will instead arrive to the caller as `1500` ("Application Error"). */ registerRpcMethod(method: string, handler: (data: RpcInvocationData) => Promise) { - if (this.rpcHandlers.has(method)) { - throw Error( - `RPC handler already registered for method ${method}, unregisterRpcMethod before trying to register again`, - ); - } - this.rpcHandlers.set(method, handler); + this.rpcServerManager.registerRpcMethod(method, handler); } /** @@ -404,7 +430,7 @@ class Room extends (EventEmitter as new () => TypedEmitter) * @param method - The name of the RPC method to unregister */ unregisterRpcMethod(method: string) { - this.rpcHandlers.delete(method); + this.rpcServerManager.unregisterRpcMethod(method); } /** @@ -1795,7 +1821,7 @@ class Room extends (EventEmitter as new () => TypedEmitter) }); this.emit(RoomEvent.ParticipantDisconnected, participant); participant.setDisconnected(); - this.localParticipant?.handleParticipantDisconnected(participant.identity); + this.rpcClientManager.handleParticipantDisconnected(participant.identity); } // updates are sent only when there's a change to speaker ordering @@ -1939,14 +1965,31 @@ class Room extends (EventEmitter as new () => TypedEmitter) this.handleDataStream(packet, encryptionType); } else if (packet.value.case === 'rpcRequest') { const rpc = packet.value.value; - this.handleIncomingRpcRequest( - packet.participantIdentity, - rpc.id, - rpc.method, - rpc.payload, - rpc.responseTimeoutMs, - rpc.version, - ); + this.rpcServerManager.handleIncomingRpcRequest(packet.participantIdentity, rpc); + } else if (packet.value.case === 'rpcResponse') { + const rpcResponse = packet.value.value; + switch (rpcResponse.value.case) { + case 'payload': + this.rpcClientManager.handleIncomingRpcResponseSuccess( + rpcResponse.requestId, + rpcResponse.value.value, + ); + break; + case 'error': + this.rpcClientManager.handleIncomingRpcResponseFailure( + rpcResponse.requestId, + RpcError.fromProto(rpcResponse.value.value), + ); + break; + default: + this.log.warn( + `Unknown rpcResponse.value.case: ${rpcResponse.value.case}`, + this.logContext, + ); + break; + } + } else if (packet.value.case === 'rpcAck') { + this.rpcClientManager.handleIncomingRpcAck(packet.value.value.requestId); } }; @@ -2010,68 +2053,6 @@ class Room extends (EventEmitter as new () => TypedEmitter) this.incomingDataStreamManager.handleDataStreamPacket(packet, encryptionType); }; - private async handleIncomingRpcRequest( - callerIdentity: string, - requestId: string, - method: string, - payload: string, - responseTimeout: number, - version: number, - ) { - await this.engine.publishRpcAck(callerIdentity, requestId); - - if (version !== 1) { - await this.engine.publishRpcResponse( - callerIdentity, - requestId, - null, - RpcError.builtIn('UNSUPPORTED_VERSION'), - ); - return; - } - - const handler = this.rpcHandlers.get(method); - - if (!handler) { - await this.engine.publishRpcResponse( - callerIdentity, - requestId, - null, - RpcError.builtIn('UNSUPPORTED_METHOD'), - ); - return; - } - - let responseError: RpcError | null = null; - let responsePayload: string | null = null; - - try { - const response = await handler({ - requestId, - callerIdentity, - payload, - responseTimeout, - }); - if (byteLength(response) > MAX_PAYLOAD_BYTES) { - responseError = RpcError.builtIn('RESPONSE_PAYLOAD_TOO_LARGE'); - this.log.warn(`RPC Response payload too large for ${method}`); - } else { - responsePayload = response; - } - } catch (error) { - if (error instanceof RpcError) { - responseError = error; - } else { - this.log.warn( - `Uncaught error returned by RPC handler for ${method}. Returning APPLICATION_ERROR instead.`, - error, - ); - responseError = RpcError.builtIn('APPLICATION_ERROR'); - } - } - await this.engine.publishRpcResponse(callerIdentity, requestId, responsePayload, responseError); - } - bufferedSegments: Map = new Map(); private handleAudioPlaybackStarted = () => { @@ -2412,6 +2393,27 @@ class Room extends (EventEmitter as new () => TypedEmitter) } } + private getRemoteParticipantClientProtocol = (identity: Participant['identity']) => { + return this.remoteParticipants.get(identity)?.clientProtocol ?? CLIENT_PROTOCOL_DEFAULT; + }; + + private registerRpcDataStreamHandler() { + this.incomingDataStreamManager.registerTextStreamHandler( + RPC_REQUEST_DATA_STREAM_TOPIC, + async (reader, { identity }) => { + const attributes = reader.info.attributes ?? {}; + await this.rpcServerManager.handleIncomingDataStream(reader, identity, attributes); + }, + ); + this.incomingDataStreamManager.registerTextStreamHandler( + RPC_RESPONSE_DATA_STREAM_TOPIC, + async (reader) => { + const attributes = reader.info.attributes ?? {}; + await this.rpcClientManager.handleIncomingDataStream(reader, attributes); + }, + ); + } + private registerConnectionReconcile() { this.clearConnectionReconcile(); let consecutiveFailures = 0; diff --git a/src/room/participant/LocalParticipant.ts b/src/room/participant/LocalParticipant.ts index b7dd493d60..d13876c06e 100644 --- a/src/room/participant/LocalParticipant.ts +++ b/src/room/participant/LocalParticipant.ts @@ -11,9 +11,6 @@ import { ParticipantInfo, RequestResponse, RequestResponse_Reason, - RpcAck, - RpcRequest, - RpcResponse, SimulcastCodec, SipDTMF, SubscribedQualityUpdate, @@ -45,11 +42,11 @@ import { } from '../errors'; import { EngineEvent, ParticipantEvent, TrackEvent } from '../events'; import { - MAX_PAYLOAD_BYTES, type PerformRpcParams, + RpcClientManager, RpcError, type RpcInvocationData, - byteLength, + RpcServerManager, } from '../rpc'; import LocalAudioTrack from '../track/LocalAudioTrack'; import LocalTrack from '../track/LocalTrack'; @@ -85,7 +82,6 @@ import { } from '../types'; import { Future, - compareVersions, isAudioTrack, isE2EESimulcastSupported, isFireFox, @@ -151,12 +147,14 @@ export default class LocalParticipant extends Participant { private firstActiveAgent?: RemoteParticipant; - private rpcHandlers: Map Promise>; - private roomOutgoingDataStreamManager: OutgoingDataStreamManager; private roomOutgoingDataTrackManager: OutgoingDataTrackManager; + private rpcClientManager: RpcClientManager; + + private rpcServerManager: RpcServerManager; + private pendingSignalRequests: Map< number, { @@ -168,25 +166,16 @@ export default class LocalParticipant extends Participant { private enabledPublishVideoCodecs: Codec[] = []; - private pendingAcks = new Map void; participantIdentity: string }>(); - - private pendingResponses = new Map< - string, - { - resolve: (payload: string | null, error: RpcError | null) => void; - participantIdentity: string; - } - >(); - /** @internal */ constructor( sid: string, identity: string, engine: RTCEngine, options: InternalRoomOptions, - roomRpcHandlers: Map Promise>, roomOutgoingDataStreamManager: OutgoingDataStreamManager, roomOutgoingDataTrackManager: OutgoingDataTrackManager, + rpcClientManager: RpcClientManager, + rpcServerManager: RpcServerManager, ) { super(sid, identity, undefined, undefined, undefined, { loggerName: options.loggerName, @@ -204,9 +193,10 @@ export default class LocalParticipant extends Participant { ['audiooutput', 'default'], ]); this.pendingSignalRequests = new Map(); - this.rpcHandlers = roomRpcHandlers; this.roomOutgoingDataStreamManager = roomOutgoingDataStreamManager; this.roomOutgoingDataTrackManager = roomOutgoingDataTrackManager; + this.rpcClientManager = rpcClientManager; + this.rpcServerManager = rpcServerManager; } get lastCameraError(): Error | undefined { @@ -266,8 +256,7 @@ export default class LocalParticipant extends Participant { .on(EngineEvent.LocalTrackUnpublished, this.handleLocalTrackUnpublished) .on(EngineEvent.SubscribedQualityUpdate, this.handleSubscribedQualityUpdate) .on(EngineEvent.Closing, this.handleClosing) - .on(EngineEvent.SignalRequestResponse, this.handleSignalRequestResponse) - .on(EngineEvent.DataPacketReceived, this.handleDataPacket); + .on(EngineEvent.SignalRequestResponse, this.handleSignalRequestResponse); } private handleReconnecting = () => { @@ -351,27 +340,6 @@ export default class LocalParticipant extends Participant { } }; - private handleDataPacket = (packet: DataPacket) => { - switch (packet.value.case) { - case 'rpcResponse': - let rpcResponse = packet.value.value as RpcResponse; - let payload: string | null = null; - let error: RpcError | null = null; - - if (rpcResponse.value.case === 'payload') { - payload = rpcResponse.value.value; - } else if (rpcResponse.value.case === 'error') { - error = RpcError.fromProto(rpcResponse.value.value); - } - this.handleIncomingRpcResponse(rpcResponse.requestId, payload, error); - break; - case 'rpcAck': - let rpcAck = packet.value.value as RpcAck; - this.handleIncomingRpcAck(rpcAck.requestId); - break; - } - }; - /** * Sets and updates the metadata of the local participant. * Note: this requires `canUpdateOwnMetadata` permission. @@ -1824,69 +1792,9 @@ export default class LocalParticipant extends Participant { * @returns A promise that resolves with the response payload or rejects with an error. * @throws Error on failure. Details in `message`. */ - performRpc({ - destinationIdentity, - method, - payload, - responseTimeout = 15000, - }: PerformRpcParams): TypedPromise { - const maxRoundTripLatency = 7000; - const minEffectiveTimeout = maxRoundTripLatency + 1000; - - return new TypedPromise(async (resolve, reject) => { - if (byteLength(payload) > MAX_PAYLOAD_BYTES) { - reject(RpcError.builtIn('REQUEST_PAYLOAD_TOO_LARGE')); - return; - } - - if ( - this.engine.latestJoinResponse?.serverInfo?.version && - compareVersions(this.engine.latestJoinResponse?.serverInfo?.version, '1.8.0') < 0 - ) { - reject(RpcError.builtIn('UNSUPPORTED_SERVER')); - return; - } - - const effectiveTimeout = Math.max(responseTimeout, minEffectiveTimeout); - const id = crypto.randomUUID(); - await this.publishRpcRequest(destinationIdentity, id, method, payload, effectiveTimeout); - - const ackTimeoutId = setTimeout(() => { - this.pendingAcks.delete(id); - reject(RpcError.builtIn('CONNECTION_TIMEOUT')); - this.pendingResponses.delete(id); - clearTimeout(responseTimeoutId); - }, maxRoundTripLatency); - - this.pendingAcks.set(id, { - resolve: () => { - clearTimeout(ackTimeoutId); - }, - participantIdentity: destinationIdentity, - }); - - const responseTimeoutId = setTimeout(() => { - this.pendingResponses.delete(id); - reject(RpcError.builtIn('RESPONSE_TIMEOUT')); - }, responseTimeout); - - this.pendingResponses.set(id, { - resolve: (responsePayload: string | null, responseError: RpcError | null) => { - clearTimeout(responseTimeoutId); - if (this.pendingAcks.has(id)) { - this.log.warn('RPC response received before ack', id); - this.pendingAcks.delete(id); - clearTimeout(ackTimeoutId); - } - - if (responseError) { - reject(responseError); - } else { - resolve(responsePayload ?? ''); - } - }, - participantIdentity: destinationIdentity, - }); + performRpc(params: PerformRpcParams): TypedPromise { + return this.rpcClientManager.performRpc(params).then(([_id, completionPromise]) => { + return completionPromise; }); } @@ -1894,20 +1802,14 @@ export default class LocalParticipant extends Participant { * @deprecated use `room.registerRpcMethod` instead */ registerRpcMethod(method: string, handler: (data: RpcInvocationData) => Promise) { - if (this.rpcHandlers.has(method)) { - this.log.warn( - `you're overriding the RPC handler for method ${method}, in the future this will throw an error`, - ); - } - - this.rpcHandlers.set(method, handler); + this.rpcServerManager.registerRpcMethod(method, handler); } /** * @deprecated use `room.unregisterRpcMethod` instead */ unregisterRpcMethod(method: string) { - this.rpcHandlers.delete(method); + this.rpcServerManager.unregisterRpcMethod(method); } /** @@ -1938,72 +1840,6 @@ export default class LocalParticipant extends Participant { } } - private handleIncomingRpcAck(requestId: string) { - const handler = this.pendingAcks.get(requestId); - if (handler) { - handler.resolve(); - this.pendingAcks.delete(requestId); - } else { - console.error('Ack received for unexpected RPC request', requestId); - } - } - - private handleIncomingRpcResponse( - requestId: string, - payload: string | null, - error: RpcError | null, - ) { - const handler = this.pendingResponses.get(requestId); - if (handler) { - handler.resolve(payload, error); - this.pendingResponses.delete(requestId); - } else { - console.error('Response received for unexpected RPC request', requestId); - } - } - - /** @internal */ - private async publishRpcRequest( - destinationIdentity: string, - requestId: string, - method: string, - payload: string, - responseTimeout: number, - ) { - const packet = new DataPacket({ - destinationIdentities: [destinationIdentity], - kind: DataPacket_Kind.RELIABLE, - value: { - case: 'rpcRequest', - value: new RpcRequest({ - id: requestId, - method, - payload, - responseTimeoutMs: responseTimeout, - version: 1, - }), - }, - }); - - await this.engine.sendDataPacket(packet, DataChannelKind.RELIABLE); - } - - /** @internal */ - handleParticipantDisconnected(participantIdentity: string) { - for (const [id, { participantIdentity: pendingIdentity }] of this.pendingAcks) { - if (pendingIdentity === participantIdentity) { - this.pendingAcks.delete(id); - } - } - - for (const [id, { participantIdentity: pendingIdentity, resolve }] of this.pendingResponses) { - if (pendingIdentity === participantIdentity) { - resolve(null, RpcError.builtIn('RECIPIENT_DISCONNECTED')); - this.pendingResponses.delete(id); - } - } - } - /** @internal */ setEnabledPublishCodecs(codecs: Codec[]) { this.enabledPublishVideoCodecs = codecs.filter( diff --git a/src/room/participant/RemoteParticipant.ts b/src/room/participant/RemoteParticipant.ts index 8c9b8436dd..012697af81 100644 --- a/src/room/participant/RemoteParticipant.ts +++ b/src/room/participant/RemoteParticipant.ts @@ -6,6 +6,7 @@ import type { } from '@livekit/protocol'; import type { SignalClient } from '../../api/SignalClient'; import { DeferrableMap } from '../../utils/deferrable-map'; +import { CLIENT_PROTOCOL_DEFAULT } from '../../version'; import type RemoteDataTrack from '../data-track/RemoteDataTrack'; import { ParticipantEvent, TrackEvent } from '../events'; import RemoteAudioTrack from '../track/RemoteAudioTrack'; @@ -39,6 +40,11 @@ export default class RemoteParticipant extends Participant { signalClient: SignalClient; + /** A version number indicating the set of features that the report participant's client supports. + * @internal + **/ + clientProtocol: number; + private volumeMap: Map; private audioOutput?: AudioOutputOptions; @@ -58,6 +64,7 @@ export default class RemoteParticipant extends Participant { pi.attributes, loggerOptions, pi.kind, + pi.clientProtocol, ); } @@ -79,6 +86,7 @@ export default class RemoteParticipant extends Participant { attributes?: Record, loggerOptions?: LoggerOptions, kind: ParticipantKind = ParticipantKind.STANDARD, + clientProtocol: number = CLIENT_PROTOCOL_DEFAULT, ) { super(sid, identity || '', name, metadata, attributes, loggerOptions, kind); this.signalClient = signalClient; @@ -87,6 +95,7 @@ export default class RemoteParticipant extends Participant { this.videoTrackPublications = new Map(); this.dataTracks = new DeferrableMap(); this.volumeMap = new Map(); + this.clientProtocol = clientProtocol; } protected addTrackPublication(publication: RemoteTrackPublication) { diff --git a/src/room/rpc.test.ts b/src/room/rpc.test.ts deleted file mode 100644 index 5ac3d78606..0000000000 --- a/src/room/rpc.test.ts +++ /dev/null @@ -1,301 +0,0 @@ -import { DataPacket, DataPacket_Kind } from '@livekit/protocol'; -import { beforeEach, describe, expect, it, vi } from 'vitest'; -import type { InternalRoomOptions } from '../options'; -import type RTCEngine from './RTCEngine'; -import Room from './Room'; -import LocalParticipant from './participant/LocalParticipant'; -import { ParticipantKind } from './participant/Participant'; -import RemoteParticipant from './participant/RemoteParticipant'; -import { RpcError } from './rpc'; - -describe('LocalParticipant', () => { - describe('registerRpcMethod', () => { - let room: Room; - let mockSendDataPacket: ReturnType; - - beforeEach(() => { - mockSendDataPacket = vi.fn(); - - room = new Room(); - room.engine.client = { - sendUpdateLocalMetadata: vi.fn(), - }; - room.engine.on = vi.fn().mockReturnThis(); - room.engine.sendDataPacket = mockSendDataPacket; - - room.localParticipant.sid = 'test-sid'; - room.localParticipant.identity = 'test-identity'; - }); - - it('should register an RPC method handler', async () => { - const methodName = 'testMethod'; - const handler = vi.fn().mockResolvedValue('test response'); - - room.registerRpcMethod(methodName, handler); - - const mockCaller = new RemoteParticipant( - {} as any, - 'remote-sid', - 'remote-identity', - 'Remote Participant', - '', - undefined, - undefined, - ParticipantKind.STANDARD, - ); - - await room.handleIncomingRpcRequest( - mockCaller.identity, - 'test-request-id', - methodName, - 'test payload', - 5000, - 1, - ); - - expect(handler).toHaveBeenCalledWith({ - requestId: 'test-request-id', - callerIdentity: mockCaller.identity, - payload: 'test payload', - responseTimeout: 5000, - }); - - // Check if sendDataPacket was called twice (once for ACK and once for response) - expect(mockSendDataPacket).toHaveBeenCalledTimes(2); - - // Check if the first call was for ACK - expect(mockSendDataPacket.mock.calls[0][0].value.case).toBe('rpcAck'); - expect(mockSendDataPacket.mock.calls[0][1]).toBe(DataPacket_Kind.RELIABLE); - - // Check if the second call was for response - expect(mockSendDataPacket.mock.calls[1][0].value.case).toBe('rpcResponse'); - expect(mockSendDataPacket.mock.calls[1][1]).toBe(DataPacket_Kind.RELIABLE); - }); - - it('should catch and transform unhandled errors in the RPC method handler', async () => { - const methodName = 'errorMethod'; - const errorMessage = 'Test error'; - const handler = vi.fn().mockRejectedValue(new Error(errorMessage)); - - room.registerRpcMethod(methodName, handler); - - const mockCaller = new RemoteParticipant( - {} as any, - 'remote-sid', - 'remote-identity', - 'Remote Participant', - '', - undefined, - undefined, - ParticipantKind.STANDARD, - ); - - await room.handleIncomingRpcRequest( - mockCaller.identity, - 'test-error-request-id', - methodName, - 'test payload', - 5000, - 1, - ); - - expect(handler).toHaveBeenCalledWith({ - requestId: 'test-error-request-id', - callerIdentity: mockCaller.identity, - payload: 'test payload', - responseTimeout: 5000, - }); - - // Check if sendDataPacket was called twice (once for ACK and once for error response) - expect(mockSendDataPacket).toHaveBeenCalledTimes(2); - - // Check if the second call was for error response - const errorResponse = mockSendDataPacket.mock.calls[1][0].value.value.value.value; - expect(errorResponse.code).toBe(RpcError.ErrorCode.APPLICATION_ERROR); - }); - - it('should pass through RpcError thrown by the RPC method handler', async () => { - const methodName = 'rpcErrorMethod'; - const errorCode = 101; - const errorMessage = 'some-error-message'; - const handler = vi.fn().mockRejectedValue(new RpcError(errorCode, errorMessage)); - - room.localParticipant.registerRpcMethod(methodName, handler); - - const mockCaller = new RemoteParticipant( - {} as any, - 'remote-sid', - 'remote-identity', - 'Remote Participant', - '', - undefined, - undefined, - ParticipantKind.STANDARD, - ); - - await room.handleIncomingRpcRequest( - mockCaller.identity, - 'test-rpc-error-request-id', - methodName, - 'test payload', - 5000, - 1, - ); - - expect(handler).toHaveBeenCalledWith({ - requestId: 'test-rpc-error-request-id', - callerIdentity: mockCaller.identity, - payload: 'test payload', - responseTimeout: 5000, - }); - - // Check if sendDataPacket was called twice (once for ACK and once for error response) - expect(mockSendDataPacket).toHaveBeenCalledTimes(2); - - // Check if the second call was for error response - const errorResponse = mockSendDataPacket.mock.calls[1][0].value.value.value.value; - expect(errorResponse.code).toBe(errorCode); - expect(errorResponse.message).toBe(errorMessage); - }); - }); - - describe('performRpc', () => { - let localParticipant: LocalParticipant; - let mockRemoteParticipant: RemoteParticipant; - let mockEngine: RTCEngine; - let mockRoomOptions: InternalRoomOptions; - let mockSendDataPacket: ReturnType; - - beforeEach(() => { - mockSendDataPacket = vi.fn(); - mockEngine = { - client: { - sendUpdateLocalMetadata: vi.fn(), - }, - on: vi.fn().mockReturnThis(), - sendDataPacket: mockSendDataPacket, - } as unknown as RTCEngine; - - mockRoomOptions = {} as InternalRoomOptions; - - localParticipant = new LocalParticipant( - 'local-sid', - 'local-identity', - mockEngine, - mockRoomOptions, - ); - - mockRemoteParticipant = new RemoteParticipant( - {} as any, - 'remote-sid', - 'remote-identity', - 'Remote Participant', - '', - undefined, - undefined, - ParticipantKind.STANDARD, - ); - }); - - it('should send RPC request and receive successful response', async () => { - const method = 'testMethod'; - const payload = 'testPayload'; - const responsePayload = 'responsePayload'; - - mockSendDataPacket.mockImplementationOnce((packet: DataPacket) => { - const requestId = packet.value.value.id; - setTimeout(() => { - localParticipant.handleIncomingRpcAck(requestId); - setTimeout(() => { - localParticipant.handleIncomingRpcResponse(requestId, responsePayload, null); - }, 10); - }, 10); - }); - - const result = await localParticipant.performRpc({ - destinationIdentity: mockRemoteParticipant.identity, - method, - payload, - }); - - expect(mockSendDataPacket).toHaveBeenCalledTimes(1); - expect(result).toBe(responsePayload); - }); - - it('should handle RPC request timeout', async () => { - const method = 'timeoutMethod'; - const payload = 'timeoutPayload'; - - const timeout = 50; - - const resultPromise = localParticipant.performRpc({ - destinationIdentity: mockRemoteParticipant.identity, - method, - payload, - responseTimeout: timeout, - }); - - mockSendDataPacket.mockImplementationOnce(() => { - return new Promise((resolve) => { - setTimeout(resolve, timeout + 10); - }); - }); - - const startTime = Date.now(); - - await expect(resultPromise).rejects.toThrow('Response timeout'); - - const elapsedTime = Date.now() - startTime; - expect(elapsedTime).toBeGreaterThanOrEqual(timeout); - expect(elapsedTime).toBeLessThan(timeout + 50); // Allow some margin for test execution - - expect(mockSendDataPacket).toHaveBeenCalledTimes(1); - }); - - it('should handle RPC error response', async () => { - const method = 'errorMethod'; - const payload = 'errorPayload'; - const errorCode = 101; - const errorMessage = 'Test error message'; - - mockSendDataPacket.mockImplementationOnce((packet: DataPacket) => { - const requestId = packet.value.value.id; - setTimeout(() => { - localParticipant.handleIncomingRpcAck(requestId); - localParticipant.handleIncomingRpcResponse( - requestId, - null, - new RpcError(errorCode, errorMessage), - ); - }, 10); - }); - - await expect( - localParticipant.performRpc({ - destinationIdentity: mockRemoteParticipant.identity, - method, - payload, - }), - ).rejects.toThrow(errorMessage); - }); - - it('should handle participant disconnection during RPC request', async () => { - const method = 'disconnectMethod'; - const payload = 'disconnectPayload'; - - mockSendDataPacket.mockImplementationOnce(() => Promise.resolve()); - - const resultPromise = localParticipant.performRpc({ - destinationIdentity: mockRemoteParticipant.identity, - method, - payload, - }); - - // Simulate a small delay before disconnection - await new Promise((resolve) => setTimeout(resolve, 200)); - localParticipant.handleParticipantDisconnected(mockRemoteParticipant.identity); - - await expect(resultPromise).rejects.toThrow('Recipient disconnected'); - }); - }); -}); diff --git a/src/room/rpc/client/RpcClientManager.test.ts b/src/room/rpc/client/RpcClientManager.test.ts new file mode 100644 index 0000000000..5aaf8b17e6 --- /dev/null +++ b/src/room/rpc/client/RpcClientManager.test.ts @@ -0,0 +1,297 @@ +import { assert, beforeEach, describe, expect, it, vi } from 'vitest'; +import log from '../../../logger'; +import { subscribeToEvents } from '../../../utils/subscribeToEvents'; +import { CLIENT_PROTOCOL_DATA_STREAM_RPC, CLIENT_PROTOCOL_DEFAULT } from '../../../version'; +import type RTCEngine from '../../RTCEngine'; +import OutgoingDataStreamManager from '../../data-stream/outgoing/OutgoingDataStreamManager'; +import { sleep } from '../../utils'; +import { + RPC_REQUEST_DATA_STREAM_TOPIC, + RPC_REQUEST_ID_ATTR, + RPC_REQUEST_METHOD_ATTR, + RPC_REQUEST_VERSION_ATTR, + RpcError, +} from '../utils'; +import RpcClientManager from './RpcClientManager'; +import type { RpcClientManagerCallbacks } from './events'; + +describe('RpcClientManager', () => { + describe('v2 -> v1', () => { + let rpcClientManager: RpcClientManager; + + beforeEach(() => { + const outgoingDataStreamManager = new OutgoingDataStreamManager( + {} as unknown as RTCEngine, + log, + ); + + rpcClientManager = new RpcClientManager( + log, + outgoingDataStreamManager, + (_identity) => CLIENT_PROTOCOL_DEFAULT, // (other participant is "v1") + () => undefined, + ); + }); + + it('should send v1 RPC request to a "legacy" client (happy path)', async () => { + const managerEvents = subscribeToEvents(rpcClientManager, [ + 'sendDataPacket', + ]); + + const [requestId, completionPromise] = await rpcClientManager.performRpc({ + destinationIdentity: 'remoteIdentity', + method: 'testMethod', + payload: 'testPayload', + }); + + // Verify exactly one packet was emitted + const { packet } = await managerEvents.waitFor('sendDataPacket'); + assert(packet.value.case === 'rpcRequest'); + expect(packet.value.value.id).toStrictEqual(requestId); + expect(packet.value.value.method).toStrictEqual('testMethod'); + expect(packet.value.value.payload).toStrictEqual('testPayload'); + expect(managerEvents.areThereBufferedEvents('sendDataPacket')).toBe(false); + + // Asynchronously send a response back + await sleep(10); + rpcClientManager.handleIncomingRpcAck(requestId); + await sleep(10); + rpcClientManager.handleIncomingRpcResponseSuccess(requestId, 'response payload'); + + // Make sure the response came out the other end + const result = await completionPromise; + expect(result).toStrictEqual('response payload'); + }); + + it('should fail to send long (> 15kb) v1 RPC request', async () => { + const longPayload = new Array(20_000).fill('A').join(''); + + const performRpcPromise = rpcClientManager.performRpc({ + destinationIdentity: 'destination-identity', + method: 'test-method', + payload: longPayload, + }); + + await expect(performRpcPromise).rejects.toThrow('Request payload too large'); + }); + + it('should handle v1 RPC request timeout', async () => { + vi.useFakeTimers(); + + try { + const method = 'timeoutMethod'; + const payload = 'timeoutPayload'; + const timeout = 50; + + const [, completionPromise] = await rpcClientManager.performRpc({ + destinationIdentity: 'remote-identity', + method, + payload, + responseTimeout: timeout, + }); + + // Register the rejection handler before advancing so the rejection is caught + const rejectPromise = expect(completionPromise).rejects.toThrow('Response timeout'); + + // Response timeout (50ms) fires before ack timeout (7000ms) + await vi.advanceTimersByTimeAsync(timeout); + + await rejectPromise; + } finally { + vi.useRealTimers(); + } + }); + + it('should handle v1 RPC error response', async () => { + const method = 'errorMethod'; + const payload = 'errorPayload'; + const errorCode = 101; + const errorMessage = 'Test error message'; + + const [requestId, completionPromise] = await rpcClientManager.performRpc({ + destinationIdentity: 'remote-identity', + method, + payload, + }); + + rpcClientManager.handleIncomingRpcAck(requestId); + rpcClientManager.handleIncomingRpcResponseFailure( + requestId, + new RpcError(errorCode, errorMessage), + ); + + await expect(completionPromise).rejects.toThrow(errorMessage); + }); + + it('should handle participant disconnection during v1 RPC request', async () => { + const method = 'disconnectMethod'; + const payload = 'disconnectPayload'; + + const [, completionPromise] = await rpcClientManager.performRpc({ + destinationIdentity: 'remote-identity', + method, + payload, + }); + + // Simulate a small delay before disconnection + await sleep(200); + rpcClientManager.handleParticipantDisconnected('remote-identity'); + + await expect(completionPromise).rejects.toThrow('Recipient disconnected'); + }); + }); + + describe('v2 -> v2', () => { + let rpcClientManager: RpcClientManager; + let mockStreamTextWriter: { + write: ReturnType; + close: ReturnType; + }; + let mockOutgoingDataStreamManager: OutgoingDataStreamManager; + + beforeEach(() => { + mockStreamTextWriter = { + write: vi.fn().mockResolvedValue(undefined), + close: vi.fn().mockResolvedValue(undefined), + }; + mockOutgoingDataStreamManager = { + streamText: vi.fn().mockResolvedValue(mockStreamTextWriter), + } as unknown as OutgoingDataStreamManager; + + rpcClientManager = new RpcClientManager( + log, + mockOutgoingDataStreamManager, + (_identity) => CLIENT_PROTOCOL_DATA_STREAM_RPC, // (other participant is "v2") + () => undefined, + ); + }); + + it('should send short v2 RPC request via data stream (happy path)', async () => { + const managerEvents = subscribeToEvents(rpcClientManager, [ + 'sendDataPacket', + ]); + + const [requestId, completionPromise] = await rpcClientManager.performRpc({ + destinationIdentity: 'destination-identity', + method: 'test-method', + payload: 'request-payload', + }); + + // Verify the data stream was used with correct attributes + expect(mockOutgoingDataStreamManager.streamText).toHaveBeenCalledWith( + expect.objectContaining({ + topic: RPC_REQUEST_DATA_STREAM_TOPIC, + destinationIdentities: ['destination-identity'], + attributes: expect.objectContaining({ + [RPC_REQUEST_ID_ATTR]: requestId, + [RPC_REQUEST_METHOD_ATTR]: 'test-method', + [RPC_REQUEST_VERSION_ATTR]: '2', + }), + }), + ); + expect(mockStreamTextWriter.write).toHaveBeenCalledWith('request-payload'); + expect(mockStreamTextWriter.close).toHaveBeenCalled(); + + // No packet should have been emitted + expect(managerEvents.areThereBufferedEvents('sendDataPacket')).toBe(false); + + // Asynchronously send a response back + await sleep(10); + rpcClientManager.handleIncomingRpcAck(requestId); + await sleep(10); + rpcClientManager.handleIncomingRpcResponseSuccess(requestId, 'response-payload'); + + await expect(completionPromise).resolves.toStrictEqual('response-payload'); + }); + + it('should send long (> 15kb) v2 RPC request via data stream', async () => { + const managerEvents = subscribeToEvents(rpcClientManager, [ + 'sendDataPacket', + ]); + + const longPayload = new Array(20_000).fill('A').join(''); + + const [requestId, completionPromise] = await rpcClientManager.performRpc({ + destinationIdentity: 'destination-identity', + method: 'test-method', + payload: longPayload, + }); + + // Verify the data stream was used with correct attributes + expect(mockOutgoingDataStreamManager.streamText).toHaveBeenCalledWith( + expect.objectContaining({ + topic: RPC_REQUEST_DATA_STREAM_TOPIC, + destinationIdentities: ['destination-identity'], + attributes: expect.objectContaining({ + [RPC_REQUEST_ID_ATTR]: requestId, + [RPC_REQUEST_METHOD_ATTR]: 'test-method', + [RPC_REQUEST_VERSION_ATTR]: '2', + }), + }), + ); + expect(mockStreamTextWriter.write).toHaveBeenCalledWith(longPayload); + expect(mockStreamTextWriter.close).toHaveBeenCalled(); + + // No packet should have been emitted + expect(managerEvents.areThereBufferedEvents('sendDataPacket')).toBe(false); + + rpcClientManager.handleIncomingRpcAck(requestId); + rpcClientManager.handleIncomingRpcResponseSuccess(requestId, 'response-payload'); + + await expect(completionPromise).resolves.toStrictEqual('response-payload'); + }); + + it('should handle a v2 RPC request timeout', async () => { + vi.useFakeTimers(); + + try { + const timeout = 50; + + const [, completionPromise] = await rpcClientManager.performRpc({ + destinationIdentity: 'remote-identity', + method: 'timeoutMethod', + payload: 'timeoutPayload', + responseTimeout: timeout, + }); + + const rejectPromise = expect(completionPromise).rejects.toThrow('Response timeout'); + await vi.advanceTimersByTimeAsync(timeout); + await rejectPromise; + } finally { + vi.useRealTimers(); + } + }); + + it('should handle a v2 RPC error response', async () => { + const errorCode = 101; + const errorMessage = 'Test error message'; + + const [requestId, completionPromise] = await rpcClientManager.performRpc({ + destinationIdentity: 'remote-identity', + method: 'errorMethod', + payload: 'errorPayload', + }); + + rpcClientManager.handleIncomingRpcAck(requestId); + rpcClientManager.handleIncomingRpcResponseFailure( + requestId, + new RpcError(errorCode, errorMessage), + ); + + await expect(completionPromise).rejects.toThrow(errorMessage); + }); + + it('should handle participant disconnection during v2 RPC request', async () => { + const [, completionPromise] = await rpcClientManager.performRpc({ + destinationIdentity: 'remote-identity', + method: 'disconnectMethod', + payload: 'disconnectPayload', + }); + + await sleep(200); + rpcClientManager.handleParticipantDisconnected('remote-identity'); + + await expect(completionPromise).rejects.toThrow('Recipient disconnected'); + }); + }); +}); diff --git a/src/room/rpc/client/RpcClientManager.ts b/src/room/rpc/client/RpcClientManager.ts new file mode 100644 index 0000000000..55df050651 --- /dev/null +++ b/src/room/rpc/client/RpcClientManager.ts @@ -0,0 +1,259 @@ +import { DataPacket, DataPacket_Kind, RpcRequest } from '@livekit/protocol'; +import EventEmitter from 'events'; +import type TypedEmitter from 'typed-emitter'; +import { type StructuredLogger } from '../../../logger'; +import { CLIENT_PROTOCOL_DATA_STREAM_RPC } from '../../../version'; +import { type TextStreamReader } from '../../data-stream/incoming/StreamReader'; +import type OutgoingDataStreamManager from '../../data-stream/outgoing/OutgoingDataStreamManager'; +import type Participant from '../../participant/Participant'; +import { Future, compareVersions } from '../../utils'; +import { + MAX_V1_PAYLOAD_BYTES, + type PerformRpcParams, + RPC_REQUEST_DATA_STREAM_TOPIC, + RPC_REQUEST_ID_ATTR, + RPC_REQUEST_METHOD_ATTR, + RPC_REQUEST_RESPONSE_TIMEOUT_MS_ATTR, + RPC_REQUEST_VERSION_ATTR, + RPC_VERSION_V1, + RPC_VERSION_V2, + RpcError, + byteLength, +} from '../utils'; +import type { RpcClientManagerCallbacks } from './events'; + +/** + * Manages the client (caller) side of RPC: sending requests, tracking pending + * ack/response state, and handling incoming ack/response packets. + * @internal + */ +export default class RpcClientManager extends (EventEmitter as new () => TypedEmitter) { + private log: StructuredLogger; + + private outgoingDataStreamManager: OutgoingDataStreamManager; + + private getRemoteParticipantClientProtocol: (identity: Participant['identity']) => number; + + private getServerVersion: () => string | undefined; + + private pendingAcks = new Map void; participantIdentity: string }>(); + + private pendingResponses = new Map< + string /* request id */, + { + completionFuture: Future; + participantIdentity: string; + } + >(); + + constructor( + log: StructuredLogger, + outgoingDataStreamManager: OutgoingDataStreamManager, + getRemoteParticipantClientProtocol: (identity: Participant['identity']) => number, + getServerVersion: () => string | undefined, + ) { + super(); + this.log = log; + this.outgoingDataStreamManager = outgoingDataStreamManager; + this.getRemoteParticipantClientProtocol = getRemoteParticipantClientProtocol; + this.getServerVersion = getServerVersion; + } + + async performRpc({ + destinationIdentity, + method, + payload, + responseTimeout: responseTimeoutMs = 15000, + }: PerformRpcParams): Promise<[id: string, completionPromise: Promise]> { + const maxRoundTripLatencyMs = 7000; + const minEffectiveTimeoutMs = maxRoundTripLatencyMs + 1000; + + const remoteClientProtocol = this.getRemoteParticipantClientProtocol(destinationIdentity); + const payloadBytes = byteLength(payload); + + // Only enforce the legacy size limit when on rpc v1 + if (payloadBytes > MAX_V1_PAYLOAD_BYTES && remoteClientProtocol < 1) { + throw RpcError.builtIn('REQUEST_PAYLOAD_TOO_LARGE'); + } + + const serverVersion = this.getServerVersion(); + if (serverVersion && compareVersions(serverVersion, '1.8.0') < 0) { + throw RpcError.builtIn('UNSUPPORTED_SERVER'); + } + + const effectiveTimeoutMs = Math.max(responseTimeoutMs, minEffectiveTimeoutMs); + const id = crypto.randomUUID(); + + await this.publishRpcRequest( + destinationIdentity, + id, + method, + payload, + effectiveTimeoutMs, + remoteClientProtocol, + ); + + const completionFuture = new Future(); + + const ackTimeoutId = setTimeout(() => { + this.pendingAcks.delete(id); + completionFuture.reject?.(RpcError.builtIn('CONNECTION_TIMEOUT')); + this.pendingResponses.delete(id); + clearTimeout(responseTimeoutId); + }, maxRoundTripLatencyMs); + + this.pendingAcks.set(id, { + resolve: () => { + clearTimeout(ackTimeoutId); + }, + participantIdentity: destinationIdentity, + }); + + const responseTimeoutId = setTimeout(() => { + this.pendingResponses.delete(id); + completionFuture.reject?.(RpcError.builtIn('RESPONSE_TIMEOUT')); + }, responseTimeoutMs); + + this.pendingResponses.set(id, { + completionFuture, + participantIdentity: destinationIdentity, + }); + + const completionPromise = completionFuture.promise.finally(() => { + clearTimeout(responseTimeoutId); + + if (this.pendingAcks.has(id)) { + this.log.warn('RPC response received before ack', id); + this.pendingAcks.delete(id); + clearTimeout(ackTimeoutId); + } + }); + + return [id, completionPromise]; + } + + private async publishRpcRequest( + destinationIdentity: string, + requestId: string, + method: string, + payload: string, + responseTimeout: number, + remoteClientProtocol: number, + ) { + if (remoteClientProtocol >= CLIENT_PROTOCOL_DATA_STREAM_RPC) { + // Send payload as a data stream - a "version 2" rpc request. + const writer = await this.outgoingDataStreamManager.streamText({ + topic: RPC_REQUEST_DATA_STREAM_TOPIC, + destinationIdentities: [destinationIdentity], + attributes: { + [RPC_REQUEST_ID_ATTR]: requestId, + [RPC_REQUEST_METHOD_ATTR]: method, + [RPC_REQUEST_RESPONSE_TIMEOUT_MS_ATTR]: `${responseTimeout}`, + [RPC_REQUEST_VERSION_ATTR]: `${RPC_VERSION_V2}`, + }, + }); + + await writer.write(payload); + await writer.close(); + return; + } + + // Fallback to sending a literal RpcRequest - a "version 1" rpc request. + this.emit('sendDataPacket', { + packet: new DataPacket({ + destinationIdentities: [destinationIdentity], + kind: DataPacket_Kind.RELIABLE, + value: { + case: 'rpcRequest', + value: new RpcRequest({ + id: requestId, + method, + payload, + responseTimeoutMs: responseTimeout, + version: RPC_VERSION_V1, + }), + }, + }), + }); + } + + /** + * Handle an incoming data stream containing an RPC response payload. + * @internal + */ + async handleIncomingDataStream(reader: TextStreamReader, attributes: Record) { + const associatedRequestId = attributes[RPC_REQUEST_ID_ATTR]; + if (!associatedRequestId) { + this.log.warn(`RPC data stream malformed: ${RPC_REQUEST_ID_ATTR} not set.`); + this.handleIncomingRpcResponseFailure( + associatedRequestId, + RpcError.builtIn('APPLICATION_ERROR'), + ); + return; + } + + let payload: string; + try { + payload = await reader.readAll(); + } catch (e) { + this.log.warn(`Error reading RPC response payload: ${e}`); + this.handleIncomingRpcResponseFailure( + associatedRequestId, + RpcError.builtIn('APPLICATION_ERROR'), + ); + return; + } + + this.handleIncomingRpcResponseSuccess(associatedRequestId, payload); + } + + /** @internal */ + handleIncomingRpcResponseSuccess(requestId: string, payload: string) { + const handler = this.pendingResponses.get(requestId); + if (handler) { + handler.completionFuture.resolve?.(payload); + this.pendingResponses.delete(requestId); + } else { + this.log.error('Response received for unexpected RPC request', requestId); + } + } + + /** @internal */ + handleIncomingRpcResponseFailure(requestId: string, error: RpcError) { + const handler = this.pendingResponses.get(requestId); + if (handler) { + handler.completionFuture.reject?.(error); + this.pendingResponses.delete(requestId); + } else { + this.log.error('Response received for unexpected RPC request', requestId); + } + } + + /** @internal */ + handleIncomingRpcAck(requestId: string) { + const handler = this.pendingAcks.get(requestId); + if (handler) { + handler.resolve(); + this.pendingAcks.delete(requestId); + } else { + this.log.error(`Ack received for unexpected RPC request: ${requestId}`); + } + } + + /** @internal */ + handleParticipantDisconnected(participantIdentity: string) { + for (const [id, { participantIdentity: pendingIdentity }] of this.pendingAcks) { + if (pendingIdentity === participantIdentity) { + this.pendingAcks.delete(id); + } + } + + for (const [id, { participantIdentity: pendingIdentity, completionFuture }] of this + .pendingResponses) { + if (pendingIdentity === participantIdentity) { + completionFuture.reject?.(RpcError.builtIn('RECIPIENT_DISCONNECTED')); + this.pendingResponses.delete(id); + } + } + } +} diff --git a/src/room/rpc/client/events.ts b/src/room/rpc/client/events.ts new file mode 100644 index 0000000000..427fedef75 --- /dev/null +++ b/src/room/rpc/client/events.ts @@ -0,0 +1,9 @@ +import type { DataPacket } from '@livekit/protocol'; + +export type EventSendDataPacket = { + packet: DataPacket; +}; + +export type RpcClientManagerCallbacks = { + sendDataPacket: (event: EventSendDataPacket) => void; +}; diff --git a/src/room/rpc/index.ts b/src/room/rpc/index.ts new file mode 100644 index 0000000000..368eface8d --- /dev/null +++ b/src/room/rpc/index.ts @@ -0,0 +1,14 @@ +export { default as RpcClientManager } from './client/RpcClientManager'; +export type { RpcClientManagerCallbacks } from './client/events'; +export { default as RpcServerManager } from './server/RpcServerManager'; +export type { RpcServerManagerCallbacks } from './server/events'; +export { + type PerformRpcParams, + RPC_REQUEST_DATA_STREAM_TOPIC, + RPC_RESPONSE_DATA_STREAM_TOPIC, + RPC_REQUEST_ID_ATTR, + RpcError, + type RpcInvocationData, + byteLength, + truncateBytes, +} from './utils'; diff --git a/src/room/rpc/server/RpcServerManager.test.ts b/src/room/rpc/server/RpcServerManager.test.ts new file mode 100644 index 0000000000..e5742f20bd --- /dev/null +++ b/src/room/rpc/server/RpcServerManager.test.ts @@ -0,0 +1,414 @@ +import { RpcRequest } from '@livekit/protocol'; +import { assert, beforeEach, describe, expect, it, vi } from 'vitest'; +import log from '../../../logger'; +import { subscribeToEvents } from '../../../utils/subscribeToEvents'; +import { CLIENT_PROTOCOL_DATA_STREAM_RPC, CLIENT_PROTOCOL_DEFAULT } from '../../../version'; +import type RTCEngine from '../../RTCEngine'; +import OutgoingDataStreamManager from '../../data-stream/outgoing/OutgoingDataStreamManager'; +import { + RPC_REQUEST_ID_ATTR, + RPC_REQUEST_METHOD_ATTR, + RPC_REQUEST_RESPONSE_TIMEOUT_MS_ATTR, + RPC_REQUEST_VERSION_ATTR, + RPC_RESPONSE_DATA_STREAM_TOPIC, + RpcError, +} from '../utils'; +import RpcServerManager from './RpcServerManager'; +import type { RpcServerManagerCallbacks } from './events'; + +describe('RpcServerManager', () => { + describe('v1 -> v1', () => { + let rpcServerManager: RpcServerManager; + + beforeEach(() => { + const outgoingDataStreamManager = new OutgoingDataStreamManager( + {} as unknown as RTCEngine, + log, + ); + + rpcServerManager = new RpcServerManager( + log, + outgoingDataStreamManager, + (_identity) => CLIENT_PROTOCOL_DEFAULT, + ); + }); + + it('should receive a rpc message from a participant', async () => { + const managerEvents = subscribeToEvents(rpcServerManager, [ + 'sendDataPacket', + ]); + + const handler = async () => 'response payload'; + rpcServerManager.registerRpcMethod('test-method', handler); + + const requestId = crypto.randomUUID(); + const responseTimeoutMs = 10_000; + await rpcServerManager.handleIncomingRpcRequest( + 'caller-identity', + new RpcRequest({ + id: requestId, + method: 'test-method', + payload: 'request payload', + responseTimeoutMs, + version: 1, + }), + ); + + // The first event is an acknowledgement of the request + const ackEvent = await managerEvents.waitFor('sendDataPacket'); + assert(ackEvent.packet.value.case === 'rpcAck'); + expect(ackEvent.packet.value.value.requestId).toStrictEqual(requestId); + + // And the second being the actual response + const responseEvent = await managerEvents.waitFor('sendDataPacket'); + assert(responseEvent.packet.value.case === 'rpcResponse'); + const rpcResponse = responseEvent.packet.value.value; + expect(rpcResponse.requestId).toStrictEqual(requestId); + assert(rpcResponse.value.case === 'payload'); + expect(rpcResponse.value.value).toStrictEqual('response payload'); + + expect(managerEvents.areThereBufferedEvents('sendDataPacket')).toBe(false); + }); + + it('should register an RPC method handler', async () => { + const managerEvents = subscribeToEvents(rpcServerManager, [ + 'sendDataPacket', + ]); + + const methodName = 'testMethod'; + const handler = vi.fn().mockResolvedValue('test response'); + + rpcServerManager.registerRpcMethod(methodName, handler); + + await rpcServerManager.handleIncomingRpcRequest( + 'remote-identity', + new RpcRequest({ + id: 'test-request-id', + method: methodName, + payload: 'test payload', + responseTimeoutMs: 5000, + version: 1, + }), + ); + + expect(handler).toHaveBeenCalledWith({ + requestId: 'test-request-id', + callerIdentity: 'remote-identity', + payload: 'test payload', + responseTimeout: 5000, + }); + + // Ensure the first event was for the ack + const ackEvent = await managerEvents.waitFor('sendDataPacket'); + expect(ackEvent.packet.value.case).toStrictEqual('rpcAck'); + + // And the second event was for the response + const responseEvent = await managerEvents.waitFor('sendDataPacket'); + expect(responseEvent.packet.value.case).toStrictEqual('rpcResponse'); + + expect(managerEvents.areThereBufferedEvents('sendDataPacket')).toBe(false); + }); + + it('should catch and transform unhandled errors in the RPC method handler', async () => { + const managerEvents = subscribeToEvents(rpcServerManager, [ + 'sendDataPacket', + ]); + + const methodName = 'errorMethod'; + const errorMessage = 'Test error'; + const handler = async () => { + throw new Error(errorMessage); + }; + + rpcServerManager.registerRpcMethod(methodName, handler); + + await rpcServerManager.handleIncomingRpcRequest( + 'remote-identity', + new RpcRequest({ + id: 'test-error-request-id', + method: methodName, + payload: 'test payload', + responseTimeoutMs: 5000, + version: 1, + }), + ); + + // Ensure the first event was for the ack + const ackEvent = await managerEvents.waitFor('sendDataPacket'); + assert(ackEvent.packet.value.case === 'rpcAck'); + + // And the second event was for the error response + const errorEvent = await managerEvents.waitFor('sendDataPacket'); + assert(errorEvent.packet.value.case === 'rpcResponse'); + assert(errorEvent.packet.value.value.value.case === 'error'); + const errorResponse = errorEvent.packet.value.value.value.value; + expect(errorResponse.code).toStrictEqual(RpcError.ErrorCode.APPLICATION_ERROR); + + expect(managerEvents.areThereBufferedEvents('sendDataPacket')).toBe(false); + }); + + it('should pass through RpcError thrown by the RPC method handler', async () => { + const managerEvents = subscribeToEvents(rpcServerManager, [ + 'sendDataPacket', + ]); + + const methodName = 'rpcErrorMethod'; + const errorCode = 101; + const errorMessage = 'some-error-message'; + const handler = async () => { + throw new RpcError(errorCode, errorMessage); + }; + + rpcServerManager.registerRpcMethod(methodName, handler); + + await rpcServerManager.handleIncomingRpcRequest( + 'remote-identity', + new RpcRequest({ + id: 'test-rpc-error-request-id', + method: methodName, + payload: 'test payload', + responseTimeoutMs: 5000, + version: 1, + }), + ); + + // Ensure the first event was for the ack + const ackEvent = await managerEvents.waitFor('sendDataPacket'); + assert(ackEvent.packet.value.case === 'rpcAck'); + + // And the second event was for the error response + const errorEvent = await managerEvents.waitFor('sendDataPacket'); + assert(errorEvent.packet.value.case === 'rpcResponse'); + assert(errorEvent.packet.value.value.value.case === 'error'); + const errorResponse = errorEvent.packet.value.value.value.value; + expect(errorResponse.code).toStrictEqual(errorCode); + expect(errorResponse.message).toStrictEqual(errorMessage); + + expect(managerEvents.areThereBufferedEvents('sendDataPacket')).toBe(false); + }); + }); + + describe('v2 -> v2', () => { + let rpcServerManager: RpcServerManager; + let outgoingDataStreamManager: OutgoingDataStreamManager; + let mockStreamTextWriter: { + write: ReturnType; + close: ReturnType; + }; + + beforeEach(() => { + outgoingDataStreamManager = new OutgoingDataStreamManager({} as unknown as RTCEngine, log); + + mockStreamTextWriter = { + write: vi.fn().mockResolvedValue(undefined), + close: vi.fn().mockResolvedValue(undefined), + }; + vi.spyOn(outgoingDataStreamManager, 'streamText').mockResolvedValue( + mockStreamTextWriter as any, + ); + + rpcServerManager = new RpcServerManager( + log, + outgoingDataStreamManager, + (_identity) => CLIENT_PROTOCOL_DATA_STREAM_RPC, + ); + }); + + function makeDataStreamAttrs(requestId: string, method: string, responseTimeout: number) { + return { + [RPC_REQUEST_ID_ATTR]: requestId, + [RPC_REQUEST_METHOD_ATTR]: method, + [RPC_REQUEST_RESPONSE_TIMEOUT_MS_ATTR]: `${responseTimeout}`, + [RPC_REQUEST_VERSION_ATTR]: '2', + }; + } + + function mockTextStreamReader(payload: string) { + return { readAll: vi.fn().mockResolvedValue(payload) } as any; + } + + it('should receive a rpc message via data stream from a participant', async () => { + const managerEvents = subscribeToEvents(rpcServerManager, [ + 'sendDataPacket', + ]); + + const handler = async () => 'response payload'; + rpcServerManager.registerRpcMethod('test-method', handler); + + const requestId = crypto.randomUUID(); + const responseTimeoutMs = 10_000; + await rpcServerManager.handleIncomingDataStream( + mockTextStreamReader('request payload'), + 'caller-identity', + makeDataStreamAttrs(requestId, 'test-method', responseTimeoutMs), + ); + + // The first event is an acknowledgement of the request + const ackEvent = await managerEvents.waitFor('sendDataPacket'); + assert(ackEvent.packet.value.case === 'rpcAck'); + expect(ackEvent.packet.value.value.requestId).toStrictEqual(requestId); + + // The response should have been sent via data stream, not packet + expect(managerEvents.areThereBufferedEvents('sendDataPacket')).toBe(false); + expect(outgoingDataStreamManager.streamText).toHaveBeenCalledWith( + expect.objectContaining({ + topic: RPC_RESPONSE_DATA_STREAM_TOPIC, + destinationIdentities: ['caller-identity'], + attributes: { [RPC_REQUEST_ID_ATTR]: requestId }, + }), + ); + expect(mockStreamTextWriter.write).toHaveBeenCalledWith('response payload'); + expect(mockStreamTextWriter.close).toHaveBeenCalled(); + }); + + it('should register an RPC method handler', async () => { + const managerEvents = subscribeToEvents(rpcServerManager, [ + 'sendDataPacket', + ]); + + const methodName = 'testMethod'; + const handler = vi.fn().mockResolvedValue('test response'); + + rpcServerManager.registerRpcMethod(methodName, handler); + + await rpcServerManager.handleIncomingDataStream( + mockTextStreamReader('test payload'), + 'remote-identity', + makeDataStreamAttrs('test-request-id', methodName, 5000), + ); + + expect(handler).toHaveBeenCalledWith({ + requestId: 'test-request-id', + callerIdentity: 'remote-identity', + payload: 'test payload', + responseTimeout: 5000, + }); + + // Ensure the ack was sent + const ackEvent = await managerEvents.waitFor('sendDataPacket'); + expect(ackEvent.packet.value.case).toStrictEqual('rpcAck'); + + // Response goes via data stream, not packet + expect(managerEvents.areThereBufferedEvents('sendDataPacket')).toBe(false); + expect(outgoingDataStreamManager.streamText).toHaveBeenCalled(); + }); + + it('should catch and transform unhandled errors in the RPC method handler', async () => { + const managerEvents = subscribeToEvents(rpcServerManager, [ + 'sendDataPacket', + ]); + + const methodName = 'errorMethod'; + const errorMessage = 'Test error'; + const handler = async () => { + throw new Error(errorMessage); + }; + + rpcServerManager.registerRpcMethod(methodName, handler); + + await rpcServerManager.handleIncomingDataStream( + mockTextStreamReader('test payload'), + 'remote-identity', + makeDataStreamAttrs('test-error-request-id', methodName, 5000), + ); + + // Ensure the first event was for the ack + const ackEvent = await managerEvents.waitFor('sendDataPacket'); + assert(ackEvent.packet.value.case === 'rpcAck'); + + // Error responses always go via packet, even for v2 callers + const errorEvent = await managerEvents.waitFor('sendDataPacket'); + assert(errorEvent.packet.value.case === 'rpcResponse'); + assert(errorEvent.packet.value.value.value.case === 'error'); + const errorResponse = errorEvent.packet.value.value.value.value; + expect(errorResponse.code).toStrictEqual(RpcError.ErrorCode.APPLICATION_ERROR); + + expect(managerEvents.areThereBufferedEvents('sendDataPacket')).toBe(false); + }); + + it('should pass through RpcError thrown by the RPC method handler', async () => { + const managerEvents = subscribeToEvents(rpcServerManager, [ + 'sendDataPacket', + ]); + + const methodName = 'rpcErrorMethod'; + const errorCode = 101; + const errorMessage = 'some-error-message'; + const handler = async () => { + throw new RpcError(errorCode, errorMessage); + }; + + rpcServerManager.registerRpcMethod(methodName, handler); + + await rpcServerManager.handleIncomingDataStream( + mockTextStreamReader('test payload'), + 'remote-identity', + makeDataStreamAttrs('test-rpc-error-request-id', methodName, 5000), + ); + + // Ensure the first event was for the ack + const ackEvent = await managerEvents.waitFor('sendDataPacket'); + assert(ackEvent.packet.value.case === 'rpcAck'); + + // Error responses always go via packet, even for v2 callers + const errorEvent = await managerEvents.waitFor('sendDataPacket'); + assert(errorEvent.packet.value.case === 'rpcResponse'); + assert(errorEvent.packet.value.value.value.case === 'error'); + const errorResponse = errorEvent.packet.value.value.value.value; + expect(errorResponse.code).toStrictEqual(errorCode); + expect(errorResponse.message).toStrictEqual(errorMessage); + + expect(managerEvents.areThereBufferedEvents('sendDataPacket')).toBe(false); + }); + }); + + describe('v1 -> v2', () => { + it('should use v1 protocol (RpcResponse packet) when responding to a v1 caller', async () => { + const outgoingDataStreamManager = new OutgoingDataStreamManager( + {} as unknown as RTCEngine, + log, + ); + const streamTextSpy = vi.spyOn(outgoingDataStreamManager, 'streamText'); + + const rpcServerManager = new RpcServerManager( + log, + outgoingDataStreamManager, + (_identity) => CLIENT_PROTOCOL_DEFAULT, + ); + + const managerEvents = subscribeToEvents(rpcServerManager, [ + 'sendDataPacket', + ]); + + const handler = async () => 'response payload'; + rpcServerManager.registerRpcMethod('test-method', handler); + + const requestId = crypto.randomUUID(); + await rpcServerManager.handleIncomingRpcRequest( + 'v1-caller', + new RpcRequest({ + id: requestId, + method: 'test-method', + payload: 'request payload', + responseTimeoutMs: 10_000, + version: 1, + }), + ); + + // Ack via packet + const ackEvent = await managerEvents.waitFor('sendDataPacket'); + assert(ackEvent.packet.value.case === 'rpcAck'); + + // Response should be a v1 RpcResponse packet, not a data stream + expect(streamTextSpy).not.toHaveBeenCalled(); + const responseEvent = await managerEvents.waitFor('sendDataPacket'); + assert(responseEvent.packet.value.case === 'rpcResponse'); + const rpcResponse = responseEvent.packet.value.value; + expect(rpcResponse.requestId).toStrictEqual(requestId); + assert(rpcResponse.value.case === 'payload'); + expect(rpcResponse.value.value).toStrictEqual('response payload'); + + expect(managerEvents.areThereBufferedEvents('sendDataPacket')).toBe(false); + }); + }); +}); diff --git a/src/room/rpc/server/RpcServerManager.ts b/src/room/rpc/server/RpcServerManager.ts new file mode 100644 index 0000000000..a0e38d5b9a --- /dev/null +++ b/src/room/rpc/server/RpcServerManager.ts @@ -0,0 +1,289 @@ +import { DataPacket, DataPacket_Kind, RpcAck, RpcRequest, RpcResponse } from '@livekit/protocol'; +import EventEmitter from 'events'; +import type TypedEmitter from 'typed-emitter'; +import { type StructuredLogger } from '../../../logger'; +import { CLIENT_PROTOCOL_DATA_STREAM_RPC } from '../../../version'; +import { type TextStreamReader } from '../../data-stream/incoming/StreamReader'; +import type OutgoingDataStreamManager from '../../data-stream/outgoing/OutgoingDataStreamManager'; +import type Participant from '../../participant/Participant'; +import { + MAX_V1_PAYLOAD_BYTES, + RPC_REQUEST_ID_ATTR, + RPC_REQUEST_METHOD_ATTR, + RPC_REQUEST_RESPONSE_TIMEOUT_MS_ATTR, + RPC_REQUEST_VERSION_ATTR, + RPC_RESPONSE_DATA_STREAM_TOPIC, + RPC_VERSION_V2, + RpcError, + type RpcInvocationData, + byteLength, +} from '../utils'; +import type { RpcServerManagerCallbacks } from './events'; + +/** + * Manages the server (handler) side of RPC: processing incoming requests, + * managing registered method handlers, and sending responses. + * @internal + */ +export default class RpcServerManager extends (EventEmitter as new () => TypedEmitter) { + private log: StructuredLogger; + + private outgoingDataStreamManager: OutgoingDataStreamManager; + + private getRemoteParticipantClientProtocol: (identity: Participant['identity']) => number; + + private rpcHandlers: Map Promise> = new Map(); + + constructor( + log: StructuredLogger, + outgoingDataStreamManager: OutgoingDataStreamManager, + getRemoteParticipantClientProtocol: (identity: Participant['identity']) => number, + ) { + super(); + this.log = log; + this.outgoingDataStreamManager = outgoingDataStreamManager; + this.getRemoteParticipantClientProtocol = getRemoteParticipantClientProtocol; + } + + registerRpcMethod(method: string, handler: (data: RpcInvocationData) => Promise) { + if (this.rpcHandlers.has(method)) { + throw Error( + `RPC handler already registered for method ${method}, unregisterRpcMethod before trying to register again`, + ); + } + this.rpcHandlers.set(method, handler); + } + + unregisterRpcMethod(method: string) { + this.rpcHandlers.delete(method); + } + + /** + * Handle an incoming RPCRequest message containing a payload. + * This handles "version 1" of rpc requests. + * @internal + */ + async handleIncomingRpcRequest(callerIdentity: string, rpcRequest: RpcRequest) { + this.publishRpcAck(callerIdentity, rpcRequest.id); + + if (rpcRequest.version !== 1) { + this.publishRpcResponsePacket( + callerIdentity, + rpcRequest.id, + null, + RpcError.builtIn('UNSUPPORTED_VERSION'), + ); + return; + } + + const handler = this.rpcHandlers.get(rpcRequest.method); + + if (!handler) { + this.publishRpcResponsePacket( + callerIdentity, + rpcRequest.id, + null, + RpcError.builtIn('UNSUPPORTED_METHOD'), + ); + return; + } + + let response: string | null = null; + try { + response = await handler({ + requestId: rpcRequest.id, + callerIdentity, + payload: rpcRequest.payload, + responseTimeout: rpcRequest.responseTimeoutMs, + }); + } catch (error) { + let responseError; + if (error instanceof RpcError) { + responseError = error; + } else { + this.log.warn( + `Uncaught error returned by RPC handler for ${rpcRequest.method}. Returning APPLICATION_ERROR instead.`, + error, + ); + responseError = RpcError.builtIn('APPLICATION_ERROR'); + } + + this.publishRpcResponsePacket(callerIdentity, rpcRequest.id, null, responseError); + return; + } + + await this.publishRpcResponse(callerIdentity, rpcRequest.id, response ?? ''); + } + + /** + * Handle an incoming data stream containing a RPC request payload. + * This handles "version 2" of rpc requests. + * @internal + */ + async handleIncomingDataStream( + reader: TextStreamReader, + callerIdentity: Participant['identity'], + dataStreamAttrs: Record, + ) { + const requestId = dataStreamAttrs[RPC_REQUEST_ID_ATTR]; + const method = dataStreamAttrs[RPC_REQUEST_METHOD_ATTR]; + const responseTimeout = parseInt(dataStreamAttrs[RPC_REQUEST_RESPONSE_TIMEOUT_MS_ATTR], 10); + const version = parseInt(dataStreamAttrs[RPC_REQUEST_VERSION_ATTR], 10); + + if (!requestId || !method || Number.isNaN(responseTimeout) || Number.isNaN(version)) { + this.log.warn( + `RPC data stream malformed: ${RPC_REQUEST_ID_ATTR} / ${RPC_REQUEST_METHOD_ATTR} / ${RPC_REQUEST_RESPONSE_TIMEOUT_MS_ATTR} / ${RPC_REQUEST_VERSION_ATTR} not set.`, + ); + this.publishRpcResponsePacket( + callerIdentity, + requestId, + null, + RpcError.builtIn('APPLICATION_ERROR'), + ); + return; + } + + this.publishRpcAck(callerIdentity, requestId); + + if (version !== RPC_VERSION_V2) { + this.publishRpcResponsePacket( + callerIdentity, + requestId, + null, + RpcError.builtIn('UNSUPPORTED_VERSION'), + ); + return; + } + + let payload: string; + try { + payload = await reader.readAll(); + } catch (e) { + this.log.warn(`Error reading RPC request payload: ${e}`); + this.publishRpcResponsePacket( + callerIdentity, + requestId, + null, + RpcError.builtIn('APPLICATION_ERROR'), + ); + return; + } + + const handler = this.rpcHandlers.get(method); + + if (!handler) { + this.publishRpcResponsePacket( + callerIdentity, + requestId, + null, + RpcError.builtIn('UNSUPPORTED_METHOD'), + ); + return; + } + + let response: string | null = null; + try { + response = await handler({ + requestId, + callerIdentity, + payload, + responseTimeout, + }); + } catch (error) { + let responseError; + if (error instanceof RpcError) { + responseError = error; + } else { + this.log.warn( + `Uncaught error returned by RPC handler for ${method}. Returning APPLICATION_ERROR instead.`, + error, + ); + responseError = RpcError.builtIn('APPLICATION_ERROR'); + } + + this.publishRpcResponsePacket(callerIdentity, requestId, null, responseError); + return; + } + + await this.publishRpcResponse(callerIdentity, requestId, response ?? ''); + } + + private publishRpcAck(destinationIdentity: string, requestId: string) { + this.emit('sendDataPacket', { + packet: new DataPacket({ + destinationIdentities: [destinationIdentity], + kind: DataPacket_Kind.RELIABLE, + value: { + case: 'rpcAck', + value: new RpcAck({ + requestId, + }), + }, + }), + }); + } + + private publishRpcResponsePacket( + destinationIdentity: string, + requestId: string, + payload: string | null, + error: RpcError | null, + ) { + this.emit('sendDataPacket', { + packet: new DataPacket({ + destinationIdentities: [destinationIdentity], + kind: DataPacket_Kind.RELIABLE, + value: { + case: 'rpcResponse', + value: new RpcResponse({ + requestId, + value: error + ? { case: 'error', value: error.toProto() } + : { case: 'payload', value: payload ?? '' }, + }), + }, + }), + }); + } + + /** + * Send a successful RPC response payload, choosing the transport based on + * the caller's client protocol version. + */ + private async publishRpcResponse( + destinationIdentity: string, + requestId: string, + payload: string, + ) { + const callerClientProtocol = this.getRemoteParticipantClientProtocol(destinationIdentity); + + if (callerClientProtocol >= CLIENT_PROTOCOL_DATA_STREAM_RPC) { + // Send response as a data stream + const writer = await this.outgoingDataStreamManager.streamText({ + topic: RPC_RESPONSE_DATA_STREAM_TOPIC, + destinationIdentities: [destinationIdentity], + attributes: { [RPC_REQUEST_ID_ATTR]: requestId }, + }); + await writer.write(payload); + await writer.close(); + return; + } + + // Legacy client: enforce size limit and send uncompressed payload inline + const responseBytes = byteLength(payload); + if (responseBytes > MAX_V1_PAYLOAD_BYTES) { + this.log.warn( + `RPC Response payload too large for request ${requestId}. To send larger responses, consider updating the sending client.`, + ); + this.publishRpcResponsePacket( + destinationIdentity, + requestId, + null, + RpcError.builtIn('RESPONSE_PAYLOAD_TOO_LARGE'), + ); + return; + } + + this.publishRpcResponsePacket(destinationIdentity, requestId, payload, null); + } +} diff --git a/src/room/rpc/server/events.ts b/src/room/rpc/server/events.ts new file mode 100644 index 0000000000..913c935e44 --- /dev/null +++ b/src/room/rpc/server/events.ts @@ -0,0 +1,9 @@ +import type { DataPacket } from '@livekit/protocol'; + +export type EventSendDataPacket = { + packet: DataPacket; +}; + +export type RpcServerManagerCallbacks = { + sendDataPacket: (event: EventSendDataPacket) => void; +}; diff --git a/src/room/rpc.ts b/src/room/rpc/utils.ts similarity index 80% rename from src/room/rpc.ts rename to src/room/rpc/utils.ts index 60a6dfc8c8..b01d94a5c9 100644 --- a/src/room/rpc.ts +++ b/src/room/rpc/utils.ts @@ -1,6 +1,3 @@ -// SPDX-FileCopyrightText: 2024 LiveKit, Inc. -// -// SPDX-License-Identifier: Apache-2.0 import { RpcError as RpcError_Proto } from '@livekit/protocol'; /** Parameters for initiating an RPC call */ @@ -139,10 +136,47 @@ export class RpcError extends Error { } /* - * Maximum payload size for RPC requests and responses. If a payload exceeds this size, + * Maximum payload size for RPC requests and responses for clients with a clientProtocol of less + * than CLIENT_PROTOCOL_DATA_STREAM_RPC. + * + * If a payload exceeds this size and the remote client does not support compression, * the RPC call will fail with a REQUEST_PAYLOAD_TOO_LARGE(1402) or RESPONSE_PAYLOAD_TOO_LARGE(1504) error. */ -export const MAX_PAYLOAD_BYTES = 15360; // 15 KB +export const MAX_V1_PAYLOAD_BYTES = 15360; // 15 KB + +/** + * Topic used for v2 RPC request data streams. + * @internal + */ +export const RPC_REQUEST_DATA_STREAM_TOPIC = 'lk.rpc_request'; + +/** + * Topic used for v2 RPC response data streams. + * @internal + */ +export const RPC_RESPONSE_DATA_STREAM_TOPIC = 'lk.rpc_response'; + +/** @internal */ +export const RPC_REQUEST_ID_ATTR = 'lk.rpc_request_id'; + +/** @internal */ +export const RPC_REQUEST_METHOD_ATTR = 'lk.rpc_request_method'; + +/** @internal */ +export const RPC_REQUEST_RESPONSE_TIMEOUT_MS_ATTR = 'lk.rpc_request_response_timeout_ms'; + +/** @internal */ +export const RPC_REQUEST_VERSION_ATTR = 'lk.rpc_request_version'; + +/** Initial version of rpc which uses RpcRequest / RpcResponse messages. + * @internal + **/ +export const RPC_VERSION_V1 = 1; + +/** Rpc version backed by data streams instead of RpcRequest / RpcResponse. + * @internal + **/ +export const RPC_VERSION_V2 = 2; /** * @internal diff --git a/src/room/utils.ts b/src/room/utils.ts index 49df9b6fca..42cb401e8d 100644 --- a/src/room/utils.ts +++ b/src/room/utils.ts @@ -9,7 +9,7 @@ import { type Throws } from '@livekit/throws-transformer/throws'; import TypedPromise from '../utils/TypedPromise'; import { getBrowser } from '../utils/browserParser'; import type { BrowserDetails } from '../utils/browserParser'; -import { protocolVersion, version } from '../version'; +import { clientProtocol, protocolVersion, version } from '../version'; import { type ConnectionError, ConnectionErrorReason } from './errors'; import type LocalParticipant from './participant/LocalParticipant'; import type Participant from './participant/Participant'; @@ -368,6 +368,7 @@ export function getClientInfo(): ClientInfo { const info = new ClientInfo({ sdk: ClientInfo_SDK.JS, protocol: protocolVersion, + clientProtocol, version, }); diff --git a/src/version.ts b/src/version.ts index e0d96ee71b..ea24368829 100644 --- a/src/version.ts +++ b/src/version.ts @@ -2,3 +2,13 @@ import { version as v } from '../package.json'; export const version = v; export const protocolVersion = 16; + +/** Initial client protocol. */ +export const CLIENT_PROTOCOL_DEFAULT = 0; +/** Replaces RPC v1 protocol with a v2 data streams based one to support unlimited request / + * response payload length. */ +export const CLIENT_PROTOCOL_DATA_STREAM_RPC = 1; + +/** The client protocol version indicates what level of support that the client has for + * client <-> client api interactions. */ +export const clientProtocol = CLIENT_PROTOCOL_DATA_STREAM_RPC;