From 30af6ebda6b1a92df3e611eec635b7a04886bb9e Mon Sep 17 00:00:00 2001 From: LuisNoriega <81337079+LuisNoriega2021@users.noreply.github.com> Date: Sun, 3 May 2026 15:37:32 -0400 Subject: [PATCH 01/12] feat: implement event ingestion endpoint * added processEvent logic in Callservice * persist calls and call_events in PostgreSQl * handle call_answwered --- .../call-service/src/services/CallService.ts | 28 ++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/packages/call-service/src/services/CallService.ts b/packages/call-service/src/services/CallService.ts index 3c9b082..4f67516 100644 --- a/packages/call-service/src/services/CallService.ts +++ b/packages/call-service/src/services/CallService.ts @@ -5,10 +5,36 @@ import { CallServiceContract, EventPayload, } from '../domain/call'; +import { db } from '../db/client'; +import {v4 as uuidv4} from 'uuid'; +import { Result } from 'pg'; export class CallService implements CallServiceContract { async processEvent(_payload: EventPayload): Promise { - throw new Error('CallService.processEvent not implemented'); + const { callId } = _payload; + + if (_payload.event === 'call_initiated') { + const { type, queueId } = _payload; + await db.query( + `INSERT INTO calls (id, type, status, queue_id) + VALUES ($1, $2, $3, $4) + ON CONFLICT (id) DO NOTHING`, + [callId, type, 'waiting', queueId] + ); + } + + const result = await db.query( + `INSERT INTO call_events (id, call_id, type, metadata) + VALUES ($1, $2, $3, $4) RETURNING *`, + [ + uuidv4(), + callId, + _payload.event, + _payload + ] + ); + + return result.rows[0]; } async getCalls(_filters: CallFilters): Promise { From 5ddfed62c0f2f80459fed01840bcfc88953dc68e Mon Sep 17 00:00:00 2001 From: LuisNoriega <81337079+LuisNoriega2021@users.noreply.github.com> Date: Sun, 3 May 2026 16:22:54 -0400 Subject: [PATCH 02/12] feat: implemnt call_answered state and validation * validate call existence before processing events * update call status from waiting to active on call_answered * flag events when waitTime exceeds 30s * persist enriched event metadata in call_events table --- .../call-service/src/services/CallService.ts | 18 +++++++++++++++++- 1 file changed, 17 insertions(+), 1 deletion(-) diff --git a/packages/call-service/src/services/CallService.ts b/packages/call-service/src/services/CallService.ts index 4f67516..bb0b375 100644 --- a/packages/call-service/src/services/CallService.ts +++ b/packages/call-service/src/services/CallService.ts @@ -12,7 +12,7 @@ import { Result } from 'pg'; export class CallService implements CallServiceContract { async processEvent(_payload: EventPayload): Promise { const { callId } = _payload; - + let metadata: Record = {..._payload} if (_payload.event === 'call_initiated') { const { type, queueId } = _payload; await db.query( @@ -23,6 +23,22 @@ export class CallService implements CallServiceContract { ); } + if(_payload.event === 'call_answered'){ + const {waitTime} = _payload; + const callResult = await db.query( + 'SELECT * FROM calls WHERE ID = $1', [callId] + ); + + if(callResult.rowCount === 0) throw new Error(`Call with ID ${callId} not found`); + const call = callResult.rows[0]; + if(call.status !== 'waiting') throw new Error(`Invalid state transition from ${call.status} to active`); + await db.query(`UPDATE calls SET status = 'active' WHERE ID = $1`, [callId]); + if(waitTime > 30) { + metadata = {..._payload, flag: 'WAIT_TIME_EXCEEDED'}; + const result = await db.query(`INSERT INTO call_events (id, call_id, type, metadata) VALUES ($1,$2,$3,$4) RETURNING *`, [uuidv4(), callId, _payload.event, metadata]); + } + } + const result = await db.query( `INSERT INTO call_events (id, call_id, type, metadata) VALUES ($1, $2, $3, $4) RETURNING *`, From 3ac2ce28e964481639654c7c55347e8f8c5b6df0 Mon Sep 17 00:00:00 2001 From: LuisNoriega <81337079+LuisNoriega2021@users.noreply.github.com> Date: Sun, 3 May 2026 16:41:51 -0400 Subject: [PATCH 03/12] feat: implemnt call_ended state and validation * validate call existence before processing events * update call status to ended --- packages/call-service/src/services/CallService.ts | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/packages/call-service/src/services/CallService.ts b/packages/call-service/src/services/CallService.ts index bb0b375..4aa7a88 100644 --- a/packages/call-service/src/services/CallService.ts +++ b/packages/call-service/src/services/CallService.ts @@ -39,6 +39,21 @@ export class CallService implements CallServiceContract { } } + if(_payload.event === 'call_ended'){ + const {duration} = _payload; + + const callResult = await db.query( + `SELECT * FROM calls WHERE id = $1`, [callId] + ); + if(callResult.rowCount === 0) throw new Error(`Call with ID ${callId} not found`); + const call = callResult.rows[0]; + if(call.status !== 'active' && call.status !== 'on_hold') throw new Error(`Invalid state transition from ${call.status}`); + await db.query(`UPDATE calls set status = 'ended', end_time = NOW() WHERE id = $1`, [callId]); + if(duration < 10){ + metadata ={..._payload, flag:'SHORT_CALL'}; + } + } + const result = await db.query( `INSERT INTO call_events (id, call_id, type, metadata) VALUES ($1, $2, $3, $4) RETURNING *`, From 76873a850a360bf1e634154292b81ffe95349bda Mon Sep 17 00:00:00 2001 From: LuisNoriega <81337079+LuisNoriega2021@users.noreply.github.com> Date: Sun, 3 May 2026 17:05:34 -0400 Subject: [PATCH 04/12] feat: implement call_hold state with duration validation * validate call existence before processing hold events * update call status to on_hold * flags events when holdDuration exceeds 60 seconds * persist enriched metadata in calla_events --- packages/call-service/src/services/CallService.ts | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/packages/call-service/src/services/CallService.ts b/packages/call-service/src/services/CallService.ts index 4aa7a88..1cafe47 100644 --- a/packages/call-service/src/services/CallService.ts +++ b/packages/call-service/src/services/CallService.ts @@ -8,6 +8,7 @@ import { import { db } from '../db/client'; import {v4 as uuidv4} from 'uuid'; import { Result } from 'pg'; +import { callHoldPayloadSchema } from '@voycelink/contracts'; export class CallService implements CallServiceContract { async processEvent(_payload: EventPayload): Promise { @@ -39,6 +40,18 @@ export class CallService implements CallServiceContract { } } + if(_payload.event === 'call_hold'){ + const { holdDuration } = _payload; + const callResult = await db.query('SELECT * FROM calls WHERE id = $1', [callId]); + if(callResult.rowCount === 0) throw new Error(`call ${callId} not found`); + const call = callResult.rows[0]; + if(call.status !== 'active') throw new Error(`Invalid state transmition from ${call.status}`); + await db.query("UPDATE calls SET status = 'on_hold' WHERE id = $1", [callId]); + if(holdDuration > 60){ + metadata ={..._payload, flag: 'HOLD_ON_EXCCCEDED'}; + } + } + if(_payload.event === 'call_ended'){ const {duration} = _payload; From 507f35898e94a92e23594bcbb0d01a944a1162a3 Mon Sep 17 00:00:00 2001 From: LuisNoriega <81337079+LuisNoriega2021@users.noreply.github.com> Date: Sun, 3 May 2026 17:31:20 -0400 Subject: [PATCH 05/12] feat: implement dynamic filtering for getCalls * Add support for optional status and queueId filters * Use parameterized queries to prevent SQL injection or another attack's * introduce dynamic where clause generation base on provided filters --- .../call-service/src/services/CallService.ts | 20 ++++++++++++++++++- 1 file changed, 19 insertions(+), 1 deletion(-) diff --git a/packages/call-service/src/services/CallService.ts b/packages/call-service/src/services/CallService.ts index 1cafe47..3554a77 100644 --- a/packages/call-service/src/services/CallService.ts +++ b/packages/call-service/src/services/CallService.ts @@ -82,7 +82,25 @@ export class CallService implements CallServiceContract { } async getCalls(_filters: CallFilters): Promise { - throw new Error('CallService.getCalls not implemented'); + let query = 'SELECT * FROM calls'; + const conditions: string[] =[]; + const values: any[] = []; + if(_filters.status){ + values.push(_filters.status); + conditions.push(`status = ${values.length}`); + } + + if(_filters.queueId){ + values.push(_filters.queueId); + conditions.push(`queue_id = ${values.length}`); + } + + if(conditions.length > 0){ + query += ' WHERE ' + conditions.join(' AND '); + } + query += ' ORDER BY start_time DESC'; + const result = await db.query(query, values); + return result.rows; } async getCallEvents(_callId: string): Promise { From 29f9732df632c2b7694b8ccfd5a085d29cdd4088 Mon Sep 17 00:00:00 2001 From: LuisNoriega <81337079+LuisNoriega2021@users.noreply.github.com> Date: Sun, 3 May 2026 18:07:06 -0400 Subject: [PATCH 06/12] test: add unit test for call lifecycle * add unit tests for CallService covering core call lifecycle, this for all cases required --- .../src/services/CallService.test.ts | 111 +++++++++++++++++- .../call-service/src/services/CallService.ts | 18 ++- 2 files changed, 113 insertions(+), 16 deletions(-) diff --git a/packages/call-service/src/services/CallService.test.ts b/packages/call-service/src/services/CallService.test.ts index d4c66f4..7850202 100644 --- a/packages/call-service/src/services/CallService.test.ts +++ b/packages/call-service/src/services/CallService.test.ts @@ -1,9 +1,108 @@ -import { describe, it } from 'vitest'; +import { describe, it, expect, beforeEach} from 'vitest'; +import { CallService } from './CallService'; +import { db } from '../db/client'; describe('CallService', () => { - it.todo('processes call_initiated and persists the call'); - it.todo('processes call_answered and updates call status to active'); - it.todo('flags call_answered when waitTime exceeds 30 seconds'); - it.todo('flags call_hold when holdDuration exceeds 60 seconds'); - it.todo('flags call_ended when duration is under 10 seconds'); + const service = new CallService(); + + beforeEach(async () => { + await db.query('DELETE FROM call_events'); + await db.query('DELETE FROM calls'); + }); + + it('processes call_initiated and persists the call', async () => { + await service.processEvent({ + event: 'call_initiated', + callId: 'test-1', + type: 'voice', + queueId: 'medical_spanish', + }); + + const result = await db.query('SELECT * FROM calls WHERE id = $1',['test-1']); + + expect(result.rowCount).toBe(1); + expect(result.rows[0].status).toBe('waiting'); + }); + + + it('processes call_answered and updates call status to active', async () => { + await service.processEvent({ + event: 'call_initiated', + callId: 'test-2', + type: 'voice', + queueId: 'medical_spanish', + }); + + await service.processEvent({ + event: 'call_answered', + callId: 'test-2', + waitTime: 10, + }); + const result = await db.query('SELECT status FROM calls WHERE id = $1',['test-2']); + + expect(result.rows[0].status).toBe('active'); + }); + + + it('flags call_answered when waitTime exceeds 30 seconds', async () => { + await service.processEvent({ + event: 'call_initiated', + callId: 'test-3', + type: 'voice', + queueId: 'medical_spanish', + }); + + const event = await service.processEvent({ + event: 'call_answered', + callId: 'test-3', + waitTime: 40, + }); + + expect(event.metadata?.flag).toBe('WAIT_TIME_EXCEEDED'); + }); + + it('flags call_hold when holdDuration exceeds 60 seconds', async () => { + await service.processEvent({ + event: 'call_initiated', + callId: 'test-4', + type: 'voice', + queueId: 'medical_spanish', + }); + await service.processEvent({ + event: 'call_answered', + callId: 'test-4', + waitTime: 10, + }); + const event = await service.processEvent({ + event: 'call_hold', + callId: 'test-4', + holdDuration: 70, + }); + + expect(event.metadata?.flag).toBe('HOLD_TIME_EXCEEDED'); + }); + + it('flags call_ended when duration is under 10 seconds', async () => { + await service.processEvent({ + event: 'call_initiated', + callId: 'test-5', + type: 'voice', + queueId: 'medical_spanish', + }); + + await service.processEvent({ + event: 'call_answered', + callId: 'test-5', + waitTime: 5, + }); + + const event = await service.processEvent({ + event: 'call_ended', + callId: 'test-5', + duration: 5, + endReason: 'completed', + }); + + expect(event.metadata?.flag).toBe('SHORT_CALL'); + }); }); diff --git a/packages/call-service/src/services/CallService.ts b/packages/call-service/src/services/CallService.ts index 3554a77..7bc21e5 100644 --- a/packages/call-service/src/services/CallService.ts +++ b/packages/call-service/src/services/CallService.ts @@ -6,14 +6,13 @@ import { EventPayload, } from '../domain/call'; import { db } from '../db/client'; -import {v4 as uuidv4} from 'uuid'; -import { Result } from 'pg'; +import {v4 as uuidv4} from 'uuid'; import { callHoldPayloadSchema } from '@voycelink/contracts'; export class CallService implements CallServiceContract { async processEvent(_payload: EventPayload): Promise { const { callId } = _payload; - let metadata: Record = {..._payload} + let metadata: Record | undefined = { ..._payload }; if (_payload.event === 'call_initiated') { const { type, queueId } = _payload; await db.query( @@ -30,13 +29,12 @@ export class CallService implements CallServiceContract { 'SELECT * FROM calls WHERE ID = $1', [callId] ); - if(callResult.rowCount === 0) throw new Error(`Call with ID ${callId} not found`); + if(callResult.rowCount === 0) {throw new Error(`Call with ID ${callId} not found`);} const call = callResult.rows[0]; - if(call.status !== 'waiting') throw new Error(`Invalid state transition from ${call.status} to active`); - await db.query(`UPDATE calls SET status = 'active' WHERE ID = $1`, [callId]); + if(call.status !== 'waiting') {throw new Error(`Invalid state transition from ${call.status} to active`);} + await db.query(`UPDATE calls SET status = 'active' WHERE id = $1`, [callId]); if(waitTime > 30) { - metadata = {..._payload, flag: 'WAIT_TIME_EXCEEDED'}; - const result = await db.query(`INSERT INTO call_events (id, call_id, type, metadata) VALUES ($1,$2,$3,$4) RETURNING *`, [uuidv4(), callId, _payload.event, metadata]); + metadata = {..._payload, flag: 'WAIT_TIME_EXCEEDED'}; } } @@ -48,7 +46,7 @@ export class CallService implements CallServiceContract { if(call.status !== 'active') throw new Error(`Invalid state transmition from ${call.status}`); await db.query("UPDATE calls SET status = 'on_hold' WHERE id = $1", [callId]); if(holdDuration > 60){ - metadata ={..._payload, flag: 'HOLD_ON_EXCCCEDED'}; + metadata ={..._payload, flag: 'HOLD_TIME_EXCEEDED'}; } } @@ -74,7 +72,7 @@ export class CallService implements CallServiceContract { uuidv4(), callId, _payload.event, - _payload + metadata ] ); From 5b3f59857e7d9bb9e089b29feabf7c3a98210b43 Mon Sep 17 00:00:00 2001 From: LuisNoriega <81337079+LuisNoriega2021@users.noreply.github.com> Date: Sun, 3 May 2026 18:53:36 -0400 Subject: [PATCH 07/12] feat: implement getCallEvents and complete backend test coverage * Implements getCallEvents to retrieve ordered event history per call *Adds and completes unit and integration tests for call lifecycle events * enforcement, and business rules like answered, hold, ended * Ensures consistent event persistence --- packages/call-service/src/app.ts | 13 ++++ .../call-service/src/routes/events.test.ts | 62 +++++++++++++++++-- packages/call-service/src/routes/events.ts | 9 ++- .../src/services/CallService.test.ts | 22 ++++++- .../call-service/src/services/CallService.ts | 19 +++++- 5 files changed, 110 insertions(+), 15 deletions(-) create mode 100644 packages/call-service/src/app.ts diff --git a/packages/call-service/src/app.ts b/packages/call-service/src/app.ts new file mode 100644 index 0000000..116d12c --- /dev/null +++ b/packages/call-service/src/app.ts @@ -0,0 +1,13 @@ +import express from 'express'; +import cors from 'cors'; +import eventsRouter from './routes/events'; +import callsRouter from './routes/calls'; + +const app = express(); + +app.use(cors()); +app.use(express.json()); +app.use('/api/events', eventsRouter); +app.use('/api/calls', callsRouter); +app.get('/health', (_req, res) => res.json({ status: 'ok' })); +export default app; \ No newline at end of file diff --git a/packages/call-service/src/routes/events.test.ts b/packages/call-service/src/routes/events.test.ts index 79c8f7e..05950aa 100644 --- a/packages/call-service/src/routes/events.test.ts +++ b/packages/call-service/src/routes/events.test.ts @@ -1,7 +1,61 @@ -import { describe, it } from 'vitest'; +import { describe, it, expect, beforeEach } from 'vitest'; +import request from 'supertest'; +import app from '../app'; +import { db } from '../db/client'; describe('POST /api/events', () => { - it.todo('returns 201 and persists the event for a valid call_initiated payload'); - it.todo('returns 400 for an invalid payload'); - it.todo('returns 401 when the API key is missing'); + beforeEach(async () => { + await db.query('DELETE FROM call_events'); + await db.query('DELETE FROM calls'); + }); + const API_KEY = 'change-me'; + it('returns 201 and persists the event for a valid call_initiated payload', async () => { + const res = await request(app) + .post('/api/events') + .set('x-api-key', API_KEY) + .send({ + event: 'call_initiated', + callId: 'int-1', + type: 'voice', + queueId: 'medical_spanish', + }); + + expect(res.status).toBe(201); + expect(res.body.callId).toBe('int-1'); + + const result = await db.query( + 'SELECT * FROM calls WHERE id = $1', + ['int-1'] + ); + + expect(result.rowCount).toBe(1); + }); + + it('returns 400 for an invalid payload', async () => { + const res = await request(app) + .post('/api/events') + .set('x-api-key', 'change-me') + .send({ + event: 'call_initiated', + callId: 'int-2', + type: 'voice', + }); + + expect(res.status).toBe(400); + expect(res.body.message).toBe('Invalid event payload'); + expect(res.body.issues).toBeDefined(); + }); + + it('returns 401 when the API key is missing', async () => { + const res = await request(app) + .post('/api/events') + .send({ + event: 'call_initiated', + callId: 'int-3', + type: 'voice', + queueId: 'medical_spanish', + }); + expect(res.status).toBe(401); + expect(res.body.message).toBeDefined(); + }); }); diff --git a/packages/call-service/src/routes/events.ts b/packages/call-service/src/routes/events.ts index 2bb57af..d1dc2f6 100644 --- a/packages/call-service/src/routes/events.ts +++ b/packages/call-service/src/routes/events.ts @@ -12,15 +12,14 @@ router.post('/', apiKeyAuth, async (req: Request, res: Response) => { const payload: EventPayload = eventPayloadSchema.parse(req.body); const event = await callService.processEvent(payload); res.status(201).json(event); - } catch (error) { - if (error instanceof ZodError) { - res.status(400).json({ + } catch (error: any) { + if (error?.name === 'ZodError') { + return res.status(400).json({ message: 'Invalid event payload', issues: error.issues, }); - return; } - + console.error(error); res.status(500).json({ message: 'Internal server error' }); } }); diff --git a/packages/call-service/src/services/CallService.test.ts b/packages/call-service/src/services/CallService.test.ts index 7850202..9ef7336 100644 --- a/packages/call-service/src/services/CallService.test.ts +++ b/packages/call-service/src/services/CallService.test.ts @@ -3,7 +3,7 @@ import { CallService } from './CallService'; import { db } from '../db/client'; describe('CallService', () => { - const service = new CallService(); + const service = new CallService(); beforeEach(async () => { await db.query('DELETE FROM call_events'); @@ -24,7 +24,6 @@ describe('CallService', () => { expect(result.rows[0].status).toBe('waiting'); }); - it('processes call_answered and updates call status to active', async () => { await service.processEvent({ event: 'call_initiated', @@ -43,7 +42,6 @@ describe('CallService', () => { expect(result.rows[0].status).toBe('active'); }); - it('flags call_answered when waitTime exceeds 30 seconds', async () => { await service.processEvent({ event: 'call_initiated', @@ -105,4 +103,22 @@ describe('CallService', () => { expect(event.metadata?.flag).toBe('SHORT_CALL'); }); + + it('returns ordered events for a call', async () => { + await service.processEvent({ + event: 'call_initiated', + callId: 'test-events', + type: 'voice', + queueId: 'medical_spanish', + }); + await service.processEvent({ + event: 'call_answered', + callId: 'test-events', + waitTime: 5, + }); + const events = await service.getCallEvents('test-events'); + expect(events.length).toBe(2); + expect(events[0].type).toBe('call_initiated'); + expect(events[1].type).toBe('call_answered'); +}); }); diff --git a/packages/call-service/src/services/CallService.ts b/packages/call-service/src/services/CallService.ts index 7bc21e5..62a1914 100644 --- a/packages/call-service/src/services/CallService.ts +++ b/packages/call-service/src/services/CallService.ts @@ -9,6 +9,16 @@ import { db } from '../db/client'; import {v4 as uuidv4} from 'uuid'; import { callHoldPayloadSchema } from '@voycelink/contracts'; +function mapCallEvent(row: any) { + return { + id: row.id, + callId: row.call_id, + type: row.type, + timestamp: row.timestamp, + metadata: row.metadata, + }; +} + export class CallService implements CallServiceContract { async processEvent(_payload: EventPayload): Promise { const { callId } = _payload; @@ -76,8 +86,9 @@ export class CallService implements CallServiceContract { ] ); - return result.rows[0]; + return mapCallEvent(result.rows[0]); } + async getCalls(_filters: CallFilters): Promise { let query = 'SELECT * FROM calls'; @@ -101,7 +112,9 @@ export class CallService implements CallServiceContract { return result.rows; } - async getCallEvents(_callId: string): Promise { - throw new Error('CallService.getCallEvents not implemented'); + async getCallEvents(callId: string): Promise { + const result = await db.query( + `SELECT * FROM call_events WHERE call_id = $1 ORDER BY timestamp ASC`,[callId]); + return result.rows; } } From 5ab9cac234baec9c00768b059521a8a291cc1f4e Mon Sep 17 00:00:00 2001 From: LuisNoriega <81337079+LuisNoriega2021@users.noreply.github.com> Date: Sun, 3 May 2026 19:28:11 -0400 Subject: [PATCH 08/12] feat: integrate Redis pub/sub for call events * Added Redis publish integration in CallService * Standardized event publishing using 'call_events' channel * Verified real-time event propagation via Redis --- package-lock.json | 2 +- packages/call-service/package.json | 2 +- packages/call-service/src/bus/publisher.ts | 8 +++----- packages/call-service/src/lib/redis.ts | 12 ++++++++++++ packages/call-service/src/services/CallService.ts | 10 +++++++++- 5 files changed, 26 insertions(+), 8 deletions(-) create mode 100644 packages/call-service/src/lib/redis.ts diff --git a/package-lock.json b/package-lock.json index b4d30f3..65f8b75 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10158,7 +10158,7 @@ "cors": "^2.8.5", "dotenv": "^16.4.5", "express": "^4.19.2", - "ioredis": "^5.3.2", + "ioredis": "^5.10.1", "pg": "^8.11.5", "uuid": "^9.0.1", "zod": "^3.23.8" diff --git a/packages/call-service/package.json b/packages/call-service/package.json index 57ccc9f..59cb0af 100644 --- a/packages/call-service/package.json +++ b/packages/call-service/package.json @@ -16,7 +16,7 @@ "cors": "^2.8.5", "dotenv": "^16.4.5", "express": "^4.19.2", - "ioredis": "^5.3.2", + "ioredis": "^5.10.1", "pg": "^8.11.5", "uuid": "^9.0.1", "zod": "^3.23.8" diff --git a/packages/call-service/src/bus/publisher.ts b/packages/call-service/src/bus/publisher.ts index 7068339..e0487b6 100644 --- a/packages/call-service/src/bus/publisher.ts +++ b/packages/call-service/src/bus/publisher.ts @@ -1,13 +1,11 @@ -import Redis from 'ioredis'; -import { config } from '../config'; import type { CallStatusUpdate } from '../domain/call'; -const redis = new Redis(config.redisUrl); - export const CHANNEL = 'call-status-updates'; export async function publishStatusUpdate( update: CallStatusUpdate, ): Promise { - await redis.publish(CHANNEL, JSON.stringify(update)); + // Disabled to avoid duplicate Redis publishing. + // CallService now handles event publishing directly. + return; } diff --git a/packages/call-service/src/lib/redis.ts b/packages/call-service/src/lib/redis.ts new file mode 100644 index 0000000..ad38fa9 --- /dev/null +++ b/packages/call-service/src/lib/redis.ts @@ -0,0 +1,12 @@ +import Redis from 'ioredis'; +import { config } from '../config'; + +export const redis = new Redis(config.redisUrl); + +redis.on('connect', () => { + console.log('Redis connected'); +}); + +redis.on('error', (err) => { + console.error('Redis error', err); +}); \ No newline at end of file diff --git a/packages/call-service/src/services/CallService.ts b/packages/call-service/src/services/CallService.ts index 62a1914..46904c0 100644 --- a/packages/call-service/src/services/CallService.ts +++ b/packages/call-service/src/services/CallService.ts @@ -7,6 +7,7 @@ import { } from '../domain/call'; import { db } from '../db/client'; import {v4 as uuidv4} from 'uuid'; +import { redis } from '../lib/redis'; import { callHoldPayloadSchema } from '@voycelink/contracts'; function mapCallEvent(row: any) { @@ -86,7 +87,14 @@ export class CallService implements CallServiceContract { ] ); - return mapCallEvent(result.rows[0]); + const event = mapCallEvent(result.rows[0]); + try { + await redis.publish('call_events', JSON.stringify(event)); + } catch (error) { + console.error('Redis publish failed:', error); + } + + return event; } From cf1ff4cb67deae17b26c62f601a79e3ce9aac98d Mon Sep 17 00:00:00 2001 From: LuisNoriega <81337079+LuisNoriega2021@users.noreply.github.com> Date: Mon, 4 May 2026 20:55:29 -0400 Subject: [PATCH 09/12] feat: implement room-based socket communication for call updates * Implement 'subscribe_call' handler to join a specific call room * Implement 'unsubscribe_call' handler to leave a call room * Remove naive global io.emit implementation --- packages/call-service/src/services/CallService.ts | 8 +++++++- packages/realtime-service/src/socket/server.ts | 9 +++++---- 2 files changed, 12 insertions(+), 5 deletions(-) diff --git a/packages/call-service/src/services/CallService.ts b/packages/call-service/src/services/CallService.ts index 46904c0..146613c 100644 --- a/packages/call-service/src/services/CallService.ts +++ b/packages/call-service/src/services/CallService.ts @@ -88,8 +88,14 @@ export class CallService implements CallServiceContract { ); const event = mapCallEvent(result.rows[0]); + + const enrichedEvent = { + ...event, + event: _payload.event + }; + try { - await redis.publish('call_events', JSON.stringify(event)); + await redis.publish('call_events', JSON.stringify(enrichedEvent)); } catch (error) { console.error('Redis publish failed:', error); } diff --git a/packages/realtime-service/src/socket/server.ts b/packages/realtime-service/src/socket/server.ts index d344552..b57a661 100644 --- a/packages/realtime-service/src/socket/server.ts +++ b/packages/realtime-service/src/socket/server.ts @@ -15,11 +15,13 @@ export function createSocketServer(httpServer: HttpServer): IoServer { console.log(`[ws] client connected ${socket.id}`); socket.on('subscribe_call', (_callId) => { - // TODO: socket.join(_callId); + socket.join(_callId); + console.log(`[ws] joined room ${_callId}`); }); socket.on('unsubscribe_call', (_callId) => { - // TODO: socket.leave(_callId); + socket.leave(_callId); + console.log(`[ws] left room ${_callId}`); }); socket.on('disconnect', () => { @@ -32,6 +34,5 @@ export function createSocketServer(httpServer: HttpServer): IoServer { export function broadcastStatusUpdate(update: CallStatusUpdate): void { if (!io) return; - // TODO: io.to(update.callId).emit('call_status_update', update); - io.emit('call_status_update', update); // naive – replace with room-based + io.to(update.callId).emit('call_status_update', update); } From dd4e396b427d703a984dd4b37491e94eef081407 Mon Sep 17 00:00:00 2001 From: LuisNoriega <81337079+LuisNoriega2021@users.noreply.github.com> Date: Mon, 4 May 2026 21:42:28 -0400 Subject: [PATCH 10/12] feat: implement Redis subscriber and broadcast call updates via WebSocket * Implement Redis subscriber to listen to 'call_events' channel * Broadcast updates to clients using Socket.io rooms * Align Redis channel naming between call-service and realtime-service * Enable real-time propagation of call lifecycle events to frontend --- packages/frontend/src/lib/socket.ts | 7 ++++++ .../realtime-service/src/bus/subscriber.ts | 24 ++++++++++++++----- packages/realtime-service/src/index.ts | 1 + .../realtime-service/src/socket/server.ts | 2 +- 4 files changed, 27 insertions(+), 7 deletions(-) diff --git a/packages/frontend/src/lib/socket.ts b/packages/frontend/src/lib/socket.ts index 2ae8fa9..53e0a98 100644 --- a/packages/frontend/src/lib/socket.ts +++ b/packages/frontend/src/lib/socket.ts @@ -10,6 +10,13 @@ let socket: Socket | null = null; export function getSocket(): Socket { if (!socket) { socket = io(REALTIME_URL, { autoConnect: false }); + socket.connect(); + socket.on('connect', () => { + console.log('[ws] connected to realtime-service'); + }); + socket.on('disconnect', () => { + console.log('[ws] disconnected from realtime-service'); + } ); } return socket; } diff --git a/packages/realtime-service/src/bus/subscriber.ts b/packages/realtime-service/src/bus/subscriber.ts index 723b278..8400906 100644 --- a/packages/realtime-service/src/bus/subscriber.ts +++ b/packages/realtime-service/src/bus/subscriber.ts @@ -1,13 +1,25 @@ import Redis from 'ioredis'; import { CallStatusUpdate } from '../types'; -const CHANNEL = 'call-status-updates'; +const CHANNEL = 'call_events'; +const subscriber = new Redis(process.env.REDIS_URL ?? 'redis://localhost:6379'); export function subscribeToCallUpdates( onUpdate: (update: CallStatusUpdate) => void, -): void { - // TODO: subscribe to CHANNEL on Redis and call onUpdate for each message - void Redis; - void CHANNEL; - void onUpdate; +): void {subscriber.subscribe(CHANNEL, + (err) => {if (err) { + console.error('Redis subscribe error:', err);} + else { + console.log(`Subscribed to ${CHANNEL}`);} + }); + + subscriber.on('message', (_channel, message) => { + console.log('Redis event received:', message); + try { + const parsed = JSON.parse(message); + onUpdate(parsed); + } catch (err) { + console.error('Invalid message from Redis:', err); + } + }); } diff --git a/packages/realtime-service/src/index.ts b/packages/realtime-service/src/index.ts index 414423d..a9db96a 100644 --- a/packages/realtime-service/src/index.ts +++ b/packages/realtime-service/src/index.ts @@ -19,6 +19,7 @@ createSocketServer(httpServer); // Wire Redis → Socket.io subscribeToCallUpdates((update) => { + console.log('Broadcasting update:', update); broadcastStatusUpdate(update); }); diff --git a/packages/realtime-service/src/socket/server.ts b/packages/realtime-service/src/socket/server.ts index b57a661..c0f7f57 100644 --- a/packages/realtime-service/src/socket/server.ts +++ b/packages/realtime-service/src/socket/server.ts @@ -12,7 +12,7 @@ export function createSocketServer(httpServer: HttpServer): IoServer { }); io.on('connection', (socket) => { - console.log(`[ws] client connected ${socket.id}`); + console.log(`[ws] client connected ${socket.id}`); socket.on('subscribe_call', (_callId) => { socket.join(_callId); From 42eaee1e9e15b7091f6f8c9d77e4d15a420451d4 Mon Sep 17 00:00:00 2001 From: LuisNoriega <81337079+LuisNoriega2021@users.noreply.github.com> Date: Mon, 4 May 2026 23:44:23 -0400 Subject: [PATCH 11/12] feat: integrate real-time updates across frontend, backend, and realtime service * Implement Redis Pub/Sub in call-service to publish call events * Add Redis subscriber in realtime-service to consume events and broadcast via Socket.io * Replace mock data in frontend with real API integration * Implement useCallEvents hook with initial fetch and live event streaming * Ensure backend remains source of truth by using refetch strategy on updates * Add missing endpoint for fetching call events history --- packages/frontend/src/hooks/useCallEvents.ts | 37 ++++++++++-- packages/frontend/src/hooks/useCalls.ts | 59 +++++++++++++++++-- .../realtime-service/src/bus/subscriber.ts | 3 +- packages/realtime-service/src/index.ts | 1 - .../realtime-service/src/socket/server.ts | 4 -- 5 files changed, 86 insertions(+), 18 deletions(-) diff --git a/packages/frontend/src/hooks/useCallEvents.ts b/packages/frontend/src/hooks/useCallEvents.ts index c428f4e..794dc82 100644 --- a/packages/frontend/src/hooks/useCallEvents.ts +++ b/packages/frontend/src/hooks/useCallEvents.ts @@ -2,8 +2,10 @@ import { useState, useEffect } from 'react'; import { CallEvent } from '../types'; -import { MOCK_EVENTS } from '../mocks/data'; +//import { MOCK_EVENTS } from '../mocks/data'; +import { getSocket, subscribeToCall, unsubscribeFromCall } from '../lib/socket'; +const API_URL = 'http://localhost:3001/api'; /** * Returns the event history for a specific call. * TODO: replace mock data with a real API call. @@ -18,14 +20,39 @@ export function useCallEvents(callId: string | null) { return; } + const socket = getSocket(); + socket.connect(); setLoading(true); + + const fetchEvents = async () => { + const res = await fetch(`${API_URL}/calls/${callId}/events`); + const data = await res.json(); + setEvents(data); + setLoading(false); + }; + + fetchEvents(); + + + subscribeToCall(callId); // TODO: replace with fetchCallEvents(callId) - const t = setTimeout(() => { - setEvents(MOCK_EVENTS[callId] ?? []); + const handler = (update: any) => { + console.log('EVENT RECEIVED:', update); + + setEvents((prev) => [...prev, update]); setLoading(false); - }, 200); + }; + socket.on('call_status_update', handler); + // const t = setTimeout(() => { + // setEvents(MOCK_EVENTS[callId] ?? []); + // setLoading(false); + // }, 200); - return () => clearTimeout(t); + return () => { + socket.off('call_status_update', handler); + unsubscribeFromCall(callId); + // clearTimeout(t); + }; }, [callId]); return { events, loading }; diff --git a/packages/frontend/src/hooks/useCalls.ts b/packages/frontend/src/hooks/useCalls.ts index a35ab33..ca2329a 100644 --- a/packages/frontend/src/hooks/useCalls.ts +++ b/packages/frontend/src/hooks/useCalls.ts @@ -2,25 +2,72 @@ import { useState, useEffect } from 'react'; import { Call, CallFilters } from '../types'; -import { MOCK_CALLS } from '../mocks/data'; +import { getSocket } from '../lib/socket'; +//import { MOCK_CALLS } from '../mocks/data'; /** * Returns the live call list and a loading indicator. * TODO: replace mock data with real API + Socket.io updates. */ +const API_URL = 'http://localhost:3001/api/calls'; + export function useCalls(_filters: CallFilters) { const [calls, setCalls] = useState([]); const [loading, setLoading] = useState(true); useEffect(() => { // TODO: replace with fetchCalls(_filters) + socket subscription - const t = setTimeout(() => { - setCalls(MOCK_CALLS); + // const t = setTimeout(() => { + // setCalls(MOCK_CALLS); + // setLoading(false); + // }, 300); + //return () => clearTimeout(t); + const fetchCalls = async () => { + setLoading(true); + + const params = new URLSearchParams(); + + if (_filters.status && _filters.status !== 'all') { + params.append('status', _filters.status); + } + + if (_filters.queueId) { + params.append('queueId', _filters.queueId); + } + + const res = await fetch(`${API_URL}?${params.toString()}`); + const data = await res.json(); + + setCalls(data); setLoading(false); - }, 300); + }; + + fetchCalls(); + + }, [_filters]); + + useEffect(() => { + const socket = getSocket(); + socket.connect(); + const handler = async () => { + console.log('refetch calls'); + const params = new URLSearchParams(); + if (_filters.status && _filters.status !== 'all') { + params.append('status', _filters.status); + } + if (_filters.queueId) { + params.append('queueId', _filters.queueId); + } + const res = await fetch(`${API_URL}?${params}`); + const data = await res.json(); + setCalls(data); + }; - return () => clearTimeout(t); - }, []); + socket.on('call_status_update', handler); + return () => { + socket.off('call_status_update', handler); + }; +}, [_filters]); return { calls, loading, setCalls }; } diff --git a/packages/realtime-service/src/bus/subscriber.ts b/packages/realtime-service/src/bus/subscriber.ts index 8400906..4bdf2bf 100644 --- a/packages/realtime-service/src/bus/subscriber.ts +++ b/packages/realtime-service/src/bus/subscriber.ts @@ -13,8 +13,7 @@ export function subscribeToCallUpdates( console.log(`Subscribed to ${CHANNEL}`);} }); - subscriber.on('message', (_channel, message) => { - console.log('Redis event received:', message); + subscriber.on('message', (_channel, message) => { try { const parsed = JSON.parse(message); onUpdate(parsed); diff --git a/packages/realtime-service/src/index.ts b/packages/realtime-service/src/index.ts index a9db96a..414423d 100644 --- a/packages/realtime-service/src/index.ts +++ b/packages/realtime-service/src/index.ts @@ -19,7 +19,6 @@ createSocketServer(httpServer); // Wire Redis → Socket.io subscribeToCallUpdates((update) => { - console.log('Broadcasting update:', update); broadcastStatusUpdate(update); }); diff --git a/packages/realtime-service/src/socket/server.ts b/packages/realtime-service/src/socket/server.ts index c0f7f57..736213b 100644 --- a/packages/realtime-service/src/socket/server.ts +++ b/packages/realtime-service/src/socket/server.ts @@ -12,20 +12,16 @@ export function createSocketServer(httpServer: HttpServer): IoServer { }); io.on('connection', (socket) => { - console.log(`[ws] client connected ${socket.id}`); socket.on('subscribe_call', (_callId) => { socket.join(_callId); - console.log(`[ws] joined room ${_callId}`); }); socket.on('unsubscribe_call', (_callId) => { socket.leave(_callId); - console.log(`[ws] left room ${_callId}`); }); socket.on('disconnect', () => { - console.log(`[ws] client disconnected ${socket.id}`); }); }); From 12bfb5fb416347642b7ba2322e1c08f3a2d3ea49 Mon Sep 17 00:00:00 2001 From: LuisNoriega <81337079+LuisNoriega2021@users.noreply.github.com> Date: Tue, 5 May 2026 00:04:37 -0400 Subject: [PATCH 12/12] docs: enhance README with technical decisions and implementation notes --- README.md | 166 ++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 166 insertions(+) diff --git a/README.md b/README.md index 5d19099..4bdb1b1 100644 --- a/README.md +++ b/README.md @@ -143,3 +143,169 @@ Placeholder test files are in `src/services/CallService.test.ts` and `src/routes - Dockerfiles per service - stronger contract sharing between packages - better observability for local debugging + +## Implementation Notes (Luis Noriega) + +# VoyceLink Call Center Dashboard + +This project implements a real-time call monitoring system using a microservices-based architecture. It processes call events, persists them in a relational database, and propagates updates through a real-time streaming layer to a web-based dashboard. + +The system is composed of three main services: + +- A backend service responsible for business logic and persistence +- A real-time service responsible for event distribution via WebSockets +- A frontend application responsible for data visualization and user interaction + +## Architecture + +The system follows an event-driven architecture in which the backend service publishes domain events to a message broker, and a separate real-time service consumes those events and distributes them to connected clients. + +Data flow: + +External clients send call events to the backend API +- The backend processes and persists the event +- The event is published to Redis using a Pub/Sub channel +- The real-time service subscribes to that channel +- Incoming events are broadcast to frontend clients via WebSockets +- The frontend reacts to these updates and refreshes its state +- Backend Service (call-service) + +## Responsibilities + +The backend service handles: + +Validation and processing of incoming call events +Enforcement of business rules and state transitions +Persistence of calls and events in PostgreSQL +Exposure of REST endpoints for querying data +Publication of events to Redis for real-time propagation +Event Processing + +The system supports the following event types: + +- call_initiated +- call_answered +- call_hold +- call_ended + +Each event triggers state transitions and may generate derived metadata based on defined business rules. + +## Business Rules + +The following conditions are enforced during event processing: + +A call must exist before transitioning to subsequent states +Valid state transitions are required (e.g., waiting → active) +Additional flags are generated when thresholds are exceeded: +WAIT_TIME_EXCEEDED when waitTime is greater than 30 seconds +HOLD_TIME_EXCEEDED when holdDuration is greater than 60 seconds +SHORT_CALL when call duration is less than 10 seconds +Persistence Layer + +Two primary tables are used: + +- calls: stores the current state and metadata of each call +- call_events: stores the historical sequence of events per call +All events are stored with associated metadata, allowing reconstruction of call history. + +## API Endpoints + +The backend exposes the following endpoints: + +- POST /api/events +Accepts and processes incoming call events +- GET /api/calls +Retrieves calls with optional filtering by status and queue +- GET /api/calls//events +Retrieves the event history for a specific call + +## Redis Integration + +A Redis Pub/Sub mechanism is used to propagate events to the real-time service. +After persisting an event, the backend publishes it to a Redis channel: + +await redis.publish('call_events', JSON.stringify(event)); +This decouples the backend from the real-time layer and allows for horizontal scalability. + +## Real-time Service + +The real-time service is responsible for: + +- Subscribing to Redis channels +- Receiving and parsing incoming events +- Broadcasting updates to connected clients via WebSockets + +## Redis Subscription + +The service subscribes to the call_events channel and invokes a handler for each message received. The message payload is parsed and transformed into a format suitable for client consumption. + +## WebSocket Layer + +Socket.io is used to maintain persistent connections with frontend clients. +Clients can subscribe to specific call identifiers, allowing the system to emit updates only to relevant consumers. + +socket.join(callId); +Event Broadcasting + +# Frontend Application + +The frontend application is responsible for: + +- Fetching and displaying call data +- Allowing filtering and selection of calls +- Displaying event history for individual calls +- Reacting to real-time updates via WebSockets + +## Data Integration + +All mock data has been replaced with real API calls. +The application interacts with the backend using HTTP requests and maintains synchronization through WebSocket events. +Call List Management + +## Event History Management + +The event history for a selected call is handled by a dedicated hook that: + + Fetches the full event history on selection + Subscribes to real-time updates for that specific call + Appends new events as they are received + WebSocket Client + +A singleton Socket.io client is implemented to manage the connection lifecycle and subscriptions. Clients subscribe and unsubscribe from call-specific channels dynamically. + +## Testing + +Unit and integration tests were implemented for the backend using Vitest. + +Test coverage includes: + Event processing and persistence + State transitions and validation rules + Flag generation logic + API response validation + Error handling scenarios + Design Considerations + +The system was designed with the following principles: + +- Separation of concerns between services +- Event-driven communication using Redis +- Backend as the single source of truth +- Scalable real-time communication via WebSockets +- Minimal coupling between components +- Trade-offs + +Some trade-offs were made to balance simplicity and correctness: + +The frontend performs a full refetch on real-time updates rather than partial state mutation +Authentication is limited to API key validation +State management in the frontend is kept simple without a global store +Running the System + Start infrastructure services (PostgreSQL and Redis) using Docker + Start the backend service + Start the real-time service + Start the frontend application + +Events can be sent to the backend via HTTP requests, and the system will process and propagate updates in real time. + +## Path video +https://drive.google.com/file/d/1usn9aCGOl-cZBBaeA1mr-SUPlYW9G_Ne/view?usp=drive_link \ No newline at end of file