From 5776f5c32636bdf5ac0dd2c18435707582c57ce5 Mon Sep 17 00:00:00 2001 From: kiranitor123 Date: Wed, 29 Apr 2026 18:31:05 -0400 Subject: [PATCH] Key Improvements: SOLID Refactor: Logic separated into CallRepository, SlaManager (timers), and CallService. Reliability: Implemented Transactional Outbox (guaranteed event delivery) and Idempotency (prevented duplicates). Real-time UX: Dashboard features automatic live ticking (LiveDuration) and instant SLA notifications. Infrastructure: Database seed script, and morgan logging for observability. Tradeoffs: SLA timers are in-memory for simplicity (cleared on service restart). Next Steps: Migrate timers to persistent queues (BullMQ) and implement E2E testing.processing latency. --- package-lock.json | 68 ++++++ packages/call-service/package.json | 4 +- .../call-service/src/bus/OutboxProcessor.ts | 52 +++++ .../call-service/src/db/CallRepository.ts | 144 ++++++++++++ packages/call-service/src/db/schema.sql | 16 ++ packages/call-service/src/db/seed.ts | 221 ++++++++++++++++++ packages/call-service/src/index.ts | 5 + packages/call-service/src/routes/calls.ts | 16 +- .../call-service/src/routes/events.test.ts | 71 +++++- packages/call-service/src/routes/events.ts | 12 +- .../src/services/CallService.test.ts | 67 +++++- .../call-service/src/services/CallService.ts | 61 ++++- .../call-service/src/services/SlaManager.ts | 75 ++++++ .../frontend/src/components/CallsTable.tsx | 24 +- packages/frontend/src/hooks/useCallEvents.ts | 65 ++++-- packages/frontend/src/hooks/useCalls.ts | 94 ++++++-- packages/frontend/src/lib/api.ts | 2 - packages/frontend/src/lib/socket.ts | 8 + packages/realtime-service/package.json | 2 + .../realtime-service/src/bus/subscriber.ts | 24 +- packages/realtime-service/src/index.ts | 2 + .../realtime-service/src/socket/server.ts | 20 +- 22 files changed, 982 insertions(+), 71 deletions(-) create mode 100644 packages/call-service/src/bus/OutboxProcessor.ts create mode 100644 packages/call-service/src/db/CallRepository.ts create mode 100644 packages/call-service/src/db/seed.ts create mode 100644 packages/call-service/src/services/SlaManager.ts diff --git a/package-lock.json b/package-lock.json index b4d30f3..a105036 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1659,6 +1659,16 @@ "dev": true, "license": "MIT" }, + "node_modules/@types/morgan": { + "version": "1.9.10", + "resolved": "https://registry.npmjs.org/@types/morgan/-/morgan-1.9.10.tgz", + "integrity": "sha512-sS4A1zheMvsADRVfT0lYbJ4S9lmsey8Zo2F7cnbYjWHP67Q0AwMYuuzLlkIM2N8gAbb9cubhIVFwcIN2XyYCkA==", + "dev": true, + "license": "MIT", + "dependencies": { + "@types/node": "*" + } + }, "node_modules/@types/node": { "version": "20.19.39", "resolved": "https://registry.npmjs.org/@types/node/-/node-20.19.39.tgz", @@ -2960,6 +2970,24 @@ "node": ">=6.0.0" } }, + "node_modules/basic-auth": { + "version": "2.0.1", + "resolved": "https://registry.npmjs.org/basic-auth/-/basic-auth-2.0.1.tgz", + "integrity": "sha512-NF+epuEdnUYVlGuhaxbbq+dvJttwLnGY+YixlXlME5KpQ5W3CnXA5cVTneY3SPbPDRkcjMbifrwmFYcClgOZeg==", + "license": "MIT", + "dependencies": { + "safe-buffer": "5.1.2" + }, + "engines": { + "node": ">= 0.8" + } + }, + "node_modules/basic-auth/node_modules/safe-buffer": { + "version": "5.1.2", + "resolved": "https://registry.npmjs.org/safe-buffer/-/safe-buffer-5.1.2.tgz", + "integrity": "sha512-Gd2UZBJDkXlY7GbJxfsE8/nvKkUEU1G38c1siN6QP6a9PT9MmHB8GnpscSmMJSoF8LOIrt8ud/wPtojys4G6+g==", + "license": "MIT" + }, "node_modules/binary-extensions": { "version": "2.3.0", "resolved": "https://registry.npmjs.org/binary-extensions/-/binary-extensions-2.3.0.tgz", @@ -6641,6 +6669,34 @@ "dev": true, "license": "MIT" }, + "node_modules/morgan": { + "version": "1.10.1", + "resolved": "https://registry.npmjs.org/morgan/-/morgan-1.10.1.tgz", + "integrity": "sha512-223dMRJtI/l25dJKWpgij2cMtywuG/WiUKXdvwfbhGKBhy1puASqXwFzmWZ7+K73vUPoR7SS2Qz2cI/g9MKw0A==", + "license": "MIT", + "dependencies": { + "basic-auth": "~2.0.1", + "debug": "2.6.9", + "depd": "~2.0.0", + "on-finished": "~2.3.0", + "on-headers": "~1.1.0" + }, + "engines": { + "node": ">= 0.8.0" + } + }, + "node_modules/morgan/node_modules/on-finished": { + "version": "2.3.0", + "resolved": "https://registry.npmjs.org/on-finished/-/on-finished-2.3.0.tgz", + "integrity": "sha512-ikqdkGAAyf/X/gPhXGvfgAytDZtDbr+bkNUJ0N9h5MI/dmdgCs3l6hoHrcUv41sRKew3jIwrp4qQDXiK99Utww==", + "license": "MIT", + "dependencies": { + "ee-first": "1.1.1" + }, + "engines": { + "node": ">= 0.8" + } + }, "node_modules/ms": { "version": "2.0.0", "resolved": "https://registry.npmjs.org/ms/-/ms-2.0.0.tgz", @@ -7082,6 +7138,15 @@ "node": ">= 0.8" } }, + "node_modules/on-headers": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/on-headers/-/on-headers-1.1.0.tgz", + "integrity": "sha512-737ZY3yNnXy37FHkQxPzt4UZ2UWPWiCZWLvFZ4fu5cueciegX0zGPnrlY6bwRg4FdQOe9YU8MkmJwGhoMybl8A==", + "license": "MIT", + "engines": { + "node": ">= 0.8" + } + }, "node_modules/once": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", @@ -10159,6 +10224,7 @@ "dotenv": "^16.4.5", "express": "^4.19.2", "ioredis": "^5.3.2", + "morgan": "^1.10.1", "pg": "^8.11.5", "uuid": "^9.0.1", "zod": "^3.23.8" @@ -10211,11 +10277,13 @@ "dotenv": "^16.4.5", "express": "^4.19.2", "ioredis": "^5.3.2", + "morgan": "^1.10.1", "socket.io": "^4.7.5" }, "devDependencies": { "@types/cors": "^2.8.17", "@types/express": "^4.17.21", + "@types/morgan": "^1.9.10", "@types/node": "^20.12.7", "nodemon": "^3.1.0", "ts-node": "^10.9.2", diff --git a/packages/call-service/package.json b/packages/call-service/package.json index 57ccc9f..7a49fa8 100644 --- a/packages/call-service/package.json +++ b/packages/call-service/package.json @@ -17,6 +17,7 @@ "dotenv": "^16.4.5", "express": "^4.19.2", "ioredis": "^5.3.2", + "morgan": "^1.10.1", "pg": "^8.11.5", "uuid": "^9.0.1", "zod": "^3.23.8" @@ -24,6 +25,7 @@ "devDependencies": { "@types/cors": "^2.8.17", "@types/express": "^4.17.21", + "@types/morgan": "^1.9.10", "@types/node": "^20.12.7", "@types/pg": "^8.11.5", "@types/supertest": "^6.0.2", @@ -34,4 +36,4 @@ "typescript": "^5.4.5", "vitest": "^1.6.0" } -} +} \ No newline at end of file diff --git a/packages/call-service/src/bus/OutboxProcessor.ts b/packages/call-service/src/bus/OutboxProcessor.ts new file mode 100644 index 0000000..d108025 --- /dev/null +++ b/packages/call-service/src/bus/OutboxProcessor.ts @@ -0,0 +1,52 @@ +import { db } from '../db/client'; +import { publishStatusUpdate } from './publisher'; + +export class OutboxProcessor { + private timer: NodeJS.Timeout | null = null; + private isProcessing = false; + + start() { + if (this.timer) return; + this.timer = setInterval(() => this.processOutbox(), 1000); + } + + stop() { + if (this.timer) clearInterval(this.timer); + this.timer = null; + } + + async processOutbox() { + if (this.isProcessing) return; + this.isProcessing = true; + + try { + // Pull up to 50 events + const res = await db.query( + `SELECT * FROM outbox_events ORDER BY id ASC LIMIT 50` + ); + + for (const row of res.rows) { + try { + await publishStatusUpdate({ + callId: row.call_id, + status: row.status, + eventType: row.event_type, + timestamp: row.timestamp.toISOString(), + metadata: row.metadata, + }); + + // Delete on success + await db.query(`DELETE FROM outbox_events WHERE id = $1`, [row.id]); + } catch (publishErr) { + console.error(`Failed to publish outbox event ${row.id}:`, publishErr); + } + } + } catch (e) { + console.error('OutboxProcessor failed to query DB', e); + } finally { + this.isProcessing = false; + } + } +} + +export const outboxProcessor = new OutboxProcessor(); diff --git a/packages/call-service/src/db/CallRepository.ts b/packages/call-service/src/db/CallRepository.ts new file mode 100644 index 0000000..582b3b2 --- /dev/null +++ b/packages/call-service/src/db/CallRepository.ts @@ -0,0 +1,144 @@ +import { db } from './client'; +import { mapCallRow, mapCallEventRow } from './mappers'; +import { Call, CallEvent, CallFilters, CallStatus } from '../domain/call'; +import { EventPayload } from '../domain/call'; + +export class CallRepository { + async getCalls(filters: CallFilters & { limit?: number; offset?: number }): Promise { + let query = 'SELECT * FROM calls WHERE 1=1'; + const params: any[] = []; + let paramIndex = 1; + + if (filters.status) { + query += ` AND status = $${paramIndex++}`; + params.push(filters.status); + } + if (filters.queueId) { + query += ` AND queue_id = $${paramIndex++}`; + params.push(filters.queueId); + } + + query += ` ORDER BY start_time DESC`; + + if (filters.limit !== undefined) { + query += ` LIMIT $${paramIndex++}`; + params.push(filters.limit); + } + if (filters.offset !== undefined) { + query += ` OFFSET $${paramIndex++}`; + params.push(filters.offset); + } + + + const res = await db.query(query, params); + return res.rows.map(mapCallRow); + } + + async getCallEvents(callId: string): Promise { + console.log('callId ----- ', callId); + const res = await db.query( + `SELECT * FROM call_events WHERE call_id = $1 ORDER BY timestamp ASC`, + [callId] + ); + return res.rows.map(mapCallEventRow); + } + + async getEventByIdempotencyKey(key: string): Promise { + const res = await db.query( + `SELECT ce.* FROM call_events ce + JOIN idempotency_keys ik ON ce.id = ik.event_id + WHERE ik.key = $1`, + [key] + ); + if (res.rows.length === 0) return null; + return mapCallEventRow(res.rows[0]); + } + + async processEventTransaction( + payload: EventPayload, + eventId: string, + newStatus: CallStatus, + now: Date, + idempotencyKey?: string + ): Promise { + const { callId, event } = payload; + const client = await db.connect(); + + try { + await client.query('BEGIN'); + + if (event === 'call_initiated') { + const p = payload as Extract; + await client.query( + `INSERT INTO calls (id, type, status, queue_id, start_time) VALUES ($1, $2, $3, $4, $5)`, + [p.callId, p.type, newStatus, p.queueId, now] + ); + } else { + if (event === 'call_ended') { + await client.query( + `UPDATE calls SET status = $1, end_time = $2 WHERE id = $3`, + [newStatus, now, callId] + ); + } else { + await client.query( + `UPDATE calls SET status = $1 WHERE id = $2`, + [newStatus, callId] + ); + } + } + + // 2. Insert event + await client.query( + `INSERT INTO call_events (id, call_id, type, timestamp, metadata) VALUES ($1, $2, $3, $4, $5)`, + [eventId, callId, event, now, JSON.stringify(payload)] + ); + + // 3. Optional: idempotency key + if (idempotencyKey) { + await client.query( + `INSERT INTO idempotency_keys (key, event_id, created_at) VALUES ($1, $2, $3)`, + [idempotencyKey, eventId, now] + ); + } + + // 4. Outbox event for Redis + await client.query( + `INSERT INTO outbox_events (call_id, status, event_type, timestamp, metadata) + VALUES ($1, $2, $3, $4, $5)`, + [callId, newStatus, event, now, JSON.stringify(payload)] + ); + + await client.query('COMMIT'); + } catch (e) { + await client.query('ROLLBACK'); + throw e; + } finally { + client.release(); + } + } + + async insertFlag(callId: string, eventId: string, flagType: string, status: CallStatus): Promise { + const now = new Date(); + const client = await db.connect(); + try { + await client.query('BEGIN'); + await client.query( + `INSERT INTO call_events (id, call_id, type, timestamp, metadata) VALUES ($1, $2, $3, $4, $5)`, + [eventId, callId, 'flagged', now, JSON.stringify({ reason: flagType })] + ); + await client.query( + `INSERT INTO outbox_events (call_id, status, event_type, timestamp, metadata) + VALUES ($1, $2, $3, $4, $5)`, + [callId, status, 'flagged', now, JSON.stringify({ reason: flagType })] + ); + await client.query('COMMIT'); + } catch (e) { + await client.query('ROLLBACK'); + throw e; + } finally { + client.release(); + } + } +} + +export const callRepository = new CallRepository(); diff --git a/packages/call-service/src/db/schema.sql b/packages/call-service/src/db/schema.sql index d3ec308..7635950 100644 --- a/packages/call-service/src/db/schema.sql +++ b/packages/call-service/src/db/schema.sql @@ -21,3 +21,19 @@ CREATE TABLE IF NOT EXISTS call_events ( CREATE INDEX IF NOT EXISTS idx_call_events_call_id ON call_events(call_id); CREATE INDEX IF NOT EXISTS idx_calls_status ON calls(status); CREATE INDEX IF NOT EXISTS idx_calls_queue_id ON calls(queue_id); + +CREATE TABLE IF NOT EXISTS idempotency_keys ( + key VARCHAR(100) PRIMARY KEY, + event_id VARCHAR(36) NOT NULL REFERENCES call_events(id) ON DELETE CASCADE, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); + +CREATE TABLE IF NOT EXISTS outbox_events ( + id SERIAL PRIMARY KEY, + call_id VARCHAR(36) NOT NULL, + status VARCHAR(20) NOT NULL, + event_type VARCHAR(50) NOT NULL, + timestamp TIMESTAMPTZ NOT NULL, + metadata JSONB, + created_at TIMESTAMPTZ NOT NULL DEFAULT NOW() +); diff --git a/packages/call-service/src/db/seed.ts b/packages/call-service/src/db/seed.ts new file mode 100644 index 0000000..2a37035 --- /dev/null +++ b/packages/call-service/src/db/seed.ts @@ -0,0 +1,221 @@ +import 'dotenv/config'; +import { db } from './client'; + +const now = Date.now(); +const minsAgo = (m: number) => new Date(now - m * 60_000).toISOString(); + +const MOCK_CALLS = [ + { + id: 'c-001', + type: 'video', + status: 'active', + queueId: 'medical_spanish', + startTime: minsAgo(5), + endTime: null, + }, + { + id: 'c-002', + type: 'voice', + status: 'waiting', + queueId: 'medical_english', + startTime: minsAgo(2), + endTime: null, + }, + { + id: 'c-003', + type: 'video', + status: 'on_hold', + queueId: 'medical_spanish', + startTime: minsAgo(12), + endTime: null, + }, + { + id: 'c-004', + type: 'voice', + status: 'ended', + queueId: 'medical_english', + startTime: minsAgo(30), + endTime: minsAgo(25), + }, + { + id: 'c-005', + type: 'video', + status: 'active', + queueId: 'legal_spanish', + startTime: minsAgo(8), + endTime: null, + }, + { + id: 'c-006', + type: 'voice', + status: 'waiting', + queueId: 'medical_spanish', + startTime: minsAgo(1), + endTime: null, + }, + { + id: 'c-007', + type: 'video', + status: 'ended', + queueId: 'legal_english', + startTime: minsAgo(60), + endTime: minsAgo(45), + }, + { + id: 'c-008', + type: 'voice', + status: 'active', + queueId: 'medical_english', + startTime: minsAgo(3), + endTime: null, + }, +]; + +const MOCK_EVENTS = [ + { + id: 'e-001-1', + callId: 'c-001', + type: 'call_initiated', + timestamp: minsAgo(5), + metadata: { type: 'video', queueId: 'medical_spanish' }, + }, + { + id: 'e-001-2', + callId: 'c-001', + type: 'call_routed', + timestamp: minsAgo(4), + metadata: { agentId: 'agent_42', routingTime: 15 }, + }, + { + id: 'e-001-3', + callId: 'c-001', + type: 'call_answered', + timestamp: minsAgo(3), + metadata: { waitTime: 25 }, + }, + { + id: 'e-003-1', + callId: 'c-003', + type: 'call_initiated', + timestamp: minsAgo(12), + metadata: { type: 'video', queueId: 'medical_spanish' }, + }, + { + id: 'e-003-2', + callId: 'c-003', + type: 'call_routed', + timestamp: minsAgo(11), + metadata: { agentId: 'agent_18', routingTime: 12 }, + }, + { + id: 'e-003-3', + callId: 'c-003', + type: 'call_answered', + timestamp: minsAgo(10), + metadata: { waitTime: 12 }, + }, + { + id: 'e-003-4', + callId: 'c-003', + type: 'call_hold', + timestamp: minsAgo(5), + metadata: { holdDuration: 45 }, + }, + { + id: 'e-004-1', + callId: 'c-004', + type: 'call_initiated', + timestamp: minsAgo(30), + metadata: { type: 'voice', queueId: 'medical_english' }, + }, + { + id: 'e-004-2', + callId: 'c-004', + type: 'call_routed', + timestamp: minsAgo(29), + metadata: { agentId: 'agent_07', routingTime: 8 }, + }, + { + id: 'e-004-3', + callId: 'c-004', + type: 'call_answered', + timestamp: minsAgo(28), + metadata: { waitTime: 8 }, + }, + { + id: 'e-004-4', + callId: 'c-004', + type: 'call_ended', + timestamp: minsAgo(25), + metadata: { endReason: 'completed', duration: 300 }, + }, + { + id: 'e-007-1', + callId: 'c-007', + type: 'call_initiated', + timestamp: minsAgo(60), + metadata: { type: 'video', queueId: 'legal_english' }, + }, + { + id: 'e-007-2', + callId: 'c-007', + type: 'call_routed', + timestamp: minsAgo(58), + metadata: { agentId: 'agent_33', routingTime: 20 }, + }, + { + id: 'e-007-3', + callId: 'c-007', + type: 'call_answered', + timestamp: minsAgo(57), + metadata: { waitTime: 35 }, + }, + { + id: 'e-007-4', + callId: 'c-007', + type: 'call_ended', + timestamp: minsAgo(45), + metadata: { endReason: 'completed', duration: 720 }, + }, +]; + +async function seed() { + console.log('Seeding database...'); + const client = await db.connect(); + + try { + await client.query('BEGIN'); + + // Clean existing data for a fresh seed + await client.query('DELETE FROM call_events'); + await client.query('DELETE FROM outbox_events'); + await client.query('DELETE FROM calls'); + + // Insert Calls + for (const call of MOCK_CALLS) { + await client.query( + `INSERT INTO calls (id, type, status, queue_id, start_time, end_time) VALUES ($1, $2, $3, $4, $5, $6)`, + [call.id, call.type, call.status, call.queueId, call.startTime, call.endTime] + ); + } + + // Insert Events + for (const event of MOCK_EVENTS) { + await client.query( + `INSERT INTO call_events (id, call_id, type, timestamp, metadata) VALUES ($1, $2, $3, $4, $5)`, + [event.id, event.callId, event.type, event.timestamp, JSON.stringify(event.metadata)] + ); + } + + await client.query('COMMIT'); + console.log('Seed completed successfully.'); + } catch (err) { + await client.query('ROLLBACK'); + console.error('Seed failed:', err); + } finally { + client.release(); + process.exit(0); + } +} + +seed(); diff --git a/packages/call-service/src/index.ts b/packages/call-service/src/index.ts index 1786181..4eb2a2c 100644 --- a/packages/call-service/src/index.ts +++ b/packages/call-service/src/index.ts @@ -1,13 +1,18 @@ import { config } from './config'; import express from 'express'; import cors from 'cors'; +import morgan from 'morgan'; import eventsRouter from './routes/events'; import callsRouter from './routes/calls'; +import { outboxProcessor } from './bus/OutboxProcessor'; const app = express(); app.use(cors()); app.use(express.json()); +app.use(morgan('dev')); + +outboxProcessor.start(); // Start background publisher app.use('/api/events', eventsRouter); app.use('/api/calls', callsRouter); diff --git a/packages/call-service/src/routes/calls.ts b/packages/call-service/src/routes/calls.ts index 762face..05b9350 100644 --- a/packages/call-service/src/routes/calls.ts +++ b/packages/call-service/src/routes/calls.ts @@ -7,20 +7,28 @@ const router = Router(); router.get('/', async (req: Request, res: Response) => { try { - const filters: CallFilters = { + const filters: CallFilters & { limit?: number; offset?: number } = { status: - typeof req.query.status === 'string' + typeof req.query.status === 'string' && req.query.status !== 'all' ? (req.query.status as CallStatus) : undefined, queueId: typeof req.query.queueId === 'string' ? (req.query.queueId as QueueId) : undefined, + limit: + typeof req.query.limit === 'string' + ? parseInt(req.query.limit, 10) + : undefined, + offset: + typeof req.query.offset === 'string' + ? parseInt(req.query.offset, 10) + : undefined, }; - const calls = await callService.getCalls(filters); res.json(calls); - } catch (_error) { + } catch (error) { + console.error('Error fetching calls:', error); res.status(500).json({ message: 'Internal server error' }); } }); diff --git a/packages/call-service/src/routes/events.test.ts b/packages/call-service/src/routes/events.test.ts index 79c8f7e..40a6b94 100644 --- a/packages/call-service/src/routes/events.test.ts +++ b/packages/call-service/src/routes/events.test.ts @@ -1,7 +1,70 @@ -import { describe, it } from 'vitest'; +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import request from 'supertest'; +import express from 'express'; +import eventsRouter from './events'; +import { callService } from '../services'; + +vi.mock('../services', () => ({ + callService: { + processEvent: vi.fn(), + }, +})); + +vi.mock('../config', () => ({ + config: { + apiKey: 'test-key', + }, +})); + +const app = express(); +app.use(express.json()); +app.use('/api/events', eventsRouter); describe('POST /api/events', () => { - it.todo('returns 201 and persists the event for a valid call_initiated payload'); - it.todo('returns 400 for an invalid payload'); - it.todo('returns 401 when the API key is missing'); + beforeEach(() => { + vi.clearAllMocks(); + }); + + it('returns 201 and persists the event for a valid call_initiated payload', async () => { + const payload = { + event: 'call_initiated', + callId: 'call-1', + type: 'voice', + queueId: 'medical_spanish' + }; + + (callService.processEvent as any).mockResolvedValue({ id: 'event-1', type: 'call_initiated' }); + + const res = await request(app) + .post('/api/events') + .set('x-api-key', 'test-key') + .send(payload); + + expect(res.status).toBe(201); + expect(res.body).toEqual({ id: 'event-1', type: 'call_initiated' }); + expect(callService.processEvent).toHaveBeenCalledWith(payload, undefined); + }); + + it('returns 400 for an invalid payload', async () => { + const payload = { + event: 'call_initiated', + // missing callId, type, queueId + }; + + const res = await request(app) + .post('/api/events') + .set('x-api-key', 'test-key') + .send(payload); + + expect(res.status).toBe(400); + expect(res.body.message).toBe('Invalid event payload'); + }); + + it('returns 401 when the API key is missing', async () => { + const res = await request(app) + .post('/api/events') + .send({}); // No authorization header + + expect(res.status).toBe(401); + }); }); diff --git a/packages/call-service/src/routes/events.ts b/packages/call-service/src/routes/events.ts index 2bb57af..cf70544 100644 --- a/packages/call-service/src/routes/events.ts +++ b/packages/call-service/src/routes/events.ts @@ -1,6 +1,5 @@ import { Router, Request, Response } from 'express'; import { eventPayloadSchema } from '@voycelink/contracts'; -import { ZodError } from 'zod'; import type { EventPayload } from '../domain/call'; import { callService } from '../services'; import { apiKeyAuth } from '../middleware/apiKey'; @@ -10,10 +9,12 @@ const router = Router(); router.post('/', apiKeyAuth, async (req: Request, res: Response) => { try { const payload: EventPayload = eventPayloadSchema.parse(req.body); - const event = await callService.processEvent(payload); + const idempotencyKey = req.headers['x-idempotency-key'] as string | undefined; + const event = await callService.processEvent(payload, idempotencyKey); + res.status(201).json(event); - } catch (error) { - if (error instanceof ZodError) { + } catch (error: any) { + if (error && error.name === 'ZodError') { res.status(400).json({ message: 'Invalid event payload', issues: error.issues, @@ -21,7 +22,8 @@ router.post('/', apiKeyAuth, async (req: Request, res: Response) => { return; } - res.status(500).json({ message: 'Internal server error' }); + console.error('Error processing event:', error); + res.status(500).json({ message: 'Internal server error', error }); } }); diff --git a/packages/call-service/src/services/CallService.test.ts b/packages/call-service/src/services/CallService.test.ts index d4c66f4..308a2d2 100644 --- a/packages/call-service/src/services/CallService.test.ts +++ b/packages/call-service/src/services/CallService.test.ts @@ -1,9 +1,64 @@ -import { describe, it } from 'vitest'; +import { describe, it, expect, vi, beforeEach } from 'vitest'; +import { CallService } from './CallService'; +import { callRepository } from '../db/CallRepository'; +import { slaManager } from './SlaManager'; + +vi.mock('../db/CallRepository', () => ({ + callRepository: { + getEventByIdempotencyKey: vi.fn(), + processEventTransaction: vi.fn(), + }, +})); + +vi.mock('./SlaManager', () => ({ + slaManager: { + handleTimersAndRules: vi.fn(), + }, +})); describe('CallService', () => { - it.todo('processes call_initiated and persists the call'); - it.todo('processes call_answered and updates call status to active'); - it.todo('flags call_answered when waitTime exceeds 30 seconds'); - it.todo('flags call_hold when holdDuration exceeds 60 seconds'); - it.todo('flags call_ended when duration is under 10 seconds'); + let callService: CallService; + + beforeEach(() => { + vi.clearAllMocks(); + callService = new CallService(); + }); + + it('processes call_initiated and delegates to repository', async () => { + const payload = { + event: 'call_initiated' as const, + callId: 'call-1', + type: 'voice' as const, + queueId: 'medical_spanish' as const, + }; + + const result = await callService.processEvent(payload); + + expect(result.type).toBe('call_initiated'); + expect(callRepository.processEventTransaction).toHaveBeenCalledWith( + payload, + expect.any(String), + 'waiting', + expect.any(Date), + undefined + ); + expect(slaManager.handleTimersAndRules).toHaveBeenCalledWith(payload, 'waiting'); + }); + + it('skips processing if idempotency key exists', async () => { + const payload = { + event: 'call_answered' as const, + callId: 'call-1', + waitTime: 10, + }; + + const existingEvent = { id: 'evt-1', type: 'call_answered' } as any; + (callRepository.getEventByIdempotencyKey as any).mockResolvedValue(existingEvent); + + const result = await callService.processEvent(payload, 'idemp-123'); + + expect(result).toBe(existingEvent); + expect(callRepository.processEventTransaction).not.toHaveBeenCalled(); + expect(slaManager.handleTimersAndRules).not.toHaveBeenCalled(); + }); }); diff --git a/packages/call-service/src/services/CallService.ts b/packages/call-service/src/services/CallService.ts index 3c9b082..c9f665c 100644 --- a/packages/call-service/src/services/CallService.ts +++ b/packages/call-service/src/services/CallService.ts @@ -1,21 +1,70 @@ +import { v4 as uuidv4 } from 'uuid'; +import { callRepository } from '../db/CallRepository'; +import { slaManager } from './SlaManager'; import { Call, CallEvent, CallFilters, CallServiceContract, EventPayload, + CallStatus } from '../domain/call'; export class CallService implements CallServiceContract { - async processEvent(_payload: EventPayload): Promise { - throw new Error('CallService.processEvent not implemented'); + async processEvent(payload: EventPayload, idempotencyKey?: string): Promise { + const { callId, event } = payload; + const now = new Date(); + const eventId = uuidv4(); + + //Check idempotency + if (idempotencyKey) { + const existing = await callRepository.getEventByIdempotencyKey(idempotencyKey); + if (existing) { + return existing; // Return early if already processed + } + } + + let newStatus: CallStatus; + switch (event) { + case 'call_initiated': + newStatus = 'waiting'; + break; + case 'call_routed': + newStatus = 'waiting'; + break; + case 'call_answered': + newStatus = 'active'; + break; + case 'call_hold': + newStatus = 'on_hold'; + break; + case 'call_ended': + newStatus = 'ended'; + break; + default: + throw new Error(`Unknown event type: ${event}`); + } + + //Process the transaction + await callRepository.processEventTransaction( + payload, + eventId, + newStatus, + now, + idempotencyKey + ); + + //Handle SLA Timers + slaManager.handleTimersAndRules(payload, newStatus); + + return new CallEvent(eventId, callId, event, now, payload as unknown as Record); } - async getCalls(_filters: CallFilters): Promise { - throw new Error('CallService.getCalls not implemented'); + async getCalls(filters: CallFilters & { limit?: number; offset?: number }): Promise { + return callRepository.getCalls(filters); } - async getCallEvents(_callId: string): Promise { - throw new Error('CallService.getCallEvents not implemented'); + async getCallEvents(callId: string): Promise { + return callRepository.getCallEvents(callId); } } diff --git a/packages/call-service/src/services/SlaManager.ts b/packages/call-service/src/services/SlaManager.ts new file mode 100644 index 0000000..0c8c8cb --- /dev/null +++ b/packages/call-service/src/services/SlaManager.ts @@ -0,0 +1,75 @@ +import { EventPayload, CallStatus } from '../domain/call'; +import { v4 as uuidv4 } from 'uuid'; +import { callRepository } from '../db/CallRepository'; + +export class SlaManager { + private slaTimers = new Map(); + private callStatuses = new Map(); + + handleTimersAndRules(payload: EventPayload, status: CallStatus) { + const { callId, event } = payload; + this.callStatuses.set(callId, status); + + if (this.slaTimers.has(callId)) { + clearTimeout(this.slaTimers.get(callId)!); + this.slaTimers.delete(callId); + } + + switch (event) { + case 'call_initiated': + this.slaTimers.set( + callId, + setTimeout(() => { + console.warn(`[SLA WARNING] Call ${callId} exceeded 30s max wait time.`); + this.insertFlag(callId, 'sla_wait_exceeded'); + }, 30000) + ); + break; + + case 'call_routed': + this.slaTimers.set( + callId, + setTimeout(() => { + console.warn(`[SLA WARNING] Call ${callId} unanswered after 15s. Re-routing.`); + this.insertFlag(callId, 'unanswered_reroute_required'); + }, 15000) + ); + break; + + case 'call_answered': + const answeredPayload = payload as Extract; + if (answeredPayload.waitTime > 30) { + console.warn(`[SLA WARNING] Call ${callId} answered with waitTime > 30s.`); + } + break; + + case 'call_hold': + this.slaTimers.set( + callId, + setTimeout(() => { + console.warn(`[SLA WARNING] Call ${callId} exceeded 60s max hold time.`); + this.insertFlag(callId, 'sla_hold_exceeded'); + }, 60000) + ); + break; + + case 'call_ended': + const endedPayload = payload as Extract; + if (endedPayload.duration < 10) { + console.warn(`[SLA WARNING] Call ${callId} duration was under 10 seconds.`); + this.insertFlag(callId, 'short_duration_flag'); + } + this.callStatuses.delete(callId); + break; + } + } + + private insertFlag(callId: string, flagType: string) { + const status = this.callStatuses.get(callId) || 'waiting'; + callRepository + .insertFlag(callId, uuidv4(), flagType, status) + .catch((err) => console.error(`Failed to insert flag event for call ${callId}:`, err)); + } +} + +export const slaManager = new SlaManager(); diff --git a/packages/frontend/src/components/CallsTable.tsx b/packages/frontend/src/components/CallsTable.tsx index 4280e61..1690691 100644 --- a/packages/frontend/src/components/CallsTable.tsx +++ b/packages/frontend/src/components/CallsTable.tsx @@ -1,5 +1,6 @@ 'use client'; +import { useState, useEffect } from 'react'; import { Call } from '../types'; import { StatusBadge } from './StatusBadge'; @@ -16,6 +17,22 @@ function elapsed(start: string, end?: string): string { return `${Math.floor(s / 60)}m ${s % 60}s`; } +function LiveDuration({ start, end }: { start: string; end?: string }) { + const [, tick] = useState(0); + + useEffect(() => { + if (end) return; + + const timer = setInterval(() => { + tick(prev => prev + 1); + }, 1000); + + return () => clearInterval(timer); + }, [end]); + + return <>{elapsed(start, end || new Date().toISOString())}; +} + function timeOf(iso: string): string { return new Date(iso).toLocaleTimeString([], { hour: '2-digit', @@ -55,9 +72,8 @@ export function CallsTable({ calls, selectedCallId, onSelectCall, loading }: Pro onSelectCall(call.id)} - className={`cursor-pointer transition-colors hover:bg-blue-50 ${ - selectedCallId === call.id ? 'bg-blue-50' : '' - }`} + className={`cursor-pointer transition-colors hover:bg-blue-50 ${selectedCallId === call.id ? 'bg-blue-50' : '' + }`} > {call.id} {call.type} @@ -66,7 +82,7 @@ export function CallsTable({ calls, selectedCallId, onSelectCall, loading }: Pro - {elapsed(call.startTime, call.endTime)} + {timeOf(call.startTime)} diff --git a/packages/frontend/src/hooks/useCallEvents.ts b/packages/frontend/src/hooks/useCallEvents.ts index c428f4e..afc4ba0 100644 --- a/packages/frontend/src/hooks/useCallEvents.ts +++ b/packages/frontend/src/hooks/useCallEvents.ts @@ -1,13 +1,10 @@ 'use client'; import { useState, useEffect } from 'react'; -import { CallEvent } from '../types'; -import { MOCK_EVENTS } from '../mocks/data'; +import { CallEvent, CallStatusUpdate } from '../types'; +import { getSocket, subscribeToCall, unsubscribeFromCall } from '../lib/socket'; +import { v4 as uuidv4 } from 'uuid'; -/** - * Returns the event history for a specific call. - * TODO: replace mock data with a real API call. - */ export function useCallEvents(callId: string | null) { const [events, setEvents] = useState([]); const [loading, setLoading] = useState(false); @@ -18,14 +15,56 @@ export function useCallEvents(callId: string | null) { return; } - setLoading(true); - // TODO: replace with fetchCallEvents(callId) - const t = setTimeout(() => { - setEvents(MOCK_EVENTS[callId] ?? []); - setLoading(false); - }, 200); + let active = true; + const socket = getSocket(); - return () => clearTimeout(t); + async function init() { + try { + setLoading(true); + socket.connect(); + subscribeToCall(callId!); + + const res = await fetch(`http://localhost:3001/api/calls/${callId}/events`); + if (!res.ok) throw new Error('Failed'); + const data = await res.json(); + + if (active) { + setEvents(prevSocketEvents => { + const existingIds = new Set(data.map((e: CallEvent) => e.id)); + const newSocketEvents = prevSocketEvents.filter(e => !existingIds.has(e.id)); + + return [...data, ...newSocketEvents]; + }); + setLoading(false); + } + } catch (err) { + if (active) setLoading(false); + } + } + + const handleUpdate = (update: CallStatusUpdate) => { + if (update.callId !== callId) return; + + setEvents((prev) => [ + ...prev, + { + id: uuidv4(), + callId: update.callId, + type: update.eventType, + timestamp: update.timestamp, + metadata: update.metadata, + }, + ]); + }; + + socket.on('call_status_update', handleUpdate); + init(); + + return () => { + active = false; + socket.off('call_status_update', handleUpdate); + unsubscribeFromCall(callId); + }; }, [callId]); return { events, loading }; diff --git a/packages/frontend/src/hooks/useCalls.ts b/packages/frontend/src/hooks/useCalls.ts index a35ab33..711a764 100644 --- a/packages/frontend/src/hooks/useCalls.ts +++ b/packages/frontend/src/hooks/useCalls.ts @@ -1,26 +1,88 @@ 'use client'; import { useState, useEffect } from 'react'; -import { Call, CallFilters } from '../types'; -import { MOCK_CALLS } from '../mocks/data'; - -/** - * Returns the live call list and a loading indicator. - * TODO: replace mock data with real API + Socket.io updates. - */ -export function useCalls(_filters: CallFilters) { +import { Call, CallFilters, CallStatusUpdate } from '../types'; +import { getSocket, subscribeToDashboard, unsubscribeFromDashboard } from '../lib/socket'; +import { fetchCalls } from '../lib/api'; + +export function useCalls(filters: CallFilters) { const [calls, setCalls] = useState([]); const [loading, setLoading] = useState(true); useEffect(() => { - // TODO: replace with fetchCalls(_filters) + socket subscription - const t = setTimeout(() => { - setCalls(MOCK_CALLS); - setLoading(false); - }, 300); + let active = true; + const socket = getSocket(); + + async function loadCalls() { + try { + setLoading(true); + const data = await fetchCalls(filters); + + if (active) { + setCalls(data); + setLoading(false); + } + } catch (err) { + console.error("Fetch error:", err); + if (active) setLoading(false); + } + } + + socket.connect(); + subscribeToDashboard(); + loadCalls(); + + const handleUpdate = (update: CallStatusUpdate) => { + setCalls((prev) => { + const idx = prev.findIndex((c) => c.id === update.callId); + + if (idx !== -1) { + const updatedCalls = [...prev]; + updatedCalls[idx] = { + ...updatedCalls[idx], + status: update.status, + ...(update.status === 'ended' ? { endTime: update.timestamp } : {}), + }; + + if (filters.status && update.status !== filters.status) { + return updatedCalls.filter((_, i) => i !== idx); + } + + return updatedCalls; + } + + if (update.eventType === 'call_initiated' && update.metadata) { + const payload = update.metadata as any; + + const matchesStatus = !filters.status || update.status === filters.status; + const matchesQueue = !filters.queueId || payload.queueId === filters.queueId; + + if (matchesStatus && matchesQueue) { + return [ + { + id: update.callId, + type: payload.type, + status: update.status, + queueId: payload.queueId, + startTime: update.timestamp, + }, + ...prev, + ]; + } + } + + return prev; + }); + }; + + socket.on('call_status_update', handleUpdate); - return () => clearTimeout(t); - }, []); + return () => { + active = false; + socket.off('call_status_update', handleUpdate); + unsubscribeFromDashboard(); + }; + }, [filters.status, filters.queueId]); return { calls, loading, setCalls }; -} +} \ No newline at end of file diff --git a/packages/frontend/src/lib/api.ts b/packages/frontend/src/lib/api.ts index a87afa6..f3180e9 100644 --- a/packages/frontend/src/lib/api.ts +++ b/packages/frontend/src/lib/api.ts @@ -5,7 +5,6 @@ const BASE_URL = /** * Fetch the current call list from call-service. - * TODO: call this from the `useCalls` hook. */ export async function fetchCalls(params?: CallFilters): Promise { const queryParams = new URLSearchParams(); @@ -27,7 +26,6 @@ export async function fetchCalls(params?: CallFilters): Promise { /** * Fetch event history for a specific call. - * TODO: call this from the `useCallEvents` hook. */ export async function fetchCallEvents(callId: string): Promise { const res = await fetch(`${BASE_URL}/api/calls/${callId}/events`, { diff --git a/packages/frontend/src/lib/socket.ts b/packages/frontend/src/lib/socket.ts index 2ae8fa9..83327e5 100644 --- a/packages/frontend/src/lib/socket.ts +++ b/packages/frontend/src/lib/socket.ts @@ -24,4 +24,12 @@ export function unsubscribeFromCall(callId: string): void { getSocket().emit('unsubscribe_call', callId); } +export function subscribeToDashboard(): void { + getSocket().emit('subscribe_dashboard'); +} + +export function unsubscribeFromDashboard(): void { + getSocket().emit('unsubscribe_dashboard'); +} + export type { CallStatusUpdate }; diff --git a/packages/realtime-service/package.json b/packages/realtime-service/package.json index edf985e..e755c9a 100644 --- a/packages/realtime-service/package.json +++ b/packages/realtime-service/package.json @@ -14,11 +14,13 @@ "dotenv": "^16.4.5", "express": "^4.19.2", "ioredis": "^5.3.2", + "morgan": "^1.10.1", "socket.io": "^4.7.5" }, "devDependencies": { "@types/cors": "^2.8.17", "@types/express": "^4.17.21", + "@types/morgan": "^1.9.10", "@types/node": "^20.12.7", "nodemon": "^3.1.0", "ts-node": "^10.9.2", diff --git a/packages/realtime-service/src/bus/subscriber.ts b/packages/realtime-service/src/bus/subscriber.ts index 723b278..f4d6938 100644 --- a/packages/realtime-service/src/bus/subscriber.ts +++ b/packages/realtime-service/src/bus/subscriber.ts @@ -2,12 +2,28 @@ import Redis from 'ioredis'; import { CallStatusUpdate } from '../types'; const CHANNEL = 'call-status-updates'; +const redis = new Redis(process.env.REDIS_URL || 'redis://localhost:6379'); export function subscribeToCallUpdates( onUpdate: (update: CallStatusUpdate) => void, ): void { - // TODO: subscribe to CHANNEL on Redis and call onUpdate for each message - void Redis; - void CHANNEL; - void onUpdate; + redis.subscribe(CHANNEL, (err, count) => { + if (err) { + console.error('Failed to subscribe to redis channel:', err); + return; + } + console.info(`Subscribed to ${count} channels. Listening on ${CHANNEL}...`); + }); + + redis.on('message', (channel, message) => { + if (channel === CHANNEL) { + try { + const update: CallStatusUpdate = JSON.parse(message); + console.info('Received call update:', update); + onUpdate(update); + } catch (err) { + console.error('Failed to parse redis message:', err); + } + } + }); } diff --git a/packages/realtime-service/src/index.ts b/packages/realtime-service/src/index.ts index 414423d..7dad39f 100644 --- a/packages/realtime-service/src/index.ts +++ b/packages/realtime-service/src/index.ts @@ -2,6 +2,7 @@ import 'dotenv/config'; import http from 'http'; import express from 'express'; import cors from 'cors'; +import morgan from 'morgan'; import { createSocketServer, broadcastStatusUpdate } from './socket/server'; import { subscribeToCallUpdates } from './bus/subscriber'; @@ -11,6 +12,7 @@ const PORT = process.env.PORT ?? 3002; app.use(cors()); app.use(express.json()); +app.use(morgan('dev')); app.get('/health', (_req, res) => res.json({ status: 'ok' })); diff --git a/packages/realtime-service/src/socket/server.ts b/packages/realtime-service/src/socket/server.ts index d344552..81e91f5 100644 --- a/packages/realtime-service/src/socket/server.ts +++ b/packages/realtime-service/src/socket/server.ts @@ -14,12 +14,20 @@ export function createSocketServer(httpServer: HttpServer): IoServer { io.on('connection', (socket) => { console.log(`[ws] client connected ${socket.id}`); - socket.on('subscribe_call', (_callId) => { - // TODO: socket.join(_callId); + socket.on('subscribe_dashboard' as any, () => { + socket.join('dashboard'); }); - socket.on('unsubscribe_call', (_callId) => { - // TODO: socket.leave(_callId); + socket.on('unsubscribe_dashboard' as any, () => { + socket.leave('dashboard'); + }); + + socket.on('subscribe_call', (callId) => { + socket.join(callId); + }); + + socket.on('unsubscribe_call', (callId) => { + socket.leave(callId); }); socket.on('disconnect', () => { @@ -32,6 +40,6 @@ export function createSocketServer(httpServer: HttpServer): IoServer { export function broadcastStatusUpdate(update: CallStatusUpdate): void { if (!io) return; - // TODO: io.to(update.callId).emit('call_status_update', update); - io.emit('call_status_update', update); // naive – replace with room-based + io.to(update.callId).emit('call_status_update', update); + io.to('dashboard').emit('call_status_update', update); }