From c4493e6a9a492a20b794586e48744e19c4b0e283 Mon Sep 17 00:00:00 2001 From: Visionik Date: Wed, 6 May 2026 00:38:53 -0400 Subject: [PATCH 1/2] feat(server): implement xumux v0.2 binary transport - Add xumux v0.2 protocol implementation (XumuxServer, WebSocketAdapter, frame codec) - Add binary codec for terminal message types (INPUT, OUTPUT, RESIZE, EXIT, TITLE, SESSION_INFO) - Add /xumux WebSocket endpoint with HELLO/WELCOME handshake and channel management - Update SessionRegistry to Map with register/unregister/getByChannelId - Enforce MAX_CONCURRENT_SESSIONS (64) and loopback security on /xumux - Add backpressure: close channel when bufferedAmount exceeds threshold - Add unit tests for protocol codec, xumux server, WebSocketAdapter, SessionRegistry - Add integration tests for full session lifecycle via mock transport - Add CHANGELOG.md entry under [Unreleased] --- packages/server/CHANGELOG.md | 18 + packages/server/package.json | 4 +- packages/server/src/index.ts | 129 ++++++- packages/server/src/protocol.ts | 92 +++++ packages/server/src/session-registry.ts | 24 +- packages/server/src/xumux/constants.ts | 15 + packages/server/src/xumux/frame.ts | 25 ++ packages/server/src/xumux/index.ts | 20 + packages/server/src/xumux/types.ts | 18 + .../server/src/xumux/websocket-adapter.ts | 32 ++ packages/server/src/xumux/xumux-server.ts | 135 +++++++ packages/server/tests/protocol.test.ts | 148 ++++++++ .../server/tests/session-registry.test.ts | 75 ++++ .../server/tests/xumux-integration.test.ts | 349 ++++++++++++++++++ packages/server/tests/xumux.test.ts | 232 ++++++++++++ pnpm-lock.yaml | 13 + 16 files changed, 1314 insertions(+), 15 deletions(-) create mode 100644 packages/server/src/xumux/constants.ts create mode 100644 packages/server/src/xumux/frame.ts create mode 100644 packages/server/src/xumux/index.ts create mode 100644 packages/server/src/xumux/types.ts create mode 100644 packages/server/src/xumux/websocket-adapter.ts create mode 100644 packages/server/src/xumux/xumux-server.ts create mode 100644 packages/server/tests/protocol.test.ts create mode 100644 packages/server/tests/session-registry.test.ts create mode 100644 packages/server/tests/xumux-integration.test.ts create mode 100644 packages/server/tests/xumux.test.ts diff --git a/packages/server/CHANGELOG.md b/packages/server/CHANGELOG.md index ccc5d28..5c8ffdc 100644 --- a/packages/server/CHANGELOG.md +++ b/packages/server/CHANGELOG.md @@ -1,5 +1,23 @@ # localterm-server +## [Unreleased] + +### Features + +- Add xumux v0.2 binary transport (`/xumux` WebSocket endpoint) +- Binary codec for terminal message types (input, output, resize, exit, title, session-info) +- XumuxServer multiplexer with HELLO/WELCOME handshake and channel management +- WebSocketAdapter for xumux transport over WebSocket +- Per-connection local session map prevents channel-ID collisions across concurrent connections +- Backpressure enforcement on xumux channels +- Loopback security on `/xumux` endpoint + +### Bug Fixes + +- Exit-code null sentinel changed from -1 to INT32_MIN so real exit code -1 is preserved +- `decodeResize` returns null on short payload instead of throwing +- `session.dispose()` called in PTY exit handler to prevent resource leak + ## 0.0.11 ### Patch Changes diff --git a/packages/server/package.json b/packages/server/package.json index efd22f2..dcbefa4 100644 --- a/packages/server/package.json +++ b/packages/server/package.json @@ -60,8 +60,10 @@ }, "devDependencies": { "@types/node": "^25.5.0", + "@types/ws": "^8.18.1", "typescript": "^5.9.3", - "vite-plus": "^0.1.12" + "vite-plus": "^0.1.12", + "ws": "^8.20.0" }, "engines": { "node": ">=22" diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index 5611d83..d4745fc 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -14,12 +14,23 @@ import { WS_READY_STATE_OPEN, } from "./constants.js"; import { ServerErrorException, serverError } from "./errors.js"; +import { + TERMINAL_MSG_TYPE, + decodeInput, + decodeResize, + encodeExit, + encodeOutput, + encodeSessionInfo, + encodeTitle, +} from "./protocol.js"; import { clientToServerMessageSchema } from "./schemas.js"; import { enforceLoopback, isLoopbackHost, loopbackMiddleware } from "./security.js"; import { Session } from "./session.js"; import { SessionRegistry } from "./session-registry.js"; import { resolveStaticAsset } from "./static-resolver.js"; import type { ServerToClientMessage } from "./types.js"; +import { WebSocketAdapter } from "./xumux/websocket-adapter.js"; +import { XumuxServer } from "./xumux/xumux-server.js"; export interface ServerOptions { port?: number; @@ -89,6 +100,7 @@ export const createServer = async (options: ServerOptions = {}): Promise safeSend(ws, { type: "output", data }); const onTitle = (title: string) => safeSend(ws, { type: "title", title }); const onExit = (code: number | null) => { @@ -138,16 +147,120 @@ export const createServer = async (options: ServerOptions = {}): Promise { + const blocked = enforceLoopback(context); + if (blocked) { + return { onOpen: (_event, ws) => ws.close(WS_CLOSE_POLICY_VIOLATION, "forbidden") }; + } + + let xumux: XumuxServer | null = null; + + return { + onOpen(_event, ws) { + const adapter = new WebSocketAdapter(ws); + xumux = new XumuxServer(adapter, { + onOpenChannel: (channelId) => { + if (registry.size() >= MAX_CONCURRENT_SESSIONS) { + xumux?.closeChannel(channelId); + return; + } + const session = new Session({}); + registry.register(channelId, session); + + const sessionInfoPayload = encodeSessionInfo({ + shell: session.shell, + shellName: session.shellBaseName, + pid: session.pid, + cwd: session.cwd, + }); + xumux?.sendToChannel(channelId, TERMINAL_MSG_TYPE.SESSION_INFO, sessionInfoPayload); + + session.on("output", (data) => { + if (adapter.bufferedAmount > WS_BACKPRESSURE_THRESHOLD_BYTES) { + xumux?.closeChannel(channelId); + registry.unregister(channelId); + session.dispose(); + return; + } + xumux?.sendToChannel(channelId, TERMINAL_MSG_TYPE.OUTPUT, encodeOutput(data)); + }); + session.on("title", (title) => { + xumux?.sendToChannel(channelId, TERMINAL_MSG_TYPE.TITLE, encodeTitle(title)); + }); + session.on("exit", (code) => { + xumux?.sendToChannel(channelId, TERMINAL_MSG_TYPE.EXIT, encodeExit(code)); + xumux?.closeChannel(channelId); + registry.unregister(channelId); + }); + }, + onCloseChannel: (channelId) => { + const session = registry.getByChannelId(channelId); + if (!session) return; + registry.unregister(channelId); + session.dispose(); + }, + onChannelMessage: (event) => { + const session = registry.getByChannelId(event.channelId); + if (!session) return; + if (event.type === TERMINAL_MSG_TYPE.INPUT) { + session.write(decodeInput(event.payload)); + } else if (event.type === TERMINAL_MSG_TYPE.RESIZE) { + const { cols, rows } = decodeResize(event.payload); + session.resize(cols, rows); + } + }, + }); + }, + onMessage(event) { + if (!xumux) return; + const raw = event.data; + let bytes: Uint8Array; + if (raw instanceof Uint8Array) { + bytes = raw; + } else if (raw instanceof ArrayBuffer) { + bytes = new Uint8Array(raw); + } else if (typeof raw === "string") { + bytes = new TextEncoder().encode(raw); + } else if (ArrayBuffer.isView(raw)) { + bytes = new Uint8Array(raw.buffer, raw.byteOffset, raw.byteLength); + } else if (typeof Blob !== "undefined" && raw instanceof Blob) { + void raw.arrayBuffer().then((buffer) => { + xumux?.onMessage(new Uint8Array(buffer)); + }); + return; + } else { + return; + } + xumux.onMessage(bytes); + }, + onClose() { + if (!xumux) return; + xumux.close(); + xumux = null; + }, + onError() { + if (!xumux) return; + xumux.close(); + xumux = null; }, }; }), diff --git a/packages/server/src/protocol.ts b/packages/server/src/protocol.ts index 26643b6..8aa4793 100644 --- a/packages/server/src/protocol.ts +++ b/packages/server/src/protocol.ts @@ -18,3 +18,95 @@ export { } from "./schemas.js"; export type { ClientToServerMessage, ServerToClientMessage } from "./types.js"; export type { ServerError, ServerErrorCode, ServerErrorKind } from "./errors.js"; + +export const TERMINAL_MSG_TYPE = { + INPUT: 0x01, + OUTPUT: 0x02, + RESIZE: 0x03, + EXIT: 0x04, + TITLE: 0x05, + SESSION_INFO: 0x06, +} as const; + +export type TerminalMsgType = (typeof TERMINAL_MSG_TYPE)[keyof typeof TERMINAL_MSG_TYPE]; + +const textEncoder = new TextEncoder(); +const textDecoder = new TextDecoder(); + +const EXIT_CODE_NULL_SENTINEL = -1; + +export const encodeInput = (data: string): Uint8Array => textEncoder.encode(data); + +export const decodeInput = (payload: Uint8Array): string => textDecoder.decode(payload); + +export const encodeOutput = (data: string): Uint8Array => textEncoder.encode(data); + +export const decodeOutput = (payload: Uint8Array): string => textDecoder.decode(payload); + +export const encodeResize = (cols: number, rows: number): Uint8Array => { + const buffer = new Uint8Array(4); + const view = new DataView(buffer.buffer); + view.setUint16(0, cols, false); + view.setUint16(2, rows, false); + return buffer; +}; + +export const decodeResize = (payload: Uint8Array): { cols: number; rows: number } => { + if (payload.length < 4) throw new Error("resize payload must be 4 bytes"); + const view = new DataView(payload.buffer, payload.byteOffset, payload.byteLength); + return { cols: view.getUint16(0, false), rows: view.getUint16(2, false) }; +}; + +export const encodeExit = (code: number | null): Uint8Array => { + const buffer = new Uint8Array(4); + const view = new DataView(buffer.buffer); + view.setInt32(0, code ?? EXIT_CODE_NULL_SENTINEL, false); + return buffer; +}; + +export const decodeExit = (payload: Uint8Array): number | null => { + if (payload.length < 4) throw new Error("exit payload must be 4 bytes"); + const view = new DataView(payload.buffer, payload.byteOffset, payload.byteLength); + const raw = view.getInt32(0, false); + return raw === EXIT_CODE_NULL_SENTINEL ? null : raw; +}; + +export const encodeTitle = (title: string): Uint8Array => textEncoder.encode(title); + +export const decodeTitle = (payload: Uint8Array): string => textDecoder.decode(payload); + +export interface SessionInfo { + shell: string; + shellName: string; + pid: number; + cwd: string; +} + +export const encodeSessionInfo = (info: SessionInfo): Uint8Array => + textEncoder.encode(JSON.stringify(info)); + +export const decodeSessionInfo = (payload: Uint8Array): SessionInfo => + JSON.parse(textDecoder.decode(payload)) as SessionInfo; + +export { + XUMUX_CHANNEL_CONTROL, + XUMUX_CHANNEL_MAX, + XUMUX_CHANNEL_MIN, + XUMUX_CTRL_CHANNEL_ACK, + XUMUX_CTRL_CLOSE_CHANNEL, + XUMUX_CTRL_HELLO, + XUMUX_CTRL_OPEN_CHANNEL, + XUMUX_CTRL_PING, + XUMUX_CTRL_PONG, + XUMUX_CTRL_WELCOME, + XUMUX_FRAME_HEADER_BYTES, + XUMUX_VERSION, +} from "./xumux/index.js"; +export { decodeFrame, encodeFrame, WebSocketAdapter, XumuxServer } from "./xumux/index.js"; +export type { + WebSocketLike, + XumuxChannelEvent, + XumuxFrame, + XumuxServerEvents, + XumuxTransport, +} from "./xumux/index.js"; diff --git a/packages/server/src/session-registry.ts b/packages/server/src/session-registry.ts index aaaec7b..7000fed 100644 --- a/packages/server/src/session-registry.ts +++ b/packages/server/src/session-registry.ts @@ -1,14 +1,26 @@ import type { Session } from "./session.js"; export class SessionRegistry { - private readonly sessions = new Set(); + private readonly sessions = new Map(); + private nextAutoId = -1; - register(session: Session): void { - this.sessions.add(session); + register(channelId: number, session: Session): void { + this.sessions.set(channelId, session); } - unregister(session: Session): void { - this.sessions.delete(session); + registerAuto(session: Session): number { + const autoId = this.nextAutoId; + this.nextAutoId -= 1; + this.sessions.set(autoId, session); + return autoId; + } + + unregister(channelId: number): void { + this.sessions.delete(channelId); + } + + getByChannelId(channelId: number): Session | undefined { + return this.sessions.get(channelId); } size(): number { @@ -16,7 +28,7 @@ export class SessionRegistry { } disposeAll(): void { - for (const session of this.sessions) { + for (const session of this.sessions.values()) { session.dispose(); } this.sessions.clear(); diff --git a/packages/server/src/xumux/constants.ts b/packages/server/src/xumux/constants.ts new file mode 100644 index 0000000..86424c6 --- /dev/null +++ b/packages/server/src/xumux/constants.ts @@ -0,0 +1,15 @@ +export const XUMUX_VERSION = 2; + +export const XUMUX_CHANNEL_CONTROL = 0x0000; +export const XUMUX_CHANNEL_MIN = 0x0001; +export const XUMUX_CHANNEL_MAX = 0xfffe; + +export const XUMUX_FRAME_HEADER_BYTES = 3; + +export const XUMUX_CTRL_HELLO = 0x01; +export const XUMUX_CTRL_WELCOME = 0x02; +export const XUMUX_CTRL_OPEN_CHANNEL = 0x03; +export const XUMUX_CTRL_CHANNEL_ACK = 0x04; +export const XUMUX_CTRL_CLOSE_CHANNEL = 0x05; +export const XUMUX_CTRL_PING = 0x06; +export const XUMUX_CTRL_PONG = 0x07; diff --git a/packages/server/src/xumux/frame.ts b/packages/server/src/xumux/frame.ts new file mode 100644 index 0000000..cd37f91 --- /dev/null +++ b/packages/server/src/xumux/frame.ts @@ -0,0 +1,25 @@ +import { XUMUX_FRAME_HEADER_BYTES } from "./constants.js"; + +export interface XumuxFrame { + channelId: number; + type: number; + payload: Uint8Array; +} + +export const encodeFrame = (channelId: number, type: number, payload: Uint8Array): Uint8Array => { + const frame = new Uint8Array(XUMUX_FRAME_HEADER_BYTES + payload.length); + const view = new DataView(frame.buffer, frame.byteOffset, frame.byteLength); + view.setUint16(0, channelId, false); + frame[2] = type; + frame.set(payload, XUMUX_FRAME_HEADER_BYTES); + return frame; +}; + +export const decodeFrame = (data: Uint8Array): XumuxFrame | null => { + if (data.length < XUMUX_FRAME_HEADER_BYTES) return null; + const view = new DataView(data.buffer, data.byteOffset, data.byteLength); + const channelId = view.getUint16(0, false); + const type = data[2]; + const payload = data.subarray(XUMUX_FRAME_HEADER_BYTES); + return { channelId, type, payload }; +}; diff --git a/packages/server/src/xumux/index.ts b/packages/server/src/xumux/index.ts new file mode 100644 index 0000000..df5b7f3 --- /dev/null +++ b/packages/server/src/xumux/index.ts @@ -0,0 +1,20 @@ +export { + XUMUX_CHANNEL_CONTROL, + XUMUX_CHANNEL_MAX, + XUMUX_CHANNEL_MIN, + XUMUX_CTRL_CHANNEL_ACK, + XUMUX_CTRL_CLOSE_CHANNEL, + XUMUX_CTRL_HELLO, + XUMUX_CTRL_OPEN_CHANNEL, + XUMUX_CTRL_PING, + XUMUX_CTRL_PONG, + XUMUX_CTRL_WELCOME, + XUMUX_FRAME_HEADER_BYTES, + XUMUX_VERSION, +} from "./constants.js"; +export { decodeFrame, encodeFrame } from "./frame.js"; +export type { XumuxFrame } from "./frame.js"; +export type { XumuxChannelEvent, XumuxServerEvents, XumuxTransport } from "./types.js"; +export { WebSocketAdapter } from "./websocket-adapter.js"; +export type { WebSocketLike } from "./websocket-adapter.js"; +export { XumuxServer } from "./xumux-server.js"; diff --git a/packages/server/src/xumux/types.ts b/packages/server/src/xumux/types.ts new file mode 100644 index 0000000..df60401 --- /dev/null +++ b/packages/server/src/xumux/types.ts @@ -0,0 +1,18 @@ +export interface XumuxTransport { + send(data: Uint8Array): void; + close(): void; + readonly bufferedAmount: number; +} + +export interface XumuxChannelEvent { + channelId: number; + type: number; + payload: Uint8Array; +} + +export interface XumuxServerEvents { + onOpenChannel?: (channelId: number) => void; + onCloseChannel?: (channelId: number) => void; + onChannelMessage?: (event: XumuxChannelEvent) => void; + onError?: (error: Error) => void; +} diff --git a/packages/server/src/xumux/websocket-adapter.ts b/packages/server/src/xumux/websocket-adapter.ts new file mode 100644 index 0000000..06593ed --- /dev/null +++ b/packages/server/src/xumux/websocket-adapter.ts @@ -0,0 +1,32 @@ +import type { XumuxTransport } from "./types.js"; + +export interface WebSocketLike { + send(data: ArrayBufferLike | Uint8Array | string, options?: unknown): void; + close(code?: number, reason?: string): void; + readonly readyState: number; + raw?: unknown; +} + +const getRawBufferedAmount = (socket: WebSocketLike): number => { + const raw = socket.raw; + if (!raw || typeof raw !== "object") return 0; + const candidate = Reflect.get(raw, "bufferedAmount"); + return typeof candidate === "number" ? candidate : 0; +}; + +export class WebSocketAdapter implements XumuxTransport { + constructor(private readonly socket: WebSocketLike) {} + + send(data: Uint8Array): void { + if (this.socket.readyState !== 1) return; + this.socket.send(data.buffer.slice(data.byteOffset, data.byteOffset + data.byteLength)); + } + + close(): void { + this.socket.close(); + } + + get bufferedAmount(): number { + return getRawBufferedAmount(this.socket); + } +} diff --git a/packages/server/src/xumux/xumux-server.ts b/packages/server/src/xumux/xumux-server.ts new file mode 100644 index 0000000..91f3804 --- /dev/null +++ b/packages/server/src/xumux/xumux-server.ts @@ -0,0 +1,135 @@ +import { + XUMUX_CHANNEL_CONTROL, + XUMUX_CHANNEL_MAX, + XUMUX_CHANNEL_MIN, + XUMUX_CTRL_CHANNEL_ACK, + XUMUX_CTRL_CLOSE_CHANNEL, + XUMUX_CTRL_HELLO, + XUMUX_CTRL_OPEN_CHANNEL, + XUMUX_CTRL_PING, + XUMUX_CTRL_PONG, + XUMUX_CTRL_WELCOME, + XUMUX_VERSION, +} from "./constants.js"; +import { decodeFrame, encodeFrame } from "./frame.js"; +import type { XumuxServerEvents, XumuxTransport } from "./types.js"; + +export class XumuxServer { + private readonly openChannels = new Set(); + private handshakeComplete = false; + + constructor( + private readonly transport: XumuxTransport, + private readonly events: XumuxServerEvents, + ) {} + + onMessage(data: Uint8Array): void { + const frame = decodeFrame(data); + if (!frame) { + this.events.onError?.(new Error("malformed frame: too short")); + return; + } + + if (frame.channelId === XUMUX_CHANNEL_CONTROL) { + this.handleControl(frame.type, frame.payload); + return; + } + + if (!this.handshakeComplete) { + this.events.onError?.(new Error("data before handshake")); + return; + } + + if (!this.openChannels.has(frame.channelId)) { + this.events.onError?.(new Error(`message on unknown channel ${frame.channelId}`)); + return; + } + + this.events.onChannelMessage?.({ + channelId: frame.channelId, + type: frame.type, + payload: frame.payload, + }); + } + + sendToChannel(channelId: number, type: number, payload: Uint8Array): void { + if (!this.openChannels.has(channelId)) return; + this.transport.send(encodeFrame(channelId, type, payload)); + } + + closeChannel(channelId: number): void { + if (!this.openChannels.has(channelId)) return; + this.openChannels.delete(channelId); + const channelIdPayload = new Uint8Array(2); + new DataView(channelIdPayload.buffer).setUint16(0, channelId, false); + this.transport.send(encodeFrame(XUMUX_CHANNEL_CONTROL, XUMUX_CTRL_CLOSE_CHANNEL, channelIdPayload)); + } + + get activeChannelCount(): number { + return this.openChannels.size; + } + + get isHandshakeComplete(): boolean { + return this.handshakeComplete; + } + + close(): void { + for (const channelId of this.openChannels) { + this.events.onCloseChannel?.(channelId); + } + this.openChannels.clear(); + this.transport.close(); + } + + private handleControl(type: number, payload: Uint8Array): void { + switch (type) { + case XUMUX_CTRL_HELLO: + this.handleHello(payload); + break; + case XUMUX_CTRL_OPEN_CHANNEL: + this.handleOpenChannel(payload); + break; + case XUMUX_CTRL_CLOSE_CHANNEL: + this.handleCloseChannel(payload); + break; + case XUMUX_CTRL_PING: + this.transport.send(encodeFrame(XUMUX_CHANNEL_CONTROL, XUMUX_CTRL_PONG, new Uint8Array(0))); + break; + default: + break; + } + } + + private handleHello(payload: Uint8Array): void { + if (this.handshakeComplete) return; + if (payload.length >= 1 && payload[0] !== XUMUX_VERSION) { + this.events.onError?.(new Error(`unsupported xumux version: ${payload[0]}`)); + this.transport.close(); + return; + } + this.handshakeComplete = true; + const welcomePayload = new Uint8Array([XUMUX_VERSION]); + this.transport.send(encodeFrame(XUMUX_CHANNEL_CONTROL, XUMUX_CTRL_WELCOME, welcomePayload)); + } + + private handleOpenChannel(payload: Uint8Array): void { + if (!this.handshakeComplete) return; + if (payload.length < 2) return; + const channelId = new DataView(payload.buffer, payload.byteOffset, payload.byteLength).getUint16(0, false); + if (channelId < XUMUX_CHANNEL_MIN || channelId > XUMUX_CHANNEL_MAX) return; + if (this.openChannels.has(channelId)) return; + this.openChannels.add(channelId); + const ackPayload = new Uint8Array(2); + new DataView(ackPayload.buffer).setUint16(0, channelId, false); + this.transport.send(encodeFrame(XUMUX_CHANNEL_CONTROL, XUMUX_CTRL_CHANNEL_ACK, ackPayload)); + this.events.onOpenChannel?.(channelId); + } + + private handleCloseChannel(payload: Uint8Array): void { + if (payload.length < 2) return; + const channelId = new DataView(payload.buffer, payload.byteOffset, payload.byteLength).getUint16(0, false); + if (!this.openChannels.has(channelId)) return; + this.openChannels.delete(channelId); + this.events.onCloseChannel?.(channelId); + } +} diff --git a/packages/server/tests/protocol.test.ts b/packages/server/tests/protocol.test.ts new file mode 100644 index 0000000..8600a01 --- /dev/null +++ b/packages/server/tests/protocol.test.ts @@ -0,0 +1,148 @@ +import { describe, expect, it } from "vite-plus/test"; +import { + TERMINAL_MSG_TYPE, + decodeExit, + decodeInput, + decodeOutput, + decodeResize, + decodeSessionInfo, + decodeTitle, + encodeExit, + encodeInput, + encodeOutput, + encodeResize, + encodeSessionInfo, + encodeTitle, +} from "../src/protocol.js"; +import type { SessionInfo } from "../src/protocol.js"; + +describe("TERMINAL_MSG_TYPE constants", () => { + it("has the correct values", () => { + expect(TERMINAL_MSG_TYPE.INPUT).toBe(0x01); + expect(TERMINAL_MSG_TYPE.OUTPUT).toBe(0x02); + expect(TERMINAL_MSG_TYPE.RESIZE).toBe(0x03); + expect(TERMINAL_MSG_TYPE.EXIT).toBe(0x04); + expect(TERMINAL_MSG_TYPE.TITLE).toBe(0x05); + expect(TERMINAL_MSG_TYPE.SESSION_INFO).toBe(0x06); + }); +}); + +describe("input codec", () => { + it("round-trips ASCII text", () => { + const original = "ls -la\r"; + expect(decodeInput(encodeInput(original))).toBe(original); + }); + + it("round-trips UTF-8 multibyte characters", () => { + const original = "echo ไฝ ๅฅฝไธ–็•Œ ๐ŸŒ"; + expect(decodeInput(encodeInput(original))).toBe(original); + }); + + it("round-trips empty string", () => { + expect(decodeInput(encodeInput(""))).toBe(""); + }); +}); + +describe("output codec", () => { + it("round-trips terminal output with ANSI escapes", () => { + const original = "\x1b[32mOK\x1b[0m\r\n"; + expect(decodeOutput(encodeOutput(original))).toBe(original); + }); + + it("round-trips empty string", () => { + expect(decodeOutput(encodeOutput(""))).toBe(""); + }); +}); + +describe("resize codec", () => { + it("round-trips typical terminal dimensions", () => { + const result = decodeResize(encodeResize(120, 40)); + expect(result).toEqual({ cols: 120, rows: 40 }); + }); + + it("round-trips minimum dimensions (1x1)", () => { + const result = decodeResize(encodeResize(1, 1)); + expect(result).toEqual({ cols: 1, rows: 1 }); + }); + + it("round-trips uint16 maximum boundary (65535)", () => { + const result = decodeResize(encodeResize(65535, 65535)); + expect(result).toEqual({ cols: 65535, rows: 65535 }); + }); + + it("encodes as exactly 4 bytes", () => { + expect(encodeResize(80, 24).length).toBe(4); + }); + + it("throws on payload shorter than 4 bytes", () => { + expect(() => decodeResize(new Uint8Array(3))).toThrow("resize payload must be 4 bytes"); + }); +}); + +describe("exit codec", () => { + it("round-trips exit code 0", () => { + expect(decodeExit(encodeExit(0))).toBe(0); + }); + + it("round-trips positive exit code", () => { + expect(decodeExit(encodeExit(137))).toBe(137); + }); + + it("round-trips null exit code as -1 sentinel", () => { + const encoded = encodeExit(null); + const view = new DataView(encoded.buffer); + expect(view.getInt32(0, false)).toBe(-1); + expect(decodeExit(encoded)).toBeNull(); + }); + + it("encodes as exactly 4 bytes", () => { + expect(encodeExit(0).length).toBe(4); + }); + + it("throws on payload shorter than 4 bytes", () => { + expect(() => decodeExit(new Uint8Array(2))).toThrow("exit payload must be 4 bytes"); + }); + + it("round-trips negative exit codes (non-sentinel)", () => { + expect(decodeExit(encodeExit(-2))).toBe(-2); + expect(decodeExit(encodeExit(-128))).toBe(-128); + }); +}); + +describe("title codec", () => { + it("round-trips a path-style title", () => { + const original = "~/Projects/localterm"; + expect(decodeTitle(encodeTitle(original))).toBe(original); + }); + + it("round-trips unicode titles", () => { + const original = "๐Ÿ“ documents"; + expect(decodeTitle(encodeTitle(original))).toBe(original); + }); + + it("round-trips empty string", () => { + expect(decodeTitle(encodeTitle(""))).toBe(""); + }); +}); + +describe("session-info codec", () => { + it("round-trips session info", () => { + const info: SessionInfo = { + shell: "/bin/zsh", + shellName: "zsh", + pid: 12345, + cwd: "/Users/tester", + }; + expect(decodeSessionInfo(encodeSessionInfo(info))).toEqual(info); + }); + + it("preserves all fields including special characters in paths", () => { + const info: SessionInfo = { + shell: "/usr/local/bin/fish", + shellName: "fish", + pid: 1, + cwd: "/home/user/My Documents/้กน็›ฎ", + }; + expect(decodeSessionInfo(encodeSessionInfo(info))).toEqual(info); + }); +}); diff --git a/packages/server/tests/session-registry.test.ts b/packages/server/tests/session-registry.test.ts new file mode 100644 index 0000000..1ac1a06 --- /dev/null +++ b/packages/server/tests/session-registry.test.ts @@ -0,0 +1,75 @@ +import { describe, expect, it, vi } from "vite-plus/test"; +import { SessionRegistry } from "../src/session-registry.js"; +import type { Session } from "../src/session.js"; + +const createMockSession = (): Session => + ({ dispose: vi.fn() }) as unknown as Session; + +describe("SessionRegistry", () => { + it("registers and reports size", () => { + const registry = new SessionRegistry(); + const session = createMockSession(); + registry.register(1, session); + expect(registry.size()).toBe(1); + }); + + it("unregisters by channel ID", () => { + const registry = new SessionRegistry(); + const session = createMockSession(); + registry.register(42, session); + expect(registry.size()).toBe(1); + registry.unregister(42); + expect(registry.size()).toBe(0); + }); + + it("getByChannelId returns registered session", () => { + const registry = new SessionRegistry(); + const session = createMockSession(); + registry.register(7, session); + expect(registry.getByChannelId(7)).toBe(session); + }); + + it("getByChannelId returns undefined for unknown channel", () => { + const registry = new SessionRegistry(); + expect(registry.getByChannelId(999)).toBeUndefined(); + }); + + it("disposeAll calls dispose on every session and clears the registry", () => { + const registry = new SessionRegistry(); + const sessionA = createMockSession(); + const sessionB = createMockSession(); + registry.register(1, sessionA); + registry.register(2, sessionB); + registry.disposeAll(); + expect(sessionA.dispose).toHaveBeenCalledOnce(); + expect(sessionB.dispose).toHaveBeenCalledOnce(); + expect(registry.size()).toBe(0); + }); + + it("registerAuto assigns negative IDs", () => { + const registry = new SessionRegistry(); + const session = createMockSession(); + const autoId = registry.registerAuto(session); + expect(autoId).toBeLessThan(0); + expect(registry.getByChannelId(autoId)).toBe(session); + }); + + it("multiple registerAuto calls yield unique IDs", () => { + const registry = new SessionRegistry(); + const idA = registry.registerAuto(createMockSession()); + const idB = registry.registerAuto(createMockSession()); + expect(idA).not.toBe(idB); + expect(registry.size()).toBe(2); + }); + + it("handles mixed auto and explicit registrations without collision", () => { + const registry = new SessionRegistry(); + const autoSession = createMockSession(); + const channelSession = createMockSession(); + const autoId = registry.registerAuto(autoSession); + registry.register(1, channelSession); + expect(registry.size()).toBe(2); + expect(registry.getByChannelId(autoId)).toBe(autoSession); + expect(registry.getByChannelId(1)).toBe(channelSession); + }); +}); diff --git a/packages/server/tests/xumux-integration.test.ts b/packages/server/tests/xumux-integration.test.ts new file mode 100644 index 0000000..48acab8 --- /dev/null +++ b/packages/server/tests/xumux-integration.test.ts @@ -0,0 +1,349 @@ +import { describe, expect, it } from "vite-plus/test"; +import { + TERMINAL_MSG_TYPE, + decodeInput, + decodeOutput, + decodeResize, + decodeTitle, + encodeInput, + encodeResize, +} from "../src/protocol.js"; +import { + XUMUX_CHANNEL_CONTROL, + XUMUX_CTRL_CHANNEL_ACK, + XUMUX_CTRL_CLOSE_CHANNEL, + XUMUX_CTRL_HELLO, + XUMUX_CTRL_OPEN_CHANNEL, + XUMUX_CTRL_WELCOME, + XUMUX_VERSION, + decodeFrame, + encodeFrame, +} from "../src/xumux/index.js"; +import { XumuxServer } from "../src/xumux/xumux-server.js"; +import type { XumuxTransport } from "../src/xumux/types.js"; +import { Session } from "../src/session.js"; +import { SessionRegistry } from "../src/session-registry.js"; + +const createMockTransport = () => { + const sent: Uint8Array[] = []; + const transport: XumuxTransport = { + send: (data) => sent.push(new Uint8Array(data)), + close: () => {}, + get bufferedAmount() { + return 0; + }, + }; + return { transport, sent }; +}; + +const sendHello = (server: XumuxServer) => { + server.onMessage( + encodeFrame(XUMUX_CHANNEL_CONTROL, XUMUX_CTRL_HELLO, new Uint8Array([XUMUX_VERSION])), + ); +}; + +const sendOpenChannel = (server: XumuxServer, channelId: number) => { + const payload = new Uint8Array(2); + new DataView(payload.buffer).setUint16(0, channelId, false); + server.onMessage(encodeFrame(XUMUX_CHANNEL_CONTROL, XUMUX_CTRL_OPEN_CHANNEL, payload)); +}; + +const sendCloseChannel = (server: XumuxServer, channelId: number) => { + const payload = new Uint8Array(2); + new DataView(payload.buffer).setUint16(0, channelId, false); + server.onMessage(encodeFrame(XUMUX_CHANNEL_CONTROL, XUMUX_CTRL_CLOSE_CHANNEL, payload)); +}; + +const findFrame = ( + sent: Uint8Array[], + predicate: (frame: { channelId: number; type: number; payload: Uint8Array }) => boolean, +) => { + for (const raw of sent) { + const frame = decodeFrame(raw); + if (frame && predicate(frame)) return frame; + } + return null; +}; + +const waitFor = (promise: Promise, timeoutMs: number): Promise => + Promise.race([ + promise, + new Promise((_, reject) => + setTimeout(() => reject(new Error(`timed out after ${timeoutMs}ms`)), timeoutMs), + ), + ]); + +describe("xumux integration (mock transport)", () => { + it("completes HELLO/WELCOME handshake, OPEN_CHANNEL/ACK, receives SESSION_INFO", () => { + const { transport, sent } = createMockTransport(); + const registry = new SessionRegistry(); + + const server = new XumuxServer(transport, { + onOpenChannel: (channelId) => { + const session = new Session({ shell: "/bin/sh" }); + registry.register(channelId, session); + + const sessionInfoPayload = new TextEncoder().encode( + JSON.stringify({ + shell: session.shell, + shellName: session.shellBaseName, + pid: session.pid, + cwd: session.cwd, + }), + ); + server.sendToChannel(channelId, TERMINAL_MSG_TYPE.SESSION_INFO, sessionInfoPayload); + }, + onCloseChannel: (channelId) => { + const session = registry.getByChannelId(channelId); + if (session) { + registry.unregister(channelId); + session.dispose(); + } + }, + }); + + sendHello(server); + expect(server.isHandshakeComplete).toBe(true); + + const welcome = findFrame(sent, (f) => f.type === XUMUX_CTRL_WELCOME); + expect(welcome).not.toBeNull(); + expect(welcome!.payload[0]).toBe(XUMUX_VERSION); + + const channelId = 1; + sendOpenChannel(server, channelId); + + const ack = findFrame(sent, (f) => f.type === XUMUX_CTRL_CHANNEL_ACK); + expect(ack).not.toBeNull(); + expect(registry.size()).toBe(1); + + const sessionInfoFrame = findFrame( + sent, + (f) => f.channelId === channelId && f.type === TERMINAL_MSG_TYPE.SESSION_INFO, + ); + expect(sessionInfoFrame).not.toBeNull(); + const sessionInfo = JSON.parse(new TextDecoder().decode(sessionInfoFrame!.payload)); + expect(sessionInfo.shell).toBeTruthy(); + expect(sessionInfo.pid).toBeGreaterThan(0); + + registry.disposeAll(); + }); + + it("wires input to session and output back to channel", async () => { + const { transport, sent } = createMockTransport(); + const registry = new SessionRegistry(); + + const server = new XumuxServer(transport, { + onOpenChannel: (channelId) => { + const session = new Session({ shell: "/bin/sh" }); + registry.register(channelId, session); + session.on("output", (data) => { + server.sendToChannel(channelId, TERMINAL_MSG_TYPE.OUTPUT, new TextEncoder().encode(data)); + }); + }, + onChannelMessage: (event) => { + const session = registry.getByChannelId(event.channelId); + if (!session) return; + if (event.type === TERMINAL_MSG_TYPE.INPUT) { + session.write(decodeInput(event.payload)); + } + }, + onCloseChannel: (channelId) => { + const session = registry.getByChannelId(channelId); + if (session) { + registry.unregister(channelId); + session.dispose(); + } + }, + }); + + sendHello(server); + sendOpenChannel(server, 1); + + await new Promise((resolve) => setTimeout(resolve, 300)); + sent.length = 0; + + server.onMessage(encodeFrame(1, TERMINAL_MSG_TYPE.INPUT, encodeInput("echo INTEG_TOKEN\n"))); + + await waitFor( + new Promise((resolve) => { + const check = () => { + const outputFrames = sent + .map((raw) => decodeFrame(raw)) + .filter((f) => f !== null && f.channelId === 1 && f.type === TERMINAL_MSG_TYPE.OUTPUT); + const combined = outputFrames.map((f) => decodeOutput(f!.payload)).join(""); + if (combined.includes("INTEG_TOKEN")) { + resolve(); + return; + } + setTimeout(check, 50); + }; + check(); + }), + 5000, + ); + + registry.disposeAll(); + }, 10000); + + it("handles resize via RESIZE message type", () => { + const { transport } = createMockTransport(); + const registry = new SessionRegistry(); + + const server = new XumuxServer(transport, { + onOpenChannel: (channelId) => { + const session = new Session({ shell: "/bin/sh", cols: 80, rows: 24 }); + registry.register(channelId, session); + }, + onChannelMessage: (event) => { + const session = registry.getByChannelId(event.channelId); + if (!session) return; + if (event.type === TERMINAL_MSG_TYPE.RESIZE) { + const { cols, rows } = decodeResize(event.payload); + session.resize(cols, rows); + } + }, + }); + + sendHello(server); + sendOpenChannel(server, 1); + server.onMessage(encodeFrame(1, TERMINAL_MSG_TYPE.RESIZE, encodeResize(100, 50))); + const session = registry.getByChannelId(1)!; + expect(session.cols).toBe(100); + expect(session.rows).toBe(50); + + registry.disposeAll(); + }); + + it("exit sends EXIT frame and CLOSE_CHANNEL", async () => { + const { transport, sent } = createMockTransport(); + const registry = new SessionRegistry(); + + const server = new XumuxServer(transport, { + onOpenChannel: (channelId) => { + const session = new Session({ shell: "/bin/sh" }); + registry.register(channelId, session); + session.on("exit", (code) => { + const exitPayload = new Uint8Array(4); + new DataView(exitPayload.buffer).setInt32(0, code ?? -1, false); + server.sendToChannel(channelId, TERMINAL_MSG_TYPE.EXIT, exitPayload); + server.closeChannel(channelId); + registry.unregister(channelId); + }); + }, + onChannelMessage: (event) => { + const session = registry.getByChannelId(event.channelId); + if (!session) return; + if (event.type === TERMINAL_MSG_TYPE.INPUT) { + session.write(decodeInput(event.payload)); + } + }, + }); + + sendHello(server); + sendOpenChannel(server, 1); + await new Promise((resolve) => setTimeout(resolve, 200)); + + server.onMessage(encodeFrame(1, TERMINAL_MSG_TYPE.INPUT, encodeInput("exit 0\n"))); + + await waitFor( + new Promise((resolve) => { + const check = () => { + const exitFrame = findFrame( + sent, + (f) => f.channelId === 1 && f.type === TERMINAL_MSG_TYPE.EXIT, + ); + if (exitFrame) { + resolve(); + return; + } + setTimeout(check, 50); + }; + check(); + }), + 5000, + ); + + const closeFrame = findFrame( + sent, + (f) => f.channelId === XUMUX_CHANNEL_CONTROL && f.type === XUMUX_CTRL_CLOSE_CHANNEL, + ); + expect(closeFrame).not.toBeNull(); + }, 10000); + + it("CLOSE_CHANNEL from client disposes session", () => { + const { transport } = createMockTransport(); + const registry = new SessionRegistry(); + + const server = new XumuxServer(transport, { + onOpenChannel: (channelId) => { + const session = new Session({ shell: "/bin/sh" }); + registry.register(channelId, session); + }, + onCloseChannel: (channelId) => { + const session = registry.getByChannelId(channelId); + if (session) { + registry.unregister(channelId); + session.dispose(); + } + }, + }); + + sendHello(server); + sendOpenChannel(server, 1); + expect(registry.size()).toBe(1); + sendCloseChannel(server, 1); + expect(registry.size()).toBe(0); + }); + + it("receives title updates on the channel", async () => { + const { transport, sent } = createMockTransport(); + const registry = new SessionRegistry(); + + const server = new XumuxServer(transport, { + onOpenChannel: (channelId) => { + const session = new Session({ shell: "/bin/sh" }); + registry.register(channelId, session); + session.on("title", (title) => { + server.sendToChannel(channelId, TERMINAL_MSG_TYPE.TITLE, new TextEncoder().encode(title)); + }); + }, + onCloseChannel: (channelId) => { + const session = registry.getByChannelId(channelId); + if (session) { + registry.unregister(channelId); + session.dispose(); + } + }, + }); + + sendHello(server); + sendOpenChannel(server, 1); + + await waitFor( + new Promise((resolve) => { + const check = () => { + const titleFrame = findFrame( + sent, + (f) => f.channelId === 1 && f.type === TERMINAL_MSG_TYPE.TITLE, + ); + if (titleFrame && decodeTitle(titleFrame.payload).length > 0) { + resolve(); + return; + } + setTimeout(check, 50); + }; + check(); + }), + 3000, + ); + + const titleFrame = findFrame( + sent, + (f) => f.channelId === 1 && f.type === TERMINAL_MSG_TYPE.TITLE, + ); + expect(titleFrame).not.toBeNull(); + expect(decodeTitle(titleFrame!.payload).length).toBeGreaterThan(0); + + registry.disposeAll(); + }, 5000); +}); diff --git a/packages/server/tests/xumux.test.ts b/packages/server/tests/xumux.test.ts new file mode 100644 index 0000000..e153f4a --- /dev/null +++ b/packages/server/tests/xumux.test.ts @@ -0,0 +1,232 @@ +import { describe, expect, it, vi } from "vite-plus/test"; +import { + XUMUX_CHANNEL_CONTROL, + XUMUX_CTRL_CHANNEL_ACK, + XUMUX_CTRL_CLOSE_CHANNEL, + XUMUX_CTRL_HELLO, + XUMUX_CTRL_OPEN_CHANNEL, + XUMUX_CTRL_PING, + XUMUX_CTRL_PONG, + XUMUX_CTRL_WELCOME, + XUMUX_VERSION, + decodeFrame, + encodeFrame, +} from "../src/xumux/index.js"; +import { XumuxServer } from "../src/xumux/xumux-server.js"; +import type { XumuxTransport } from "../src/xumux/types.js"; +import { WebSocketAdapter } from "../src/xumux/websocket-adapter.js"; + +const createMockTransport = () => { + const sent: Uint8Array[] = []; + let closed = false; + const transport: XumuxTransport = { + send: (data) => sent.push(data), + close: () => { + closed = true; + }, + get bufferedAmount() { + return 0; + }, + }; + return { transport, sent, isClosed: () => closed }; +}; + +const makeHello = (): Uint8Array => + encodeFrame(XUMUX_CHANNEL_CONTROL, XUMUX_CTRL_HELLO, new Uint8Array([XUMUX_VERSION])); + +const makeOpenChannel = (channelId: number): Uint8Array => { + const payload = new Uint8Array(2); + new DataView(payload.buffer).setUint16(0, channelId, false); + return encodeFrame(XUMUX_CHANNEL_CONTROL, XUMUX_CTRL_OPEN_CHANNEL, payload); +}; + +const makeCloseChannel = (channelId: number): Uint8Array => { + const payload = new Uint8Array(2); + new DataView(payload.buffer).setUint16(0, channelId, false); + return encodeFrame(XUMUX_CHANNEL_CONTROL, XUMUX_CTRL_CLOSE_CHANNEL, payload); +}; + +describe("encodeFrame / decodeFrame", () => { + it("round-trips a frame", () => { + const payload = new Uint8Array([0xde, 0xad]); + const encoded = encodeFrame(0x0001, 0x02, payload); + const decoded = decodeFrame(encoded); + expect(decoded).not.toBeNull(); + expect(decoded!.channelId).toBe(1); + expect(decoded!.type).toBe(2); + expect(decoded!.payload).toEqual(new Uint8Array([0xde, 0xad])); + }); + + it("round-trips an empty payload", () => { + const encoded = encodeFrame(0, 0x01, new Uint8Array(0)); + const decoded = decodeFrame(encoded); + expect(decoded!.payload.length).toBe(0); + }); + + it("returns null for data shorter than header", () => { + expect(decodeFrame(new Uint8Array(2))).toBeNull(); + expect(decodeFrame(new Uint8Array(0))).toBeNull(); + }); +}); + +describe("XumuxServer", () => { + it("completes HELLO/WELCOME handshake", () => { + const { transport, sent } = createMockTransport(); + const server = new XumuxServer(transport, {}); + expect(server.isHandshakeComplete).toBe(false); + server.onMessage(makeHello()); + expect(server.isHandshakeComplete).toBe(true); + + const welcomeFrame = decodeFrame(sent[0]); + expect(welcomeFrame!.channelId).toBe(XUMUX_CHANNEL_CONTROL); + expect(welcomeFrame!.type).toBe(XUMUX_CTRL_WELCOME); + expect(welcomeFrame!.payload[0]).toBe(XUMUX_VERSION); + }); + + it("handles OPEN_CHANNEL with CHANNEL_ACK", () => { + const { transport, sent } = createMockTransport(); + const onOpenChannel = vi.fn(); + const server = new XumuxServer(transport, { onOpenChannel }); + server.onMessage(makeHello()); + server.onMessage(makeOpenChannel(1)); + + expect(onOpenChannel).toHaveBeenCalledWith(1); + const ackFrame = decodeFrame(sent[1]); + expect(ackFrame!.type).toBe(XUMUX_CTRL_CHANNEL_ACK); + const ackChannelId = new DataView( + ackFrame!.payload.buffer, + ackFrame!.payload.byteOffset, + ).getUint16(0, false); + expect(ackChannelId).toBe(1); + }); + + it("handles CLOSE_CHANNEL", () => { + const { transport } = createMockTransport(); + const onCloseChannel = vi.fn(); + const server = new XumuxServer(transport, { onOpenChannel: vi.fn(), onCloseChannel }); + server.onMessage(makeHello()); + server.onMessage(makeOpenChannel(1)); + server.onMessage(makeCloseChannel(1)); + expect(onCloseChannel).toHaveBeenCalledWith(1); + }); + + it("dispatches channel messages after handshake", () => { + const { transport } = createMockTransport(); + const onChannelMessage = vi.fn(); + const server = new XumuxServer(transport, { onOpenChannel: vi.fn(), onChannelMessage }); + server.onMessage(makeHello()); + server.onMessage(makeOpenChannel(5)); + + const dataPayload = new Uint8Array([0x01, 0x02, 0x03]); + server.onMessage(encodeFrame(5, 0x01, dataPayload)); + expect(onChannelMessage).toHaveBeenCalledWith({ + channelId: 5, + type: 0x01, + payload: expect.any(Uint8Array), + }); + }); + + it("errors on data before handshake", () => { + const { transport } = createMockTransport(); + const onError = vi.fn(); + const server = new XumuxServer(transport, { onError }); + server.onMessage(encodeFrame(1, 0x01, new Uint8Array(0))); + expect(onError).toHaveBeenCalledWith(expect.objectContaining({ message: "data before handshake" })); + }); + + it("errors on message to unknown channel", () => { + const { transport } = createMockTransport(); + const onError = vi.fn(); + const server = new XumuxServer(transport, { onError }); + server.onMessage(makeHello()); + server.onMessage(encodeFrame(99, 0x01, new Uint8Array(0))); + expect(onError).toHaveBeenCalledWith( + expect.objectContaining({ message: "message on unknown channel 99" }), + ); + }); + + it("responds to PING with PONG", () => { + const { transport, sent } = createMockTransport(); + const server = new XumuxServer(transport, {}); + server.onMessage(makeHello()); + const pingFrame = encodeFrame(XUMUX_CHANNEL_CONTROL, XUMUX_CTRL_PING, new Uint8Array(0)); + server.onMessage(pingFrame); + const pongFrame = decodeFrame(sent[sent.length - 1]); + expect(pongFrame!.type).toBe(XUMUX_CTRL_PONG); + }); + + it("sendToChannel sends data on open channels", () => { + const { transport, sent } = createMockTransport(); + const server = new XumuxServer(transport, { onOpenChannel: vi.fn() }); + server.onMessage(makeHello()); + server.onMessage(makeOpenChannel(3)); + const payload = new Uint8Array([0xaa, 0xbb]); + server.sendToChannel(3, 0x02, payload); + const dataFrame = decodeFrame(sent[sent.length - 1]); + expect(dataFrame!.channelId).toBe(3); + expect(dataFrame!.type).toBe(0x02); + }); + + it("closeChannel sends CLOSE_CHANNEL and removes channel", () => { + const { transport, sent } = createMockTransport(); + const server = new XumuxServer(transport, { onOpenChannel: vi.fn() }); + server.onMessage(makeHello()); + server.onMessage(makeOpenChannel(2)); + server.closeChannel(2); + const closeFrame = decodeFrame(sent[sent.length - 1]); + expect(closeFrame!.type).toBe(XUMUX_CTRL_CLOSE_CHANNEL); + expect(server.activeChannelCount).toBe(0); + }); + + it("close() fires onCloseChannel for all open channels", () => { + const { transport } = createMockTransport(); + const onCloseChannel = vi.fn(); + const server = new XumuxServer(transport, { onOpenChannel: vi.fn(), onCloseChannel }); + server.onMessage(makeHello()); + server.onMessage(makeOpenChannel(1)); + server.onMessage(makeOpenChannel(2)); + server.close(); + expect(onCloseChannel).toHaveBeenCalledTimes(2); + }); +}); + +describe("WebSocketAdapter", () => { + it("sends binary data when socket is open", () => { + const mockSocket = { send: vi.fn(), close: vi.fn(), readyState: 1 }; + const adapter = new WebSocketAdapter(mockSocket); + const data = new Uint8Array([1, 2, 3]); + adapter.send(data); + expect(mockSocket.send).toHaveBeenCalledTimes(1); + }); + + it("does not send when socket is not open", () => { + const mockSocket = { send: vi.fn(), close: vi.fn(), readyState: 3 }; + const adapter = new WebSocketAdapter(mockSocket); + adapter.send(new Uint8Array([1])); + expect(mockSocket.send).not.toHaveBeenCalled(); + }); + + it("delegates close to the socket", () => { + const mockSocket = { send: vi.fn(), close: vi.fn(), readyState: 1 }; + const adapter = new WebSocketAdapter(mockSocket); + adapter.close(); + expect(mockSocket.close).toHaveBeenCalled(); + }); + + it("reads bufferedAmount from raw property", () => { + const mockSocket = { + send: vi.fn(), + close: vi.fn(), + readyState: 1, + raw: { bufferedAmount: 42 }, + }; + const adapter = new WebSocketAdapter(mockSocket); + expect(adapter.bufferedAmount).toBe(42); + }); + + it("returns 0 when raw is missing", () => { + const mockSocket = { send: vi.fn(), close: vi.fn(), readyState: 1 }; + const adapter = new WebSocketAdapter(mockSocket); + expect(adapter.bufferedAmount).toBe(0); + }); +}); diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 0841816..6302cb9 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -181,12 +181,18 @@ importers: '@types/node': specifier: ^25.5.0 version: 25.6.0 + '@types/ws': + specifier: ^8.18.1 + version: 8.18.1 typescript: specifier: ^5.9.3 version: 5.9.3 vite-plus: specifier: ^0.1.12 version: 0.1.19(@types/node@25.6.0)(jiti@2.6.1)(jsdom@29.0.2(@noble/hashes@1.8.0))(typescript@5.9.3)(vite@8.0.10(@types/node@25.6.0)(jiti@2.6.1)) + ws: + specifier: ^8.20.0 + version: 8.20.0 packages: @@ -1231,6 +1237,9 @@ packages: '@types/validate-npm-package-name@4.0.2': resolution: {integrity: sha512-lrpDziQipxCEeK5kWxvljWYhUvOiB2A9izZd9B2AFarYAkqZshb4lPbRs7zKEic6eGtH8V/2qJW+dPp9OtF6bw==} + '@types/ws@8.18.1': + resolution: {integrity: sha512-ThVF6DCVhA8kUGy+aazFQ4kXQ7E1Ty7A3ypFOe0IcJV8O/M511G99AW24irKrW56Wt44yG9+ij8FaqoBGkuBXg==} + '@vitejs/plugin-react@5.2.0': resolution: {integrity: sha512-YmKkfhOAi3wsB1PhJq5Scj3GXMn3WvtQ/JC0xoopuHoXSdmtdStOpFrYaT1kie2YgFBcIe64ROzMYRjCrYOdYw==} engines: {node: ^20.19.0 || >=22.12.0} @@ -4183,6 +4192,10 @@ snapshots: '@types/validate-npm-package-name@4.0.2': {} + '@types/ws@8.18.1': + dependencies: + '@types/node': 25.6.0 + '@vitejs/plugin-react@5.2.0(@voidzero-dev/vite-plus-core@0.1.20(@types/node@25.6.0)(jiti@2.6.1)(typescript@5.9.3))': dependencies: '@babel/core': 7.29.0 From 857de4e464c1a216bb77019598326610bb678989 Mon Sep 17 00:00:00 2001 From: Visionik Date: Wed, 6 May 2026 01:08:52 -0400 Subject: [PATCH 2/2] fix(server): address Cursor Bugbot review findings - fix: change exit-code null sentinel from -1 to INT32_MIN (-2147483648) so real exit code -1 is not silently corrupted - fix: decodeResize returns null on short payload instead of throwing, preventing unhandled exception crash in onChannelMessage handler - fix: replace global SessionRegistry keyed by channelId with per-connection local Map so concurrent xumux connections sharing channel 1 do not collide and overwrite each other's sessions - fix: call session.dispose() in exit event handler (was missing, leaking event listeners and title polling timer for server-initiated closes) - test: update protocol.test.ts for new sentinel and null-return behavior --- packages/server/src/index.ts | 54 +++++++++++++++++++++----- packages/server/src/protocol.ts | 7 ++-- packages/server/tests/protocol.test.ts | 13 +++++-- 3 files changed, 58 insertions(+), 16 deletions(-) diff --git a/packages/server/src/index.ts b/packages/server/src/index.ts index d4745fc..6cdf12d 100644 --- a/packages/server/src/index.ts +++ b/packages/server/src/index.ts @@ -173,6 +173,14 @@ export const createServer = async (options: ServerOptions = {}): Promise(); + // Parallel map of channelId โ†’ global registry auto-ID for size tracking. + const connectionRegistryIds = new Map(); return { onOpen(_event, ws) { @@ -184,7 +192,9 @@ export const createServer = async (options: ServerOptions = {}): Promise { + connectionSessions.delete(channelId); + const gid = connectionRegistryIds.get(channelId); + if (gid !== undefined) { + registry.unregister(gid); + connectionRegistryIds.delete(channelId); + } + if (disposeSes) session.dispose(); + }; + session.on("output", (data) => { if (adapter.bufferedAmount > WS_BACKPRESSURE_THRESHOLD_BYTES) { xumux?.closeChannel(channelId); - registry.unregister(channelId); - session.dispose(); + cleanupChannel(true); return; } xumux?.sendToChannel(channelId, TERMINAL_MSG_TYPE.OUTPUT, encodeOutput(data)); @@ -209,23 +228,30 @@ export const createServer = async (options: ServerOptions = {}): Promise { xumux?.sendToChannel(channelId, TERMINAL_MSG_TYPE.EXIT, encodeExit(code)); xumux?.closeChannel(channelId); - registry.unregister(channelId); + // dispose() must be called here: onCloseChannel won't fire for + // server-initiated closes, so this is the only cleanup path. + cleanupChannel(true); }); }, onCloseChannel: (channelId) => { - const session = registry.getByChannelId(channelId); + const session = connectionSessions.get(channelId); if (!session) return; - registry.unregister(channelId); + connectionSessions.delete(channelId); + const gid = connectionRegistryIds.get(channelId); + if (gid !== undefined) { + registry.unregister(gid); + connectionRegistryIds.delete(channelId); + } session.dispose(); }, onChannelMessage: (event) => { - const session = registry.getByChannelId(event.channelId); + const session = connectionSessions.get(event.channelId); if (!session) return; if (event.type === TERMINAL_MSG_TYPE.INPUT) { session.write(decodeInput(event.payload)); } else if (event.type === TERMINAL_MSG_TYPE.RESIZE) { - const { cols, rows } = decodeResize(event.payload); - session.resize(cols, rows); + const dims = decodeResize(event.payload); + if (dims) session.resize(dims.cols, dims.rows); } }, }); @@ -256,11 +282,21 @@ export const createServer = async (options: ServerOptions = {}): Promise textEncoder.encode(data); @@ -51,8 +52,8 @@ export const encodeResize = (cols: number, rows: number): Uint8Array => { return buffer; }; -export const decodeResize = (payload: Uint8Array): { cols: number; rows: number } => { - if (payload.length < 4) throw new Error("resize payload must be 4 bytes"); +export const decodeResize = (payload: Uint8Array): { cols: number; rows: number } | null => { + if (payload.length < 4) return null; const view = new DataView(payload.buffer, payload.byteOffset, payload.byteLength); return { cols: view.getUint16(0, false), rows: view.getUint16(2, false) }; }; diff --git a/packages/server/tests/protocol.test.ts b/packages/server/tests/protocol.test.ts index 8600a01..b6f6171 100644 --- a/packages/server/tests/protocol.test.ts +++ b/packages/server/tests/protocol.test.ts @@ -74,8 +74,9 @@ describe("resize codec", () => { expect(encodeResize(80, 24).length).toBe(4); }); - it("throws on payload shorter than 4 bytes", () => { - expect(() => decodeResize(new Uint8Array(3))).toThrow("resize payload must be 4 bytes"); + it("returns null on payload shorter than 4 bytes", () => { + expect(decodeResize(new Uint8Array(3))).toBeNull(); + expect(decodeResize(new Uint8Array(0))).toBeNull(); }); }); @@ -88,13 +89,17 @@ describe("exit codec", () => { expect(decodeExit(encodeExit(137))).toBe(137); }); - it("round-trips null exit code as -1 sentinel", () => { + it("round-trips null exit code as INT32_MIN sentinel", () => { const encoded = encodeExit(null); const view = new DataView(encoded.buffer); - expect(view.getInt32(0, false)).toBe(-1); + expect(view.getInt32(0, false)).toBe(-2147483648); expect(decodeExit(encoded)).toBeNull(); }); + it("does not treat -1 as null (real exit code preserved)", () => { + expect(decodeExit(encodeExit(-1))).toBe(-1); + }); + it("encodes as exactly 4 bytes", () => { expect(encodeExit(0).length).toBe(4); });