Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 33 additions & 11 deletions apps/backend/utils/serverEvents.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,35 +48,49 @@ export type EventData<T = unknown> = {
timestamp?: Date;
};

class ServerEventsManager {
export class ServerEventsManager {
constructor(...topicNames: string[]) {
this.topics = new Set(topicNames);
}

private static readonly INACTIVITY_TIMEOUT_MS = 5 * 60 * 1000;

private topics: Set<string> = new Set();
// each map key is a topic e.g. primary_dj, show_dj, live_fs,
private clients: Map<string, EventClient> = new Map();
private clientTopics: Map<string, Set<string>> = new Map(); // clientId -> topicIds
private topicClients: Map<string, Set<string>> = new Map(); // topicId -> clientIds
private clientTimeouts: Map<string, ReturnType<typeof setTimeout>> = new Map();

private resetTimeout = (clientId: string) => {
const existing = this.clientTimeouts.get(clientId);
if (existing) {
clearTimeout(existing);
}

const timeoutId = setTimeout(() => {
if (this.clients.has(clientId)) {
this.disconnect(clientId, 'Connection terminated due to inactivity');
}
}, ServerEventsManager.INACTIVITY_TIMEOUT_MS);

this.clientTimeouts.set(clientId, timeoutId);
};

registerClient = (res: Response): EventClient => {
const client: EventClient = {
id: crypto.randomUUID(),
res: res,
};

// Add timeout for stale connections
const timeout = setTimeout(
() => {
if (this.clients.has(client.id)) {
this.disconnect(client.id, 'Connection terminated due to inactivity');
}
},
5 * 60 * 1000 // 5 minutes
);
this.resetTimeout(client.id);

client.res.on('close', () => {
clearTimeout(timeout);
const timeoutId = this.clientTimeouts.get(client.id);
if (timeoutId) {
clearTimeout(timeoutId);
this.clientTimeouts.delete(client.id);
}
this.unsubAll(client.id);
});

Expand Down Expand Up @@ -183,6 +197,12 @@ class ServerEventsManager {

this.clientTopics.delete(clientId);
this.clients.delete(clientId);

const timeoutId = this.clientTimeouts.get(clientId);
if (timeoutId) {
clearTimeout(timeoutId);
this.clientTimeouts.delete(clientId);
}
};

broadcast = (topicId: string, data: EventData) => {
Expand All @@ -199,6 +219,7 @@ class ServerEventsManager {
if (client) {
try {
client.res.write(message);
this.resetTimeout(clientId);
} catch (error) {
this.unsubAll(client.id);
}
Expand All @@ -222,6 +243,7 @@ class ServerEventsManager {
if (client) {
try {
client.res.write(message);
this.resetTimeout(clientId);
} catch (error) {
this.unsubAll(client.id);
}
Expand Down
2 changes: 2 additions & 0 deletions eslint.config.mjs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,8 @@ export default tseslint.config(
'@typescript-eslint/no-unsafe-call': 'off',
'@typescript-eslint/no-unsafe-member-access': 'off',
'@typescript-eslint/no-unsafe-return': 'off',
'@typescript-eslint/no-unused-vars': ['warn', { argsIgnorePattern: '^_', varsIgnorePattern: '^_' }],
'@typescript-eslint/unbound-method': 'off',
},
}
);
89 changes: 89 additions & 0 deletions tests/unit/utils/serverEvents.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
import { EventEmitter } from 'events';
import { ServerEventsManager } from '../../../apps/backend/utils/serverEvents';
import { Response } from 'express';

function createMockResponse(): Response {
const emitter = new EventEmitter();
const res = {
writeHead: jest.fn(),
write: jest.fn().mockReturnValue(true),
end: jest.fn(),
on: emitter.on.bind(emitter),
emit: emitter.emit.bind(emitter),
} as unknown as Response;
return res;
}

describe('ServerEventsManager', () => {
beforeEach(() => {
jest.useFakeTimers();
});

afterEach(() => {
jest.useRealTimers();
});

describe('inactivity timeout', () => {
it('disconnects an idle client after 5 minutes', () => {
const mgr = new ServerEventsManager('topic-a');
const res = createMockResponse();
const _client = mgr.registerClient(res);

jest.advanceTimersByTime(5 * 60 * 1000);

expect(res.end).toHaveBeenCalled();
});

it('resets the timeout when a broadcast is sent to the client', () => {
const mgr = new ServerEventsManager('topic-a');
const res = createMockResponse();
const client = mgr.registerClient(res);
mgr.subscribe(['topic-a'], client.id);

// Advance 4.5 minutes (not yet at 5-min threshold)
jest.advanceTimersByTime(4.5 * 60 * 1000);

// Broadcast — this should reset the inactivity timer
mgr.broadcast('topic-a', { type: 'update', payload: { value: 1 } });

// Advance another 4.5 minutes (9 min total from registration,
// but only 4.5 min since last activity)
jest.advanceTimersByTime(4.5 * 60 * 1000);

// Client should still be connected because the timer was reset
expect(res.end).not.toHaveBeenCalled();
expect(mgr.getSubs(client.id)).toEqual(['topic-a']);
});

it('resets the timeout when a dispatch is sent to the client', () => {
const mgr = new ServerEventsManager('topic-a');
const res = createMockResponse();
const client = mgr.registerClient(res);
mgr.subscribe(['topic-a'], client.id);

jest.advanceTimersByTime(4.5 * 60 * 1000);

mgr.dispatch('topic-a', client.id, { type: 'ping', payload: {} });

jest.advanceTimersByTime(4.5 * 60 * 1000);

expect(res.end).not.toHaveBeenCalled();
expect(mgr.getSubs(client.id)).toEqual(['topic-a']);
});

it('still disconnects if no activity occurs after the reset window', () => {
const mgr = new ServerEventsManager('topic-a');
const res = createMockResponse();
const client = mgr.registerClient(res);
mgr.subscribe(['topic-a'], client.id);

jest.advanceTimersByTime(2 * 60 * 1000);
mgr.broadcast('topic-a', { type: 'update', payload: {} });

// Full 5 minutes after the broadcast with no further activity
jest.advanceTimersByTime(5 * 60 * 1000);

expect(res.end).toHaveBeenCalled();
});
});
});
Loading