-
Notifications
You must be signed in to change notification settings - Fork 10
feat(server): implement xumux v0.2 binary transport #7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -14,12 +14,23 @@ import { | |
| WS_READY_STATE_OPEN, | ||
| } from "./constants.js"; | ||
| import { ServerErrorException, serverError } from "./errors.js"; | ||
| import { | ||
| TERMINAL_MSG_TYPE, | ||
| decodeInput, | ||
| decodeResize, | ||
| encodeExit, | ||
| encodeOutput, | ||
| encodeSessionInfo, | ||
| encodeTitle, | ||
| } from "./protocol.js"; | ||
| import { clientToServerMessageSchema } from "./schemas.js"; | ||
| import { enforceLoopback, isLoopbackHost, loopbackMiddleware } from "./security.js"; | ||
| import { Session } from "./session.js"; | ||
| import { SessionRegistry } from "./session-registry.js"; | ||
| import { resolveStaticAsset } from "./static-resolver.js"; | ||
| import type { ServerToClientMessage } from "./types.js"; | ||
| import { WebSocketAdapter } from "./xumux/websocket-adapter.js"; | ||
| import { XumuxServer } from "./xumux/xumux-server.js"; | ||
|
|
||
| export interface ServerOptions { | ||
| port?: number; | ||
|
|
@@ -89,6 +100,7 @@ export const createServer = async (options: ServerOptions = {}): Promise<Running | |
| } | ||
|
|
||
| let session: Session | null = null; | ||
| let registryId: number | null = null; | ||
|
|
||
| return { | ||
| onOpen(_event, ws) { | ||
|
|
@@ -97,11 +109,8 @@ export const createServer = async (options: ServerOptions = {}): Promise<Running | |
| return; | ||
| } | ||
| session = new Session({}); | ||
| registry.register(session); | ||
| registryId = registry.registerAuto(session); | ||
|
|
||
| // Wire listeners BEFORE the first safeSend so any synchronous emit | ||
| // from Session (current or future) reaches the client. Today | ||
| // node-pty's data/exit are async, but this guards against drift. | ||
| const onOutput = (data: string) => safeSend(ws, { type: "output", data }); | ||
| const onTitle = (title: string) => safeSend(ws, { type: "title", title }); | ||
| const onExit = (code: number | null) => { | ||
|
|
@@ -138,16 +147,156 @@ export const createServer = async (options: ServerOptions = {}): Promise<Running | |
| } | ||
| }, | ||
| onClose() { | ||
| if (!session) return; | ||
| registry.unregister(session); | ||
| if (!session || registryId === null) return; | ||
| registry.unregister(registryId); | ||
| session.dispose(); | ||
| session = null; | ||
| registryId = null; | ||
| }, | ||
| onError() { | ||
| if (!session) return; | ||
| registry.unregister(session); | ||
| if (!session || registryId === null) return; | ||
| registry.unregister(registryId); | ||
| session.dispose(); | ||
| session = null; | ||
| registryId = null; | ||
| }, | ||
| }; | ||
| }), | ||
| ); | ||
|
|
||
| app.get( | ||
| "/xumux", | ||
| upgradeWebSocket((context) => { | ||
| const blocked = enforceLoopback(context); | ||
| if (blocked) { | ||
| return { onOpen: (_event, ws) => ws.close(WS_CLOSE_POLICY_VIOLATION, "forbidden") }; | ||
| } | ||
|
|
||
| let xumux: XumuxServer | null = null; | ||
| // Per-connection session map: channelId → session. | ||
| // Channel IDs are only unique within a single xumux connection, so using | ||
| // the global registry (keyed by channelId) would cause collisions when two | ||
| // concurrent connections both open channel 1. This local map is scoped to | ||
| // the current WebSocket connection closure. | ||
| const connectionSessions = new Map<number, Session>(); | ||
| // Parallel map of channelId → global registry auto-ID for size tracking. | ||
| const connectionRegistryIds = new Map<number, number>(); | ||
|
|
||
| return { | ||
| onOpen(_event, ws) { | ||
| const adapter = new WebSocketAdapter(ws); | ||
| xumux = new XumuxServer(adapter, { | ||
| onOpenChannel: (channelId) => { | ||
| if (registry.size() >= MAX_CONCURRENT_SESSIONS) { | ||
| xumux?.closeChannel(channelId); | ||
| return; | ||
| } | ||
| const session = new Session({}); | ||
| const globalId = registry.registerAuto(session); | ||
| connectionSessions.set(channelId, session); | ||
| connectionRegistryIds.set(channelId, globalId); | ||
|
|
||
| const sessionInfoPayload = encodeSessionInfo({ | ||
| shell: session.shell, | ||
| shellName: session.shellBaseName, | ||
| pid: session.pid, | ||
| cwd: session.cwd, | ||
| }); | ||
| xumux?.sendToChannel(channelId, TERMINAL_MSG_TYPE.SESSION_INFO, sessionInfoPayload); | ||
|
|
||
| const cleanupChannel = (disposeSes: boolean) => { | ||
| connectionSessions.delete(channelId); | ||
| const gid = connectionRegistryIds.get(channelId); | ||
| if (gid !== undefined) { | ||
| registry.unregister(gid); | ||
| connectionRegistryIds.delete(channelId); | ||
| } | ||
| if (disposeSes) session.dispose(); | ||
| }; | ||
|
|
||
| session.on("output", (data) => { | ||
| if (adapter.bufferedAmount > WS_BACKPRESSURE_THRESHOLD_BYTES) { | ||
| xumux?.closeChannel(channelId); | ||
| cleanupChannel(true); | ||
| return; | ||
| } | ||
| xumux?.sendToChannel(channelId, TERMINAL_MSG_TYPE.OUTPUT, encodeOutput(data)); | ||
| }); | ||
| session.on("title", (title) => { | ||
| xumux?.sendToChannel(channelId, TERMINAL_MSG_TYPE.TITLE, encodeTitle(title)); | ||
| }); | ||
| session.on("exit", (code) => { | ||
| xumux?.sendToChannel(channelId, TERMINAL_MSG_TYPE.EXIT, encodeExit(code)); | ||
| xumux?.closeChannel(channelId); | ||
| // dispose() must be called here: onCloseChannel won't fire for | ||
| // server-initiated closes, so this is the only cleanup path. | ||
| cleanupChannel(true); | ||
| }); | ||
|
cursor[bot] marked this conversation as resolved.
|
||
| }, | ||
| onCloseChannel: (channelId) => { | ||
| const session = connectionSessions.get(channelId); | ||
| if (!session) return; | ||
| connectionSessions.delete(channelId); | ||
| const gid = connectionRegistryIds.get(channelId); | ||
| if (gid !== undefined) { | ||
| registry.unregister(gid); | ||
| connectionRegistryIds.delete(channelId); | ||
| } | ||
| session.dispose(); | ||
| }, | ||
| onChannelMessage: (event) => { | ||
| const session = connectionSessions.get(event.channelId); | ||
| if (!session) return; | ||
| if (event.type === TERMINAL_MSG_TYPE.INPUT) { | ||
| session.write(decodeInput(event.payload)); | ||
| } else if (event.type === TERMINAL_MSG_TYPE.RESIZE) { | ||
| const dims = decodeResize(event.payload); | ||
| if (dims) session.resize(dims.cols, dims.rows); | ||
| } | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Binary xumux path lacks input size validationMedium Severity The Reviewed by Cursor Bugbot for commit 3ce56b7. Configure here. |
||
| }, | ||
| }); | ||
| }, | ||
| onMessage(event) { | ||
| if (!xumux) return; | ||
| const raw = event.data; | ||
| let bytes: Uint8Array; | ||
| if (raw instanceof Uint8Array) { | ||
| bytes = raw; | ||
| } else if (raw instanceof ArrayBuffer) { | ||
| bytes = new Uint8Array(raw); | ||
| } else if (typeof raw === "string") { | ||
| bytes = new TextEncoder().encode(raw); | ||
| } else if (ArrayBuffer.isView(raw)) { | ||
| bytes = new Uint8Array(raw.buffer, raw.byteOffset, raw.byteLength); | ||
| } else if (typeof Blob !== "undefined" && raw instanceof Blob) { | ||
| void raw.arrayBuffer().then((buffer) => { | ||
| xumux?.onMessage(new Uint8Array(buffer)); | ||
| }); | ||
| return; | ||
|
cursor[bot] marked this conversation as resolved.
|
||
| } else { | ||
| return; | ||
| } | ||
| xumux.onMessage(bytes); | ||
| }, | ||
| onClose() { | ||
| if (!xumux) return; | ||
| xumux.close(); | ||
| xumux = null; | ||
| // Dispose any sessions that didn't get an explicit CLOSE_CHANNEL | ||
| // (e.g. client disconnected mid-session). | ||
| for (const [, session] of connectionSessions) session.dispose(); | ||
| for (const [, gid] of connectionRegistryIds) registry.unregister(gid); | ||
| connectionSessions.clear(); | ||
| connectionRegistryIds.clear(); | ||
| }, | ||
| onError() { | ||
| if (!xumux) return; | ||
| xumux.close(); | ||
| xumux = null; | ||
| for (const [, session] of connectionSessions) session.dispose(); | ||
| for (const [, gid] of connectionRegistryIds) registry.unregister(gid); | ||
| connectionSessions.clear(); | ||
| connectionRegistryIds.clear(); | ||
| }, | ||
| }; | ||
| }), | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,15 @@ | ||
| export const XUMUX_VERSION = 2; | ||
|
|
||
| export const XUMUX_CHANNEL_CONTROL = 0x0000; | ||
| export const XUMUX_CHANNEL_MIN = 0x0001; | ||
| export const XUMUX_CHANNEL_MAX = 0xfffe; | ||
|
|
||
| export const XUMUX_FRAME_HEADER_BYTES = 3; | ||
|
|
||
| export const XUMUX_CTRL_HELLO = 0x01; | ||
| export const XUMUX_CTRL_WELCOME = 0x02; | ||
| export const XUMUX_CTRL_OPEN_CHANNEL = 0x03; | ||
| export const XUMUX_CTRL_CHANNEL_ACK = 0x04; | ||
| export const XUMUX_CTRL_CLOSE_CHANNEL = 0x05; | ||
| export const XUMUX_CTRL_PING = 0x06; | ||
| export const XUMUX_CTRL_PONG = 0x07; |


Uh oh!
There was an error while loading. Please reload this page.