From 6756e7c6f69a936f9be64272d25a9423d4d9e03f Mon Sep 17 00:00:00 2001 From: Rory& Date: Tue, 12 May 2026 21:19:22 +0200 Subject: [PATCH 01/10] Split unix socket IPC to separate files, add interfaces --- src/util/util/Event.ts | 529 ------------------ src/util/util/index.ts | 2 +- src/util/util/ipc/Event.ts | 221 ++++++++ .../util/ipc/listener/BaseEventListener.ts | 26 + .../util/ipc/listener/UnixSocketListener.ts | 118 ++++ src/util/util/ipc/writer/BaseEventWriter.ts | 26 + src/util/util/ipc/writer/UnixSocketWriter.ts | 235 ++++++++ 7 files changed, 627 insertions(+), 530 deletions(-) delete mode 100644 src/util/util/Event.ts create mode 100644 src/util/util/ipc/Event.ts create mode 100644 src/util/util/ipc/listener/BaseEventListener.ts create mode 100644 src/util/util/ipc/listener/UnixSocketListener.ts create mode 100644 src/util/util/ipc/writer/BaseEventWriter.ts create mode 100644 src/util/util/ipc/writer/UnixSocketWriter.ts diff --git a/src/util/util/Event.ts b/src/util/util/Event.ts deleted file mode 100644 index 0e66288649..0000000000 --- a/src/util/util/Event.ts +++ /dev/null @@ -1,529 +0,0 @@ -/* - Spacebar: A FOSS re-implementation and extension of the Discord.com backend. - Copyright (C) 2023 Spacebar and Spacebar Contributors - - This program is free software: you can redistribute it and/or modify - it under the terms of the GNU Affero General Public License as published - by the Free Software Foundation, either version 3 of the License, or - (at your option) any later version. - - This program is distributed in the hope that it will be useful, - but WITHOUT ANY WARRANTY; without even the implied warranty of - MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - GNU Affero General Public License for more details. - - You should have received a copy of the GNU Affero General Public License - along with this program. If not, see . -*/ - -import { Channel } from "amqplib"; -import { RabbitMQ } from "./RabbitMQ"; -import EventEmitter from "node:events"; -import { EVENT, Event } from "../interfaces"; -import { randomUUID } from "node:crypto"; -import path from "node:path"; -import net, { Socket } from "node:net"; -import fs, { FSWatcher } from "node:fs"; -import { Stopwatch } from "./Stopwatch"; -import { Config } from "./Config"; -import { red } from "picocolors"; - -export const events = new EventEmitter(); -let unixSocketListener: UnixSocketListener | null = null; -let unixSocketWriter: UnixSocketWriter | null = null; - -export async function emitEvent(payload: Omit) { - const id = (payload.guild_id || payload.channel_id || payload.user_id || payload.session_id) as string; - if (!id) return console.error("event doesn't contain any id", payload); - - if (RabbitMQ.connection) { - const data = typeof payload.data === "object" ? JSON.stringify(payload.data) : payload.data; // use rabbitmq for event transmission - - const publishEvent = async (retryCount = 0): Promise => { - const channel = await RabbitMQ.getSafeChannel(); - try { - await channel.assertExchange(id, "fanout", { - durable: false, - }); - - // assertQueue isn't needed, because a queue will automatically created if it doesn't exist - const successful = channel.publish(id, "", Buffer.from(`${data}`), { type: payload.event }); - if (!successful) throw new Error("failed to send event"); - } catch (e) { - // Check if this is a channel closed error and if we should retry - const errorMessage = e instanceof Error ? e.message : String(e); - const isChannelError = errorMessage.includes("Channel closed") || errorMessage.includes("IllegalOperationError") || errorMessage.includes("RESOURCE_ERROR"); - - if (isChannelError && retryCount < 1) { - console.log("[RabbitMQ] Channel error detected, retrying with new channel..."); - // Force the cached channel to be discarded by calling getSafeChannel which will create a new one - return publishEvent(retryCount + 1); - } - - console.log("[RabbitMQ] ", e); - } - }; - - await publishEvent(); - } else if (process.env.EVENT_TRANSMISSION === "unix" && process.env.EVENT_SOCKET_PATH) { - if (!unixSocketWriter) { - console.error("[Event] Unix socket writer not initialized, cannot emit event!"); - throw new Error("Unix socket writer not initialized"); - } - await unixSocketWriter.emit(payload); - } else if (process.env.EVENT_TRANSMISSION === "process") { - process.send?.({ type: "event", event: payload, id } as ProcessEvent); - } else { - events.emit(id, payload); - } -} - -export async function initEvent() { - await RabbitMQ.init(); // does nothing if rabbitmq is not setup - - if (process.env.EVENT_TRANSMISSION === "unix" && process.env.EVENT_SOCKET_PATH) { - if (!unixSocketWriter) { - unixSocketWriter = new UnixSocketWriter(process.env.EVENT_SOCKET_PATH); - await unixSocketWriter.init(); - } - } - - // Set up the spacebar event listener (used for config reload, etc.) - const setupSpacebarListener = async () => { - console.log("[Event] Setting up spacebar event listener"); - await listenEvent("spacebar", async (event) => { - console.log("[Event] Received spacebar event:", event); - if ((event.event as string) === "SB_RELOAD_CONFIG") { - console.log("[Event] Reloading config due to RELOAD_CONFIG event"); - await Config.init(true); - } - }); - }; - - // Initial setup - await setupSpacebarListener(); - - // Re-establish listener on reconnection - RabbitMQ.on("reconnected", async () => { - console.log("[Event] RabbitMQ reconnected, re-establishing spacebar listener"); - await setupSpacebarListener(); - }); -} - -export interface EventOpts extends Event { - acknowledge?: () => unknown; - channel?: Channel; - cancel: (id?: string) => unknown; -} - -export interface ListenEventOpts { - channel?: Channel; - acknowledge?: boolean; -} - -export interface ProcessEvent { - type: "event"; - event: Event; - id: string; -} - -export async function listenEvent(event: string, callback: (event: EventOpts) => unknown, opts?: ListenEventOpts): Promise<() => Promise> { - if (RabbitMQ.connection) { - const rabbitMQChannel = await RabbitMQ.getSafeChannel(); - const channel = opts?.channel || rabbitMQChannel; - if (!channel) throw new Error("[Events] An event was sent without an associated channel"); - return await rabbitListen(channel, event, callback, { - acknowledge: opts?.acknowledge, - }); - } else if (process.env.EVENT_TRANSMISSION === "unix" && process.env.EVENT_SOCKET_PATH) { - if (!unixSocketListener) { - unixSocketListener = new UnixSocketListener(path.join(process.env.EVENT_SOCKET_PATH, `${process.pid}.sock`)); - await unixSocketListener.init(); - } - return await unixSocketListener.listen(event, callback); - } else if (process.env.EVENT_TRANSMISSION === "process") { - const cancel = async () => { - process.removeListener("message", listener); - process.setMaxListeners(process.getMaxListeners() - 1); - }; - - const listener = (msg: ProcessEvent) => { - // eslint-disable-next-line @typescript-eslint/no-unused-expressions - msg.type === "event" && msg.id === event && callback({ ...msg.event, cancel }); - }; - - // TODO: assert the type is correct? - process.addListener("message", (msg) => listener(msg as ProcessEvent)); - process.setMaxListeners(process.getMaxListeners() + 1); - - return cancel; - } else { - const listener = (opts: EventOpts) => callback({ ...opts, cancel }); - const cancel = async () => { - events.removeListener(event, listener); - events.setMaxListeners(events.getMaxListeners() - 1); - }; - events.setMaxListeners(events.getMaxListeners() + 1); - events.addListener(event, listener); - - return cancel; - } -} - -async function rabbitListen(channel: Channel, id: string, callback: (event: EventOpts) => unknown, opts?: { acknowledge?: boolean }): Promise<() => Promise> { - await channel.assertExchange(id, "fanout", { durable: false }); - // messageTtl ensures any orphaned messages are cleaned up quickly if the consumer disconnects. - const q = await channel.assertQueue("", { - exclusive: true, - autoDelete: true, - messageTtl: 5000, // Messages expire after 5 seconds if not consumed - }); - - const consumerTag = randomUUID(); - - const cancel = async () => { - try { - // Order matters here to prevent RESOURCE_ERROR, due to potential race condition: - // 1. Unbind first - stops new messages from being routed to this queue - await channel.unbindQueue(q.queue, id, ""); - // 2. Cancel consumer - with autoDelete: true, this triggers queue deletion - // after RabbitMQ ensures no messages are in-flight to this queue - await channel.cancel(consumerTag); - // Don't explicitly delete the queue - let autoDelete handle it safely. - // Explicitly deleting can race with in-flight message delivery. - } catch (e) { - // Channel might already be closed or queue already deleted - that's fine - console.log("[RabbitMQ] Error during consumer cancel (may be expected):", e instanceof Error ? e.message : e); - } - }; - - await channel.bindQueue(q.queue, id, ""); - await channel.consume( - q.queue, - (opts) => { - if (!opts) return; - - const data = JSON.parse(opts.content.toString()); - const event = opts.properties.type as EVENT; - - callback({ - event, - data, - acknowledge() { - channel.ack(opts); - }, - channel, - cancel, - }); - // rabbitCh.ack(opts); - }, - { - noAck: !opts?.acknowledge, - consumerTag: consumerTag, - }, - ); - - return cancel; -} - -class UnixSocketListener { - eventEmitter: EventEmitter; - socketPath: string; - - constructor(socketPath: string) { - this.eventEmitter = new EventEmitter(); - this.socketPath = socketPath; - } - - async init() { - // remove stale socket file if it exists - // can happen if there's a PID conflict (across containers/PID namespaces) - try { - if (fs.existsSync(this.socketPath)) { - fs.unlinkSync(this.socketPath); - console.log("[Events] Removed stale socket file:", this.socketPath); - } - } catch (e) { - console.error("[Events] Failed to remove stale socket:", e); - } - - const server = net.createServer((socket) => { - socket.on("connect", () => { - console.log("[Events] Unix socket client connected"); - }); - let buffer = Buffer.alloc(0); - socket.on("data", (data: Buffer) => { - buffer = Buffer.concat([buffer, data]); - while (buffer.length >= 4) { - const msgLen = buffer.readUInt32BE(0); - if (buffer.length < 4 + msgLen) break; - const msgBuf = buffer.subarray(4, 4 + msgLen); - buffer = buffer.subarray(4 + msgLen); - try { - const payload = JSON.parse(msgBuf.toString()); - this.eventEmitter.emit(payload.id, payload.event); - } catch (e) { - console.error("[Events] Failed to parse unix socket data:", e); - } - } - }); - socket.on("error", (err) => { - console.error("[Events] Unix socket error:", err); - }); - socket.on("close", () => { - console.log("[Events] Unix socket client disconnected"); - }); - }); - - server.listen(this.socketPath, () => { - console.log(`Unix socket server listening on ${this.socketPath}`); - }); - - const shutdown = () => { - console.log("[Events] Closing unix socket server"); - server.close(); - - // clean up socket file - try { - fs.unlinkSync(this.socketPath); - } catch (e) { - console.error("[Events] Failed to unlink socket file:", e); - } - - process.exit(0); - }; - for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) { - process.on(sig, shutdown); - } - } - - async listen(event: string, callback: (event: EventOpts) => unknown): Promise<() => Promise> { - const listener = (data: Event) => { - callback({ - ...data, - cancel, - }); - }; - - this.eventEmitter.addListener(event, listener); - - const cancel = async () => { - this.eventEmitter.removeListener(event, listener); - this.eventEmitter.setMaxListeners(this.eventEmitter.getMaxListeners() - 1); - }; - - this.eventEmitter.setMaxListeners(this.eventEmitter.getMaxListeners() + 1); - - return cancel; - } -} - -function getPidCmdline(pid: number): string | null { - try { - const cmdline = fs.readFileSync(`/proc/${pid}/cmdline`, "utf-8"); - return cmdline.replaceAll("\0", " ").trim(); - } catch (e) { - return null; - } -} - -class UnixSocketWriter { - socketPath: string; - clients: { [key: string]: Socket } = {}; - watcher?: FSWatcher; - backlog: Event[] = []; - broadcastLock: Promise = Promise.resolve(); - replayLock: Promise = Promise.resolve(); - isInitializing = true; - - constructor(socketPath: string) { - this.socketPath = socketPath; - } - - async init() { - if (!fs.opendirSync(this.socketPath)) throw new Error("Unix socket path does not exist or is not a directory: " + this.socketPath); - - console.log("[Events] Unix socket writer initializing for", this.socketPath); - - const connect = (file: string) => { - const fullPath = path.join(this.socketPath, file); - const pid = Number(path.basename(file, ".sock")); - console.log("[Events] Attempting to connect to unix socket:", fullPath, "| proc:", getPidCmdline(pid) ?? red("No such pid: " + pid)); - - // avoid duplicate connections - if (this.clients[fullPath] && !this.clients[fullPath].destroyed) { - console.log("[Events] Unix socket client already connected to", fullPath); - return; - } - - // clean up old connection if it exists - if (this.clients[fullPath]) { - console.log("[Events] Removing stale unix socket client for", fullPath); - try { - this.clients[fullPath].destroy(); - } catch (e) { - // ignore - } - delete this.clients[fullPath]; - } - - // check if it's actually a socket file (not a ghost/regular file) - try { - const stats = fs.statSync(fullPath); - if (!stats.isSocket()) { - console.log("[Events] Ignoring non-socket file:", fullPath); - return; - } - } catch (e) { - console.log("[Events] Cannot stat socket file:", fullPath); - return; - } - - try { - this.clients[fullPath] = net.createConnection(fullPath, () => { - console.log("[Events] Unix socket client connected to", fullPath); - }); - - this.clients[fullPath].on("error", (err) => { - console.error("[Events] Unix socket client error on", fullPath, ":", err); - // clean up after error - if (this.clients[fullPath]) { - delete this.clients[fullPath]; - } - }); - - // handle clean socket closure - this.clients[fullPath].on("close", () => { - console.log("[Events] Unix socket client closed:", fullPath); - delete this.clients[fullPath]; - }); - } catch (e) { - console.error("[Events] Failed to create connection to", fullPath, ":", e); - delete this.clients[fullPath]; - } - }; - - // connect to all sockets, now and in the future - this.watcher = fs.watch(this.socketPath, {}, (eventType, filename) => { - console.log("[Events] Unix socket writer received watch sig", eventType, filename); - if (eventType === "rename" && filename?.endsWith(".sock")) { - try { - const fullPath = path.join(this.socketPath, filename!); - if (fs.existsSync(fullPath)) { - connect(filename!); - } else { - if (this.clients[fullPath]) { - console.log("[Events] Unix socket writer detected removed socket:", fullPath); - try { - this.clients[fullPath].destroy(); - } catch (e) { - // socket may already be destroyed - } - delete this.clients[fullPath]; - } - } - } catch (e) { - // don't - } - } - }); - - this.watcher.on("error", (err) => { - console.error("[Events] Unix socket watcher error:", err); - }); - - // connect to existing sockets if any - try { - const files = fs.readdirSync(this.socketPath); - console.log("[Events] Unix socket writer found existing sockets:", files); - files.forEach((file) => { - if (file.endsWith(".sock")) { - connect(file); - } - }); - } catch (err) { - console.error("[Events] Unix socket writer failed to read directory:", err); - } - - this.isInitializing = false; - } - - async emit(event: Event) { - if (!this.clients) throw new Error("UnixSocketWriter not initialized"); - - // check if there are any listeners - const clientCount = Object.entries(this.clients).length; - if (clientCount === 0) { - console.warn("[Events] Unix socket writer has no connected clients to emit to, backlog size:", this.backlog.length + 1); - this.backlog.push(event); - if (!this.isInitializing) { - this.isInitializing = true; - console.log("[Events] Re-initializing unix socket writer due to new event with no listeners"); - await this.close(); - await this.init(); - } - return; - } - - await this.replayLock; - await (this.replayLock = Promise.resolve().then(async () => { - if (this.backlog.length > 0) { - console.log(`[Events] Replaying ${this.backlog.length} backlog events`); - for (const backlogEvent of this.backlog) { - await this.broadcast(backlogEvent); - } - this.backlog = []; - } - })); - - await this.broadcast(event); - } - - private async broadcast(event: Event) { - await this.broadcastLock; - return await (this.broadcastLock = new Promise((res) => { - const tsw = Stopwatch.startNew(); - const payloadBuf = Buffer.from(JSON.stringify({ id: (event.guild_id || event.channel_id || event.user_id || event.session_id) as string, event })); - const lenBuf = Buffer.alloc(4); - lenBuf.writeUInt32BE(payloadBuf.length, 0); - const framed = Buffer.concat([lenBuf, payloadBuf]); - - for (const [socketPath, socket] of Object.entries(this.clients)) { - if (socket.destroyed) { - console.log("[Events] Unix socket writer found destroyed socket, removing:", socketPath); - delete this.clients[socketPath]; - continue; - } - - try { - socket.write(framed); - } catch (e) { - console.error("[Events] Unix socket writer failed to write to socket", socketPath, ":", e); - } - } - - if (tsw.elapsed().totalMilliseconds > 5) - // else it's too noisy - console.log(`[Events] Unix socket writer emitted to ${Object.entries(this.clients).length} sockets in ${tsw.elapsed().totalMilliseconds}ms`); - res(); - })); - } - - async close() { - console.log("[Events] Closing Unix socket writer"); - - if (this.watcher) { - this.watcher.close(); - this.watcher = undefined; - } - - for (const [path, socket] of Object.entries(this.clients)) { - try { - socket.destroy(); - } catch (e) { - console.error("[Events] Error closing socket", path, ":", e); - } - } - this.clients = {}; - } -} diff --git a/src/util/util/index.ts b/src/util/util/index.ts index 368fcc509d..9daef50b13 100644 --- a/src/util/util/index.ts +++ b/src/util/util/index.ts @@ -27,7 +27,7 @@ export * from "./Database"; export * from "./DateBuilder"; export * from "./email"; export * from "./ElapsedTime"; -export * from "./Event"; +export * from "./ipc/Event"; export * from "./FieldError"; export * from "./Intents"; export * from "./InvisibleCharacters"; diff --git a/src/util/util/ipc/Event.ts b/src/util/util/ipc/Event.ts new file mode 100644 index 0000000000..a332d6e0c6 --- /dev/null +++ b/src/util/util/ipc/Event.ts @@ -0,0 +1,221 @@ +/* + Spacebar: A FOSS re-implementation and extension of the Discord.com backend. + Copyright (C) 2023 Spacebar and Spacebar Contributors + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . +*/ + +import { Channel } from "amqplib"; +import { randomUUID } from "node:crypto"; +import EventEmitter from "node:events"; +import path from "node:path"; +import { RabbitMQ } from "../RabbitMQ"; +import { EVENT, Event } from "../../interfaces"; +import { Config } from "../Config"; +import { BaseEventListener } from "./listener/BaseEventListener"; +import { BaseEventWriter } from "./writer/BaseEventWriter"; +import { UnixSocketWriter } from "./writer/UnixSocketWriter"; +import { UnixSocketListener } from "./listener/UnixSocketListener"; + +export const events = new EventEmitter(); +let listener: BaseEventListener | null = null; +let writer: BaseEventWriter | null = null; +let unixSocketListener: UnixSocketListener | null = null; +let unixSocketWriter: UnixSocketWriter | null = null; + +export async function emitEvent(payload: Omit) { + const id = (payload.guild_id || payload.channel_id || payload.user_id || payload.session_id) as string; + if (!id) return console.error("event doesn't contain any id", payload); + + if (RabbitMQ.connection) { + const data = typeof payload.data === "object" ? JSON.stringify(payload.data) : payload.data; // use rabbitmq for event transmission + + const publishEvent = async (retryCount = 0): Promise => { + const channel = await RabbitMQ.getSafeChannel(); + try { + await channel.assertExchange(id, "fanout", { + durable: false, + }); + + // assertQueue isn't needed, because a queue will automatically created if it doesn't exist + const successful = channel.publish(id, "", Buffer.from(`${data}`), { type: payload.event }); + if (!successful) throw new Error("failed to send event"); + } catch (e) { + // Check if this is a channel closed error and if we should retry + const errorMessage = e instanceof Error ? e.message : String(e); + const isChannelError = errorMessage.includes("Channel closed") || errorMessage.includes("IllegalOperationError") || errorMessage.includes("RESOURCE_ERROR"); + + if (isChannelError && retryCount < 1) { + console.log("[RabbitMQ] Channel error detected, retrying with new channel..."); + // Force the cached channel to be discarded by calling getSafeChannel which will create a new one + return publishEvent(retryCount + 1); + } + + console.log("[RabbitMQ] ", e); + } + }; + + await publishEvent(); + } else if (process.env.EVENT_TRANSMISSION === "unix" && process.env.EVENT_SOCKET_PATH) { + if (!unixSocketWriter) { + console.error("[Event] Unix socket writer not initialized, cannot emit event!"); + throw new Error("Unix socket writer not initialized"); + } + await unixSocketWriter.emit(payload); + } else if (process.env.EVENT_TRANSMISSION === "process") { + process.send?.({ type: "event", event: payload, id } as ProcessEvent); + } else { + events.emit(id, payload); + } +} + +export async function initEvent() { + await RabbitMQ.init(); // does nothing if rabbitmq is not setup + + if (process.env.EVENT_TRANSMISSION === "unix" && process.env.EVENT_SOCKET_PATH) { + if (!unixSocketWriter) { + writer = unixSocketWriter = new UnixSocketWriter(process.env.EVENT_SOCKET_PATH); + await unixSocketWriter.init(); + } + } + + // Set up the spacebar event listener (used for config reload, etc.) + const setupSpacebarListener = async () => { + console.log("[Event] Setting up spacebar event listener"); + await listenEvent("spacebar", async (event) => { + console.log("[Event] Received spacebar event:", event); + if ((event.event as string) === "SB_RELOAD_CONFIG") { + console.log("[Event] Reloading config due to RELOAD_CONFIG event"); + await Config.init(true); + } + }); + }; + + // Initial setup + await setupSpacebarListener(); + + // Re-establish listener on reconnection + RabbitMQ.on("reconnected", async () => { + console.log("[Event] RabbitMQ reconnected, re-establishing spacebar listener"); + await setupSpacebarListener(); + }); +} + +export interface EventOpts extends Event { + acknowledge?: () => unknown; + channel?: Channel; + cancel: (id?: string) => unknown; +} + +export interface ListenEventOpts { + channel?: Channel; + acknowledge?: boolean; +} + +export interface ProcessEvent { + type: "event"; + event: Event; + id: string; +} + +export async function listenEvent(event: string, callback: (event: EventOpts) => unknown, opts?: ListenEventOpts): Promise<() => Promise> { + if (RabbitMQ.connection) { + const rabbitMQChannel = await RabbitMQ.getSafeChannel(); + const channel = opts?.channel || rabbitMQChannel; + if (!channel) throw new Error("[Events] An event was sent without an associated channel"); + return await rabbitListen(channel, event, callback, { + acknowledge: opts?.acknowledge, + }); + } else if (process.env.EVENT_TRANSMISSION === "unix" && process.env.EVENT_SOCKET_PATH) { + if (!unixSocketListener) { + listener = unixSocketListener = new UnixSocketListener(path.join(process.env.EVENT_SOCKET_PATH, `${process.pid}.sock`)); + await unixSocketListener.init(); + } + return await unixSocketListener.listen(event, callback); + } else if (process.env.EVENT_TRANSMISSION === "process") { + const cancel = async () => { + process.removeListener("message", listener); + process.setMaxListeners(process.getMaxListeners() - 1); + }; + + const listener = (msg: ProcessEvent) => { + // eslint-disable-next-line @typescript-eslint/no-unused-expressions + msg.type === "event" && msg.id === event && callback({ ...msg.event, cancel }); + }; + + // TODO: assert the type is correct? + process.addListener("message", (msg) => listener(msg as ProcessEvent)); + process.setMaxListeners(process.getMaxListeners() + 1); + + return cancel; + } else { + const listener = (opts: EventOpts) => callback({ ...opts, cancel }); + const cancel = async () => { + events.removeListener(event, listener); + events.setMaxListeners(events.getMaxListeners() - 1); + }; + events.setMaxListeners(events.getMaxListeners() + 1); + events.addListener(event, listener); + + return cancel; + } +} + +async function rabbitListen(channel: Channel, id: string, callback: (event: EventOpts) => unknown, opts?: { acknowledge?: boolean }): Promise<() => Promise> { + await channel.assertExchange(id, "fanout", { durable: false }); + const q = await channel.assertQueue("", { + exclusive: true, + autoDelete: true, + messageTtl: 5000, + }); + + const consumerTag = randomUUID(); + + const cancel = async () => { + try { + await channel.unbindQueue(q.queue, id, ""); + await channel.cancel(consumerTag); + } catch (e) { + console.log("[RabbitMQ] Error while cancelling channel (may be expected):", e instanceof Error ? e.message : e); + } + }; + + await channel.bindQueue(q.queue, id, ""); + await channel.consume( + q.queue, + (opts) => { + if (!opts) return; + + const data = JSON.parse(opts.content.toString()); + const event = opts.properties.type as EVENT; + + callback({ + event, + data, + acknowledge() { + channel.ack(opts); + }, + channel, + cancel, + }); + // rabbitCh.ack(opts); + }, + { + noAck: !opts?.acknowledge, + consumerTag: consumerTag, + }, + ); + + return cancel; +} diff --git a/src/util/util/ipc/listener/BaseEventListener.ts b/src/util/util/ipc/listener/BaseEventListener.ts new file mode 100644 index 0000000000..989e6aba5d --- /dev/null +++ b/src/util/util/ipc/listener/BaseEventListener.ts @@ -0,0 +1,26 @@ +/* + Spacebar: A FOSS re-implementation and extension of the Discord.com backend. + Copyright (C) 2026 Spacebar and Spacebar Contributors + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . +*/ + +import EventEmitter from "node:events"; +import { EventOpts } from "@spacebar/util"; + +export abstract class BaseEventListener { + abstract init(): Promise; + abstract close(): Promise; + abstract listen(event: string, callback: (event: EventOpts) => unknown): Promise<() => Promise>; +} diff --git a/src/util/util/ipc/listener/UnixSocketListener.ts b/src/util/util/ipc/listener/UnixSocketListener.ts new file mode 100644 index 0000000000..14d07d3873 --- /dev/null +++ b/src/util/util/ipc/listener/UnixSocketListener.ts @@ -0,0 +1,118 @@ +/* + Spacebar: A FOSS re-implementation and extension of the Discord.com backend. + Copyright (C) 2026 Spacebar and Spacebar Contributors + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . +*/ + +import EventEmitter from "node:events"; +import fs from "node:fs"; +import net, { Server } from "node:net"; +import { BaseEventListener } from "./BaseEventListener"; +import { Event, EventOpts } from "@spacebar/util"; + +export class UnixSocketListener extends BaseEventListener { + eventEmitter: EventEmitter; + socketPath: string; + server: Server; + + constructor(socketPath: string) { + super(); + this.eventEmitter = new EventEmitter(); + this.socketPath = socketPath; + } + + async init() { + // remove stale socket file if it exists + // can happen if there's a PID conflict (across containers/PID namespaces) + try { + if (fs.existsSync(this.socketPath)) { + fs.unlinkSync(this.socketPath); + console.log("[Events] Removed stale socket file:", this.socketPath); + } + } catch (e) { + console.error("[Events] Failed to remove stale socket:", e); + } + + this.server = net.createServer((socket) => { + socket.on("connect", () => { + console.log("[Events] Unix socket client connected"); + }); + let buffer = Buffer.alloc(0); + socket.on("data", (data: Buffer) => { + buffer = Buffer.concat([buffer, data]); + while (buffer.length >= 4) { + const msgLen = buffer.readUInt32BE(0); + if (buffer.length < 4 + msgLen) break; + const msgBuf = buffer.subarray(4, 4 + msgLen); + buffer = buffer.subarray(4 + msgLen); + try { + const payload = JSON.parse(msgBuf.toString()); + this.eventEmitter.emit(payload.id, payload.event); + } catch (e) { + console.error("[Events] Failed to parse unix socket data:", e); + } + } + }); + socket.on("error", (err) => { + console.error("[Events] Unix socket error:", err); + }); + socket.on("close", () => { + console.log("[Events] Unix socket client disconnected"); + }); + }); + + this.server.listen(this.socketPath, () => { + console.log(`Unix socket server listening on ${this.socketPath}`); + }); + + for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) { + process.on(sig, this.close); + } + } + + async close(): Promise { + console.log("[Events] Closing unix socket server"); + this.server.close(); + + // clean up socket file + try { + fs.unlinkSync(this.socketPath); + } catch (e) { + console.error("[Events] Failed to unlink socket file:", e); + } + + process.exit(0); + } + + async listen(event: string, callback: (event: EventOpts) => unknown): Promise<() => Promise> { + const listener = (data: Event) => { + callback({ + ...data, + cancel, + }); + }; + + this.eventEmitter.addListener(event, listener); + + const cancel = async () => { + this.eventEmitter.removeListener(event, listener); + this.eventEmitter.setMaxListeners(this.eventEmitter.getMaxListeners() - 1); + }; + + this.eventEmitter.setMaxListeners(this.eventEmitter.getMaxListeners() + 1); + + return cancel; + } +} diff --git a/src/util/util/ipc/writer/BaseEventWriter.ts b/src/util/util/ipc/writer/BaseEventWriter.ts new file mode 100644 index 0000000000..878408a5ab --- /dev/null +++ b/src/util/util/ipc/writer/BaseEventWriter.ts @@ -0,0 +1,26 @@ +/* + Spacebar: A FOSS re-implementation and extension of the Discord.com backend. + Copyright (C) 2026 Spacebar and Spacebar Contributors + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . +*/ + +import EventEmitter from "node:events"; +import { Event, EventOpts } from "@spacebar/util"; + +export abstract class BaseEventWriter { + abstract init(): Promise; + abstract close(): Promise; + abstract emit(event: Event): Promise; +} diff --git a/src/util/util/ipc/writer/UnixSocketWriter.ts b/src/util/util/ipc/writer/UnixSocketWriter.ts new file mode 100644 index 0000000000..84839593ec --- /dev/null +++ b/src/util/util/ipc/writer/UnixSocketWriter.ts @@ -0,0 +1,235 @@ +/* + Spacebar: A FOSS re-implementation and extension of the Discord.com backend. + Copyright (C) 2026 Spacebar and Spacebar Contributors + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . +*/ + +import net, { Socket } from "node:net"; +import fs, { FSWatcher } from "node:fs"; +import path from "node:path"; +import { red } from "picocolors"; +import { BaseEventWriter } from "./BaseEventWriter"; +import { Event, Stopwatch } from "@spacebar/util"; + +export class UnixSocketWriter extends BaseEventWriter { + socketPath: string; + clients: { [key: string]: Socket } = {}; + watcher?: FSWatcher; + backlog: Event[] = []; + broadcastLock: Promise = Promise.resolve(); + replayLock: Promise = Promise.resolve(); + isInitializing = true; + + constructor(socketPath: string) { + super(); + this.socketPath = socketPath; + } + + async init() { + if (!fs.opendirSync(this.socketPath)) throw new Error("Unix socket path does not exist or is not a directory: " + this.socketPath); + + console.log("[Events] Unix socket writer initializing for", this.socketPath); + + const connect = (file: string) => { + const fullPath = path.join(this.socketPath, file); + const pid = Number(path.basename(file, ".sock")); + console.log("[Events] Attempting to connect to unix socket:", fullPath, "| proc:", getPidCmdline(pid) ?? red("No such pid: " + pid)); + + // avoid duplicate connections + if (this.clients[fullPath] && !this.clients[fullPath].destroyed) { + console.log("[Events] Unix socket client already connected to", fullPath); + return; + } + + // clean up old connection if it exists + if (this.clients[fullPath]) { + console.log("[Events] Removing stale unix socket client for", fullPath); + try { + this.clients[fullPath].destroy(); + } catch (e) { + // ignore + } + delete this.clients[fullPath]; + } + + // check if it's actually a socket file (not a ghost/regular file) + try { + const stats = fs.statSync(fullPath); + if (!stats.isSocket()) { + console.log("[Events] Ignoring non-socket file:", fullPath); + return; + } + } catch (e) { + console.log("[Events] Cannot stat socket file:", fullPath); + return; + } + + try { + this.clients[fullPath] = net.createConnection(fullPath, () => { + console.log("[Events] Unix socket client connected to", fullPath); + }); + + this.clients[fullPath].on("error", (err) => { + console.error("[Events] Unix socket client error on", fullPath, ":", err); + // clean up after error + if (this.clients[fullPath]) { + delete this.clients[fullPath]; + } + }); + + // handle clean socket closure + this.clients[fullPath].on("close", () => { + console.log("[Events] Unix socket client closed:", fullPath); + delete this.clients[fullPath]; + }); + } catch (e) { + console.error("[Events] Failed to create connection to", fullPath, ":", e); + delete this.clients[fullPath]; + } + }; + + // connect to all sockets, now and in the future + this.watcher = fs.watch(this.socketPath, {}, (eventType, filename) => { + console.log("[Events] Unix socket writer received watch sig", eventType, filename); + if (eventType === "rename" && filename?.endsWith(".sock")) { + try { + const fullPath = path.join(this.socketPath, filename!); + if (fs.existsSync(fullPath)) { + connect(filename!); + } else { + if (this.clients[fullPath]) { + console.log("[Events] Unix socket writer detected removed socket:", fullPath); + try { + this.clients[fullPath].destroy(); + } catch (e) { + // socket may already be destroyed + } + delete this.clients[fullPath]; + } + } + } catch (e) { + // don't + } + } + }); + + this.watcher.on("error", (err) => { + console.error("[Events] Unix socket watcher error:", err); + }); + + // connect to existing sockets if any + try { + const files = fs.readdirSync(this.socketPath); + console.log("[Events] Unix socket writer found existing sockets:", files); + files.forEach((file) => { + if (file.endsWith(".sock")) { + connect(file); + } + }); + } catch (err) { + console.error("[Events] Unix socket writer failed to read directory:", err); + } + + this.isInitializing = false; + } + + async emit(event: Event) { + if (!this.clients) throw new Error("UnixSocketWriter not initialized"); + + // check if there are any listeners + const clientCount = Object.entries(this.clients).length; + if (clientCount === 0) { + console.warn("[Events] Unix socket writer has no connected clients to emit to, backlog size:", this.backlog.length + 1); + this.backlog.push(event); + if (!this.isInitializing) { + this.isInitializing = true; + console.log("[Events] Re-initializing unix socket writer due to new event with no listeners"); + await this.close(); + await this.init(); + } + return; + } + + await this.replayLock; + await (this.replayLock = Promise.resolve().then(async () => { + if (this.backlog.length > 0) { + console.log(`[Events] Replaying ${this.backlog.length} backlog events`); + for (const backlogEvent of this.backlog) { + await this.broadcast(backlogEvent); + } + this.backlog = []; + } + })); + + await this.broadcast(event); + } + + private async broadcast(event: Event) { + await this.broadcastLock; + return await (this.broadcastLock = new Promise((res) => { + const tsw = Stopwatch.startNew(); + const payloadBuf = Buffer.from(JSON.stringify({ id: (event.guild_id || event.channel_id || event.user_id || event.session_id) as string, event })); + const lenBuf = Buffer.alloc(4); + lenBuf.writeUInt32BE(payloadBuf.length, 0); + const framed = Buffer.concat([lenBuf, payloadBuf]); + + for (const [socketPath, socket] of Object.entries(this.clients)) { + if (socket.destroyed) { + console.log("[Events] Unix socket writer found destroyed socket, removing:", socketPath); + delete this.clients[socketPath]; + continue; + } + + try { + socket.write(framed); + } catch (e) { + console.error("[Events] Unix socket writer failed to write to socket", socketPath, ":", e); + } + } + + if (tsw.elapsed().totalMilliseconds > 5) + // else it's too noisy + console.log(`[Events] Unix socket writer emitted to ${Object.entries(this.clients).length} sockets in ${tsw.elapsed().totalMilliseconds}ms`); + res(); + })); + } + + async close() { + console.log("[Events] Closing Unix socket writer"); + + if (this.watcher) { + this.watcher.close(); + this.watcher = undefined; + } + + for (const [path, socket] of Object.entries(this.clients)) { + try { + socket.destroy(); + } catch (e) { + console.error("[Events] Error closing socket", path, ":", e); + } + } + this.clients = {}; + } +} + +function getPidCmdline(pid: number): string | null { + try { + const cmdline = fs.readFileSync(`/proc/${pid}/cmdline`, "utf-8"); + return cmdline.replaceAll("\0", " ").trim(); + } catch (e) { + return null; + } +} From f86d13c6a102cbce0db9310cc0bffa5ce8367e84 Mon Sep 17 00:00:00 2001 From: Rory& Date: Sat, 16 May 2026 20:36:15 +0200 Subject: [PATCH 02/10] Add untested rabbitmq single-channel IPC --- src/util/util/index.ts | 2 +- src/util/util/ipc/Event.ts | 17 ++- src/util/util/{ => ipc}/RabbitMQ.ts | 4 +- .../ipc/listener/RabbitMqSingleListener.ts | 106 ++++++++++++++++++ .../util/ipc/listener/UnixSocketListener.ts | 20 ++-- .../util/ipc/writer/RabbitMqSingleWriter.ts | 70 ++++++++++++ src/util/util/ipc/writer/UnixSocketWriter.ts | 46 ++++---- 7 files changed, 228 insertions(+), 37 deletions(-) rename src/util/util/{ => ipc}/RabbitMQ.ts (99%) create mode 100644 src/util/util/ipc/listener/RabbitMqSingleListener.ts create mode 100644 src/util/util/ipc/writer/RabbitMqSingleWriter.ts diff --git a/src/util/util/index.ts b/src/util/util/index.ts index 9daef50b13..f6de6a9554 100644 --- a/src/util/util/index.ts +++ b/src/util/util/index.ts @@ -37,7 +37,7 @@ export * from "./Logo"; export * from "./MessageFlags"; export * from "./networking"; export * from "./Permissions"; -export * from "./RabbitMQ"; +export * from "./ipc/RabbitMQ"; export * from "./Regex"; export * from "./Rights"; export * from "./Snowflake"; diff --git a/src/util/util/ipc/Event.ts b/src/util/util/ipc/Event.ts index a332d6e0c6..4899b5d83f 100644 --- a/src/util/util/ipc/Event.ts +++ b/src/util/util/ipc/Event.ts @@ -20,13 +20,15 @@ import { Channel } from "amqplib"; import { randomUUID } from "node:crypto"; import EventEmitter from "node:events"; import path from "node:path"; -import { RabbitMQ } from "../RabbitMQ"; +import { RabbitMQ } from "./RabbitMQ"; import { EVENT, Event } from "../../interfaces"; import { Config } from "../Config"; import { BaseEventListener } from "./listener/BaseEventListener"; import { BaseEventWriter } from "./writer/BaseEventWriter"; import { UnixSocketWriter } from "./writer/UnixSocketWriter"; import { UnixSocketListener } from "./listener/UnixSocketListener"; +import { RabbitMqSingleListener } from "./listener/RabbitMqSingleListener"; +import { yellow } from "picocolors"; export const events = new EventEmitter(); let listener: BaseEventListener | null = null; @@ -131,12 +133,25 @@ export interface ProcessEvent { export async function listenEvent(event: string, callback: (event: EventOpts) => unknown, opts?: ListenEventOpts): Promise<() => Promise> { if (RabbitMQ.connection) { + if (process.env.EVENT_TRANSMISSION !== "rabbitmq-legacy") { + console.warn(yellow("[Events] Warning:"), "RabbitMQ replication without configuring EVENT_TRANSMISSION is deprecated."); + console.warn(yellow("[Events] Warning:"), "Set EVENT_TRANSMISSION to 'rabbitmq-legacy' in environment variables to silence this warning."); + } const rabbitMQChannel = await RabbitMQ.getSafeChannel(); const channel = opts?.channel || rabbitMQChannel; if (!channel) throw new Error("[Events] An event was sent without an associated channel"); return await rabbitListen(channel, event, callback, { acknowledge: opts?.acknowledge, }); + } else if (process.env.EVENT_TRANSMISSION === "rabbitmq-single") { + if (!Config.get().rabbitmq.host) { + console.error("[Events] RabbitMQ is not configured."); + } + if (!listener) { + listener = new RabbitMqSingleListener(Config.get().rabbitmq.host!); + await listener.init(); + } + return await listener.listen(event, callback); } else if (process.env.EVENT_TRANSMISSION === "unix" && process.env.EVENT_SOCKET_PATH) { if (!unixSocketListener) { listener = unixSocketListener = new UnixSocketListener(path.join(process.env.EVENT_SOCKET_PATH, `${process.pid}.sock`)); diff --git a/src/util/util/RabbitMQ.ts b/src/util/util/ipc/RabbitMQ.ts similarity index 99% rename from src/util/util/RabbitMQ.ts rename to src/util/util/ipc/RabbitMQ.ts index 328a2df720..fe6e6d1ba7 100644 --- a/src/util/util/RabbitMQ.ts +++ b/src/util/util/ipc/RabbitMQ.ts @@ -16,9 +16,9 @@ along with this program. If not, see . */ -import amqp, { Channel, ChannelModel } from "amqplib"; -import { Config } from "./Config"; import EventEmitter from "node:events"; +import amqp, { Channel, ChannelModel } from "amqplib"; +import { Config } from "../Config"; export class RabbitMQ { public static connection: ChannelModel | null = null; diff --git a/src/util/util/ipc/listener/RabbitMqSingleListener.ts b/src/util/util/ipc/listener/RabbitMqSingleListener.ts new file mode 100644 index 0000000000..bff8f94024 --- /dev/null +++ b/src/util/util/ipc/listener/RabbitMqSingleListener.ts @@ -0,0 +1,106 @@ +/* + Spacebar: A FOSS re-implementation and extension of the Discord.com backend. + Copyright (C) 2026 Spacebar and Spacebar Contributors + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . +*/ + +import EventEmitter from "node:events"; +import { BaseEventListener } from "./BaseEventListener"; +import { EVENT, Event, EventOpts, RabbitMQ } from "@spacebar/util"; +import amqp, { Channel, ChannelModel } from "amqplib"; +import { randomUUID } from "node:crypto"; + +export class RabbitMqSingleListener extends BaseEventListener { + private readonly host: string; + private connection: ChannelModel; + private channel: Channel; + eventEmitter: EventEmitter; + + constructor(host: string) { + super(); + this.eventEmitter = new EventEmitter(); + this.host = host; + } + + async init() { + console.log(`[RabbitMQSingleListener] Connecting to: ${this.host}`); + this.connection = await amqp.connect(this.host, { + timeout: 1000 * 60, + noDelay: true, + }); + this.channel = await this.connection.createChannel(); + + for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) { + process.on(sig, this.close); + } + + this.connection.on("error", (err) => { + console.error("[RabbitMQSingleListener] Connection error:", err); + }); + + this.connection.on("close", () => { + console.error("[RabbitMQSingleListener] Connection closed"); + this.init().catch((e) => console.error("[RabbitMQSingleListener] Failed to schedule reconnection:", e)); + }); + + // actually set up event receiving? + await this.channel.assertExchange("", "fanout", { durable: false }); + const q = await this.channel.assertQueue("", { + exclusive: true, + autoDelete: true, + messageTtl: 5000, + }); + + const consumerTag = randomUUID(); + await this.channel.bindQueue(q.queue, "", ""); + await this.channel.consume( + q.queue, + (opts) => { + if (!opts) return; + const data = JSON.parse(opts.content.toString()) as { id: EVENT; event: Event }; + + this.eventEmitter.emit(data.id, data.event); + }, + { + consumerTag, + }, + ); + } + + async close(): Promise { + await this.channel.close(); + await this.connection.close(); + } + + async listen(event: string, callback: (event: EventOpts) => unknown): Promise<() => Promise> { + const listener = (data: Event) => { + callback({ + ...data, + cancel, + }); + }; + + this.eventEmitter.addListener(event, listener); + + const cancel = async () => { + this.eventEmitter.removeListener(event, listener); + this.eventEmitter.setMaxListeners(this.eventEmitter.getMaxListeners() - 1); + }; + + this.eventEmitter.setMaxListeners(this.eventEmitter.getMaxListeners() + 1); + + return cancel; + } +} diff --git a/src/util/util/ipc/listener/UnixSocketListener.ts b/src/util/util/ipc/listener/UnixSocketListener.ts index 14d07d3873..8ead85bda5 100644 --- a/src/util/util/ipc/listener/UnixSocketListener.ts +++ b/src/util/util/ipc/listener/UnixSocketListener.ts @@ -20,7 +20,7 @@ import EventEmitter from "node:events"; import fs from "node:fs"; import net, { Server } from "node:net"; import { BaseEventListener } from "./BaseEventListener"; -import { Event, EventOpts } from "@spacebar/util"; +import { EVENT, Event, EventOpts } from "@spacebar/util"; export class UnixSocketListener extends BaseEventListener { eventEmitter: EventEmitter; @@ -39,15 +39,15 @@ export class UnixSocketListener extends BaseEventListener { try { if (fs.existsSync(this.socketPath)) { fs.unlinkSync(this.socketPath); - console.log("[Events] Removed stale socket file:", this.socketPath); + console.log("[UnixSocketListener] Removed stale socket file:", this.socketPath); } } catch (e) { - console.error("[Events] Failed to remove stale socket:", e); + console.error("[UnixSocketListener] Failed to remove stale socket:", e); } this.server = net.createServer((socket) => { socket.on("connect", () => { - console.log("[Events] Unix socket client connected"); + console.log("[UnixSocketListener] Unix socket client connected"); }); let buffer = Buffer.alloc(0); socket.on("data", (data: Buffer) => { @@ -58,18 +58,18 @@ export class UnixSocketListener extends BaseEventListener { const msgBuf = buffer.subarray(4, 4 + msgLen); buffer = buffer.subarray(4 + msgLen); try { - const payload = JSON.parse(msgBuf.toString()); + const payload = JSON.parse(msgBuf.toString()) as { id: EVENT; event: Event }; this.eventEmitter.emit(payload.id, payload.event); } catch (e) { - console.error("[Events] Failed to parse unix socket data:", e); + console.error("[UnixSocketListener] Failed to parse unix socket data:", e); } } }); socket.on("error", (err) => { - console.error("[Events] Unix socket error:", err); + console.error("[UnixSocketListener] Unix socket error:", err); }); socket.on("close", () => { - console.log("[Events] Unix socket client disconnected"); + console.log("[UnixSocketListener] Unix socket client disconnected"); }); }); @@ -83,14 +83,14 @@ export class UnixSocketListener extends BaseEventListener { } async close(): Promise { - console.log("[Events] Closing unix socket server"); + console.log("[UnixSocketListener] Closing unix socket server"); this.server.close(); // clean up socket file try { fs.unlinkSync(this.socketPath); } catch (e) { - console.error("[Events] Failed to unlink socket file:", e); + console.error("[UnixSocketListener] Failed to unlink socket file:", e); } process.exit(0); diff --git a/src/util/util/ipc/writer/RabbitMqSingleWriter.ts b/src/util/util/ipc/writer/RabbitMqSingleWriter.ts new file mode 100644 index 0000000000..5bad330762 --- /dev/null +++ b/src/util/util/ipc/writer/RabbitMqSingleWriter.ts @@ -0,0 +1,70 @@ +/* + Spacebar: A FOSS re-implementation and extension of the Discord.com backend. + Copyright (C) 2026 Spacebar and Spacebar Contributors + + This program is free software: you can redistribute it and/or modify + it under the terms of the GNU Affero General Public License as published + by the Free Software Foundation, either version 3 of the License, or + (at your option) any later version. + + This program is distributed in the hope that it will be useful, + but WITHOUT ANY WARRANTY; without even the implied warranty of + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + GNU Affero General Public License for more details. + + You should have received a copy of the GNU Affero General Public License + along with this program. If not, see . +*/ + +import { BaseEventWriter } from "./BaseEventWriter"; +import amqp, { Channel, ChannelModel } from "amqplib"; +import { Event } from "@spacebar/util"; + +export class RabbitMqSingleWriter extends BaseEventWriter { + private readonly host: string; + private connection: ChannelModel; + private channel: Channel; + + constructor(host: string) { + super(); + this.host = host; + } + + async init(): Promise { + console.log(`[RabbitMQ] Connecting to: ${this.host}`); + this.connection = await amqp.connect(this.host, { + timeout: 1000 * 60, + noDelay: true, + }); + this.channel = await this.connection.createChannel(); + } + async close(): Promise { + await this.channel.close(); + await this.connection.close(); + } + + async emit(event: Event): Promise { + // todo check if channel is closed + if ((this.channel as unknown as { closed?: boolean }).closed) this.channel = await this.connection.createChannel(); + await this.channel.assertExchange("", "fanout", { + durable: false, // ensure that messages arent written to disk + }); + + let success = false; + try { + success = this.channel.publish( + "", + "", + Buffer.from(JSON.stringify({ id: (event.guild_id || event.channel_id || event.user_id || event.session_id) as string, event })), + {}, + ); + } catch (e) { + console.error("[RabbitMqSingleWriter] Got error while publishing event:", e); + } + + if (!success) { + console.log("[RabbitMqSingleWriter] Publishing message was not successful, retrying..."); + await this.emit(event); + } + } +} diff --git a/src/util/util/ipc/writer/UnixSocketWriter.ts b/src/util/util/ipc/writer/UnixSocketWriter.ts index 84839593ec..53b9564ce2 100644 --- a/src/util/util/ipc/writer/UnixSocketWriter.ts +++ b/src/util/util/ipc/writer/UnixSocketWriter.ts @@ -40,22 +40,22 @@ export class UnixSocketWriter extends BaseEventWriter { async init() { if (!fs.opendirSync(this.socketPath)) throw new Error("Unix socket path does not exist or is not a directory: " + this.socketPath); - console.log("[Events] Unix socket writer initializing for", this.socketPath); + console.log("[UnixSocketWriter] Unix socket writer initializing for", this.socketPath); const connect = (file: string) => { const fullPath = path.join(this.socketPath, file); const pid = Number(path.basename(file, ".sock")); - console.log("[Events] Attempting to connect to unix socket:", fullPath, "| proc:", getPidCmdline(pid) ?? red("No such pid: " + pid)); + console.log("[UnixSocketWriter] Attempting to connect to unix socket:", fullPath, "| proc:", getPidCmdline(pid) ?? red("No such pid: " + pid)); // avoid duplicate connections if (this.clients[fullPath] && !this.clients[fullPath].destroyed) { - console.log("[Events] Unix socket client already connected to", fullPath); + console.log("[UnixSocketWriter] Unix socket client already connected to", fullPath); return; } // clean up old connection if it exists if (this.clients[fullPath]) { - console.log("[Events] Removing stale unix socket client for", fullPath); + console.log("[UnixSocketWriter] Removing stale unix socket client for", fullPath); try { this.clients[fullPath].destroy(); } catch (e) { @@ -68,21 +68,21 @@ export class UnixSocketWriter extends BaseEventWriter { try { const stats = fs.statSync(fullPath); if (!stats.isSocket()) { - console.log("[Events] Ignoring non-socket file:", fullPath); + console.log("[UnixSocketWriter] Ignoring non-socket file:", fullPath); return; } } catch (e) { - console.log("[Events] Cannot stat socket file:", fullPath); + console.log("[UnixSocketWriter] Cannot stat socket file:", fullPath); return; } try { this.clients[fullPath] = net.createConnection(fullPath, () => { - console.log("[Events] Unix socket client connected to", fullPath); + console.log("[UnixSocketWriter] Unix socket client connected to", fullPath); }); this.clients[fullPath].on("error", (err) => { - console.error("[Events] Unix socket client error on", fullPath, ":", err); + console.error("[UnixSocketWriter] Unix socket client error on", fullPath, ":", err); // clean up after error if (this.clients[fullPath]) { delete this.clients[fullPath]; @@ -91,18 +91,18 @@ export class UnixSocketWriter extends BaseEventWriter { // handle clean socket closure this.clients[fullPath].on("close", () => { - console.log("[Events] Unix socket client closed:", fullPath); + console.log("[UnixSocketWriter] Unix socket client closed:", fullPath); delete this.clients[fullPath]; }); } catch (e) { - console.error("[Events] Failed to create connection to", fullPath, ":", e); + console.error("[UnixSocketWriter] Failed to create connection to", fullPath, ":", e); delete this.clients[fullPath]; } }; // connect to all sockets, now and in the future this.watcher = fs.watch(this.socketPath, {}, (eventType, filename) => { - console.log("[Events] Unix socket writer received watch sig", eventType, filename); + console.log("[UnixSocketWriter] Unix socket writer received watch sig", eventType, filename); if (eventType === "rename" && filename?.endsWith(".sock")) { try { const fullPath = path.join(this.socketPath, filename!); @@ -110,7 +110,7 @@ export class UnixSocketWriter extends BaseEventWriter { connect(filename!); } else { if (this.clients[fullPath]) { - console.log("[Events] Unix socket writer detected removed socket:", fullPath); + console.log("[UnixSocketWriter] Unix socket writer detected removed socket:", fullPath); try { this.clients[fullPath].destroy(); } catch (e) { @@ -126,20 +126,20 @@ export class UnixSocketWriter extends BaseEventWriter { }); this.watcher.on("error", (err) => { - console.error("[Events] Unix socket watcher error:", err); + console.error("[UnixSocketWriter] Unix socket watcher error:", err); }); // connect to existing sockets if any try { const files = fs.readdirSync(this.socketPath); - console.log("[Events] Unix socket writer found existing sockets:", files); + console.log("[UnixSocketWriter] Unix socket writer found existing sockets:", files); files.forEach((file) => { if (file.endsWith(".sock")) { connect(file); } }); } catch (err) { - console.error("[Events] Unix socket writer failed to read directory:", err); + console.error("[UnixSocketWriter] Unix socket writer failed to read directory:", err); } this.isInitializing = false; @@ -151,11 +151,11 @@ export class UnixSocketWriter extends BaseEventWriter { // check if there are any listeners const clientCount = Object.entries(this.clients).length; if (clientCount === 0) { - console.warn("[Events] Unix socket writer has no connected clients to emit to, backlog size:", this.backlog.length + 1); + console.warn("[UnixSocketWriter] Unix socket writer has no connected clients to emit to, backlog size:", this.backlog.length + 1); this.backlog.push(event); if (!this.isInitializing) { this.isInitializing = true; - console.log("[Events] Re-initializing unix socket writer due to new event with no listeners"); + console.log("[UnixSocketWriter] Re-initializing unix socket writer due to new event with no listeners"); await this.close(); await this.init(); } @@ -165,7 +165,7 @@ export class UnixSocketWriter extends BaseEventWriter { await this.replayLock; await (this.replayLock = Promise.resolve().then(async () => { if (this.backlog.length > 0) { - console.log(`[Events] Replaying ${this.backlog.length} backlog events`); + console.log(`[UnixSocketWriter] Replaying ${this.backlog.length} backlog events`); for (const backlogEvent of this.backlog) { await this.broadcast(backlogEvent); } @@ -187,7 +187,7 @@ export class UnixSocketWriter extends BaseEventWriter { for (const [socketPath, socket] of Object.entries(this.clients)) { if (socket.destroyed) { - console.log("[Events] Unix socket writer found destroyed socket, removing:", socketPath); + console.log("[UnixSocketWriter] Unix socket writer found destroyed socket, removing:", socketPath); delete this.clients[socketPath]; continue; } @@ -195,19 +195,19 @@ export class UnixSocketWriter extends BaseEventWriter { try { socket.write(framed); } catch (e) { - console.error("[Events] Unix socket writer failed to write to socket", socketPath, ":", e); + console.error("[UnixSocketWriter] Unix socket writer failed to write to socket", socketPath, ":", e); } } if (tsw.elapsed().totalMilliseconds > 5) // else it's too noisy - console.log(`[Events] Unix socket writer emitted to ${Object.entries(this.clients).length} sockets in ${tsw.elapsed().totalMilliseconds}ms`); + console.log(`[UnixSocketWriter] Unix socket writer emitted to ${Object.entries(this.clients).length} sockets in ${tsw.elapsed().totalMilliseconds}ms`); res(); })); } async close() { - console.log("[Events] Closing Unix socket writer"); + console.log("[UnixSocketWriter] Closing Unix socket writer"); if (this.watcher) { this.watcher.close(); @@ -218,7 +218,7 @@ export class UnixSocketWriter extends BaseEventWriter { try { socket.destroy(); } catch (e) { - console.error("[Events] Error closing socket", path, ":", e); + console.error("[UnixSocketWriter] Error closing socket", path, ":", e); } } this.clients = {}; From bd61d3c6a586092ccfa1fb19c22e0379ec1e449b Mon Sep 17 00:00:00 2001 From: Rory& Date: Sat, 16 May 2026 20:39:36 +0200 Subject: [PATCH 03/10] Clean up unused imports in IPC code --- src/util/util/ipc/listener/BaseEventListener.ts | 1 - src/util/util/ipc/listener/RabbitMqSingleListener.ts | 2 +- src/util/util/ipc/writer/BaseEventWriter.ts | 3 +-- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/src/util/util/ipc/listener/BaseEventListener.ts b/src/util/util/ipc/listener/BaseEventListener.ts index 989e6aba5d..2df611b2e4 100644 --- a/src/util/util/ipc/listener/BaseEventListener.ts +++ b/src/util/util/ipc/listener/BaseEventListener.ts @@ -16,7 +16,6 @@ along with this program. If not, see . */ -import EventEmitter from "node:events"; import { EventOpts } from "@spacebar/util"; export abstract class BaseEventListener { diff --git a/src/util/util/ipc/listener/RabbitMqSingleListener.ts b/src/util/util/ipc/listener/RabbitMqSingleListener.ts index bff8f94024..b323db4bee 100644 --- a/src/util/util/ipc/listener/RabbitMqSingleListener.ts +++ b/src/util/util/ipc/listener/RabbitMqSingleListener.ts @@ -18,7 +18,7 @@ import EventEmitter from "node:events"; import { BaseEventListener } from "./BaseEventListener"; -import { EVENT, Event, EventOpts, RabbitMQ } from "@spacebar/util"; +import { EVENT, Event, EventOpts } from "@spacebar/util"; import amqp, { Channel, ChannelModel } from "amqplib"; import { randomUUID } from "node:crypto"; diff --git a/src/util/util/ipc/writer/BaseEventWriter.ts b/src/util/util/ipc/writer/BaseEventWriter.ts index 878408a5ab..7349263d45 100644 --- a/src/util/util/ipc/writer/BaseEventWriter.ts +++ b/src/util/util/ipc/writer/BaseEventWriter.ts @@ -16,8 +16,7 @@ along with this program. If not, see . */ -import EventEmitter from "node:events"; -import { Event, EventOpts } from "@spacebar/util"; +import { Event } from "@spacebar/util"; export abstract class BaseEventWriter { abstract init(): Promise; From 31961be027ef50ca4f71eac7cd4179f34e27c110 Mon Sep 17 00:00:00 2001 From: Rory& Date: Sat, 16 May 2026 20:44:41 +0200 Subject: [PATCH 04/10] Move rabbitListen to RabbitMQ.ts --- src/util/util/ipc/Event.ts | 57 +++-------------------------------- src/util/util/ipc/RabbitMQ.ts | 51 +++++++++++++++++++++++++++++++ 2 files changed, 55 insertions(+), 53 deletions(-) diff --git a/src/util/util/ipc/Event.ts b/src/util/util/ipc/Event.ts index 4899b5d83f..3eb052b96b 100644 --- a/src/util/util/ipc/Event.ts +++ b/src/util/util/ipc/Event.ts @@ -16,19 +16,18 @@ along with this program. If not, see . */ -import { Channel } from "amqplib"; -import { randomUUID } from "node:crypto"; import EventEmitter from "node:events"; import path from "node:path"; -import { RabbitMQ } from "./RabbitMQ"; -import { EVENT, Event } from "../../interfaces"; +import { Channel } from "amqplib"; +import { yellow } from "picocolors"; +import { Event } from "../../interfaces"; import { Config } from "../Config"; +import { rabbitListen, RabbitMQ } from "./RabbitMQ"; import { BaseEventListener } from "./listener/BaseEventListener"; import { BaseEventWriter } from "./writer/BaseEventWriter"; import { UnixSocketWriter } from "./writer/UnixSocketWriter"; import { UnixSocketListener } from "./listener/UnixSocketListener"; import { RabbitMqSingleListener } from "./listener/RabbitMqSingleListener"; -import { yellow } from "picocolors"; export const events = new EventEmitter(); let listener: BaseEventListener | null = null; @@ -186,51 +185,3 @@ export async function listenEvent(event: string, callback: (event: EventOpts) => return cancel; } } - -async function rabbitListen(channel: Channel, id: string, callback: (event: EventOpts) => unknown, opts?: { acknowledge?: boolean }): Promise<() => Promise> { - await channel.assertExchange(id, "fanout", { durable: false }); - const q = await channel.assertQueue("", { - exclusive: true, - autoDelete: true, - messageTtl: 5000, - }); - - const consumerTag = randomUUID(); - - const cancel = async () => { - try { - await channel.unbindQueue(q.queue, id, ""); - await channel.cancel(consumerTag); - } catch (e) { - console.log("[RabbitMQ] Error while cancelling channel (may be expected):", e instanceof Error ? e.message : e); - } - }; - - await channel.bindQueue(q.queue, id, ""); - await channel.consume( - q.queue, - (opts) => { - if (!opts) return; - - const data = JSON.parse(opts.content.toString()); - const event = opts.properties.type as EVENT; - - callback({ - event, - data, - acknowledge() { - channel.ack(opts); - }, - channel, - cancel, - }); - // rabbitCh.ack(opts); - }, - { - noAck: !opts?.acknowledge, - consumerTag: consumerTag, - }, - ); - - return cancel; -} diff --git a/src/util/util/ipc/RabbitMQ.ts b/src/util/util/ipc/RabbitMQ.ts index fe6e6d1ba7..47d1660dae 100644 --- a/src/util/util/ipc/RabbitMQ.ts +++ b/src/util/util/ipc/RabbitMQ.ts @@ -16,9 +16,12 @@ along with this program. If not, see . */ +import { randomUUID } from "node:crypto"; import EventEmitter from "node:events"; import amqp, { Channel, ChannelModel } from "amqplib"; +import { EVENT } from "../../interfaces"; import { Config } from "../Config"; +import type { EventOpts } from "./Event"; export class RabbitMQ { public static connection: ChannelModel | null = null; @@ -171,3 +174,51 @@ export class RabbitMQ { return this.connection !== null && !this.isReconnecting; } } + +export async function rabbitListen(channel: Channel, id: string, callback: (event: EventOpts) => unknown, opts?: { acknowledge?: boolean }): Promise<() => Promise> { + await channel.assertExchange(id, "fanout", { durable: false }); + const q = await channel.assertQueue("", { + exclusive: true, + autoDelete: true, + messageTtl: 5000, + }); + + const consumerTag = randomUUID(); + + const cancel = async () => { + try { + await channel.unbindQueue(q.queue, id, ""); + await channel.cancel(consumerTag); + } catch (e) { + console.log("[RabbitMQ] Error while cancelling channel (may be expected):", e instanceof Error ? e.message : e); + } + }; + + await channel.bindQueue(q.queue, id, ""); + await channel.consume( + q.queue, + (opts) => { + if (!opts) return; + + const data = JSON.parse(opts.content.toString()); + const event = opts.properties.type as EVENT; + + callback({ + event, + data, + acknowledge() { + channel.ack(opts); + }, + channel, + cancel, + }); + // rabbitCh.ack(opts); + }, + { + noAck: !opts?.acknowledge, + consumerTag: consumerTag, + }, + ); + + return cancel; +} From 7a5d0d0d761be8eb9c39de6683577563dd29da6e Mon Sep 17 00:00:00 2001 From: Rory& Date: Sat, 16 May 2026 20:52:24 +0200 Subject: [PATCH 05/10] Move publishEvent to RabbitMQ.ts --- src/util/util/ipc/Event.ts | 29 +---------------------------- src/util/util/ipc/RabbitMQ.ts | 28 +++++++++++++++++++++++++++- 2 files changed, 28 insertions(+), 29 deletions(-) diff --git a/src/util/util/ipc/Event.ts b/src/util/util/ipc/Event.ts index 3eb052b96b..8a2c55d881 100644 --- a/src/util/util/ipc/Event.ts +++ b/src/util/util/ipc/Event.ts @@ -40,34 +40,7 @@ export async function emitEvent(payload: Omit) { if (!id) return console.error("event doesn't contain any id", payload); if (RabbitMQ.connection) { - const data = typeof payload.data === "object" ? JSON.stringify(payload.data) : payload.data; // use rabbitmq for event transmission - - const publishEvent = async (retryCount = 0): Promise => { - const channel = await RabbitMQ.getSafeChannel(); - try { - await channel.assertExchange(id, "fanout", { - durable: false, - }); - - // assertQueue isn't needed, because a queue will automatically created if it doesn't exist - const successful = channel.publish(id, "", Buffer.from(`${data}`), { type: payload.event }); - if (!successful) throw new Error("failed to send event"); - } catch (e) { - // Check if this is a channel closed error and if we should retry - const errorMessage = e instanceof Error ? e.message : String(e); - const isChannelError = errorMessage.includes("Channel closed") || errorMessage.includes("IllegalOperationError") || errorMessage.includes("RESOURCE_ERROR"); - - if (isChannelError && retryCount < 1) { - console.log("[RabbitMQ] Channel error detected, retrying with new channel..."); - // Force the cached channel to be discarded by calling getSafeChannel which will create a new one - return publishEvent(retryCount + 1); - } - - console.log("[RabbitMQ] ", e); - } - }; - - await publishEvent(); + await RabbitMQ.publishEvent(id, payload); } else if (process.env.EVENT_TRANSMISSION === "unix" && process.env.EVENT_SOCKET_PATH) { if (!unixSocketWriter) { console.error("[Event] Unix socket writer not initialized, cannot emit event!"); diff --git a/src/util/util/ipc/RabbitMQ.ts b/src/util/util/ipc/RabbitMQ.ts index 47d1660dae..e3a1ca4f95 100644 --- a/src/util/util/ipc/RabbitMQ.ts +++ b/src/util/util/ipc/RabbitMQ.ts @@ -19,7 +19,7 @@ import { randomUUID } from "node:crypto"; import EventEmitter from "node:events"; import amqp, { Channel, ChannelModel } from "amqplib"; -import { EVENT } from "../../interfaces"; +import { Event, EVENT } from "../../interfaces"; import { Config } from "../Config"; import type { EventOpts } from "./Event"; @@ -173,6 +173,32 @@ export class RabbitMQ { static isConnected(): boolean { return this.connection !== null && !this.isReconnecting; } + + static async publishEvent(id: string, payload: Omit, retryCount = 0): Promise { + const data = typeof payload.data === "object" ? JSON.stringify(payload.data) : payload.data; // use rabbitmq for event transmission + const channel = await RabbitMQ.getSafeChannel(); + try { + await channel.assertExchange(id, "fanout", { + durable: false, + }); + + // assertQueue isn't needed, because a queue will automatically created if it doesn't exist + const successful = channel.publish(id, "", Buffer.from(`${data}`), { type: payload.event }); + if (!successful) throw new Error("failed to send event"); + } catch (e) { + // Check if this is a channel closed error and if we should retry + const errorMessage = e instanceof Error ? e.message : String(e); + const isChannelError = errorMessage.includes("Channel closed") || errorMessage.includes("IllegalOperationError") || errorMessage.includes("RESOURCE_ERROR"); + + if (isChannelError && retryCount < 1) { + console.log("[RabbitMQ] Channel error detected, retrying with new channel..."); + // Force the cached channel to be discarded by calling getSafeChannel which will create a new one + return RabbitMQ.publishEvent(id, payload, retryCount + 1); + } + + console.log("[RabbitMQ] ", e); + } + } } export async function rabbitListen(channel: Channel, id: string, callback: (event: EventOpts) => unknown, opts?: { acknowledge?: boolean }): Promise<() => Promise> { From e44d2423cde89db6c1db4bc9d12448981230f3dd Mon Sep 17 00:00:00 2001 From: Rory& Date: Sat, 16 May 2026 21:06:58 +0200 Subject: [PATCH 06/10] Demote default rabbitmq behavior, implement init for all BaseEventWriter/Listener abstracted implementations --- src/util/util/ipc/Event.ts | 78 +++++++++++++++++++++----------------- 1 file changed, 44 insertions(+), 34 deletions(-) diff --git a/src/util/util/ipc/Event.ts b/src/util/util/ipc/Event.ts index 8a2c55d881..9e523ce6c7 100644 --- a/src/util/util/ipc/Event.ts +++ b/src/util/util/ipc/Event.ts @@ -28,41 +28,47 @@ import { BaseEventWriter } from "./writer/BaseEventWriter"; import { UnixSocketWriter } from "./writer/UnixSocketWriter"; import { UnixSocketListener } from "./listener/UnixSocketListener"; import { RabbitMqSingleListener } from "./listener/RabbitMqSingleListener"; +import { RabbitMqSingleWriter } from "./writer/RabbitMqSingleWriter"; export const events = new EventEmitter(); let listener: BaseEventListener | null = null; let writer: BaseEventWriter | null = null; -let unixSocketListener: UnixSocketListener | null = null; -let unixSocketWriter: UnixSocketWriter | null = null; export async function emitEvent(payload: Omit) { const id = (payload.guild_id || payload.channel_id || payload.user_id || payload.session_id) as string; if (!id) return console.error("event doesn't contain any id", payload); - if (RabbitMQ.connection) { - await RabbitMQ.publishEvent(id, payload); - } else if (process.env.EVENT_TRANSMISSION === "unix" && process.env.EVENT_SOCKET_PATH) { - if (!unixSocketWriter) { - console.error("[Event] Unix socket writer not initialized, cannot emit event!"); - throw new Error("Unix socket writer not initialized"); - } - await unixSocketWriter.emit(payload); + if (writer) { + await writer.emit(payload); } else if (process.env.EVENT_TRANSMISSION === "process") { process.send?.({ type: "event", event: payload, id } as ProcessEvent); + } else if (RabbitMQ.connection) { + await RabbitMQ.publishEvent(id, payload); } else { events.emit(id, payload); } } export async function initEvent() { - await RabbitMQ.init(); // does nothing if rabbitmq is not setup + if (process.env.EVENT_TRANSMISSION === "rabbitmq-single") { + if (!Config.get().rabbitmq.host!) { + throw new Error("[Events] RabbitMQ is not configured."); + } - if (process.env.EVENT_TRANSMISSION === "unix" && process.env.EVENT_SOCKET_PATH) { - if (!unixSocketWriter) { - writer = unixSocketWriter = new UnixSocketWriter(process.env.EVENT_SOCKET_PATH); - await unixSocketWriter.init(); + if (!writer) { + writer = new RabbitMqSingleWriter(Config.get().rabbitmq.host!); + await writer.init(); } - } + } else if (process.env.EVENT_TRANSMISSION === "unix" && process.env.EVENT_SOCKET_PATH) { + if (!process.env.EVENT_SOCKET_PATH) { + throw new Error("[Events] EVENT_SOCKET_PATH is not configured."); + } + + if (!writer) { + writer = new UnixSocketWriter(process.env.EVENT_SOCKET_PATH); + await writer.init(); + } + } else await RabbitMQ.init(); // does nothing if rabbitmq is not setup // Set up the spacebar event listener (used for config reload, etc.) const setupSpacebarListener = async () => { @@ -104,32 +110,25 @@ export interface ProcessEvent { } export async function listenEvent(event: string, callback: (event: EventOpts) => unknown, opts?: ListenEventOpts): Promise<() => Promise> { - if (RabbitMQ.connection) { - if (process.env.EVENT_TRANSMISSION !== "rabbitmq-legacy") { - console.warn(yellow("[Events] Warning:"), "RabbitMQ replication without configuring EVENT_TRANSMISSION is deprecated."); - console.warn(yellow("[Events] Warning:"), "Set EVENT_TRANSMISSION to 'rabbitmq-legacy' in environment variables to silence this warning."); - } - const rabbitMQChannel = await RabbitMQ.getSafeChannel(); - const channel = opts?.channel || rabbitMQChannel; - if (!channel) throw new Error("[Events] An event was sent without an associated channel"); - return await rabbitListen(channel, event, callback, { - acknowledge: opts?.acknowledge, - }); - } else if (process.env.EVENT_TRANSMISSION === "rabbitmq-single") { + if (process.env.EVENT_TRANSMISSION === "rabbitmq-single") { if (!Config.get().rabbitmq.host) { - console.error("[Events] RabbitMQ is not configured."); + throw new Error("[Events] EVENT_SOCKET_PATH is not configured."); } if (!listener) { listener = new RabbitMqSingleListener(Config.get().rabbitmq.host!); await listener.init(); } return await listener.listen(event, callback); - } else if (process.env.EVENT_TRANSMISSION === "unix" && process.env.EVENT_SOCKET_PATH) { - if (!unixSocketListener) { - listener = unixSocketListener = new UnixSocketListener(path.join(process.env.EVENT_SOCKET_PATH, `${process.pid}.sock`)); - await unixSocketListener.init(); + } else if (process.env.EVENT_TRANSMISSION === "unix") { + if (!process.env.EVENT_SOCKET_PATH) { + throw new Error("[Events] EVENT_SOCKET_PATH is not configured."); } - return await unixSocketListener.listen(event, callback); + + if (!listener) { + listener = listener = new UnixSocketListener(path.join(process.env.EVENT_SOCKET_PATH, `${process.pid}.sock`)); + await listener.init(); + } + return await listener.listen(event, callback); } else if (process.env.EVENT_TRANSMISSION === "process") { const cancel = async () => { process.removeListener("message", listener); @@ -146,6 +145,17 @@ export async function listenEvent(event: string, callback: (event: EventOpts) => process.setMaxListeners(process.getMaxListeners() + 1); return cancel; + } else if (RabbitMQ.connection) { + if (process.env.EVENT_TRANSMISSION !== "rabbitmq-legacy") { + console.warn(yellow("[Events] Warning:"), "RabbitMQ replication without configuring EVENT_TRANSMISSION is deprecated."); + console.warn(yellow("[Events] Warning:"), "Set EVENT_TRANSMISSION to 'rabbitmq-legacy' in environment variables to silence this warning."); + } + const rabbitMQChannel = await RabbitMQ.getSafeChannel(); + const channel = opts?.channel || rabbitMQChannel; + if (!channel) throw new Error("[Events] An event was sent without an associated channel"); + return await rabbitListen(channel, event, callback, { + acknowledge: opts?.acknowledge, + }); } else { const listener = (opts: EventOpts) => callback({ ...opts, cancel }); const cancel = async () => { From 3bb13bdf76a3fde4dfe95b8dd1098ee9f0aa9657 Mon Sep 17 00:00:00 2001 From: Rory& Date: Sat, 16 May 2026 21:25:24 +0200 Subject: [PATCH 07/10] Nix: expose EVENT_TRANSMISSION as an option --- nix/modules/default/default.nix | 20 +++++++++++++++++--- 1 file changed, 17 insertions(+), 3 deletions(-) diff --git a/nix/modules/default/default.nix b/nix/modules/default/default.nix index 5602c25895..876ad941ae 100644 --- a/nix/modules/default/default.nix +++ b/nix/modules/default/default.nix @@ -55,6 +55,20 @@ in description = "Path to store CDN files."; }; + ipcMethod = lib.mkOption { + type = lib.types.enum [ + "unix" + "rabbitmq-single" + "rabbitmq-legacy" + ]; + default = "unix"; + description = '' + How messages should be passed between services. + Note that the C# services (eg. Admin API) currently only supports the "unix" method! + Read more at https://docs.spacebar.chat/setup/server/installation/generic/ipc/. + ''; + }; + extraEnvironment = lib.mkOption { default = { }; description = '' @@ -98,7 +112,6 @@ in environment = builtins.mapAttrs (_: val: builtins.toString val) ( { # things we set by default... - EVENT_TRANSMISSION = "unix"; EVENT_SOCKET_PATH = "/run/spacebar/"; } // cfg.extraEnvironment @@ -108,6 +121,7 @@ in CONFIG_READONLY = 1; PORT = toString cfg.apiEndpoint.localPort; STORAGE_LOCATION = cfg.cdnPath; + EVENT_TRANSMISSION = cfg.ipcMethod; } ); serviceConfig = { @@ -121,7 +135,6 @@ in environment = builtins.mapAttrs (_: val: builtins.toString val) ( { # things we set by default... - EVENT_TRANSMISSION = "unix"; EVENT_SOCKET_PATH = "/run/spacebar/"; } // cfg.extraEnvironment @@ -131,6 +144,7 @@ in CONFIG_READONLY = 1; PORT = toString cfg.gatewayEndpoint.localPort; STORAGE_LOCATION = cfg.cdnPath; + EVENT_TRANSMISSION = cfg.ipcMethod; APPLY_DB_MIGRATIONS = "false"; } ); @@ -144,7 +158,6 @@ in environment = builtins.mapAttrs (_: val: builtins.toString val) ( { # things we set by default... - EVENT_TRANSMISSION = "unix"; EVENT_SOCKET_PATH = "/run/spacebar/"; } // cfg.extraEnvironment @@ -154,6 +167,7 @@ in CONFIG_READONLY = 1; PORT = toString cfg.cdnEndpoint.localPort; STORAGE_LOCATION = cfg.cdnPath; + EVENT_TRANSMISSION = cfg.ipcMethod; APPLY_DB_MIGRATIONS = "false"; } ); From 165885226628a300f5799f2722d39210a44d9757 Mon Sep 17 00:00:00 2001 From: Rory& Date: Sun, 17 May 2026 02:14:27 +0200 Subject: [PATCH 08/10] Add NixOS test for starting with rabbitmq-single ipc --- default.nix | 17 ++++++++-- nix/tests/test-bundle-starts.nix | 17 ++++++++-- src/util/util/ipc/Event.ts | 4 +-- .../ipc/listener/RabbitMqSingleListener.ts | 32 ++++++++++++------- .../util/ipc/writer/RabbitMqSingleWriter.ts | 4 +-- 5 files changed, 54 insertions(+), 20 deletions(-) diff --git a/default.nix b/default.nix index 6af2bae2a1..d47a30d63b 100644 --- a/default.nix +++ b/default.nix @@ -82,8 +82,6 @@ let # set +x runHook postInstall ''; - - passthru.tests = pkgs.testers.runNixOSTest (import ./nix/tests/test-bundle-starts.nix self); }; in pkgs.stdenv.mkDerivation { @@ -122,5 +120,18 @@ pkgs.stdenv.mkDerivation { makeWrapper ${pkgs.nodejs_24}/bin/node $out/bin/apply-migrations --prefix NODE_PATH : $out/node_modules --add-flags --enable-source-maps --add-flags $out/dist/apply-migrations.js ''; - passthru.tests = pkgs.testers.runNixOSTest (import ./nix/tests/test-bundle-starts.nix self); + passthru.tests = pkgs.runCommand "spacebar-server-ts-all-tests" rec { + bundleStarts = pkgs.testers.runNixOSTest (import ./nix/tests/test-bundle-starts.nix { inherit self; }); + bundleStartsRabbitMq = pkgs.testers.runNixOSTest ( + import ./nix/tests/test-bundle-starts.nix { + inherit self; + withIpc = "rabbitmq-single"; + } + ); + + nativeBuildInputs = [ + bundleStarts + bundleStartsRabbitMq + ]; + } "touch $out"; } diff --git a/nix/tests/test-bundle-starts.nix b/nix/tests/test-bundle-starts.nix index 86b998cf76..2e088f6de9 100644 --- a/nix/tests/test-bundle-starts.nix +++ b/nix/tests/test-bundle-starts.nix @@ -1,4 +1,7 @@ -self: +{ + self, + withIpc ? "unix", +}: { config, lib, @@ -8,11 +11,13 @@ self: let sb = import ../lib/mkEndpoint.nix; + isRabbitMqTest = lib.strings.hasPrefix "rabbitmq" withIpc; in { - name = "test-bundle-starts"; + name = "test-bundle-starts" + lib.optionalString (withIpc != "unix") ("_ipc=" + withIpc); skipTypeCheck = true; skipLint = true; + globalTimeout = 120; nodes.machine = { imports = [ self.nixosModules.default ]; @@ -30,12 +35,20 @@ in LOG_REQUESTS = "-"; # Log all requests LOG_VALIDATION_ERRORS = true; }; + ipcMethod = withIpc; + + settings = { + rabbitmq = { + host = lib.mkIf isRabbitMqTest "amqp://guest:guest@127.0.0.1:5672"; + }; + }; nginx.enable = true; }; in lib.trace ("Testing with config: " + builtins.toJSON cfg) cfg; services.nginx.enable = true; + services.rabbitmq.enable = isRabbitMqTest; services.postgresql = { enable = true; initdbArgs = [ diff --git a/src/util/util/ipc/Event.ts b/src/util/util/ipc/Event.ts index 9e523ce6c7..e836c269ea 100644 --- a/src/util/util/ipc/Event.ts +++ b/src/util/util/ipc/Event.ts @@ -52,7 +52,7 @@ export async function emitEvent(payload: Omit) { export async function initEvent() { if (process.env.EVENT_TRANSMISSION === "rabbitmq-single") { if (!Config.get().rabbitmq.host!) { - throw new Error("[Events] RabbitMQ is not configured."); + throw new Error("[Events] rabbitmq.host is not configured."); } if (!writer) { @@ -112,7 +112,7 @@ export interface ProcessEvent { export async function listenEvent(event: string, callback: (event: EventOpts) => unknown, opts?: ListenEventOpts): Promise<() => Promise> { if (process.env.EVENT_TRANSMISSION === "rabbitmq-single") { if (!Config.get().rabbitmq.host) { - throw new Error("[Events] EVENT_SOCKET_PATH is not configured."); + throw new Error("[Events] rabbitmq.host is not configured."); } if (!listener) { listener = new RabbitMqSingleListener(Config.get().rabbitmq.host!); diff --git a/src/util/util/ipc/listener/RabbitMqSingleListener.ts b/src/util/util/ipc/listener/RabbitMqSingleListener.ts index b323db4bee..127d79cc21 100644 --- a/src/util/util/ipc/listener/RabbitMqSingleListener.ts +++ b/src/util/util/ipc/listener/RabbitMqSingleListener.ts @@ -18,7 +18,7 @@ import EventEmitter from "node:events"; import { BaseEventListener } from "./BaseEventListener"; -import { EVENT, Event, EventOpts } from "@spacebar/util"; +import { EVENT, Event, EventOpts, sleep } from "@spacebar/util"; import amqp, { Channel, ChannelModel } from "amqplib"; import { randomUUID } from "node:crypto"; @@ -35,11 +35,19 @@ export class RabbitMqSingleListener extends BaseEventListener { } async init() { - console.log(`[RabbitMQSingleListener] Connecting to: ${this.host}`); - this.connection = await amqp.connect(this.host, { - timeout: 1000 * 60, - noDelay: true, - }); + while (!this.connection) { + try { + console.log(`[RabbitMQSingleListener] Connecting to: ${this.host}`); + this.connection = await amqp.connect(this.host, { + timeout: 1000 * 60, + noDelay: true, + }); + console.log(`[RabbitMQSingleListener] Connected to: ${this.host}`); + } catch (e) { + console.log(`[RabbitMQSingleListener] Failed to connect to to: ${this.host}: ${e}`); + await sleep(1000); + } + } this.channel = await this.connection.createChannel(); for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) { @@ -52,19 +60,21 @@ export class RabbitMqSingleListener extends BaseEventListener { this.connection.on("close", () => { console.error("[RabbitMQSingleListener] Connection closed"); - this.init().catch((e) => console.error("[RabbitMQSingleListener] Failed to schedule reconnection:", e)); + sleep(1000).then(() => { + this.init().catch((e) => console.error("[RabbitMQSingleListener] Failed to schedule reconnection:", e)); + }); }); // actually set up event receiving? - await this.channel.assertExchange("", "fanout", { durable: false }); - const q = await this.channel.assertQueue("", { - exclusive: true, + await this.channel.assertExchange("-", "fanout", { durable: false }); + const q = await this.channel.assertQueue("-", { + exclusive: false, autoDelete: true, messageTtl: 5000, }); const consumerTag = randomUUID(); - await this.channel.bindQueue(q.queue, "", ""); + await this.channel.bindQueue(q.queue, "-", ""); await this.channel.consume( q.queue, (opts) => { diff --git a/src/util/util/ipc/writer/RabbitMqSingleWriter.ts b/src/util/util/ipc/writer/RabbitMqSingleWriter.ts index 5bad330762..9be6299b5e 100644 --- a/src/util/util/ipc/writer/RabbitMqSingleWriter.ts +++ b/src/util/util/ipc/writer/RabbitMqSingleWriter.ts @@ -46,14 +46,14 @@ export class RabbitMqSingleWriter extends BaseEventWriter { async emit(event: Event): Promise { // todo check if channel is closed if ((this.channel as unknown as { closed?: boolean }).closed) this.channel = await this.connection.createChannel(); - await this.channel.assertExchange("", "fanout", { + await this.channel.assertExchange("-", "fanout", { durable: false, // ensure that messages arent written to disk }); let success = false; try { success = this.channel.publish( - "", + "-", "", Buffer.from(JSON.stringify({ id: (event.guild_id || event.channel_id || event.user_id || event.session_id) as string, event })), {}, From b053758d1cd7d450562a1358228e605909f7a39a Mon Sep 17 00:00:00 2001 From: Rory& Date: Sun, 17 May 2026 02:25:14 +0200 Subject: [PATCH 09/10] Add startup test for legacy rabbitmq setup --- default.nix | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) diff --git a/default.nix b/default.nix index d47a30d63b..4b9dd756c8 100644 --- a/default.nix +++ b/default.nix @@ -122,16 +122,23 @@ pkgs.stdenv.mkDerivation { passthru.tests = pkgs.runCommand "spacebar-server-ts-all-tests" rec { bundleStarts = pkgs.testers.runNixOSTest (import ./nix/tests/test-bundle-starts.nix { inherit self; }); - bundleStartsRabbitMq = pkgs.testers.runNixOSTest ( + bundleStartsRabbitMqSingle = pkgs.testers.runNixOSTest ( import ./nix/tests/test-bundle-starts.nix { inherit self; withIpc = "rabbitmq-single"; } ); + bundleStartsRabbitMqLegacy = pkgs.testers.runNixOSTest ( + import ./nix/tests/test-bundle-starts.nix { + inherit self; + withIpc = "rabbitmq-legacy"; + } + ); nativeBuildInputs = [ bundleStarts - bundleStartsRabbitMq + bundleStartsRabbitMqSingle + bundleStartsRabbitMqLegacy ]; } "touch $out"; } From fc60a9be8ffabbb87a2e64150adbd49ed0688cc2 Mon Sep 17 00:00:00 2001 From: Rory& Date: Sun, 17 May 2026 02:46:48 +0200 Subject: [PATCH 10/10] Better error handling in rabbitmqsinglelistener/writer --- .../ipc/listener/RabbitMqSingleListener.ts | 10 ++-- .../util/ipc/writer/RabbitMqSingleWriter.ts | 53 +++++++++++++++---- 2 files changed, 49 insertions(+), 14 deletions(-) diff --git a/src/util/util/ipc/listener/RabbitMqSingleListener.ts b/src/util/util/ipc/listener/RabbitMqSingleListener.ts index 127d79cc21..b5fd5efd50 100644 --- a/src/util/util/ipc/listener/RabbitMqSingleListener.ts +++ b/src/util/util/ipc/listener/RabbitMqSingleListener.ts @@ -24,8 +24,8 @@ import { randomUUID } from "node:crypto"; export class RabbitMqSingleListener extends BaseEventListener { private readonly host: string; - private connection: ChannelModel; - private channel: Channel; + private connection?: ChannelModel; + private channel?: Channel; eventEmitter: EventEmitter; constructor(host: string) { @@ -90,8 +90,10 @@ export class RabbitMqSingleListener extends BaseEventListener { } async close(): Promise { - await this.channel.close(); - await this.connection.close(); + await this.channel?.close(); + this.channel = undefined; + await this.connection?.close(); + this.connection = undefined; } async listen(event: string, callback: (event: EventOpts) => unknown): Promise<() => Promise> { diff --git a/src/util/util/ipc/writer/RabbitMqSingleWriter.ts b/src/util/util/ipc/writer/RabbitMqSingleWriter.ts index 9be6299b5e..28781c6dbd 100644 --- a/src/util/util/ipc/writer/RabbitMqSingleWriter.ts +++ b/src/util/util/ipc/writer/RabbitMqSingleWriter.ts @@ -18,12 +18,12 @@ import { BaseEventWriter } from "./BaseEventWriter"; import amqp, { Channel, ChannelModel } from "amqplib"; -import { Event } from "@spacebar/util"; +import { Event, sleep } from "@spacebar/util"; export class RabbitMqSingleWriter extends BaseEventWriter { private readonly host: string; - private connection: ChannelModel; - private channel: Channel; + private connection?: ChannelModel; + private channel?: Channel; constructor(host: string) { super(); @@ -31,19 +31,52 @@ export class RabbitMqSingleWriter extends BaseEventWriter { } async init(): Promise { - console.log(`[RabbitMQ] Connecting to: ${this.host}`); - this.connection = await amqp.connect(this.host, { - timeout: 1000 * 60, - noDelay: true, - }); + while (!this.connection) { + try { + console.log(`[RabbitMQSingleWriter] Connecting to: ${this.host}`); + this.connection = await amqp.connect(this.host, { + timeout: 1000 * 60, + noDelay: true, + }); + console.log(`[RabbitMQSingleWriter] Connected to: ${this.host}`); + } catch (e) { + console.log(`[RabbitMQSingleWriter] Failed to connect to to: ${this.host}: ${e}`); + await sleep(1000); + } + } this.channel = await this.connection.createChannel(); + + for (const sig of ["SIGINT", "SIGTERM", "SIGQUIT"] as const) { + process.on(sig, this.close); + } + + this.connection.on("error", (err) => { + console.error("[RabbitMQSingleWriter] Connection error:", err); + }); + + this.connection.on("close", () => { + console.error("[RabbitMQSingleWriter] Connection closed"); + sleep(1000).then(() => { + this.init().catch((e) => console.error("[RabbitMQSingleWriter] Failed to schedule reconnection:", e)); + }); + }); } + async close(): Promise { - await this.channel.close(); - await this.connection.close(); + await this.channel?.close(); + this.channel = undefined; + await this.connection?.close(); + this.connection = undefined; } async emit(event: Event): Promise { + if (!this.connection) { + throw new Error("RabbitMqSingleWriter#emit called without connection being initialised!"); + } + if (!this.channel) { + throw new Error("RabbitMqSingleWriter#emit called without channel being initialised!"); + } + // todo check if channel is closed if ((this.channel as unknown as { closed?: boolean }).closed) this.channel = await this.connection.createChannel(); await this.channel.assertExchange("-", "fanout", {