diff --git a/PROGRESS.md b/PROGRESS.md index 7f8dd73..8a55f3d 100644 --- a/PROGRESS.md +++ b/PROGRESS.md @@ -105,6 +105,10 @@ This file is the canonical live tracker for the Infomaniak SDK and OpenClaw inte - Tightened local ignore rules for credential files and generated build leftovers. - Restricted CI workflow permissions to read-only contents access and disabled checkout credential persistence. - Added `SECURITY.md` and Dependabot updates for npm and GitHub Actions. +- `feat: add kchat websocket client` + - Added reviewed `client.kchat.websocket` helpers for hosted Infomaniak Echo/Pusher sockets and plain Mattermost `/api/v4/websocket` sockets. + - Added injectable `fetch` and `WebSocket` transports, team/user discovery for Echo subscriptions, private-channel `/broadcasting/auth`, Mattermost authentication challenge frames, normalized `posted` events, sender/channel filters, abort handling, and no-secret error messages. + - Added mocked unit coverage for Echo subscription/auth flows, Mattermost auth/post flows, filtering, malformed frames, binary payloads, socket API variants, and failure cases without live network calls. ## Next Task Queue @@ -142,6 +146,7 @@ This file is the canonical live tracker for the Infomaniak SDK and OpenClaw inte - npm publication: `liquid-potassium@0.1.0` is published to `https://registry.npmjs.org/` with `latest` pointing to `0.1.0`. - Post-publish install verification: a fresh temporary project installed `liquid-potassium@0.1.0` from npm with 0 vulnerabilities and successfully imported the main SDK, OpenClaw plugin, and OpenClaw tools subpaths. - Public GitHub security preflight: passing `npm run ci`, `npm audit`, `npm audit --omit=dev`, `npm pack --dry-run --json`, `git diff --check`, and high-confidence secret scans across current tracked files and git history. +- kChat WebSocket client: passing `npm run typecheck` and `npm run test:coverage` with 100% statements, branches, functions, and lines; all tests use mocked `fetch` and WebSocket implementations. ## Blockers And Risks diff --git a/README.md b/README.md index 19fb9ca..2fbb5e3 100644 --- a/README.md +++ b/README.md @@ -14,6 +14,7 @@ This is an unofficial community project and is not affiliated with Infomaniak. - Searchable operation catalog with docs-enriched metadata from a committed Infomaniak Developer Portal snapshot. - Resource discovery helpers for common opaque IDs such as kDrive drive IDs. - Reviewed Mail application helpers for mailbox consumption routes that are not present in the public OpenAPI spec. +- Reviewed kChat WebSocket helpers for hosted Infomaniak Echo/Pusher events and plain Mattermost WebSocket events. - Curated domain workflow actions for migration-friendly tasks such as mailbox reads, kDrive browsing/uploads, kChat posts, and resource discovery. - Native OpenClaw plugin with six compact tools. - No real network calls in tests; runtime HTTP goes through injected `fetch`. @@ -147,6 +148,29 @@ const message = await client.mail.application.getMessage( `mailApplicationBaseUrl` defaults to `https://mail.infomaniak.com`. Route provenance and maintenance rules are documented in [`docs/mail-application-api.md`](docs/mail-application-api.md). +## kChat WebSocket API + +Hosted Infomaniak kChat exposes live events through an Echo/Pusher-compatible socket at `websocket.kchat.infomaniak.com`, with private subscription auth through the kChat team host. The reviewed SDK surface lives under `client.kchat.websocket` and keeps all network dependencies injectable: + +```ts +const client = createInfomaniakClient({ + token: process.env.INFOMANIAK_TOKEN, + fetch, +}); + +const abort = new AbortController(); +await client.kchat.websocket.runConnection({ + teamName: "my-team", + channelIds: ["channel-id-to-accept"], + signal: abort.signal, + onPost(event) { + console.log(event.text, event.channelId, event.userName); + }, +}); +``` + +By default the client resolves `private-team.` and `presence-teamUser.` from `teamName`, authenticates them through `/broadcasting/auth`, and emits normalized `posted` events. For a plain Mattermost server, set `protocol: "mattermost"` and provide `apiBaseUrl` or a full `url`. + ## OpenClaw Plugin The native plugin entry is exported at: diff --git a/src/client/create-client.ts b/src/client/create-client.ts index d0c6485..acdfad2 100644 --- a/src/client/create-client.ts +++ b/src/client/create-client.ts @@ -5,6 +5,7 @@ import { createDomainWorkflowClient, type DomainWorkflowClient } from "../workfl import type { OperationMetadata } from "../generated/catalog/types.js"; import type { GeneratedOperationClient, OperationRequest } from "./generated-operation-client.js"; import { InfomaniakError, InfomaniakOperationError } from "./errors.js"; +import { createKchatWebSocketClient, type KchatWebSocketClient, type KchatWebSocketConstructor } from "./kchat-websocket.js"; import { createMailApplicationClient, type MailApplicationClient } from "./mail-application.js"; export interface InfomaniakClientConfig { @@ -14,6 +15,7 @@ export interface InfomaniakClientConfig { fetch?: typeof fetch; headers?: HeadersInit | (() => HeadersInit | Promise); userAgent?: string; + webSocket?: KchatWebSocketConstructor; } export interface RawRequestOptions extends OperationRequest { @@ -34,10 +36,14 @@ export type DomainClients = ReturnType; export type MailDomainClient = DomainClients["mail"] & { application: MailApplicationClient; }; +export type KchatDomainClient = DomainClients["kchat"] & { + websocket: KchatWebSocketClient; +}; export type InfomaniakClient = GeneratedOperationClient & - Omit & { + Omit & { discovery: ResourceDiscovery; + kchat: KchatDomainClient; mail: MailDomainClient; raw: RawClient; workflows: DomainWorkflowClient; @@ -81,6 +87,12 @@ export function createInfomaniakClient(config: InfomaniakClientConfig = {}): Inf DELETE: (path, options) => requestPath(fetchImpl, config, path, { ...options, method: "DELETE" }), }; const domainClients = createDomainOperations(generatedClient); + const kchatWebSocketOptions = { + fetch: fetchImpl, + ...(config.token === undefined ? {} : { token: config.token }), + ...(config.webSocket === undefined ? {} : { webSocket: config.webSocket }), + }; + const kchatWebSocket = createKchatWebSocketClient(kchatWebSocketOptions); const mailApplicationBaseUrl = config.mailApplicationBaseUrl ?? defaultMailApplicationBaseUrl; const mailApplication = createMailApplicationClient({ apiRequest: (path, options) => requestPath(fetchImpl, config, path, options as RawRequestOptions | undefined), @@ -98,6 +110,10 @@ export function createInfomaniakClient(config: InfomaniakClientConfig = {}): Inf ...generatedClient, ...domainClients, discovery, + kchat: { + ...domainClients.kchat, + websocket: kchatWebSocket, + }, mail: { ...domainClients.mail, application: mailApplication, diff --git a/src/client/kchat-websocket.ts b/src/client/kchat-websocket.ts new file mode 100644 index 0000000..bc9a6c2 --- /dev/null +++ b/src/client/kchat-websocket.ts @@ -0,0 +1,635 @@ +import { InfomaniakError } from "./errors.js"; + +export type KchatWebSocketProtocol = "infomaniak-echo" | "mattermost"; +export type KchatTokenProvider = string | (() => string | undefined | Promise); + +export interface KchatWebSocketLike { + send(data: string): void; + close(code?: number, reason?: string): void; + addEventListener?(eventName: string, handler: (event: unknown) => void): void; + removeEventListener?(eventName: string, handler: (event: unknown) => void): void; + on?(eventName: string, handler: (event: unknown) => void): void; + off?(eventName: string, handler: (event: unknown) => void): void; + removeListener?(eventName: string, handler: (event: unknown) => void): void; +} + +export type KchatWebSocketConstructor = new (url: string) => KchatWebSocketLike; + +export interface KchatJsonObject { + [key: string]: KchatJsonValue; +} + +export type KchatJsonValue = string | number | boolean | null | KchatJsonObject | KchatJsonValue[]; + +export interface KchatWebSocketFrame { + event?: string; + channel?: string; + data?: unknown; + broadcast?: unknown; + status?: string; + seq_reply?: number; + error?: unknown; + [key: string]: unknown; +} + +export interface KchatWebSocketPostEvent { + id: string; + postId?: string; + rootId?: string; + timestamp?: number; + text: string; + channelId?: string; + channelName?: string; + teamId?: string; + teamDomain?: string; + userId?: string; + userName?: string; + rawPost: KchatJsonObject; + rawFrame: KchatWebSocketFrame; +} + +export interface KchatWebSocketUrlOptions { + protocol?: KchatWebSocketProtocol; + teamName?: string; + apiBaseUrl?: string; + url?: string; + host?: string; + appKey?: string; +} + +export interface KchatWebSocketRunOptions extends KchatWebSocketUrlOptions { + token?: KchatTokenProvider; + fetch?: typeof fetch; + webSocket?: KchatWebSocketConstructor; + signal?: AbortSignal; + authEndpoint?: string; + subscriptions?: readonly string[]; + teamId?: string; + teamUserId?: string; + channelIds?: readonly string[]; + ignoredUserIds?: readonly string[]; + ignoredUserNames?: readonly string[]; + onFrame?: (frame: KchatWebSocketFrame) => void; + onAuthenticated?: () => void; + onSubscribed?: (channelName: string) => void; + onPost?: (event: KchatWebSocketPostEvent) => void | Promise; +} + +export interface KchatWebSocketClientFactoryOptions { + token?: KchatTokenProvider; + fetch?: typeof fetch; + webSocket?: KchatWebSocketConstructor; +} + +export interface KchatWebSocketClient { + resolveUrl(options?: KchatWebSocketUrlOptions): string; + runConnection(options?: KchatWebSocketRunOptions): Promise; +} + +const defaultProtocol: KchatWebSocketProtocol = "infomaniak-echo"; +const defaultEchoHost = "websocket.kchat.infomaniak.com"; +const defaultEchoAppKey = "kchat-key"; +const echoClientName = "liquid-potassium"; +const echoClientVersion = "0.2.0"; +const mattermostAuthSeq = 1; + +export function createKchatWebSocketClient(options: KchatWebSocketClientFactoryOptions = {}): KchatWebSocketClient { + return { + resolveUrl: (urlOptions = {}) => resolveKchatWebSocketUrl(urlOptions), + runConnection: (runOptions = {}) => { + const mergedOptions: KchatWebSocketRunOptions = { ...runOptions }; + if (mergedOptions.token === undefined && options.token !== undefined) { + mergedOptions.token = options.token; + } + if (mergedOptions.fetch === undefined && options.fetch !== undefined) { + mergedOptions.fetch = options.fetch; + } + if (mergedOptions.webSocket === undefined && options.webSocket !== undefined) { + mergedOptions.webSocket = options.webSocket; + } + return runKchatWebSocketConnection(mergedOptions); + }, + }; +} + +export function resolveKchatWebSocketUrl(options: KchatWebSocketUrlOptions = {}): string { + const explicitUrl = readOptionalString(options.url); + if (explicitUrl) { + return explicitUrl; + } + + if (resolveKchatWebSocketProtocol(options.protocol) === "mattermost") { + const url = new URL(resolveKchatApiBaseUrl(options)); + url.protocol = url.protocol === "http:" ? "ws:" : "wss:"; + url.pathname = "/api/v4/websocket"; + url.search = ""; + url.hash = ""; + return url.toString(); + } + + const host = readOptionalString(options.host) ?? defaultEchoHost; + const appKey = readOptionalString(options.appKey) ?? defaultEchoAppKey; + const url = new URL(`wss://${host}/app/${encodeURIComponent(appKey)}`); + url.searchParams.set("protocol", "7"); + url.searchParams.set("client", echoClientName); + url.searchParams.set("version", echoClientVersion); + url.searchParams.set("flash", "false"); + return url.toString(); +} + +export function createKchatMattermostAuthFrame(token: string, seq = mattermostAuthSeq): KchatJsonObject { + return { + seq, + action: "authentication_challenge", + data: { + token, + }, + }; +} + +export async function runKchatWebSocketConnection(options: KchatWebSocketRunOptions = {}): Promise { + if (resolveKchatWebSocketProtocol(options.protocol) === "mattermost") { + await runMattermostWebSocketConnection(options); + return; + } + + await runEchoWebSocketConnection(options); +} + +export function normalizeKchatWebSocketPostEvent( + frame: KchatWebSocketFrame, + options: Pick = {}, +): KchatWebSocketPostEvent | undefined { + if (frame.event !== "posted") { + return undefined; + } + + const data = parseJsonObject(frame.data) ?? {}; + const broadcast = parseJsonObject(frame.broadcast) ?? {}; + const post = parseJsonObject(data.post); + if (!post) { + return undefined; + } + + const text = readOptionalString(post.message) ?? readOptionalString(data.message); + const channelId = readOptionalString(post.channel_id) ?? readOptionalString(post.channelId) ?? readOptionalString(data.channel_id) ?? readOptionalString(broadcast.channel_id); + const channelName = + readOptionalString(data.channel_name) ?? + readOptionalString(data.channelName) ?? + readOptionalString(data.channel_display_name) ?? + readOptionalString(data.channelDisplayName); + const postId = readOptionalString(post.id) ?? readOptionalString(post.post_id); + const fallbackId = channelId && text ? `${channelId}:${readOptionalNumber(post.create_at) ?? text}` : undefined; + const id = postId ?? fallbackId; + + if (!id || !text || (!channelId && !channelName)) { + return undefined; + } + + const teamId = readOptionalString(post.team_id) ?? readOptionalString(post.teamId) ?? readOptionalString(data.team_id) ?? readOptionalString(broadcast.team_id); + const teamDomain = + readOptionalString(data.team_domain) ?? + readOptionalString(data.teamDomain) ?? + readOptionalString(data.team_name) ?? + readOptionalString(options.teamName); + const rootId = readOptionalString(post.root_id) ?? readOptionalString(post.rootId); + const userId = readOptionalString(post.user_id) ?? readOptionalString(post.userId); + const userName = + readOptionalString(data.sender_name) ?? + readOptionalString(data.senderName) ?? + readOptionalString(post.user_name) ?? + readOptionalString(post.userName); + const timestamp = normalizeKchatTimestamp(readOptionalNumber(post.create_at) ?? readOptionalNumber(post.createAt)); + + return { + id, + ...(postId ? { postId } : {}), + ...(rootId ? { rootId } : {}), + ...(timestamp === undefined ? {} : { timestamp }), + text, + ...(channelId ? { channelId } : {}), + ...(channelName ? { channelName } : {}), + ...(teamId ? { teamId } : {}), + ...(teamDomain ? { teamDomain } : {}), + ...(userId ? { userId } : {}), + ...(userName ? { userName } : {}), + rawPost: post, + rawFrame: frame, + }; +} + +async function runMattermostWebSocketConnection(options: KchatWebSocketRunOptions): Promise { + const token = await resolveRequiredToken(options.token); + const socket = createSocket(options); + + await runSocket(socket, options, { + open: async () => { + socket.send(JSON.stringify(createKchatMattermostAuthFrame(token))); + }, + frame: async (frame, close, fail) => { + if (isMattermostAuthReply(frame)) { + if (frame.status !== "OK") { + fail(new InfomaniakError("kChat Mattermost WebSocket authentication failed.")); + close(); + return; + } + options.onAuthenticated?.(); + return; + } + + await maybeEmitPost(frame, options); + }, + }); +} + +async function runEchoWebSocketConnection(options: KchatWebSocketRunOptions): Promise { + const token = await resolveRequiredToken(options.token); + const socket = createSocket(options); + let subscribed = false; + + await runSocket(socket, options, { + frame: async (frame, close, fail) => { + if (frame.event === "pusher:connection_established") { + if (subscribed) { + return; + } + subscribed = true; + const socketId = readOptionalString(parseJsonObject(frame.data)?.socket_id); + if (!socketId) { + fail(new InfomaniakError("kChat Echo WebSocket did not return a socket id.")); + close(); + return; + } + const channels = await resolveEchoSubscriptionChannels(options, token); + for (const channelName of channels) { + const auth = await authorizeEchoSubscription(options, token, socketId, channelName); + socket.send(JSON.stringify(createEchoSubscribeFrame(channelName, auth))); + } + options.onAuthenticated?.(); + return; + } + + if (frame.event === "pusher:ping") { + socket.send(JSON.stringify({ event: "pusher:pong", data: {} })); + return; + } + + if (frame.event === "pusher:error") { + fail(new InfomaniakError(readOptionalString(parseJsonObject(frame.data)?.message) ?? "kChat Echo WebSocket returned an error.")); + close(); + return; + } + + if (frame.event === "pusher_internal:subscription_succeeded") { + if (frame.channel) { + options.onSubscribed?.(frame.channel); + } + return; + } + + if (frame.event?.startsWith("pusher:")) { + return; + } + + await maybeEmitPost(frame, options); + }, + }); +} + +interface SocketHandlers { + open?: () => void | Promise; + frame: (frame: KchatWebSocketFrame, close: () => void, fail: (error: Error) => void) => void | Promise; +} + +async function runSocket(socket: KchatWebSocketLike, options: KchatWebSocketRunOptions, handlers: SocketHandlers): Promise { + await new Promise((resolve, reject) => { + let settled = false; + const cleanups: (() => void)[] = []; + + const settleResolve = (): void => { + /* v8 ignore next 3 -- duplicate close/abort events can race after a socket is already settled. */ + if (settled) { + return; + } + settled = true; + for (const cleanup of cleanups.splice(0)) { + cleanup(); + } + options.signal?.removeEventListener("abort", onAbort); + resolve(); + }; + const settleReject = (error: Error): void => { + /* v8 ignore next 3 -- duplicate error events can race after a socket is already settled. */ + if (settled) { + return; + } + settled = true; + for (const cleanup of cleanups.splice(0)) { + cleanup(); + } + options.signal?.removeEventListener("abort", onAbort); + reject(error); + }; + const fail = (error: Error): void => settleReject(error); + const close = (): void => closeSocket(socket); + const onAbort = (): void => { + close(); + settleResolve(); + }; + const onOpen = (): void => { + void Promise.resolve(handlers.open?.()).catch(fail); + }; + const onMessage = (event: unknown): void => { + const frame = parseFrame(resolveWebSocketMessageData(event)); + if (!frame) { + return; + } + options.onFrame?.(frame); + void Promise.resolve(handlers.frame(frame, close, fail)).catch(fail); + }; + const onError = (event: unknown): void => fail(resolveWebSocketError(event)); + const onClose = (): void => settleResolve(); + + cleanups.push(addSocketListener(socket, "open", onOpen)); + cleanups.push(addSocketListener(socket, "message", onMessage)); + cleanups.push(addSocketListener(socket, "error", onError)); + cleanups.push(addSocketListener(socket, "close", onClose)); + + if (options.signal?.aborted) { + onAbort(); + return; + } + options.signal?.addEventListener("abort", onAbort, { once: true }); + }); +} + +async function maybeEmitPost(frame: KchatWebSocketFrame, options: KchatWebSocketRunOptions): Promise { + const event = normalizeKchatWebSocketPostEvent(frame, options); + if (!event || !isAllowedChannel(event, options) || isIgnoredSender(event, options)) { + return; + } + + await options.onPost?.(event); +} + +async function resolveEchoSubscriptionChannels(options: KchatWebSocketRunOptions, token: string): Promise { + if (options.subscriptions && options.subscriptions.length > 0) { + return options.subscriptions; + } + + const [teamId, teamUserId] = await Promise.all([resolveEchoTeamId(options, token), resolveEchoTeamUserId(options, token)]); + return [`private-team.${teamId}`, `presence-teamUser.${teamUserId}`]; +} + +async function resolveEchoTeamId(options: KchatWebSocketRunOptions, token: string): Promise { + const configuredTeamId = readOptionalString(options.teamId); + if (configuredTeamId) { + return configuredTeamId; + } + + const teamName = readOptionalString(options.teamName); + if (!teamName) { + throw new InfomaniakError("Configure teamName, teamId, or subscriptions before connecting to the kChat Echo WebSocket."); + } + + const team = await fetchKchatJson(options, token, `${resolveKchatApiBaseUrl(options)}/api/v4/teams/name/${encodeURIComponent(teamName)}`); + const teamId = readOptionalString(team.id); + if (!teamId) { + throw new InfomaniakError(`kChat team lookup did not return an id for ${teamName}.`); + } + return teamId; +} + +async function resolveEchoTeamUserId(options: KchatWebSocketRunOptions, token: string): Promise { + const configuredTeamUserId = readOptionalString(options.teamUserId); + if (configuredTeamUserId) { + return configuredTeamUserId; + } + + const user = await fetchKchatJson(options, token, `${resolveKchatApiBaseUrl(options)}/api/v4/users/me`); + const userId = readOptionalString(user.id); + if (!userId) { + throw new InfomaniakError("kChat current-user lookup did not return an id."); + } + return userId; +} + +async function authorizeEchoSubscription( + options: KchatWebSocketRunOptions, + token: string, + socketId: string, + channelName: string, +): Promise { + const response = await fetchKchat(options, token, resolveEchoAuthEndpoint(options), { + method: "POST", + headers: { accept: "application/json", "content-type": "application/x-www-form-urlencoded" }, + body: new URLSearchParams({ socket_id: socketId, channel_name: channelName }), + }); + if (!response.ok) { + throw new InfomaniakError(`kChat Echo WebSocket channel auth failed for ${channelName}: HTTP ${response.status}.`); + } + const auth = parseJsonObject(await response.json()); + if (!auth || !readOptionalString(auth.auth)) { + throw new InfomaniakError(`kChat Echo WebSocket channel auth did not return a signature for ${channelName}.`); + } + return auth; +} + +function createEchoSubscribeFrame(channelName: string, auth: KchatJsonObject): KchatJsonObject { + const channelData = readOptionalString(auth.channel_data); + return { + event: "pusher:subscribe", + data: { + channel: channelName, + auth: readOptionalString(auth.auth)!, + ...(channelData ? { channel_data: channelData } : {}), + }, + }; +} + +function resolveEchoAuthEndpoint(options: KchatWebSocketRunOptions): string { + return readOptionalString(options.authEndpoint) ?? `${resolveKchatApiBaseUrl(options)}/broadcasting/auth`; +} + +async function fetchKchatJson(options: KchatWebSocketRunOptions, token: string, url: string): Promise { + const response = await fetchKchat(options, token, url); + if (!response.ok) { + throw new InfomaniakError(`kChat API request failed: HTTP ${response.status}.`); + } + const parsed = parseJsonObject(await response.json()); + if (!parsed) { + throw new InfomaniakError("kChat API response did not return a JSON object."); + } + return parsed; +} + +async function fetchKchat(options: KchatWebSocketRunOptions, token: string, url: string, init: RequestInit = {}): Promise { + const fetchImpl = options.fetch ?? globalThis.fetch; + if (typeof fetchImpl !== "function") { + throw new InfomaniakError("No fetch implementation is available. Provide options.fetch."); + } + const headers = new Headers(init.headers); + headers.set("authorization", `Bearer ${token}`); + return fetchImpl(url, { + ...init, + headers, + ...(options.signal ? { signal: options.signal } : {}), + }); +} + +function createSocket(options: KchatWebSocketRunOptions): KchatWebSocketLike { + const WebSocketImpl = options.webSocket ?? globalThis.WebSocket; + if (typeof WebSocketImpl !== "function") { + throw new InfomaniakError("No WebSocket implementation is available. Provide options.webSocket."); + } + return new WebSocketImpl(resolveKchatWebSocketUrl(options)); +} + +async function resolveRequiredToken(tokenProvider: KchatTokenProvider | undefined): Promise { + const token = readOptionalString(typeof tokenProvider === "function" ? await tokenProvider() : tokenProvider); + if (!token) { + throw new InfomaniakError("A kChat WebSocket bearer token is required."); + } + return token; +} + +function resolveKchatWebSocketProtocol(protocol: KchatWebSocketProtocol | undefined): KchatWebSocketProtocol { + return protocol ?? defaultProtocol; +} + +function resolveKchatApiBaseUrl(options: Pick): string { + const apiBaseUrl = readOptionalString(options.apiBaseUrl); + if (apiBaseUrl) { + return apiBaseUrl; + } + const teamName = readOptionalString(options.teamName); + if (!teamName) { + throw new InfomaniakError("Configure apiBaseUrl or a DNS-safe teamName before using kChat WebSocket helpers."); + } + const hostLabel = teamName.toLowerCase(); + if (!/^[a-z0-9-]+$/.test(hostLabel)) { + throw new InfomaniakError("kChat teamName must be a DNS-safe team slug when apiBaseUrl is omitted."); + } + return `https://${hostLabel}.kchat.infomaniak.com`; +} + +function isMattermostAuthReply(frame: KchatWebSocketFrame): boolean { + return readOptionalNumber(frame.seq_reply) === mattermostAuthSeq && typeof frame.status === "string"; +} + +function isAllowedChannel(event: KchatWebSocketPostEvent, options: Pick): boolean { + return !options.channelIds || options.channelIds.length === 0 || Boolean(event.channelId && options.channelIds.includes(event.channelId)); +} + +function isIgnoredSender(event: KchatWebSocketPostEvent, options: Pick): boolean { + return Boolean( + (event.userId && options.ignoredUserIds?.includes(event.userId)) || (event.userName && options.ignoredUserNames?.includes(event.userName)), + ); +} + +function parseFrame(data: unknown): KchatWebSocketFrame | undefined { + return parseJsonObject(readWebSocketText(data)); +} + +function parseJsonObject(value: unknown): KchatJsonObject | undefined { + if (isRecord(value)) { + return value as KchatJsonObject; + } + if (typeof value !== "string" || !value.trim()) { + return undefined; + } + try { + const parsed: unknown = JSON.parse(value); + return isRecord(parsed) ? (parsed as KchatJsonObject) : undefined; + } catch { + return undefined; + } +} + +function readWebSocketText(data: unknown): string | undefined { + if (typeof data === "string") { + return data; + } + if (data instanceof ArrayBuffer) { + return Buffer.from(data).toString("utf8"); + } + if (Buffer.isBuffer(data)) { + return data.toString("utf8"); + } + if (ArrayBuffer.isView(data)) { + return Buffer.from(data.buffer, data.byteOffset, data.byteLength).toString("utf8"); + } + return undefined; +} + +function resolveWebSocketMessageData(event: unknown): unknown { + if (isRecord(event) && "data" in event) { + return event.data; + } + return event; +} + +function addSocketListener(socket: KchatWebSocketLike, eventName: string, handler: (event: unknown) => void): () => void { + if (socket.addEventListener) { + socket.addEventListener(eventName, handler); + return () => socket.removeEventListener?.(eventName, handler); + } + if (socket.on) { + socket.on(eventName, handler); + return () => { + if (socket.off) { + socket.off(eventName, handler); + } else { + socket.removeListener?.(eventName, handler); + } + }; + } + const socketRecord = socket as unknown as Record; + const propertyName = `on${eventName}`; + const previous = socketRecord[propertyName]; + socketRecord[propertyName] = handler; + return () => { + if (socketRecord[propertyName] === handler) { + socketRecord[propertyName] = previous; + } + }; +} + +function closeSocket(socket: KchatWebSocketLike): void { + socket.close(); +} + +function resolveWebSocketError(event: unknown): Error { + if (event instanceof Error) { + return event; + } + const message = isRecord(event) ? readOptionalString(event.message) ?? readOptionalString(parseJsonObject(event.error)?.message) : undefined; + return new InfomaniakError(message ?? "kChat WebSocket connection failed."); +} + +function normalizeKchatTimestamp(value: number | undefined): number | undefined { + return value === undefined ? undefined : value > 0 && value < 10_000_000_000 ? value * 1000 : value; +} + +function readOptionalString(value: unknown): string | undefined { + if (typeof value !== "string") { + return undefined; + } + const trimmed = value.trim(); + return trimmed ? trimmed : undefined; +} + +function readOptionalNumber(value: unknown): number | undefined { + if (typeof value === "number" && Number.isFinite(value)) { + return value; + } + if (typeof value !== "string" || !value.trim()) { + return undefined; + } + const parsed = Number(value); + return Number.isFinite(parsed) ? parsed : undefined; +} + +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null && !Array.isArray(value); +} diff --git a/src/index.ts b/src/index.ts index 8f5c18f..5a21782 100644 --- a/src/index.ts +++ b/src/index.ts @@ -16,6 +16,25 @@ export { export { createInfomaniakClient, type DomainClients, type InfomaniakClient, type InfomaniakClientConfig, type RawClient, type RawRequestOptions } from "./client/create-client.js"; export { InfomaniakError, InfomaniakOperationError, type OperationErrorDetails } from "./client/errors.js"; export type { GeneratedOperationClient, OperationRequest } from "./client/generated-operation-client.js"; +export { + createKchatMattermostAuthFrame, + createKchatWebSocketClient, + normalizeKchatWebSocketPostEvent, + resolveKchatWebSocketUrl, + runKchatWebSocketConnection, + type KchatJsonObject, + type KchatJsonValue, + type KchatTokenProvider, + type KchatWebSocketClient, + type KchatWebSocketClientFactoryOptions, + type KchatWebSocketConstructor, + type KchatWebSocketFrame, + type KchatWebSocketLike, + type KchatWebSocketPostEvent, + type KchatWebSocketProtocol, + type KchatWebSocketRunOptions, + type KchatWebSocketUrlOptions, +} from "./client/kchat-websocket.js"; export type { CurrentMyKSuite, GetMailboxQuotaOptions, diff --git a/tests/unit/client/kchat-websocket.test.ts b/tests/unit/client/kchat-websocket.test.ts new file mode 100644 index 0000000..fd0c911 --- /dev/null +++ b/tests/unit/client/kchat-websocket.test.ts @@ -0,0 +1,854 @@ +import { describe, expect, it, vi } from "vitest"; + +import { + createInfomaniakClient, + createKchatMattermostAuthFrame, + createKchatWebSocketClient, + InfomaniakError, + normalizeKchatWebSocketPostEvent, + resolveKchatWebSocketUrl, + runKchatWebSocketConnection, + type KchatWebSocketLike, +} from "../../../src/index.js"; + +const fixtureToken = "test"; +const factoryToken = "factory"; +const runToken = "run"; + +describe("kChat WebSocket client", () => { + it("exposes kChat WebSocket helpers on the generated kChat domain client", async () => { + EventTargetWebSocket.instances = []; + const fetchMock = vi.fn().mockImplementation(async (url) => { + if (String(url).endsWith("/api/v4/teams/name/main-team")) { + return jsonResponse({ id: "team-123" }); + } + if (String(url).endsWith("/api/v4/users/me")) { + return jsonResponse({ id: "user-self" }); + } + return jsonResponse({ auth: "kchat-key:signature" }); + }); + const client = createInfomaniakClient({ + token: async () => "placeholder-token", + fetch: fetchMock, + webSocket: EventTargetWebSocket, + }); + expect(client.kchat.websocket.resolveUrl()).toBe( + "wss://websocket.kchat.infomaniak.com/app/kchat-key?protocol=7&client=liquid-potassium&version=0.2.0&flash=false", + ); + const posts: string[] = []; + const abort = new AbortController(); + const connection = client.kchat.websocket.runConnection({ + teamName: "main-team", + channelIds: ["channel-123"], + signal: abort.signal, + onPost(event) { + posts.push(event.text); + abort.abort(); + }, + }); + + await waitImmediate(); + const socket = EventTargetWebSocket.instances[0]!; + socket.emitMessage({ + event: "pusher:connection_established", + data: JSON.stringify({ socket_id: "1.2", activity_timeout: 30 }), + }); + await waitImmediate(); + socket.emitMessage({ + event: "posted", + channel: "presence-teamUser.user-self", + data: JSON.stringify({ + channel_name: "test", + channel_id: "channel-123", + sender_name: "alice", + post: JSON.stringify({ + id: "post-123", + channel_id: "channel-123", + user_id: "user-123", + message: "hello from client facade", + create_at: 1710000000000, + }), + }), + }); + await connection; + + expect(posts).toEqual(["hello from client facade"]); + expect(socket.closed).toBe(true); + }); + + it("derives Echo and Mattermost WebSocket URLs", () => { + expect(resolveKchatWebSocketUrl()).toBe( + "wss://websocket.kchat.infomaniak.com/app/kchat-key?protocol=7&client=liquid-potassium&version=0.2.0&flash=false", + ); + expect(resolveKchatWebSocketUrl({ host: "ws.example.test", appKey: "custom key" })).toBe( + "wss://ws.example.test/app/custom%20key?protocol=7&client=liquid-potassium&version=0.2.0&flash=false", + ); + expect(resolveKchatWebSocketUrl({ url: " ", host: " ", appKey: " " })).toBe( + "wss://websocket.kchat.infomaniak.com/app/kchat-key?protocol=7&client=liquid-potassium&version=0.2.0&flash=false", + ); + expect(resolveKchatWebSocketUrl({ protocol: "mattermost", apiBaseUrl: "http://mattermost.example.test/path?ignored=true" })).toBe( + "ws://mattermost.example.test/api/v4/websocket", + ); + expect(resolveKchatWebSocketUrl({ protocol: "mattermost", teamName: "main-team" })).toBe( + "wss://main-team.kchat.infomaniak.com/api/v4/websocket", + ); + expect(resolveKchatWebSocketUrl({ url: "wss://custom.example.test/socket" })).toBe("wss://custom.example.test/socket"); + expect(() => resolveKchatWebSocketUrl({ protocol: "mattermost" })).toThrow(InfomaniakError); + expect(() => resolveKchatWebSocketUrl({ protocol: "mattermost", teamName: "Not Safe" })).toThrow(/DNS-safe/); + }); + + it("normalizes Mattermost and Echo posted events", () => { + const normalized = normalizeKchatWebSocketPostEvent( + { + event: "posted", + data: JSON.stringify({ + channel_display_name: "Test", + channel_name: "test", + channel_id: "channel-123", + sender_name: "alice", + team_id: "team-123", + post: JSON.stringify({ + id: "post-123", + create_at: 1710000000, + channel_id: "channel-123", + root_id: "root-123", + user_id: "user-123", + message: "hello over websocket", + }), + }), + }, + { teamName: "main-team" }, + ); + + expect(normalized).toMatchObject({ + id: "post-123", + postId: "post-123", + rootId: "root-123", + timestamp: 1710000000000, + text: "hello over websocket", + channelId: "channel-123", + channelName: "test", + teamId: "team-123", + teamDomain: "main-team", + userId: "user-123", + userName: "alice", + }); + expect(normalizeKchatWebSocketPostEvent({ event: "hello" })).toBeUndefined(); + expect(normalizeKchatWebSocketPostEvent({ event: "posted", data: {} })).toBeUndefined(); + expect(normalizeKchatWebSocketPostEvent({ event: "posted", data: { post: "{}" } })).toBeUndefined(); + expect( + normalizeKchatWebSocketPostEvent({ + event: "posted", + broadcast: { channel_id: "broadcast-channel", team_id: "team-broadcast" }, + data: { + channelDisplayName: "Display", + teamDomain: "team-domain", + senderName: "bob", + post: { + createAt: "1710000000000", + message: "fallback id", + post_id: "post-alt", + rootId: "root-alt", + teamId: "team-alt", + userName: "bob-post", + }, + }, + }), + ).toMatchObject({ + id: "post-alt", + channelId: "broadcast-channel", + channelName: "Display", + rootId: "root-alt", + teamId: "team-alt", + teamDomain: "team-domain", + userName: "bob", + timestamp: 1710000000000, + }); + expect( + normalizeKchatWebSocketPostEvent({ + event: "posted", + data: { + channelName: "display-only", + post: { + id: "post-display-only", + message: "no channel id", + userName: "post-user", + create_at: "not-a-number", + }, + }, + }), + ).toMatchObject({ + id: "post-display-only", + channelName: "display-only", + text: "no channel id", + userName: "post-user", + }); + expect( + normalizeKchatWebSocketPostEvent({ + event: "posted", + data: { + channel_id: "fallback-channel", + post: { + channel_id: "fallback-channel", + message: "fallback id", + }, + }, + }), + ).toMatchObject({ + id: "fallback-channel:fallback id", + text: "fallback id", + channelId: "fallback-channel", + }); + expect(normalizeKchatWebSocketPostEvent({ event: "posted", data: "not json" })).toBeUndefined(); + expect(normalizeKchatWebSocketPostEvent({ event: "posted", data: { post: { id: "post-no-channel", message: "missing channel" } } })).toBeUndefined(); + }); + + it("authenticates and subscribes to the Infomaniak Echo socket", async () => { + EventTargetWebSocket.instances = []; + const fetchCalls: Array<{ url: string; method: string; body: string; authorization: string | null }> = []; + const fetchMock = vi.fn().mockImplementation(async (url, init = {}) => { + const headers = new Headers(init.headers); + fetchCalls.push({ + url: String(url), + method: init.method ?? "GET", + body: String(init.body ?? ""), + authorization: headers.get("authorization"), + }); + if (String(url).endsWith("/api/v4/teams/name/main-team")) { + return jsonResponse({ id: "team-123" }); + } + if (String(url).endsWith("/api/v4/users/me")) { + return jsonResponse({ id: "user-self" }); + } + const body = new URLSearchParams(String(init.body ?? "")); + return jsonResponse({ + auth: `kchat-key:${body.get("channel_name")}`, + ...(body.get("channel_name")?.startsWith("presence-") ? { channel_data: JSON.stringify({ user_id: "user-self" }) } : {}), + }); + }); + const frames: string[] = []; + const subscriptions: string[] = []; + const posts: string[] = []; + const abort = new AbortController(); + const connection = runKchatWebSocketConnection({ + token: fixtureToken, + fetch: fetchMock, + webSocket: EventTargetWebSocket, + teamName: "main-team", + channelIds: ["channel-123"], + ignoredUserNames: ["bot-user"], + signal: abort.signal, + onFrame(frame) { + frames.push(frame.event ?? ""); + }, + onSubscribed(channelName) { + subscriptions.push(channelName); + }, + onPost(event) { + posts.push(event.text); + abort.abort(); + }, + }); + + await waitImmediate(); + const socket = EventTargetWebSocket.instances[0]!; + socket.emitMessage({ + event: "pusher:connection_established", + data: JSON.stringify({ socket_id: "1.2" }), + }); + socket.emitMessage({ + event: "pusher:connection_established", + data: JSON.stringify({ socket_id: "1.2" }), + }); + await waitImmediate(); + await waitImmediate(); + socket.emitMessage({ event: "pusher:ping", data: {} }); + socket.emitMessage({ event: "pusher_internal:subscription_succeeded", data: "{}" }); + socket.emitMessage({ event: "pusher:cache_miss", data: "{}" }); + socket.emitMessage({ event: "pusher_internal:subscription_succeeded", channel: "private-team.team-123", data: "{}" }); + socket.emitMessage({ event: "pusher_internal:subscription_succeeded", channel: "presence-teamUser.user-self", data: "{}" }); + socket.emitMessage({ + event: "posted", + data: { + channel_id: "other-channel", + post: JSON.stringify({ id: "ignored-channel", channel_id: "other-channel", user_id: "user-1", message: "ignored" }), + }, + }); + socket.emitMessage({ + event: "posted", + data: { + channel_id: "channel-123", + sender_name: "bot-user", + post: JSON.stringify({ id: "ignored-sender", channel_id: "channel-123", user_id: "user-1", message: "ignored" }), + }, + }); + socket.emitMessage({ + event: "posted", + data: { + channel_id: "channel-123", + sender_name: "alice", + post: JSON.stringify({ id: "post-123", channel_id: "channel-123", user_id: "user-1", message: "accepted" }), + }, + }); + await connection; + + expect(frames).toContain("pusher:connection_established"); + expect(subscriptions).toEqual(["private-team.team-123", "presence-teamUser.user-self"]); + expect(posts).toEqual(["accepted"]); + expect(fetchCalls).toMatchObject([ + { url: "https://main-team.kchat.infomaniak.com/api/v4/teams/name/main-team", method: "GET", authorization: `Bearer ${fixtureToken}` }, + { url: "https://main-team.kchat.infomaniak.com/api/v4/users/me", method: "GET", authorization: `Bearer ${fixtureToken}` }, + { url: "https://main-team.kchat.infomaniak.com/broadcasting/auth", method: "POST", authorization: `Bearer ${fixtureToken}` }, + { url: "https://main-team.kchat.infomaniak.com/broadcasting/auth", method: "POST", authorization: `Bearer ${fixtureToken}` }, + ]); + expect(socket.sent.map((value) => JSON.parse(value) as { event: string; data: { channel?: string } })).toEqual([ + { event: "pusher:subscribe", data: { channel: "private-team.team-123", auth: "kchat-key:private-team.team-123" } }, + { + event: "pusher:subscribe", + data: { channel: "presence-teamUser.user-self", auth: "kchat-key:presence-teamUser.user-self", channel_data: JSON.stringify({ user_id: "user-self" }) }, + }, + { event: "pusher:pong", data: {} }, + ]); + }); + + it("uses explicit Echo subscriptions and rejects Echo server errors", async () => { + EventTargetWebSocket.instances = []; + const client = createKchatWebSocketClient({ + token: fixtureToken, + fetch: vi.fn().mockResolvedValue(jsonResponse({ auth: "signature" })), + webSocket: EventTargetWebSocket, + }); + const connection = client.runConnection({ + apiBaseUrl: "https://team.example.test", + authEndpoint: "https://team.example.test/auth", + subscriptions: ["private-team.team-123"], + }); + + await waitImmediate(); + const socket = EventTargetWebSocket.instances[0]!; + socket.emitMessage({ event: "pusher:connection_established", data: JSON.stringify({ socket_id: "1.2" }) }); + await waitImmediate(); + expect(JSON.parse(socket.sent[0] ?? "{}")).toEqual({ + event: "pusher:subscribe", + data: { channel: "private-team.team-123", auth: "signature" }, + }); + socket.emitMessage({ event: "pusher:error", data: JSON.stringify({ message: "denied" }) }); + + await expect(connection).rejects.toThrow(/denied/); + }); + + it("uses explicit Echo team identifiers without discovery calls", async () => { + EventTargetWebSocket.instances = []; + const fetchMock = vi.fn().mockImplementation(async () => jsonResponse({ auth: "signature" })); + const abort = new AbortController(); + const connection = runKchatWebSocketConnection({ + token: fixtureToken, + fetch: fetchMock, + webSocket: EventTargetWebSocket, + apiBaseUrl: "https://team.example.test", + teamId: "team-123", + teamUserId: "user-self", + signal: abort.signal, + onAuthenticated() { + abort.abort(); + }, + }); + + await waitImmediate(); + EventTargetWebSocket.instances[0]!.emitMessage({ + event: "pusher:connection_established", + data: JSON.stringify({ socket_id: "1.2" }), + }); + await connection; + + expect(fetchMock).toHaveBeenCalledTimes(2); + expect(EventTargetWebSocket.instances[0]!.sent.map((value) => JSON.parse(value).data.channel)).toEqual([ + "private-team.team-123", + "presence-teamUser.user-self", + ]); + }); + + it("authenticates and receives posts over a Mattermost socket", async () => { + NodeStyleWebSocket.instances = []; + const posts: string[] = []; + const abort = new AbortController(); + const connection = runKchatWebSocketConnection({ + protocol: "mattermost", + apiBaseUrl: "https://mattermost.example.test", + token: () => "placeholder-token", + webSocket: NodeStyleWebSocket, + signal: abort.signal, + onPost(event) { + posts.push(event.text); + abort.abort(); + }, + }); + + await waitImmediate(); + const socket = NodeStyleWebSocket.instances[0]!; + socket.emit("open", {}); + expect(JSON.parse(socket.sent[0] ?? "{}")).toEqual(createKchatMattermostAuthFrame("placeholder-token")); + socket.emitMessage({ status: "OK", seq_reply: 1 }); + socket.emitMessage({ + event: "posted", + data: { + channel_name: "test", + post: JSON.stringify({ id: "post-mm", channel_id: "channel-mm", message: "from mattermost", create_at: 1710000000000 }), + }, + }); + await connection; + + expect(posts).toEqual(["from mattermost"]); + expect(socket.closed).toBe(true); + }); + + it("lets run options override WebSocket client factory defaults", async () => { + EventTargetWebSocket.instances = []; + const factoryFetch = vi.fn(); + const runFetch = vi.fn().mockImplementation(async () => jsonResponse({ auth: "run-signature" })); + const client = createKchatWebSocketClient({ + token: factoryToken, + fetch: factoryFetch, + webSocket: ThrowingSendWebSocket, + }); + const abort = new AbortController(); + const connection = client.runConnection({ + token: runToken, + fetch: runFetch, + webSocket: EventTargetWebSocket, + apiBaseUrl: "https://team.example.test", + subscriptions: ["private-team.team-123"], + signal: abort.signal, + onAuthenticated() { + abort.abort(); + }, + }); + + await waitImmediate(); + EventTargetWebSocket.instances[0]!.emitMessage({ + event: "pusher:connection_established", + data: JSON.stringify({ socket_id: "1.2" }), + }); + await connection; + + expect(factoryFetch).not.toHaveBeenCalled(); + expect(runFetch).toHaveBeenCalledOnce(); + expect(new Headers(runFetch.mock.calls[0]?.[1]?.headers).get("authorization")).toBe(`Bearer ${runToken}`); + }); + + it("fails closed for missing token, missing WebSocket, and failed auth", async () => { + await expect(runKchatWebSocketConnection({ webSocket: EventTargetWebSocket })).rejects.toThrow(/bearer token/); + const originalWebSocket = globalThis.WebSocket; + vi.stubGlobal("WebSocket", undefined); + await expect(runKchatWebSocketConnection({ token: fixtureToken })).rejects.toThrow(/No WebSocket/); + vi.stubGlobal("WebSocket", originalWebSocket); + + EventTargetWebSocket.instances = []; + const connection = runKchatWebSocketConnection({ + protocol: "mattermost", + apiBaseUrl: "https://mattermost.example.test", + token: fixtureToken, + webSocket: EventTargetWebSocket, + }); + await waitImmediate(); + const socket = EventTargetWebSocket.instances[0]!; + socket.emitOpen(); + socket.emitMessage({ status: "FAIL", seq_reply: 1 }); + + await expect(connection).rejects.toThrow(/authentication failed/); + }); + + it("fails closed for malformed Echo handshakes and API discovery/auth failures", async () => { + await expectEchoFailure({ + fetchImpl: vi.fn(), + message: { event: "pusher:connection_established", data: "{}" }, + error: /socket id/, + }); + await expectEchoFailure({ + fetchImpl: vi.fn(), + message: { event: "pusher:error", data: "{}" }, + error: /returned an error/, + }); + await expectEchoFailure({ + fetchImpl: vi.fn(), + options: { teamUserId: "user-self" }, + message: { event: "pusher:connection_established", data: JSON.stringify({ socket_id: "1.2" }) }, + error: /teamName/, + }); + await expectEchoFailure({ + fetchImpl: vi.fn().mockResolvedValue(jsonResponse({})), + options: { teamName: "main-team", teamUserId: "user-self" }, + message: { event: "pusher:connection_established", data: JSON.stringify({ socket_id: "1.2" }) }, + error: /team lookup/, + }); + await expectEchoFailure({ + fetchImpl: vi.fn().mockImplementation(async (url) => { + if (String(url).endsWith("/api/v4/teams/name/main-team")) { + return jsonResponse({ id: "team-123" }); + } + return jsonResponse({}); + }), + options: { teamName: "main-team" }, + message: { event: "pusher:connection_established", data: JSON.stringify({ socket_id: "1.2" }) }, + error: /current-user/, + }); + await expectEchoFailure({ + fetchImpl: vi.fn().mockResolvedValue(jsonResponse({ error: "denied" }, { status: 403 })), + options: { apiBaseUrl: "https://team.example.test", subscriptions: ["private-team.team-123"] }, + message: { event: "pusher:connection_established", data: JSON.stringify({ socket_id: "1.2" }) }, + error: /channel auth failed/, + }); + await expectEchoFailure({ + fetchImpl: vi.fn().mockResolvedValue(jsonResponse({ missing: "auth" })), + options: { apiBaseUrl: "https://team.example.test", subscriptions: ["private-team.team-123"] }, + message: { event: "pusher:connection_established", data: JSON.stringify({ socket_id: "1.2" }) }, + error: /did not return a signature/, + }); + await expectEchoFailure({ + fetchImpl: vi.fn().mockResolvedValue(jsonResponse({ error: "nope" }, { status: 500 })), + options: { teamName: "main-team", teamUserId: "user-self" }, + message: { event: "pusher:connection_established", data: JSON.stringify({ socket_id: "1.2" }) }, + error: /API request failed/, + }); + await expectEchoFailure({ + fetchImpl: vi.fn().mockResolvedValue(new Response(JSON.stringify([]), { status: 200, headers: { "content-type": "application/json" } })), + options: { teamName: "main-team", teamUserId: "user-self" }, + message: { event: "pusher:connection_established", data: JSON.stringify({ socket_id: "1.2" }) }, + error: /JSON object/, + }); + }); + + it("handles alternate socket APIs, payload encodings, aborts, and socket errors", async () => { + await expect( + runKchatWebSocketConnection({ + protocol: "mattermost", + apiBaseUrl: "https://mattermost.example.test", + token: fixtureToken, + webSocket: ThrowingSendWebSocket, + }), + ).rejects.toThrow(/send failed/); + + const preAborted = new AbortController(); + preAborted.abort(); + await expect( + runKchatWebSocketConnection({ + token: fixtureToken, + webSocket: EventTargetWebSocket, + signal: preAborted.signal, + }), + ).resolves.toBeUndefined(); + + PropertyWebSocket.instances = []; + const abort = new AbortController(); + const frames: string[] = []; + const connection = runKchatWebSocketConnection({ + protocol: "mattermost", + apiBaseUrl: "https://mattermost.example.test", + token: fixtureToken, + webSocket: PropertyWebSocket, + signal: abort.signal, + onFrame(frame) { + frames.push(frame.event ?? String(frame.status)); + }, + }); + await waitImmediate(); + const propertySocket = PropertyWebSocket.instances[0]!; + propertySocket.emitOpen(); + propertySocket.emitRaw(Buffer.from(JSON.stringify({ status: "OK", seq_reply: 1 }))); + const arrayBufferFrame = Buffer.from( + JSON.stringify({ event: "posted", data: { channel_name: "test", post: JSON.stringify({ id: "post-buffer", channel_id: "channel", message: "buffer" }) } }), + ); + propertySocket.emitRaw(arrayBufferFrame.buffer.slice(arrayBufferFrame.byteOffset, arrayBufferFrame.byteOffset + arrayBufferFrame.byteLength)); + propertySocket.emitRaw( + new Uint8Array( + Buffer.from( + JSON.stringify({ + event: "posted", + data: { post: "{}" }, + }), + ), + ), + ); + propertySocket.emitRaw(Buffer.from("not json")); + propertySocket.emitRaw(Buffer.from(JSON.stringify([]))); + propertySocket.emitRaw({ event: "posted", data: { post: "{}" } }); + propertySocket.onmessage = () => undefined; + abort.abort(); + await connection; + expect(frames).toEqual(["OK", "posted", "posted"]); + expect(propertySocket.closed).toBe(true); + + RemoveListenerWebSocket.instances = []; + const removeListenerAbort = new AbortController(); + const removeListenerConnection = runKchatWebSocketConnection({ + protocol: "mattermost", + apiBaseUrl: "https://mattermost.example.test", + token: fixtureToken, + webSocket: RemoveListenerWebSocket, + signal: removeListenerAbort.signal, + }); + await waitImmediate(); + RemoveListenerWebSocket.instances[0]!.emit("open", {}); + removeListenerAbort.abort(); + await removeListenerConnection; + expect(RemoveListenerWebSocket.instances[0]!.removed).toContain("message"); + + EventTargetWebSocket.instances = []; + const errored = runKchatWebSocketConnection({ + token: fixtureToken, + webSocket: EventTargetWebSocket, + subscriptions: ["private-team.team-123"], + fetch: vi.fn().mockResolvedValue(jsonResponse({ auth: "signature" })), + }); + await waitImmediate(); + EventTargetWebSocket.instances[0]!.emit("error", new Error("socket exploded")); + await expect(errored).rejects.toThrow(/socket exploded/); + + EventTargetWebSocket.instances = []; + const messageErrored = runKchatWebSocketConnection({ + token: fixtureToken, + webSocket: EventTargetWebSocket, + subscriptions: ["private-team.team-123"], + fetch: vi.fn().mockResolvedValue(jsonResponse({ auth: "signature" })), + }); + await waitImmediate(); + EventTargetWebSocket.instances[0]!.emit("error", { error: JSON.stringify({ message: "nested socket error" }) }); + await expect(messageErrored).rejects.toThrow(/nested socket error/); + + EventTargetWebSocket.instances = []; + const directMessageErrored = runKchatWebSocketConnection({ + token: fixtureToken, + webSocket: EventTargetWebSocket, + subscriptions: ["private-team.team-123"], + fetch: vi.fn().mockResolvedValue(jsonResponse({ auth: "signature" })), + }); + await waitImmediate(); + EventTargetWebSocket.instances[0]!.emit("error", { message: "direct socket error" }); + await expect(directMessageErrored).rejects.toThrow(/direct socket error/); + + EventTargetWebSocket.instances = []; + const fallbackErrored = runKchatWebSocketConnection({ + token: fixtureToken, + webSocket: EventTargetWebSocket, + subscriptions: ["private-team.team-123"], + fetch: vi.fn().mockResolvedValue(jsonResponse({ auth: "signature" })), + }); + await waitImmediate(); + EventTargetWebSocket.instances[0]!.emit("error", {}); + await expect(fallbackErrored).rejects.toThrow(/connection failed/); + + EventTargetWebSocket.instances = []; + const primitiveErrored = runKchatWebSocketConnection({ + token: fixtureToken, + webSocket: EventTargetWebSocket, + subscriptions: ["private-team.team-123"], + fetch: vi.fn().mockResolvedValue(jsonResponse({ auth: "signature" })), + }); + await waitImmediate(); + EventTargetWebSocket.instances[0]!.emit("error", "primitive error"); + await expect(primitiveErrored).rejects.toThrow(/connection failed/); + + vi.stubGlobal("fetch", undefined); + await expectEchoFailure({ + options: { apiBaseUrl: "https://team.example.test", subscriptions: ["private-team.team-123"] }, + message: { event: "pusher:connection_established", data: JSON.stringify({ socket_id: "1.2" }) }, + error: /No fetch implementation/, + }); + }); +}); + +class EventTargetWebSocket implements KchatWebSocketLike { + static instances: EventTargetWebSocket[] = []; + readonly sent: string[] = []; + readonly listeners = new Map void>>(); + closed = false; + + constructor(readonly url: string) { + EventTargetWebSocket.instances.push(this); + queueMicrotask(() => this.emitOpen()); + } + + send(data: string): void { + this.sent.push(data); + } + + close(): void { + this.closed = true; + this.emit("close", {}); + } + + addEventListener(eventName: string, handler: (event: unknown) => void): void { + this.listeners.set(eventName, [...(this.listeners.get(eventName) ?? []), handler]); + } + + removeEventListener(eventName: string, handler: (event: unknown) => void): void { + this.listeners.set( + eventName, + (this.listeners.get(eventName) ?? []).filter((listener) => listener !== handler), + ); + } + + emitOpen(): void { + this.emit("open", {}); + } + + emitMessage(frame: unknown): void { + this.emit("message", { data: JSON.stringify(frame) }); + } + + emit(eventName: string, event: unknown): void { + for (const listener of this.listeners.get(eventName) ?? []) { + listener(event); + } + } +} + +class NodeStyleWebSocket implements KchatWebSocketLike { + static instances: NodeStyleWebSocket[] = []; + readonly sent: string[] = []; + readonly listeners = new Map void>>(); + closed = false; + + constructor(readonly url: string) { + NodeStyleWebSocket.instances.push(this); + } + + send(data: string): void { + this.sent.push(data); + } + + close(): void { + this.closed = true; + this.emit("close", {}); + } + + on(eventName: string, handler: (event: unknown) => void): void { + this.listeners.set(eventName, [...(this.listeners.get(eventName) ?? []), handler]); + } + + off(eventName: string, handler: (event: unknown) => void): void { + this.listeners.set( + eventName, + (this.listeners.get(eventName) ?? []).filter((listener) => listener !== handler), + ); + } + + emitMessage(frame: unknown): void { + this.emit("message", { data: JSON.stringify(frame) }); + } + + emit(eventName: string, event: unknown): void { + for (const listener of this.listeners.get(eventName) ?? []) { + listener(event); + } + } +} + +class RemoveListenerWebSocket implements KchatWebSocketLike { + static instances: RemoveListenerWebSocket[] = []; + readonly sent: string[] = []; + readonly removed: string[] = []; + readonly listeners = new Map void>>(); + + constructor(readonly url: string) { + RemoveListenerWebSocket.instances.push(this); + } + + send(data: string): void { + this.sent.push(data); + } + + close(): void { + this.emit("close", {}); + } + + on(eventName: string, handler: (event: unknown) => void): void { + this.listeners.set(eventName, [...(this.listeners.get(eventName) ?? []), handler]); + } + + removeListener(eventName: string, handler: (event: unknown) => void): void { + this.removed.push(eventName); + this.listeners.set( + eventName, + (this.listeners.get(eventName) ?? []).filter((listener) => listener !== handler), + ); + } + + emit(eventName: string, event: unknown): void { + for (const listener of this.listeners.get(eventName) ?? []) { + listener(event); + } + } +} + +class PropertyWebSocket implements KchatWebSocketLike { + static instances: PropertyWebSocket[] = []; + readonly sent: string[] = []; + closed = false; + onopen?: (event: unknown) => void; + onmessage?: (event: unknown) => void; + onerror?: (event: unknown) => void; + onclose?: (event: unknown) => void; + + constructor(readonly url: string) { + PropertyWebSocket.instances.push(this); + } + + send(data: string): void { + this.sent.push(data); + } + + close(): void { + this.closed = true; + this.onclose?.({}); + } + + emitOpen(): void { + this.onopen?.({}); + } + + emitRaw(data: unknown): void { + this.onmessage?.(data); + } +} + +class ThrowingSendWebSocket extends EventTargetWebSocket { + override send(): void { + throw new Error("send failed"); + } +} + +function jsonResponse(value: unknown, init?: ResponseInit): Response { + return new Response(JSON.stringify(value), { + status: 200, + headers: { "content-type": "application/json" }, + ...init, + }); +} + +async function waitImmediate(): Promise { + await new Promise((resolve) => setImmediate(resolve)); +} + +async function expectEchoFailure({ + fetchImpl, + options = {}, + message, + error, +}: { + fetchImpl?: typeof fetch; + options?: Parameters[0]; + message: unknown; + error: RegExp; +}): Promise { + EventTargetWebSocket.instances = []; + const connection = runKchatWebSocketConnection({ + token: fixtureToken, + webSocket: EventTargetWebSocket, + ...(fetchImpl === undefined ? {} : { fetch: fetchImpl }), + ...options, + }); + await waitImmediate(); + EventTargetWebSocket.instances[0]!.emitMessage(message); + await expect(connection).rejects.toThrow(error); +}