diff --git a/default.nix b/default.nix index 6af2bae2a1..4b9dd756c8 100644 --- a/default.nix +++ b/default.nix @@ -82,8 +82,6 @@ let # set +x runHook postInstall ''; - - passthru.tests = pkgs.testers.runNixOSTest (import ./nix/tests/test-bundle-starts.nix self); }; in pkgs.stdenv.mkDerivation { @@ -122,5 +120,25 @@ pkgs.stdenv.mkDerivation { makeWrapper ${pkgs.nodejs_24}/bin/node $out/bin/apply-migrations --prefix NODE_PATH : $out/node_modules --add-flags --enable-source-maps --add-flags $out/dist/apply-migrations.js ''; - passthru.tests = pkgs.testers.runNixOSTest (import ./nix/tests/test-bundle-starts.nix self); + passthru.tests = pkgs.runCommand "spacebar-server-ts-all-tests" rec { + bundleStarts = pkgs.testers.runNixOSTest (import ./nix/tests/test-bundle-starts.nix { inherit self; }); + bundleStartsRabbitMqSingle = pkgs.testers.runNixOSTest ( + import ./nix/tests/test-bundle-starts.nix { + inherit self; + withIpc = "rabbitmq-single"; + } + ); + bundleStartsRabbitMqLegacy = pkgs.testers.runNixOSTest ( + import ./nix/tests/test-bundle-starts.nix { + inherit self; + withIpc = "rabbitmq-legacy"; + } + ); + + nativeBuildInputs = [ + bundleStarts + bundleStartsRabbitMqSingle + bundleStartsRabbitMqLegacy + ]; + } "touch $out"; } diff --git a/nix/modules/default/default.nix b/nix/modules/default/default.nix index 5602c25895..876ad941ae 100644 --- a/nix/modules/default/default.nix +++ b/nix/modules/default/default.nix @@ -55,6 +55,20 @@ in description = "Path to store CDN files."; }; + ipcMethod = lib.mkOption { + type = lib.types.enum [ + "unix" + "rabbitmq-single" + "rabbitmq-legacy" + ]; + default = "unix"; + description = '' + How messages should be passed between services. + Note that the C# services (eg. Admin API) currently only supports the "unix" method! + Read more at https://docs.spacebar.chat/setup/server/installation/generic/ipc/. + ''; + }; + extraEnvironment = lib.mkOption { default = { }; description = '' @@ -98,7 +112,6 @@ in environment = builtins.mapAttrs (_: val: builtins.toString val) ( { # things we set by default... - EVENT_TRANSMISSION = "unix"; EVENT_SOCKET_PATH = "/run/spacebar/"; } // cfg.extraEnvironment @@ -108,6 +121,7 @@ in CONFIG_READONLY = 1; PORT = toString cfg.apiEndpoint.localPort; STORAGE_LOCATION = cfg.cdnPath; + EVENT_TRANSMISSION = cfg.ipcMethod; } ); serviceConfig = { @@ -121,7 +135,6 @@ in environment = builtins.mapAttrs (_: val: builtins.toString val) ( { # things we set by default... - EVENT_TRANSMISSION = "unix"; EVENT_SOCKET_PATH = "/run/spacebar/"; } // cfg.extraEnvironment @@ -131,6 +144,7 @@ in CONFIG_READONLY = 1; PORT = toString cfg.gatewayEndpoint.localPort; STORAGE_LOCATION = cfg.cdnPath; + EVENT_TRANSMISSION = cfg.ipcMethod; APPLY_DB_MIGRATIONS = "false"; } ); @@ -144,7 +158,6 @@ in environment = builtins.mapAttrs (_: val: builtins.toString val) ( { # things we set by default... - EVENT_TRANSMISSION = "unix"; EVENT_SOCKET_PATH = "/run/spacebar/"; } // cfg.extraEnvironment @@ -154,6 +167,7 @@ in CONFIG_READONLY = 1; PORT = toString cfg.cdnEndpoint.localPort; STORAGE_LOCATION = cfg.cdnPath; + EVENT_TRANSMISSION = cfg.ipcMethod; APPLY_DB_MIGRATIONS = "false"; } ); diff --git a/nix/tests/test-bundle-starts.nix b/nix/tests/test-bundle-starts.nix index 86b998cf76..2e088f6de9 100644 --- a/nix/tests/test-bundle-starts.nix +++ b/nix/tests/test-bundle-starts.nix @@ -1,4 +1,7 @@ -self: +{ + self, + withIpc ? "unix", +}: { config, lib, @@ -8,11 +11,13 @@ self: let sb = import ../lib/mkEndpoint.nix; + isRabbitMqTest = lib.strings.hasPrefix "rabbitmq" withIpc; in { - name = "test-bundle-starts"; + name = "test-bundle-starts" + lib.optionalString (withIpc != "unix") ("_ipc=" + withIpc); skipTypeCheck = true; skipLint = true; + globalTimeout = 120; nodes.machine = { imports = [ self.nixosModules.default ]; @@ -30,12 +35,20 @@ in LOG_REQUESTS = "-"; # Log all requests LOG_VALIDATION_ERRORS = true; }; + ipcMethod = withIpc; + + settings = { + rabbitmq = { + host = lib.mkIf isRabbitMqTest "amqp://guest:guest@127.0.0.1:5672"; + }; + }; nginx.enable = true; }; in lib.trace ("Testing with config: " + builtins.toJSON cfg) cfg; services.nginx.enable = true; + services.rabbitmq.enable = isRabbitMqTest; services.postgresql = { enable = true; initdbArgs = [ diff --git a/src/util/util/Event.ts b/src/util/util/Event.ts deleted file mode 100644 index 0e66288649..0000000000 --- a/src/util/util/Event.ts +++ /dev/null @@ -1,529 +0,0 @@ -/* - Spacebar: A FOSS re-implementation and extension of the Discord.com backend. - Copyright (C) 2023 Spacebar and Spacebar Contributors - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU Affero General Public License as published - by the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License - along with this program. If not, see . -*/ - -import { Channel } from "amqplib"; -import { RabbitMQ } from "./RabbitMQ"; -import EventEmitter from "node:events"; -import { EVENT, Event } from "../interfaces"; -import { randomUUID } from "node:crypto"; -import path from "node:path"; -import net, { Socket } from "node:net"; -import fs, { FSWatcher } from "node:fs"; -import { Stopwatch } from "./Stopwatch"; -import { Config } from "./Config"; -import { red } from "picocolors"; - -export const events = new EventEmitter(); -let unixSocketListener: UnixSocketListener | null = null; -let unixSocketWriter: UnixSocketWriter | null = null; - -export async function emitEvent(payload: Omit) { - const id = (payload.guild_id || payload.channel_id || payload.user_id || payload.session_id) as string; - if (!id) return console.error("event doesn't contain any id", payload); - - if (RabbitMQ.connection) { - const data = typeof payload.data === "object" ? JSON.stringify(payload.data) : payload.data; // use rabbitmq for event transmission - - const publishEvent = async (retryCount = 0): Promise => { - const channel = await RabbitMQ.getSafeChannel(); - try { - await channel.assertExchange(id, "fanout", { - durable: false, - }); - - // assertQueue isn't needed, because a queue will automatically created if it doesn't exist - const successful = channel.publish(id, "", Buffer.from(`${data}`), { type: payload.event }); - if (!successful) throw new Error("failed to send event"); - } catch (e) { - // Check if this is a channel closed error and if we should retry - const errorMessage = e instanceof Error ? e.message : String(e); - const isChannelError = errorMessage.includes("Channel closed") || errorMessage.includes("IllegalOperationError") || errorMessage.includes("RESOURCE_ERROR"); - - if (isChannelError && retryCount < 1) { - console.log("[RabbitMQ] Channel error detected, retrying with new channel..."); - // Force the cached channel to be discarded by calling getSafeChannel which will create a new one - return publishEvent(retryCount + 1); - } - - console.log("[RabbitMQ] ", e); - } - }; - - await publishEvent(); - } else if (process.env.EVENT_TRANSMISSION === "unix" && process.env.EVENT_SOCKET_PATH) { - if (!unixSocketWriter) { - console.error("[Event] Unix socket writer not initialized, cannot emit event!"); - throw new Error("Unix socket writer not initialized"); - } - await unixSocketWriter.emit(payload); - } else if (process.env.EVENT_TRANSMISSION === "process") { - process.send?.({ type: "event", event: payload, id } as ProcessEvent); - } else { - events.emit(id, payload); - } -} - -export async function initEvent() { - await RabbitMQ.init(); // does nothing if rabbitmq is not setup - - if (process.env.EVENT_TRANSMISSION === "unix" && process.env.EVENT_SOCKET_PATH) { - if (!unixSocketWriter) { - unixSocketWriter = new UnixSocketWriter(process.env.EVENT_SOCKET_PATH); - await unixSocketWriter.init(); - } - } - - // Set up the spacebar event listener (used for config reload, etc.) - const setupSpacebarListener = async () => { - console.log("[Event] Setting up spacebar event listener"); - await listenEvent("spacebar", async (event) => { - console.log("[Event] Received spacebar event:", event); - if ((event.event as string) === "SB_RELOAD_CONFIG") { - console.log("[Event] Reloading config due to RELOAD_CONFIG event"); - await Config.init(true); - } - }); - }; - - // Initial setup - await setupSpacebarListener(); - - // Re-establish listener on reconnection - RabbitMQ.on("reconnected", async () => { - console.log("[Event] RabbitMQ reconnected, re-establishing spacebar listener"); - await setupSpacebarListener(); - }); -} - -export interface EventOpts extends Event { - acknowledge?: () => unknown; - channel?: Channel; - cancel: (id?: string) => unknown; -} - -export interface ListenEventOpts { - channel?: Channel; - acknowledge?: boolean; -} - -export interface ProcessEvent { - type: "event"; - event: Event; - id: string; -} - -export async function listenEvent(event: string, callback: (event: EventOpts) => unknown, opts?: ListenEventOpts): Promise<() => Promise> { - if (RabbitMQ.connection) { - const rabbitMQChannel = await RabbitMQ.getSafeChannel(); - const channel = opts?.channel || rabbitMQChannel; - if (!channel) throw new Error("[Events] An event was sent without an associated channel"); - return await rabbitListen(channel, event, callback, { - acknowledge: opts?.acknowledge, - }); - } else if (process.env.EVENT_TRANSMISSION === "unix" && process.env.EVENT_SOCKET_PATH) { - if (!unixSocketListener) { - unixSocketListener = new UnixSocketListener(path.join(process.env.EVENT_SOCKET_PATH, `${process.pid}.sock`)); - await unixSocketListener.init(); - } - return await unixSocketListener.listen(event, callback); - } else if (process.env.EVENT_TRANSMISSION === "process") { - const cancel = async () => { - process.removeListener("message", listener); - process.setMaxListeners(process.getMaxListeners() - 1); - }; - - const listener = (msg: ProcessEvent) => { - // eslint-disable-next-line @typescript-eslint/no-unused-expressions - msg.type === "event" && msg.id === event && callback({ ...msg.event, cancel }); - }; - - // TODO: assert the type is correct? - process.addListener("message", (msg) => listener(msg as ProcessEvent)); - process.setMaxListeners(process.getMaxListeners() + 1); - - return cancel; - } else { - const listener = (opts: EventOpts) => callback({ ...opts, cancel }); - const cancel = async () => { - events.removeListener(event, listener); - events.setMaxListeners(events.getMaxListeners() - 1); - }; - events.setMaxListeners(events.getMaxListeners() + 1); - events.addListener(event, listener); - - return cancel; - } -} - -async function rabbitListen(channel: Channel, id: string, callback: (event: EventOpts) => unknown, opts?: { acknowledge?: boolean }): Promise<() => Promise> { - await channel.assertExchange(id, "fanout", { durable: false }); - // messageTtl ensures any orphaned messages are cleaned up quickly if the consumer disconnects. - const q = await channel.assertQueue("", { - exclusive: true, - autoDelete: true, - messageTtl: 5000, // Messages expire after 5 seconds if not consumed - }); - - const consumerTag = randomUUID(); - - const cancel = async () => { - try { - // Order matters here to prevent RESOURCE_ERROR, due to potential race condition: - // 1. Unbind first - stops new messages from being routed to this queue - await channel.unbindQueue(q.queue, id, ""); - // 2. Cancel consumer - with autoDelete: true, this triggers queue deletion - // after RabbitMQ ensures no messages are in-flight to this queue - await channel.cancel(consumerTag); - // Don't explicitly delete the queue - let autoDelete handle it safely. - // Explicitly deleting can race with in-flight message delivery. - } catch (e) { - // Channel might already be closed or queue already deleted - that's fine - console.log("[RabbitMQ] Error during consumer cancel (may be expected):", e instanceof Error ? e.message : e); - } - }; - - await channel.bindQueue(q.queue, id, ""); - await channel.consume( - q.queue, - (opts) => { - if (!opts) return; - - const data = JSON.parse(opts.content.toString()); - const event = opts.properties.type as EVENT; - - callback({ - event, - data, - acknowledge() { - channel.ack(opts); - }, - channel, - cancel, - }); - // rabbitCh.ack(opts); - }, - { - noAck: !opts?.acknowledge, - consumerTag: consumerTag, - }, - ); - - return cancel; -} - -class UnixSocketListener { - eventEmitter: EventEmitter; - socketPath: string; - - constructor(socketPath: string) { - this.eventEmitter = new EventEmitter(); - this.socketPath = socketPath; - } - - async init() { - // remove stale socket file if it exists - // can happen if there's a PID conflict (across containers/PID namespaces) - try { - if (fs.existsSync(this.socketPath)) { - fs.unlinkSync(this.socketPath); - console.log("[Events] Removed stale socket file:", this.socketPath); - } - } catch (e) { - console.error("[Events] Failed to remove stale socket:", e); - } - - const server = net.createServer((socket) => { - socket.on("connect", () => { - console.log("[Events] Unix socket client connected"); - }); - let buffer = Buffer.alloc(0); - socket.on("data", (data: Buffer) => { - buffer = Buffer.concat([buffer, data]); - while (buffer.length >= 4) { - const msgLen = buffer.readUInt32BE(0); - if (buffer.length < 4 + msgLen) break; - const msgBuf = buffer.subarray(4, 4 + msgLen); - buffer = buffer.subarray(4 + msgLen); - try { - const payload = JSON.parse(msgBuf.toString()); - this.eventEmitter.emit(payload.id, payload.event); - } catch (e) { - console.error("[Events] Failed to parse unix socket data:", e); - } - } - }); - socket.on("error", (err) => { - console.error("[Events] Unix socket error:", err); - }); - socket.on("close", () => { - console.log("[Events] Unix socket client disconnected"); - }); - }); - - server.listen(this.socketPath, () => { - console.log(`Unix socket server listening on ${this.socketPath}`); - }); - - const shutdown = () => { - console.log("[Events] Closing unix socket server"); - server.close(); - - // clean up socket file - try { - fs.unlinkSync(this.socketPath); - } catch (e) { - console.error("[Events] Failed to unlink socket file:", e); - } - - process.exit(0); - }; - for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) { - process.on(sig, shutdown); - } - } - - async listen(event: string, callback: (event: EventOpts) => unknown): Promise<() => Promise> { - const listener = (data: Event) => { - callback({ - ...data, - cancel, - }); - }; - - this.eventEmitter.addListener(event, listener); - - const cancel = async () => { - this.eventEmitter.removeListener(event, listener); - this.eventEmitter.setMaxListeners(this.eventEmitter.getMaxListeners() - 1); - }; - - this.eventEmitter.setMaxListeners(this.eventEmitter.getMaxListeners() + 1); - - return cancel; - } -} - -function getPidCmdline(pid: number): string | null { - try { - const cmdline = fs.readFileSync(`/proc/${pid}/cmdline`, "utf-8"); - return cmdline.replaceAll("\0", " ").trim(); - } catch (e) { - return null; - } -} - -class UnixSocketWriter { - socketPath: string; - clients: { [key: string]: Socket } = {}; - watcher?: FSWatcher; - backlog: Event[] = []; - broadcastLock: Promise = Promise.resolve(); - replayLock: Promise = Promise.resolve(); - isInitializing = true; - - constructor(socketPath: string) { - this.socketPath = socketPath; - } - - async init() { - if (!fs.opendirSync(this.socketPath)) throw new Error("Unix socket path does not exist or is not a directory: " + this.socketPath); - - console.log("[Events] Unix socket writer initializing for", this.socketPath); - - const connect = (file: string) => { - const fullPath = path.join(this.socketPath, file); - const pid = Number(path.basename(file, ".sock")); - console.log("[Events] Attempting to connect to unix socket:", fullPath, "| proc:", getPidCmdline(pid) ?? red("No such pid: " + pid)); - - // avoid duplicate connections - if (this.clients[fullPath] && !this.clients[fullPath].destroyed) { - console.log("[Events] Unix socket client already connected to", fullPath); - return; - } - - // clean up old connection if it exists - if (this.clients[fullPath]) { - console.log("[Events] Removing stale unix socket client for", fullPath); - try { - this.clients[fullPath].destroy(); - } catch (e) { - // ignore - } - delete this.clients[fullPath]; - } - - // check if it's actually a socket file (not a ghost/regular file) - try { - const stats = fs.statSync(fullPath); - if (!stats.isSocket()) { - console.log("[Events] Ignoring non-socket file:", fullPath); - return; - } - } catch (e) { - console.log("[Events] Cannot stat socket file:", fullPath); - return; - } - - try { - this.clients[fullPath] = net.createConnection(fullPath, () => { - console.log("[Events] Unix socket client connected to", fullPath); - }); - - this.clients[fullPath].on("error", (err) => { - console.error("[Events] Unix socket client error on", fullPath, ":", err); - // clean up after error - if (this.clients[fullPath]) { - delete this.clients[fullPath]; - } - }); - - // handle clean socket closure - this.clients[fullPath].on("close", () => { - console.log("[Events] Unix socket client closed:", fullPath); - delete this.clients[fullPath]; - }); - } catch (e) { - console.error("[Events] Failed to create connection to", fullPath, ":", e); - delete this.clients[fullPath]; - } - }; - - // connect to all sockets, now and in the future - this.watcher = fs.watch(this.socketPath, {}, (eventType, filename) => { - console.log("[Events] Unix socket writer received watch sig", eventType, filename); - if (eventType === "rename" && filename?.endsWith(".sock")) { - try { - const fullPath = path.join(this.socketPath, filename!); - if (fs.existsSync(fullPath)) { - connect(filename!); - } else { - if (this.clients[fullPath]) { - console.log("[Events] Unix socket writer detected removed socket:", fullPath); - try { - this.clients[fullPath].destroy(); - } catch (e) { - // socket may already be destroyed - } - delete this.clients[fullPath]; - } - } - } catch (e) { - // don't - } - } - }); - - this.watcher.on("error", (err) => { - console.error("[Events] Unix socket watcher error:", err); - }); - - // connect to existing sockets if any - try { - const files = fs.readdirSync(this.socketPath); - console.log("[Events] Unix socket writer found existing sockets:", files); - files.forEach((file) => { - if (file.endsWith(".sock")) { - connect(file); - } - }); - } catch (err) { - console.error("[Events] Unix socket writer failed to read directory:", err); - } - - this.isInitializing = false; - } - - async emit(event: Event) { - if (!this.clients) throw new Error("UnixSocketWriter not initialized"); - - // check if there are any listeners - const clientCount = Object.entries(this.clients).length; - if (clientCount === 0) { - console.warn("[Events] Unix socket writer has no connected clients to emit to, backlog size:", this.backlog.length + 1); - this.backlog.push(event); - if (!this.isInitializing) { - this.isInitializing = true; - console.log("[Events] Re-initializing unix socket writer due to new event with no listeners"); - await this.close(); - await this.init(); - } - return; - } - - await this.replayLock; - await (this.replayLock = Promise.resolve().then(async () => { - if (this.backlog.length > 0) { - console.log(`[Events] Replaying ${this.backlog.length} backlog events`); - for (const backlogEvent of this.backlog) { - await this.broadcast(backlogEvent); - } - this.backlog = []; - } - })); - - await this.broadcast(event); - } - - private async broadcast(event: Event) { - await this.broadcastLock; - return await (this.broadcastLock = new Promise((res) => { - const tsw = Stopwatch.startNew(); - const payloadBuf = Buffer.from(JSON.stringify({ id: (event.guild_id || event.channel_id || event.user_id || event.session_id) as string, event })); - const lenBuf = Buffer.alloc(4); - lenBuf.writeUInt32BE(payloadBuf.length, 0); - const framed = Buffer.concat([lenBuf, payloadBuf]); - - for (const [socketPath, socket] of Object.entries(this.clients)) { - if (socket.destroyed) { - console.log("[Events] Unix socket writer found destroyed socket, removing:", socketPath); - delete this.clients[socketPath]; - continue; - } - - try { - socket.write(framed); - } catch (e) { - console.error("[Events] Unix socket writer failed to write to socket", socketPath, ":", e); - } - } - - if (tsw.elapsed().totalMilliseconds > 5) - // else it's too noisy - console.log(`[Events] Unix socket writer emitted to ${Object.entries(this.clients).length} sockets in ${tsw.elapsed().totalMilliseconds}ms`); - res(); - })); - } - - async close() { - console.log("[Events] Closing Unix socket writer"); - - if (this.watcher) { - this.watcher.close(); - this.watcher = undefined; - } - - for (const [path, socket] of Object.entries(this.clients)) { - try { - socket.destroy(); - } catch (e) { - console.error("[Events] Error closing socket", path, ":", e); - } - } - this.clients = {}; - } -} diff --git a/src/util/util/index.ts b/src/util/util/index.ts index 368fcc509d..f6de6a9554 100644 --- a/src/util/util/index.ts +++ b/src/util/util/index.ts @@ -27,7 +27,7 @@ export * from "./Database"; export * from "./DateBuilder"; export * from "./email"; export * from "./ElapsedTime"; -export * from "./Event"; +export * from "./ipc/Event"; export * from "./FieldError"; export * from "./Intents"; export * from "./InvisibleCharacters"; @@ -37,7 +37,7 @@ export * from "./Logo"; export * from "./MessageFlags"; export * from "./networking"; export * from "./Permissions"; -export * from "./RabbitMQ"; +export * from "./ipc/RabbitMQ"; export * from "./Regex"; export * from "./Rights"; export * from "./Snowflake"; diff --git a/src/util/util/ipc/Event.ts b/src/util/util/ipc/Event.ts new file mode 100644 index 0000000000..e836c269ea --- /dev/null +++ b/src/util/util/ipc/Event.ts @@ -0,0 +1,170 @@ +/* + Spacebar: A FOSS re-implementation and extension of the Discord.com backend. + Copyright (C) 2023 Spacebar and Spacebar Contributors + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . +*/ + +import EventEmitter from "node:events"; +import path from "node:path"; +import { Channel } from "amqplib"; +import { yellow } from "picocolors"; +import { Event } from "../../interfaces"; +import { Config } from "../Config"; +import { rabbitListen, RabbitMQ } from "./RabbitMQ"; +import { BaseEventListener } from "./listener/BaseEventListener"; +import { BaseEventWriter } from "./writer/BaseEventWriter"; +import { UnixSocketWriter } from "./writer/UnixSocketWriter"; +import { UnixSocketListener } from "./listener/UnixSocketListener"; +import { RabbitMqSingleListener } from "./listener/RabbitMqSingleListener"; +import { RabbitMqSingleWriter } from "./writer/RabbitMqSingleWriter"; + +export const events = new EventEmitter(); +let listener: BaseEventListener | null = null; +let writer: BaseEventWriter | null = null; + +export async function emitEvent(payload: Omit) { + const id = (payload.guild_id || payload.channel_id || payload.user_id || payload.session_id) as string; + if (!id) return console.error("event doesn't contain any id", payload); + + if (writer) { + await writer.emit(payload); + } else if (process.env.EVENT_TRANSMISSION === "process") { + process.send?.({ type: "event", event: payload, id } as ProcessEvent); + } else if (RabbitMQ.connection) { + await RabbitMQ.publishEvent(id, payload); + } else { + events.emit(id, payload); + } +} + +export async function initEvent() { + if (process.env.EVENT_TRANSMISSION === "rabbitmq-single") { + if (!Config.get().rabbitmq.host!) { + throw new Error("[Events] rabbitmq.host is not configured."); + } + + if (!writer) { + writer = new RabbitMqSingleWriter(Config.get().rabbitmq.host!); + await writer.init(); + } + } else if (process.env.EVENT_TRANSMISSION === "unix" && process.env.EVENT_SOCKET_PATH) { + if (!process.env.EVENT_SOCKET_PATH) { + throw new Error("[Events] EVENT_SOCKET_PATH is not configured."); + } + + if (!writer) { + writer = new UnixSocketWriter(process.env.EVENT_SOCKET_PATH); + await writer.init(); + } + } else await RabbitMQ.init(); // does nothing if rabbitmq is not setup + + // Set up the spacebar event listener (used for config reload, etc.) + const setupSpacebarListener = async () => { + console.log("[Event] Setting up spacebar event listener"); + await listenEvent("spacebar", async (event) => { + console.log("[Event] Received spacebar event:", event); + if ((event.event as string) === "SB_RELOAD_CONFIG") { + console.log("[Event] Reloading config due to RELOAD_CONFIG event"); + await Config.init(true); + } + }); + }; + + // Initial setup + await setupSpacebarListener(); + + // Re-establish listener on reconnection + RabbitMQ.on("reconnected", async () => { + console.log("[Event] RabbitMQ reconnected, re-establishing spacebar listener"); + await setupSpacebarListener(); + }); +} + +export interface EventOpts extends Event { + acknowledge?: () => unknown; + channel?: Channel; + cancel: (id?: string) => unknown; +} + +export interface ListenEventOpts { + channel?: Channel; + acknowledge?: boolean; +} + +export interface ProcessEvent { + type: "event"; + event: Event; + id: string; +} + +export async function listenEvent(event: string, callback: (event: EventOpts) => unknown, opts?: ListenEventOpts): Promise<() => Promise> { + if (process.env.EVENT_TRANSMISSION === "rabbitmq-single") { + if (!Config.get().rabbitmq.host) { + throw new Error("[Events] rabbitmq.host is not configured."); + } + if (!listener) { + listener = new RabbitMqSingleListener(Config.get().rabbitmq.host!); + await listener.init(); + } + return await listener.listen(event, callback); + } else if (process.env.EVENT_TRANSMISSION === "unix") { + if (!process.env.EVENT_SOCKET_PATH) { + throw new Error("[Events] EVENT_SOCKET_PATH is not configured."); + } + + if (!listener) { + listener = listener = new UnixSocketListener(path.join(process.env.EVENT_SOCKET_PATH, `${process.pid}.sock`)); + await listener.init(); + } + return await listener.listen(event, callback); + } else if (process.env.EVENT_TRANSMISSION === "process") { + const cancel = async () => { + process.removeListener("message", listener); + process.setMaxListeners(process.getMaxListeners() - 1); + }; + + const listener = (msg: ProcessEvent) => { + // eslint-disable-next-line @typescript-eslint/no-unused-expressions + msg.type === "event" && msg.id === event && callback({ ...msg.event, cancel }); + }; + + // TODO: assert the type is correct? + process.addListener("message", (msg) => listener(msg as ProcessEvent)); + process.setMaxListeners(process.getMaxListeners() + 1); + + return cancel; + } else if (RabbitMQ.connection) { + if (process.env.EVENT_TRANSMISSION !== "rabbitmq-legacy") { + console.warn(yellow("[Events] Warning:"), "RabbitMQ replication without configuring EVENT_TRANSMISSION is deprecated."); + console.warn(yellow("[Events] Warning:"), "Set EVENT_TRANSMISSION to 'rabbitmq-legacy' in environment variables to silence this warning."); + } + const rabbitMQChannel = await RabbitMQ.getSafeChannel(); + const channel = opts?.channel || rabbitMQChannel; + if (!channel) throw new Error("[Events] An event was sent without an associated channel"); + return await rabbitListen(channel, event, callback, { + acknowledge: opts?.acknowledge, + }); + } else { + const listener = (opts: EventOpts) => callback({ ...opts, cancel }); + const cancel = async () => { + events.removeListener(event, listener); + events.setMaxListeners(events.getMaxListeners() - 1); + }; + events.setMaxListeners(events.getMaxListeners() + 1); + events.addListener(event, listener); + + return cancel; + } +} diff --git a/src/util/util/RabbitMQ.ts b/src/util/util/ipc/RabbitMQ.ts similarity index 66% rename from src/util/util/RabbitMQ.ts rename to src/util/util/ipc/RabbitMQ.ts index 328a2df720..e3a1ca4f95 100644 --- a/src/util/util/RabbitMQ.ts +++ b/src/util/util/ipc/RabbitMQ.ts @@ -16,9 +16,12 @@ along with this program. If not, see . */ -import amqp, { Channel, ChannelModel } from "amqplib"; -import { Config } from "./Config"; +import { randomUUID } from "node:crypto"; import EventEmitter from "node:events"; +import amqp, { Channel, ChannelModel } from "amqplib"; +import { Event, EVENT } from "../../interfaces"; +import { Config } from "../Config"; +import type { EventOpts } from "./Event"; export class RabbitMQ { public static connection: ChannelModel | null = null; @@ -170,4 +173,78 @@ export class RabbitMQ { static isConnected(): boolean { return this.connection !== null && !this.isReconnecting; } + + static async publishEvent(id: string, payload: Omit, retryCount = 0): Promise { + const data = typeof payload.data === "object" ? JSON.stringify(payload.data) : payload.data; // use rabbitmq for event transmission + const channel = await RabbitMQ.getSafeChannel(); + try { + await channel.assertExchange(id, "fanout", { + durable: false, + }); + + // assertQueue isn't needed, because a queue will automatically created if it doesn't exist + const successful = channel.publish(id, "", Buffer.from(`${data}`), { type: payload.event }); + if (!successful) throw new Error("failed to send event"); + } catch (e) { + // Check if this is a channel closed error and if we should retry + const errorMessage = e instanceof Error ? e.message : String(e); + const isChannelError = errorMessage.includes("Channel closed") || errorMessage.includes("IllegalOperationError") || errorMessage.includes("RESOURCE_ERROR"); + + if (isChannelError && retryCount < 1) { + console.log("[RabbitMQ] Channel error detected, retrying with new channel..."); + // Force the cached channel to be discarded by calling getSafeChannel which will create a new one + return RabbitMQ.publishEvent(id, payload, retryCount + 1); + } + + console.log("[RabbitMQ] ", e); + } + } +} + +export async function rabbitListen(channel: Channel, id: string, callback: (event: EventOpts) => unknown, opts?: { acknowledge?: boolean }): Promise<() => Promise> { + await channel.assertExchange(id, "fanout", { durable: false }); + const q = await channel.assertQueue("", { + exclusive: true, + autoDelete: true, + messageTtl: 5000, + }); + + const consumerTag = randomUUID(); + + const cancel = async () => { + try { + await channel.unbindQueue(q.queue, id, ""); + await channel.cancel(consumerTag); + } catch (e) { + console.log("[RabbitMQ] Error while cancelling channel (may be expected):", e instanceof Error ? e.message : e); + } + }; + + await channel.bindQueue(q.queue, id, ""); + await channel.consume( + q.queue, + (opts) => { + if (!opts) return; + + const data = JSON.parse(opts.content.toString()); + const event = opts.properties.type as EVENT; + + callback({ + event, + data, + acknowledge() { + channel.ack(opts); + }, + channel, + cancel, + }); + // rabbitCh.ack(opts); + }, + { + noAck: !opts?.acknowledge, + consumerTag: consumerTag, + }, + ); + + return cancel; } diff --git a/src/util/util/ipc/listener/BaseEventListener.ts b/src/util/util/ipc/listener/BaseEventListener.ts new file mode 100644 index 0000000000..2df611b2e4 --- /dev/null +++ b/src/util/util/ipc/listener/BaseEventListener.ts @@ -0,0 +1,25 @@ +/* + Spacebar: A FOSS re-implementation and extension of the Discord.com backend. + Copyright (C) 2026 Spacebar and Spacebar Contributors + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . +*/ + +import { EventOpts } from "@spacebar/util"; + +export abstract class BaseEventListener { + abstract init(): Promise; + abstract close(): Promise; + abstract listen(event: string, callback: (event: EventOpts) => unknown): Promise<() => Promise>; +} diff --git a/src/util/util/ipc/listener/RabbitMqSingleListener.ts b/src/util/util/ipc/listener/RabbitMqSingleListener.ts new file mode 100644 index 0000000000..b5fd5efd50 --- /dev/null +++ b/src/util/util/ipc/listener/RabbitMqSingleListener.ts @@ -0,0 +1,118 @@ +/* + Spacebar: A FOSS re-implementation and extension of the Discord.com backend. + Copyright (C) 2026 Spacebar and Spacebar Contributors + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . +*/ + +import EventEmitter from "node:events"; +import { BaseEventListener } from "./BaseEventListener"; +import { EVENT, Event, EventOpts, sleep } from "@spacebar/util"; +import amqp, { Channel, ChannelModel } from "amqplib"; +import { randomUUID } from "node:crypto"; + +export class RabbitMqSingleListener extends BaseEventListener { + private readonly host: string; + private connection?: ChannelModel; + private channel?: Channel; + eventEmitter: EventEmitter; + + constructor(host: string) { + super(); + this.eventEmitter = new EventEmitter(); + this.host = host; + } + + async init() { + while (!this.connection) { + try { + console.log(`[RabbitMQSingleListener] Connecting to: ${this.host}`); + this.connection = await amqp.connect(this.host, { + timeout: 1000 * 60, + noDelay: true, + }); + console.log(`[RabbitMQSingleListener] Connected to: ${this.host}`); + } catch (e) { + console.log(`[RabbitMQSingleListener] Failed to connect to to: ${this.host}: ${e}`); + await sleep(1000); + } + } + this.channel = await this.connection.createChannel(); + + for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) { + process.on(sig, this.close); + } + + this.connection.on("error", (err) => { + console.error("[RabbitMQSingleListener] Connection error:", err); + }); + + this.connection.on("close", () => { + console.error("[RabbitMQSingleListener] Connection closed"); + sleep(1000).then(() => { + this.init().catch((e) => console.error("[RabbitMQSingleListener] Failed to schedule reconnection:", e)); + }); + }); + + // actually set up event receiving? + await this.channel.assertExchange("-", "fanout", { durable: false }); + const q = await this.channel.assertQueue("-", { + exclusive: false, + autoDelete: true, + messageTtl: 5000, + }); + + const consumerTag = randomUUID(); + await this.channel.bindQueue(q.queue, "-", ""); + await this.channel.consume( + q.queue, + (opts) => { + if (!opts) return; + const data = JSON.parse(opts.content.toString()) as { id: EVENT; event: Event }; + + this.eventEmitter.emit(data.id, data.event); + }, + { + consumerTag, + }, + ); + } + + async close(): Promise { + await this.channel?.close(); + this.channel = undefined; + await this.connection?.close(); + this.connection = undefined; + } + + async listen(event: string, callback: (event: EventOpts) => unknown): Promise<() => Promise> { + const listener = (data: Event) => { + callback({ + ...data, + cancel, + }); + }; + + this.eventEmitter.addListener(event, listener); + + const cancel = async () => { + this.eventEmitter.removeListener(event, listener); + this.eventEmitter.setMaxListeners(this.eventEmitter.getMaxListeners() - 1); + }; + + this.eventEmitter.setMaxListeners(this.eventEmitter.getMaxListeners() + 1); + + return cancel; + } +} diff --git a/src/util/util/ipc/listener/UnixSocketListener.ts b/src/util/util/ipc/listener/UnixSocketListener.ts new file mode 100644 index 0000000000..8ead85bda5 --- /dev/null +++ b/src/util/util/ipc/listener/UnixSocketListener.ts @@ -0,0 +1,118 @@ +/* + Spacebar: A FOSS re-implementation and extension of the Discord.com backend. + Copyright (C) 2026 Spacebar and Spacebar Contributors + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . +*/ + +import EventEmitter from "node:events"; +import fs from "node:fs"; +import net, { Server } from "node:net"; +import { BaseEventListener } from "./BaseEventListener"; +import { EVENT, Event, EventOpts } from "@spacebar/util"; + +export class UnixSocketListener extends BaseEventListener { + eventEmitter: EventEmitter; + socketPath: string; + server: Server; + + constructor(socketPath: string) { + super(); + this.eventEmitter = new EventEmitter(); + this.socketPath = socketPath; + } + + async init() { + // remove stale socket file if it exists + // can happen if there's a PID conflict (across containers/PID namespaces) + try { + if (fs.existsSync(this.socketPath)) { + fs.unlinkSync(this.socketPath); + console.log("[UnixSocketListener] Removed stale socket file:", this.socketPath); + } + } catch (e) { + console.error("[UnixSocketListener] Failed to remove stale socket:", e); + } + + this.server = net.createServer((socket) => { + socket.on("connect", () => { + console.log("[UnixSocketListener] Unix socket client connected"); + }); + let buffer = Buffer.alloc(0); + socket.on("data", (data: Buffer) => { + buffer = Buffer.concat([buffer, data]); + while (buffer.length >= 4) { + const msgLen = buffer.readUInt32BE(0); + if (buffer.length < 4 + msgLen) break; + const msgBuf = buffer.subarray(4, 4 + msgLen); + buffer = buffer.subarray(4 + msgLen); + try { + const payload = JSON.parse(msgBuf.toString()) as { id: EVENT; event: Event }; + this.eventEmitter.emit(payload.id, payload.event); + } catch (e) { + console.error("[UnixSocketListener] Failed to parse unix socket data:", e); + } + } + }); + socket.on("error", (err) => { + console.error("[UnixSocketListener] Unix socket error:", err); + }); + socket.on("close", () => { + console.log("[UnixSocketListener] Unix socket client disconnected"); + }); + }); + + this.server.listen(this.socketPath, () => { + console.log(`Unix socket server listening on ${this.socketPath}`); + }); + + for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) { + process.on(sig, this.close); + } + } + + async close(): Promise { + console.log("[UnixSocketListener] Closing unix socket server"); + this.server.close(); + + // clean up socket file + try { + fs.unlinkSync(this.socketPath); + } catch (e) { + console.error("[UnixSocketListener] Failed to unlink socket file:", e); + } + + process.exit(0); + } + + async listen(event: string, callback: (event: EventOpts) => unknown): Promise<() => Promise> { + const listener = (data: Event) => { + callback({ + ...data, + cancel, + }); + }; + + this.eventEmitter.addListener(event, listener); + + const cancel = async () => { + this.eventEmitter.removeListener(event, listener); + this.eventEmitter.setMaxListeners(this.eventEmitter.getMaxListeners() - 1); + }; + + this.eventEmitter.setMaxListeners(this.eventEmitter.getMaxListeners() + 1); + + return cancel; + } +} diff --git a/src/util/util/ipc/writer/BaseEventWriter.ts b/src/util/util/ipc/writer/BaseEventWriter.ts new file mode 100644 index 0000000000..7349263d45 --- /dev/null +++ b/src/util/util/ipc/writer/BaseEventWriter.ts @@ -0,0 +1,25 @@ +/* + Spacebar: A FOSS re-implementation and extension of the Discord.com backend. + Copyright (C) 2026 Spacebar and Spacebar Contributors + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . +*/ + +import { Event } from "@spacebar/util"; + +export abstract class BaseEventWriter { + abstract init(): Promise; + abstract close(): Promise; + abstract emit(event: Event): Promise; +} diff --git a/src/util/util/ipc/writer/RabbitMqSingleWriter.ts b/src/util/util/ipc/writer/RabbitMqSingleWriter.ts new file mode 100644 index 0000000000..28781c6dbd --- /dev/null +++ b/src/util/util/ipc/writer/RabbitMqSingleWriter.ts @@ -0,0 +1,103 @@ +/* + Spacebar: A FOSS re-implementation and extension of the Discord.com backend. + Copyright (C) 2026 Spacebar and Spacebar Contributors + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . +*/ + +import { BaseEventWriter } from "./BaseEventWriter"; +import amqp, { Channel, ChannelModel } from "amqplib"; +import { Event, sleep } from "@spacebar/util"; + +export class RabbitMqSingleWriter extends BaseEventWriter { + private readonly host: string; + private connection?: ChannelModel; + private channel?: Channel; + + constructor(host: string) { + super(); + this.host = host; + } + + async init(): Promise { + while (!this.connection) { + try { + console.log(`[RabbitMQSingleWriter] Connecting to: ${this.host}`); + this.connection = await amqp.connect(this.host, { + timeout: 1000 * 60, + noDelay: true, + }); + console.log(`[RabbitMQSingleWriter] Connected to: ${this.host}`); + } catch (e) { + console.log(`[RabbitMQSingleWriter] Failed to connect to to: ${this.host}: ${e}`); + await sleep(1000); + } + } + this.channel = await this.connection.createChannel(); + + for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) { + process.on(sig, this.close); + } + + this.connection.on("error", (err) => { + console.error("[RabbitMQSingleWriter] Connection error:", err); + }); + + this.connection.on("close", () => { + console.error("[RabbitMQSingleWriter] Connection closed"); + sleep(1000).then(() => { + this.init().catch((e) => console.error("[RabbitMQSingleWriter] Failed to schedule reconnection:", e)); + }); + }); + } + + async close(): Promise { + await this.channel?.close(); + this.channel = undefined; + await this.connection?.close(); + this.connection = undefined; + } + + async emit(event: Event): Promise { + if (!this.connection) { + throw new Error("RabbitMqSingleWriter#emit called without connection being initialised!"); + } + if (!this.channel) { + throw new Error("RabbitMqSingleWriter#emit called without channel being initialised!"); + } + + // todo check if channel is closed + if ((this.channel as unknown as { closed?: boolean }).closed) this.channel = await this.connection.createChannel(); + await this.channel.assertExchange("-", "fanout", { + durable: false, // ensure that messages arent written to disk + }); + + let success = false; + try { + success = this.channel.publish( + "-", + "", + Buffer.from(JSON.stringify({ id: (event.guild_id || event.channel_id || event.user_id || event.session_id) as string, event })), + {}, + ); + } catch (e) { + console.error("[RabbitMqSingleWriter] Got error while publishing event:", e); + } + + if (!success) { + console.log("[RabbitMqSingleWriter] Publishing message was not successful, retrying..."); + await this.emit(event); + } + } +} diff --git a/src/util/util/ipc/writer/UnixSocketWriter.ts b/src/util/util/ipc/writer/UnixSocketWriter.ts new file mode 100644 index 0000000000..53b9564ce2 --- /dev/null +++ b/src/util/util/ipc/writer/UnixSocketWriter.ts @@ -0,0 +1,235 @@ +/* + Spacebar: A FOSS re-implementation and extension of the Discord.com backend. + Copyright (C) 2026 Spacebar and Spacebar Contributors + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . +*/ + +import net, { Socket } from "node:net"; +import fs, { FSWatcher } from "node:fs"; +import path from "node:path"; +import { red } from "picocolors"; +import { BaseEventWriter } from "./BaseEventWriter"; +import { Event, Stopwatch } from "@spacebar/util"; + +export class UnixSocketWriter extends BaseEventWriter { + socketPath: string; + clients: { [key: string]: Socket } = {}; + watcher?: FSWatcher; + backlog: Event[] = []; + broadcastLock: Promise = Promise.resolve(); + replayLock: Promise = Promise.resolve(); + isInitializing = true; + + constructor(socketPath: string) { + super(); + this.socketPath = socketPath; + } + + async init() { + if (!fs.opendirSync(this.socketPath)) throw new Error("Unix socket path does not exist or is not a directory: " + this.socketPath); + + console.log("[UnixSocketWriter] Unix socket writer initializing for", this.socketPath); + + const connect = (file: string) => { + const fullPath = path.join(this.socketPath, file); + const pid = Number(path.basename(file, ".sock")); + console.log("[UnixSocketWriter] Attempting to connect to unix socket:", fullPath, "| proc:", getPidCmdline(pid) ?? red("No such pid: " + pid)); + + // avoid duplicate connections + if (this.clients[fullPath] && !this.clients[fullPath].destroyed) { + console.log("[UnixSocketWriter] Unix socket client already connected to", fullPath); + return; + } + + // clean up old connection if it exists + if (this.clients[fullPath]) { + console.log("[UnixSocketWriter] Removing stale unix socket client for", fullPath); + try { + this.clients[fullPath].destroy(); + } catch (e) { + // ignore + } + delete this.clients[fullPath]; + } + + // check if it's actually a socket file (not a ghost/regular file) + try { + const stats = fs.statSync(fullPath); + if (!stats.isSocket()) { + console.log("[UnixSocketWriter] Ignoring non-socket file:", fullPath); + return; + } + } catch (e) { + console.log("[UnixSocketWriter] Cannot stat socket file:", fullPath); + return; + } + + try { + this.clients[fullPath] = net.createConnection(fullPath, () => { + console.log("[UnixSocketWriter] Unix socket client connected to", fullPath); + }); + + this.clients[fullPath].on("error", (err) => { + console.error("[UnixSocketWriter] Unix socket client error on", fullPath, ":", err); + // clean up after error + if (this.clients[fullPath]) { + delete this.clients[fullPath]; + } + }); + + // handle clean socket closure + this.clients[fullPath].on("close", () => { + console.log("[UnixSocketWriter] Unix socket client closed:", fullPath); + delete this.clients[fullPath]; + }); + } catch (e) { + console.error("[UnixSocketWriter] Failed to create connection to", fullPath, ":", e); + delete this.clients[fullPath]; + } + }; + + // connect to all sockets, now and in the future + this.watcher = fs.watch(this.socketPath, {}, (eventType, filename) => { + console.log("[UnixSocketWriter] Unix socket writer received watch sig", eventType, filename); + if (eventType === "rename" && filename?.endsWith(".sock")) { + try { + const fullPath = path.join(this.socketPath, filename!); + if (fs.existsSync(fullPath)) { + connect(filename!); + } else { + if (this.clients[fullPath]) { + console.log("[UnixSocketWriter] Unix socket writer detected removed socket:", fullPath); + try { + this.clients[fullPath].destroy(); + } catch (e) { + // socket may already be destroyed + } + delete this.clients[fullPath]; + } + } + } catch (e) { + // don't + } + } + }); + + this.watcher.on("error", (err) => { + console.error("[UnixSocketWriter] Unix socket watcher error:", err); + }); + + // connect to existing sockets if any + try { + const files = fs.readdirSync(this.socketPath); + console.log("[UnixSocketWriter] Unix socket writer found existing sockets:", files); + files.forEach((file) => { + if (file.endsWith(".sock")) { + connect(file); + } + }); + } catch (err) { + console.error("[UnixSocketWriter] Unix socket writer failed to read directory:", err); + } + + this.isInitializing = false; + } + + async emit(event: Event) { + if (!this.clients) throw new Error("UnixSocketWriter not initialized"); + + // check if there are any listeners + const clientCount = Object.entries(this.clients).length; + if (clientCount === 0) { + console.warn("[UnixSocketWriter] Unix socket writer has no connected clients to emit to, backlog size:", this.backlog.length + 1); + this.backlog.push(event); + if (!this.isInitializing) { + this.isInitializing = true; + console.log("[UnixSocketWriter] Re-initializing unix socket writer due to new event with no listeners"); + await this.close(); + await this.init(); + } + return; + } + + await this.replayLock; + await (this.replayLock = Promise.resolve().then(async () => { + if (this.backlog.length > 0) { + console.log(`[UnixSocketWriter] Replaying ${this.backlog.length} backlog events`); + for (const backlogEvent of this.backlog) { + await this.broadcast(backlogEvent); + } + this.backlog = []; + } + })); + + await this.broadcast(event); + } + + private async broadcast(event: Event) { + await this.broadcastLock; + return await (this.broadcastLock = new Promise((res) => { + const tsw = Stopwatch.startNew(); + const payloadBuf = Buffer.from(JSON.stringify({ id: (event.guild_id || event.channel_id || event.user_id || event.session_id) as string, event })); + const lenBuf = Buffer.alloc(4); + lenBuf.writeUInt32BE(payloadBuf.length, 0); + const framed = Buffer.concat([lenBuf, payloadBuf]); + + for (const [socketPath, socket] of Object.entries(this.clients)) { + if (socket.destroyed) { + console.log("[UnixSocketWriter] Unix socket writer found destroyed socket, removing:", socketPath); + delete this.clients[socketPath]; + continue; + } + + try { + socket.write(framed); + } catch (e) { + console.error("[UnixSocketWriter] Unix socket writer failed to write to socket", socketPath, ":", e); + } + } + + if (tsw.elapsed().totalMilliseconds > 5) + // else it's too noisy + console.log(`[UnixSocketWriter] Unix socket writer emitted to ${Object.entries(this.clients).length} sockets in ${tsw.elapsed().totalMilliseconds}ms`); + res(); + })); + } + + async close() { + console.log("[UnixSocketWriter] Closing Unix socket writer"); + + if (this.watcher) { + this.watcher.close(); + this.watcher = undefined; + } + + for (const [path, socket] of Object.entries(this.clients)) { + try { + socket.destroy(); + } catch (e) { + console.error("[UnixSocketWriter] Error closing socket", path, ":", e); + } + } + this.clients = {}; + } +} + +function getPidCmdline(pid: number): string | null { + try { + const cmdline = fs.readFileSync(`/proc/${pid}/cmdline`, "utf-8"); + return cmdline.replaceAll("\0", " ").trim(); + } catch (e) { + return null; + } +}