diff --git a/apps/backend/utils/serverEvents.ts b/apps/backend/utils/serverEvents.ts index db80e023..b81ecc0d 100644 --- a/apps/backend/utils/serverEvents.ts +++ b/apps/backend/utils/serverEvents.ts @@ -48,16 +48,34 @@ export type EventData = { timestamp?: Date; }; -class ServerEventsManager { +export class ServerEventsManager { constructor(...topicNames: string[]) { this.topics = new Set(topicNames); } + private static readonly INACTIVITY_TIMEOUT_MS = 5 * 60 * 1000; + private topics: Set = new Set(); // each map key is a topic e.g. primary_dj, show_dj, live_fs, private clients: Map = new Map(); private clientTopics: Map> = new Map(); // clientId -> topicIds private topicClients: Map> = new Map(); // topicId -> clientIds + private clientTimeouts: Map> = new Map(); + + private resetTimeout = (clientId: string) => { + const existing = this.clientTimeouts.get(clientId); + if (existing) { + clearTimeout(existing); + } + + const timeoutId = setTimeout(() => { + if (this.clients.has(clientId)) { + this.disconnect(clientId, 'Connection terminated due to inactivity'); + } + }, ServerEventsManager.INACTIVITY_TIMEOUT_MS); + + this.clientTimeouts.set(clientId, timeoutId); + }; registerClient = (res: Response): EventClient => { const client: EventClient = { @@ -65,18 +83,14 @@ class ServerEventsManager { res: res, }; - // Add timeout for stale connections - const timeout = setTimeout( - () => { - if (this.clients.has(client.id)) { - this.disconnect(client.id, 'Connection terminated due to inactivity'); - } - }, - 5 * 60 * 1000 // 5 minutes - ); + this.resetTimeout(client.id); client.res.on('close', () => { - clearTimeout(timeout); + const timeoutId = this.clientTimeouts.get(client.id); + if (timeoutId) { + clearTimeout(timeoutId); + this.clientTimeouts.delete(client.id); + } this.unsubAll(client.id); }); @@ -183,6 +197,12 @@ class ServerEventsManager { this.clientTopics.delete(clientId); this.clients.delete(clientId); + + const timeoutId = this.clientTimeouts.get(clientId); + if (timeoutId) { + clearTimeout(timeoutId); + this.clientTimeouts.delete(clientId); + } }; broadcast = (topicId: string, data: EventData) => { @@ -199,6 +219,7 @@ class ServerEventsManager { if (client) { try { client.res.write(message); + this.resetTimeout(clientId); } catch (error) { this.unsubAll(client.id); } @@ -222,6 +243,7 @@ class ServerEventsManager { if (client) { try { client.res.write(message); + this.resetTimeout(clientId); } catch (error) { this.unsubAll(client.id); } diff --git a/eslint.config.mjs b/eslint.config.mjs index 8ebbdb64..3686fdda 100644 --- a/eslint.config.mjs +++ b/eslint.config.mjs @@ -93,6 +93,8 @@ export default tseslint.config( '@typescript-eslint/no-unsafe-call': 'off', '@typescript-eslint/no-unsafe-member-access': 'off', '@typescript-eslint/no-unsafe-return': 'off', + '@typescript-eslint/no-unused-vars': ['warn', { argsIgnorePattern: '^_', varsIgnorePattern: '^_' }], + '@typescript-eslint/unbound-method': 'off', }, } ); diff --git a/tests/unit/utils/serverEvents.test.ts b/tests/unit/utils/serverEvents.test.ts new file mode 100644 index 00000000..b7c052dc --- /dev/null +++ b/tests/unit/utils/serverEvents.test.ts @@ -0,0 +1,89 @@ +import { EventEmitter } from 'events'; +import { ServerEventsManager } from '../../../apps/backend/utils/serverEvents'; +import { Response } from 'express'; + +function createMockResponse(): Response { + const emitter = new EventEmitter(); + const res = { + writeHead: jest.fn(), + write: jest.fn().mockReturnValue(true), + end: jest.fn(), + on: emitter.on.bind(emitter), + emit: emitter.emit.bind(emitter), + } as unknown as Response; + return res; +} + +describe('ServerEventsManager', () => { + beforeEach(() => { + jest.useFakeTimers(); + }); + + afterEach(() => { + jest.useRealTimers(); + }); + + describe('inactivity timeout', () => { + it('disconnects an idle client after 5 minutes', () => { + const mgr = new ServerEventsManager('topic-a'); + const res = createMockResponse(); + const _client = mgr.registerClient(res); + + jest.advanceTimersByTime(5 * 60 * 1000); + + expect(res.end).toHaveBeenCalled(); + }); + + it('resets the timeout when a broadcast is sent to the client', () => { + const mgr = new ServerEventsManager('topic-a'); + const res = createMockResponse(); + const client = mgr.registerClient(res); + mgr.subscribe(['topic-a'], client.id); + + // Advance 4.5 minutes (not yet at 5-min threshold) + jest.advanceTimersByTime(4.5 * 60 * 1000); + + // Broadcast — this should reset the inactivity timer + mgr.broadcast('topic-a', { type: 'update', payload: { value: 1 } }); + + // Advance another 4.5 minutes (9 min total from registration, + // but only 4.5 min since last activity) + jest.advanceTimersByTime(4.5 * 60 * 1000); + + // Client should still be connected because the timer was reset + expect(res.end).not.toHaveBeenCalled(); + expect(mgr.getSubs(client.id)).toEqual(['topic-a']); + }); + + it('resets the timeout when a dispatch is sent to the client', () => { + const mgr = new ServerEventsManager('topic-a'); + const res = createMockResponse(); + const client = mgr.registerClient(res); + mgr.subscribe(['topic-a'], client.id); + + jest.advanceTimersByTime(4.5 * 60 * 1000); + + mgr.dispatch('topic-a', client.id, { type: 'ping', payload: {} }); + + jest.advanceTimersByTime(4.5 * 60 * 1000); + + expect(res.end).not.toHaveBeenCalled(); + expect(mgr.getSubs(client.id)).toEqual(['topic-a']); + }); + + it('still disconnects if no activity occurs after the reset window', () => { + const mgr = new ServerEventsManager('topic-a'); + const res = createMockResponse(); + const client = mgr.registerClient(res); + mgr.subscribe(['topic-a'], client.id); + + jest.advanceTimersByTime(2 * 60 * 1000); + mgr.broadcast('topic-a', { type: 'update', payload: {} }); + + // Full 5 minutes after the broadcast with no further activity + jest.advanceTimersByTime(5 * 60 * 1000); + + expect(res.end).toHaveBeenCalled(); + }); + }); +});