diff --git a/README.md b/README.md index 5d19099..4bdb1b1 100644 --- a/README.md +++ b/README.md @@ -143,3 +143,169 @@ Placeholder test files are in `src/services/CallService.test.ts` and `src/routes - Dockerfiles per service - stronger contract sharing between packages - better observability for local debugging + +## Implementation Notes (Luis Noriega) + +# VoyceLink Call Center Dashboard + +This project implements a real-time call monitoring system using a microservices-based architecture. It processes call events, persists them in a relational database, and propagates updates through a real-time streaming layer to a web-based dashboard. + +The system is composed of three main services: + +- A backend service responsible for business logic and persistence +- A real-time service responsible for event distribution via WebSockets +- A frontend application responsible for data visualization and user interaction + +## Architecture + +The system follows an event-driven architecture in which the backend service publishes domain events to a message broker, and a separate real-time service consumes those events and distributes them to connected clients. + +Data flow: + +External clients send call events to the backend API +- The backend processes and persists the event +- The event is published to Redis using a Pub/Sub channel +- The real-time service subscribes to that channel +- Incoming events are broadcast to frontend clients via WebSockets +- The frontend reacts to these updates and refreshes its state +- Backend Service (call-service) + +## Responsibilities + +The backend service handles: + +Validation and processing of incoming call events +Enforcement of business rules and state transitions +Persistence of calls and events in PostgreSQL +Exposure of REST endpoints for querying data +Publication of events to Redis for real-time propagation +Event Processing + +The system supports the following event types: + +- call_initiated +- call_answered +- call_hold +- call_ended + +Each event triggers state transitions and may generate derived metadata based on defined business rules. + +## Business Rules + +The following conditions are enforced during event processing: + +A call must exist before transitioning to subsequent states +Valid state transitions are required (e.g., waiting → active) +Additional flags are generated when thresholds are exceeded: +WAIT_TIME_EXCEEDED when waitTime is greater than 30 seconds +HOLD_TIME_EXCEEDED when holdDuration is greater than 60 seconds +SHORT_CALL when call duration is less than 10 seconds +Persistence Layer + +Two primary tables are used: + +- calls: stores the current state and metadata of each call +- call_events: stores the historical sequence of events per call +All events are stored with associated metadata, allowing reconstruction of call history. + +## API Endpoints + +The backend exposes the following endpoints: + +- POST /api/events +Accepts and processes incoming call events +- GET /api/calls +Retrieves calls with optional filtering by status and queue +- GET /api/calls//events +Retrieves the event history for a specific call + +## Redis Integration + +A Redis Pub/Sub mechanism is used to propagate events to the real-time service. +After persisting an event, the backend publishes it to a Redis channel: + +await redis.publish('call_events', JSON.stringify(event)); +This decouples the backend from the real-time layer and allows for horizontal scalability. + +## Real-time Service + +The real-time service is responsible for: + +- Subscribing to Redis channels +- Receiving and parsing incoming events +- Broadcasting updates to connected clients via WebSockets + +## Redis Subscription + +The service subscribes to the call_events channel and invokes a handler for each message received. The message payload is parsed and transformed into a format suitable for client consumption. + +## WebSocket Layer + +Socket.io is used to maintain persistent connections with frontend clients. +Clients can subscribe to specific call identifiers, allowing the system to emit updates only to relevant consumers. + +socket.join(callId); +Event Broadcasting + +# Frontend Application + +The frontend application is responsible for: + +- Fetching and displaying call data +- Allowing filtering and selection of calls +- Displaying event history for individual calls +- Reacting to real-time updates via WebSockets + +## Data Integration + +All mock data has been replaced with real API calls. +The application interacts with the backend using HTTP requests and maintains synchronization through WebSocket events. +Call List Management + +## Event History Management + +The event history for a selected call is handled by a dedicated hook that: + + Fetches the full event history on selection + Subscribes to real-time updates for that specific call + Appends new events as they are received + WebSocket Client + +A singleton Socket.io client is implemented to manage the connection lifecycle and subscriptions. Clients subscribe and unsubscribe from call-specific channels dynamically. + +## Testing + +Unit and integration tests were implemented for the backend using Vitest. + +Test coverage includes: + Event processing and persistence + State transitions and validation rules + Flag generation logic + API response validation + Error handling scenarios + Design Considerations + +The system was designed with the following principles: + +- Separation of concerns between services +- Event-driven communication using Redis +- Backend as the single source of truth +- Scalable real-time communication via WebSockets +- Minimal coupling between components +- Trade-offs + +Some trade-offs were made to balance simplicity and correctness: + +The frontend performs a full refetch on real-time updates rather than partial state mutation +Authentication is limited to API key validation +State management in the frontend is kept simple without a global store +Running the System + Start infrastructure services (PostgreSQL and Redis) using Docker + Start the backend service + Start the real-time service + Start the frontend application + +Events can be sent to the backend via HTTP requests, and the system will process and propagate updates in real time. + +## Path video +https://drive.google.com/file/d/1usn9aCGOl-cZBBaeA1mr-SUPlYW9G_Ne/view?usp=drive_link \ No newline at end of file diff --git a/package-lock.json b/package-lock.json index b4d30f3..65f8b75 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10158,7 +10158,7 @@ "cors": "^2.8.5", "dotenv": "^16.4.5", "express": "^4.19.2", - "ioredis": "^5.3.2", + "ioredis": "^5.10.1", "pg": "^8.11.5", "uuid": "^9.0.1", "zod": "^3.23.8" diff --git a/packages/call-service/package.json b/packages/call-service/package.json index 57ccc9f..59cb0af 100644 --- a/packages/call-service/package.json +++ b/packages/call-service/package.json @@ -16,7 +16,7 @@ "cors": "^2.8.5", "dotenv": "^16.4.5", "express": "^4.19.2", - "ioredis": "^5.3.2", + "ioredis": "^5.10.1", "pg": "^8.11.5", "uuid": "^9.0.1", "zod": "^3.23.8" diff --git a/packages/call-service/src/app.ts b/packages/call-service/src/app.ts new file mode 100644 index 0000000..116d12c --- /dev/null +++ b/packages/call-service/src/app.ts @@ -0,0 +1,13 @@ +import express from 'express'; +import cors from 'cors'; +import eventsRouter from './routes/events'; +import callsRouter from './routes/calls'; + +const app = express(); + +app.use(cors()); +app.use(express.json()); +app.use('/api/events', eventsRouter); +app.use('/api/calls', callsRouter); +app.get('/health', (_req, res) => res.json({ status: 'ok' })); +export default app; \ No newline at end of file diff --git a/packages/call-service/src/bus/publisher.ts b/packages/call-service/src/bus/publisher.ts index 7068339..e0487b6 100644 --- a/packages/call-service/src/bus/publisher.ts +++ b/packages/call-service/src/bus/publisher.ts @@ -1,13 +1,11 @@ -import Redis from 'ioredis'; -import { config } from '../config'; import type { CallStatusUpdate } from '../domain/call'; -const redis = new Redis(config.redisUrl); - export const CHANNEL = 'call-status-updates'; export async function publishStatusUpdate( update: CallStatusUpdate, ): Promise { - await redis.publish(CHANNEL, JSON.stringify(update)); + // Disabled to avoid duplicate Redis publishing. + // CallService now handles event publishing directly. + return; } diff --git a/packages/call-service/src/lib/redis.ts b/packages/call-service/src/lib/redis.ts new file mode 100644 index 0000000..ad38fa9 --- /dev/null +++ b/packages/call-service/src/lib/redis.ts @@ -0,0 +1,12 @@ +import Redis from 'ioredis'; +import { config } from '../config'; + +export const redis = new Redis(config.redisUrl); + +redis.on('connect', () => { + console.log('Redis connected'); +}); + +redis.on('error', (err) => { + console.error('Redis error', err); +}); \ No newline at end of file diff --git a/packages/call-service/src/routes/events.test.ts b/packages/call-service/src/routes/events.test.ts index 79c8f7e..05950aa 100644 --- a/packages/call-service/src/routes/events.test.ts +++ b/packages/call-service/src/routes/events.test.ts @@ -1,7 +1,61 @@ -import { describe, it } from 'vitest'; +import { describe, it, expect, beforeEach } from 'vitest'; +import request from 'supertest'; +import app from '../app'; +import { db } from '../db/client'; describe('POST /api/events', () => { - it.todo('returns 201 and persists the event for a valid call_initiated payload'); - it.todo('returns 400 for an invalid payload'); - it.todo('returns 401 when the API key is missing'); + beforeEach(async () => { + await db.query('DELETE FROM call_events'); + await db.query('DELETE FROM calls'); + }); + const API_KEY = 'change-me'; + it('returns 201 and persists the event for a valid call_initiated payload', async () => { + const res = await request(app) + .post('/api/events') + .set('x-api-key', API_KEY) + .send({ + event: 'call_initiated', + callId: 'int-1', + type: 'voice', + queueId: 'medical_spanish', + }); + + expect(res.status).toBe(201); + expect(res.body.callId).toBe('int-1'); + + const result = await db.query( + 'SELECT * FROM calls WHERE id = $1', + ['int-1'] + ); + + expect(result.rowCount).toBe(1); + }); + + it('returns 400 for an invalid payload', async () => { + const res = await request(app) + .post('/api/events') + .set('x-api-key', 'change-me') + .send({ + event: 'call_initiated', + callId: 'int-2', + type: 'voice', + }); + + expect(res.status).toBe(400); + expect(res.body.message).toBe('Invalid event payload'); + expect(res.body.issues).toBeDefined(); + }); + + it('returns 401 when the API key is missing', async () => { + const res = await request(app) + .post('/api/events') + .send({ + event: 'call_initiated', + callId: 'int-3', + type: 'voice', + queueId: 'medical_spanish', + }); + expect(res.status).toBe(401); + expect(res.body.message).toBeDefined(); + }); }); diff --git a/packages/call-service/src/routes/events.ts b/packages/call-service/src/routes/events.ts index 2bb57af..d1dc2f6 100644 --- a/packages/call-service/src/routes/events.ts +++ b/packages/call-service/src/routes/events.ts @@ -12,15 +12,14 @@ router.post('/', apiKeyAuth, async (req: Request, res: Response) => { const payload: EventPayload = eventPayloadSchema.parse(req.body); const event = await callService.processEvent(payload); res.status(201).json(event); - } catch (error) { - if (error instanceof ZodError) { - res.status(400).json({ + } catch (error: any) { + if (error?.name === 'ZodError') { + return res.status(400).json({ message: 'Invalid event payload', issues: error.issues, }); - return; } - + console.error(error); res.status(500).json({ message: 'Internal server error' }); } }); diff --git a/packages/call-service/src/services/CallService.test.ts b/packages/call-service/src/services/CallService.test.ts index d4c66f4..9ef7336 100644 --- a/packages/call-service/src/services/CallService.test.ts +++ b/packages/call-service/src/services/CallService.test.ts @@ -1,9 +1,124 @@ -import { describe, it } from 'vitest'; +import { describe, it, expect, beforeEach} from 'vitest'; +import { CallService } from './CallService'; +import { db } from '../db/client'; describe('CallService', () => { - it.todo('processes call_initiated and persists the call'); - it.todo('processes call_answered and updates call status to active'); - it.todo('flags call_answered when waitTime exceeds 30 seconds'); - it.todo('flags call_hold when holdDuration exceeds 60 seconds'); - it.todo('flags call_ended when duration is under 10 seconds'); + const service = new CallService(); + + beforeEach(async () => { + await db.query('DELETE FROM call_events'); + await db.query('DELETE FROM calls'); + }); + + it('processes call_initiated and persists the call', async () => { + await service.processEvent({ + event: 'call_initiated', + callId: 'test-1', + type: 'voice', + queueId: 'medical_spanish', + }); + + const result = await db.query('SELECT * FROM calls WHERE id = $1',['test-1']); + + expect(result.rowCount).toBe(1); + expect(result.rows[0].status).toBe('waiting'); + }); + + it('processes call_answered and updates call status to active', async () => { + await service.processEvent({ + event: 'call_initiated', + callId: 'test-2', + type: 'voice', + queueId: 'medical_spanish', + }); + + await service.processEvent({ + event: 'call_answered', + callId: 'test-2', + waitTime: 10, + }); + const result = await db.query('SELECT status FROM calls WHERE id = $1',['test-2']); + + expect(result.rows[0].status).toBe('active'); + }); + + it('flags call_answered when waitTime exceeds 30 seconds', async () => { + await service.processEvent({ + event: 'call_initiated', + callId: 'test-3', + type: 'voice', + queueId: 'medical_spanish', + }); + + const event = await service.processEvent({ + event: 'call_answered', + callId: 'test-3', + waitTime: 40, + }); + + expect(event.metadata?.flag).toBe('WAIT_TIME_EXCEEDED'); + }); + + it('flags call_hold when holdDuration exceeds 60 seconds', async () => { + await service.processEvent({ + event: 'call_initiated', + callId: 'test-4', + type: 'voice', + queueId: 'medical_spanish', + }); + await service.processEvent({ + event: 'call_answered', + callId: 'test-4', + waitTime: 10, + }); + const event = await service.processEvent({ + event: 'call_hold', + callId: 'test-4', + holdDuration: 70, + }); + + expect(event.metadata?.flag).toBe('HOLD_TIME_EXCEEDED'); + }); + + it('flags call_ended when duration is under 10 seconds', async () => { + await service.processEvent({ + event: 'call_initiated', + callId: 'test-5', + type: 'voice', + queueId: 'medical_spanish', + }); + + await service.processEvent({ + event: 'call_answered', + callId: 'test-5', + waitTime: 5, + }); + + const event = await service.processEvent({ + event: 'call_ended', + callId: 'test-5', + duration: 5, + endReason: 'completed', + }); + + expect(event.metadata?.flag).toBe('SHORT_CALL'); + }); + + it('returns ordered events for a call', async () => { + await service.processEvent({ + event: 'call_initiated', + callId: 'test-events', + type: 'voice', + queueId: 'medical_spanish', + }); + await service.processEvent({ + event: 'call_answered', + callId: 'test-events', + waitTime: 5, + }); + const events = await service.getCallEvents('test-events'); + expect(events.length).toBe(2); + expect(events[0].type).toBe('call_initiated'); + expect(events[1].type).toBe('call_answered'); +}); }); diff --git a/packages/call-service/src/services/CallService.ts b/packages/call-service/src/services/CallService.ts index 3c9b082..146613c 100644 --- a/packages/call-service/src/services/CallService.ts +++ b/packages/call-service/src/services/CallService.ts @@ -5,17 +5,130 @@ import { CallServiceContract, EventPayload, } from '../domain/call'; +import { db } from '../db/client'; +import {v4 as uuidv4} from 'uuid'; +import { redis } from '../lib/redis'; +import { callHoldPayloadSchema } from '@voycelink/contracts'; + +function mapCallEvent(row: any) { + return { + id: row.id, + callId: row.call_id, + type: row.type, + timestamp: row.timestamp, + metadata: row.metadata, + }; +} export class CallService implements CallServiceContract { async processEvent(_payload: EventPayload): Promise { - throw new Error('CallService.processEvent not implemented'); + const { callId } = _payload; + let metadata: Record | undefined = { ..._payload }; + if (_payload.event === 'call_initiated') { + const { type, queueId } = _payload; + await db.query( + `INSERT INTO calls (id, type, status, queue_id) + VALUES ($1, $2, $3, $4) + ON CONFLICT (id) DO NOTHING`, + [callId, type, 'waiting', queueId] + ); + } + + if(_payload.event === 'call_answered'){ + const {waitTime} = _payload; + const callResult = await db.query( + 'SELECT * FROM calls WHERE ID = $1', [callId] + ); + + if(callResult.rowCount === 0) {throw new Error(`Call with ID ${callId} not found`);} + const call = callResult.rows[0]; + if(call.status !== 'waiting') {throw new Error(`Invalid state transition from ${call.status} to active`);} + await db.query(`UPDATE calls SET status = 'active' WHERE id = $1`, [callId]); + if(waitTime > 30) { + metadata = {..._payload, flag: 'WAIT_TIME_EXCEEDED'}; + } + } + + if(_payload.event === 'call_hold'){ + const { holdDuration } = _payload; + const callResult = await db.query('SELECT * FROM calls WHERE id = $1', [callId]); + if(callResult.rowCount === 0) throw new Error(`call ${callId} not found`); + const call = callResult.rows[0]; + if(call.status !== 'active') throw new Error(`Invalid state transmition from ${call.status}`); + await db.query("UPDATE calls SET status = 'on_hold' WHERE id = $1", [callId]); + if(holdDuration > 60){ + metadata ={..._payload, flag: 'HOLD_TIME_EXCEEDED'}; + } + } + + if(_payload.event === 'call_ended'){ + const {duration} = _payload; + + const callResult = await db.query( + `SELECT * FROM calls WHERE id = $1`, [callId] + ); + if(callResult.rowCount === 0) throw new Error(`Call with ID ${callId} not found`); + const call = callResult.rows[0]; + if(call.status !== 'active' && call.status !== 'on_hold') throw new Error(`Invalid state transition from ${call.status}`); + await db.query(`UPDATE calls set status = 'ended', end_time = NOW() WHERE id = $1`, [callId]); + if(duration < 10){ + metadata ={..._payload, flag:'SHORT_CALL'}; + } + } + + const result = await db.query( + `INSERT INTO call_events (id, call_id, type, metadata) + VALUES ($1, $2, $3, $4) RETURNING *`, + [ + uuidv4(), + callId, + _payload.event, + metadata + ] + ); + + const event = mapCallEvent(result.rows[0]); + + const enrichedEvent = { + ...event, + event: _payload.event + }; + + try { + await redis.publish('call_events', JSON.stringify(enrichedEvent)); + } catch (error) { + console.error('Redis publish failed:', error); + } + + return event; } + async getCalls(_filters: CallFilters): Promise { - throw new Error('CallService.getCalls not implemented'); + let query = 'SELECT * FROM calls'; + const conditions: string[] =[]; + const values: any[] = []; + if(_filters.status){ + values.push(_filters.status); + conditions.push(`status = ${values.length}`); + } + + if(_filters.queueId){ + values.push(_filters.queueId); + conditions.push(`queue_id = ${values.length}`); + } + + if(conditions.length > 0){ + query += ' WHERE ' + conditions.join(' AND '); + } + query += ' ORDER BY start_time DESC'; + const result = await db.query(query, values); + return result.rows; } - async getCallEvents(_callId: string): Promise { - throw new Error('CallService.getCallEvents not implemented'); + async getCallEvents(callId: string): Promise { + const result = await db.query( + `SELECT * FROM call_events WHERE call_id = $1 ORDER BY timestamp ASC`,[callId]); + return result.rows; } } diff --git a/packages/frontend/src/hooks/useCallEvents.ts b/packages/frontend/src/hooks/useCallEvents.ts index c428f4e..794dc82 100644 --- a/packages/frontend/src/hooks/useCallEvents.ts +++ b/packages/frontend/src/hooks/useCallEvents.ts @@ -2,8 +2,10 @@ import { useState, useEffect } from 'react'; import { CallEvent } from '../types'; -import { MOCK_EVENTS } from '../mocks/data'; +//import { MOCK_EVENTS } from '../mocks/data'; +import { getSocket, subscribeToCall, unsubscribeFromCall } from '../lib/socket'; +const API_URL = 'http://localhost:3001/api'; /** * Returns the event history for a specific call. * TODO: replace mock data with a real API call. @@ -18,14 +20,39 @@ export function useCallEvents(callId: string | null) { return; } + const socket = getSocket(); + socket.connect(); setLoading(true); + + const fetchEvents = async () => { + const res = await fetch(`${API_URL}/calls/${callId}/events`); + const data = await res.json(); + setEvents(data); + setLoading(false); + }; + + fetchEvents(); + + + subscribeToCall(callId); // TODO: replace with fetchCallEvents(callId) - const t = setTimeout(() => { - setEvents(MOCK_EVENTS[callId] ?? []); + const handler = (update: any) => { + console.log('EVENT RECEIVED:', update); + + setEvents((prev) => [...prev, update]); setLoading(false); - }, 200); + }; + socket.on('call_status_update', handler); + // const t = setTimeout(() => { + // setEvents(MOCK_EVENTS[callId] ?? []); + // setLoading(false); + // }, 200); - return () => clearTimeout(t); + return () => { + socket.off('call_status_update', handler); + unsubscribeFromCall(callId); + // clearTimeout(t); + }; }, [callId]); return { events, loading }; diff --git a/packages/frontend/src/hooks/useCalls.ts b/packages/frontend/src/hooks/useCalls.ts index a35ab33..ca2329a 100644 --- a/packages/frontend/src/hooks/useCalls.ts +++ b/packages/frontend/src/hooks/useCalls.ts @@ -2,25 +2,72 @@ import { useState, useEffect } from 'react'; import { Call, CallFilters } from '../types'; -import { MOCK_CALLS } from '../mocks/data'; +import { getSocket } from '../lib/socket'; +//import { MOCK_CALLS } from '../mocks/data'; /** * Returns the live call list and a loading indicator. * TODO: replace mock data with real API + Socket.io updates. */ +const API_URL = 'http://localhost:3001/api/calls'; + export function useCalls(_filters: CallFilters) { const [calls, setCalls] = useState([]); const [loading, setLoading] = useState(true); useEffect(() => { // TODO: replace with fetchCalls(_filters) + socket subscription - const t = setTimeout(() => { - setCalls(MOCK_CALLS); + // const t = setTimeout(() => { + // setCalls(MOCK_CALLS); + // setLoading(false); + // }, 300); + //return () => clearTimeout(t); + const fetchCalls = async () => { + setLoading(true); + + const params = new URLSearchParams(); + + if (_filters.status && _filters.status !== 'all') { + params.append('status', _filters.status); + } + + if (_filters.queueId) { + params.append('queueId', _filters.queueId); + } + + const res = await fetch(`${API_URL}?${params.toString()}`); + const data = await res.json(); + + setCalls(data); setLoading(false); - }, 300); + }; + + fetchCalls(); + + }, [_filters]); + + useEffect(() => { + const socket = getSocket(); + socket.connect(); + const handler = async () => { + console.log('refetch calls'); + const params = new URLSearchParams(); + if (_filters.status && _filters.status !== 'all') { + params.append('status', _filters.status); + } + if (_filters.queueId) { + params.append('queueId', _filters.queueId); + } + const res = await fetch(`${API_URL}?${params}`); + const data = await res.json(); + setCalls(data); + }; - return () => clearTimeout(t); - }, []); + socket.on('call_status_update', handler); + return () => { + socket.off('call_status_update', handler); + }; +}, [_filters]); return { calls, loading, setCalls }; } diff --git a/packages/frontend/src/lib/socket.ts b/packages/frontend/src/lib/socket.ts index 2ae8fa9..53e0a98 100644 --- a/packages/frontend/src/lib/socket.ts +++ b/packages/frontend/src/lib/socket.ts @@ -10,6 +10,13 @@ let socket: Socket | null = null; export function getSocket(): Socket { if (!socket) { socket = io(REALTIME_URL, { autoConnect: false }); + socket.connect(); + socket.on('connect', () => { + console.log('[ws] connected to realtime-service'); + }); + socket.on('disconnect', () => { + console.log('[ws] disconnected from realtime-service'); + } ); } return socket; } diff --git a/packages/realtime-service/src/bus/subscriber.ts b/packages/realtime-service/src/bus/subscriber.ts index 723b278..4bdf2bf 100644 --- a/packages/realtime-service/src/bus/subscriber.ts +++ b/packages/realtime-service/src/bus/subscriber.ts @@ -1,13 +1,24 @@ import Redis from 'ioredis'; import { CallStatusUpdate } from '../types'; -const CHANNEL = 'call-status-updates'; +const CHANNEL = 'call_events'; +const subscriber = new Redis(process.env.REDIS_URL ?? 'redis://localhost:6379'); export function subscribeToCallUpdates( onUpdate: (update: CallStatusUpdate) => void, -): void { - // TODO: subscribe to CHANNEL on Redis and call onUpdate for each message - void Redis; - void CHANNEL; - void onUpdate; +): void {subscriber.subscribe(CHANNEL, + (err) => {if (err) { + console.error('Redis subscribe error:', err);} + else { + console.log(`Subscribed to ${CHANNEL}`);} + }); + + subscriber.on('message', (_channel, message) => { + try { + const parsed = JSON.parse(message); + onUpdate(parsed); + } catch (err) { + console.error('Invalid message from Redis:', err); + } + }); } diff --git a/packages/realtime-service/src/socket/server.ts b/packages/realtime-service/src/socket/server.ts index d344552..736213b 100644 --- a/packages/realtime-service/src/socket/server.ts +++ b/packages/realtime-service/src/socket/server.ts @@ -12,18 +12,16 @@ export function createSocketServer(httpServer: HttpServer): IoServer { }); io.on('connection', (socket) => { - console.log(`[ws] client connected ${socket.id}`); socket.on('subscribe_call', (_callId) => { - // TODO: socket.join(_callId); + socket.join(_callId); }); socket.on('unsubscribe_call', (_callId) => { - // TODO: socket.leave(_callId); + socket.leave(_callId); }); socket.on('disconnect', () => { - console.log(`[ws] client disconnected ${socket.id}`); }); }); @@ -32,6 +30,5 @@ export function createSocketServer(httpServer: HttpServer): IoServer { export function broadcastStatusUpdate(update: CallStatusUpdate): void { if (!io) return; - // TODO: io.to(update.callId).emit('call_status_update', update); - io.emit('call_status_update', update); // naive – replace with room-based + io.to(update.callId).emit('call_status_update', update); }