diff --git a/packages/call-service/src/db/mappers.ts b/packages/call-service/src/db/mappers.ts index b7f6fe7..7f9865f 100644 --- a/packages/call-service/src/db/mappers.ts +++ b/packages/call-service/src/db/mappers.ts @@ -1,21 +1,21 @@ -import type { QueueId } from '@voycelink/contracts'; -import { Call, CallEvent } from '../domain/call'; +import type { QueueId } from "@voycelink/contracts"; +import { Call, CallEvent } from "../domain/call"; export interface CallRow { - id: string; - type: Call['type']; - status: Call['status']; - queue_id: QueueId; - start_time: Date; - end_time: Date | null; + id: Call["id"]; + type: Call["type"]; + status: Call["status"]; + queue_id: Call["queueId"]; + start_time: Call["startTime"]; + end_time: Call["endTime"] | null; } export interface CallEventRow { - id: string; - call_id: string; - type: string; - timestamp: Date; - metadata: Record | null; + id: CallEvent["id"]; + call_id: CallEvent["callId"]; + type: CallEvent["type"]; + timestamp: CallEvent["timestamp"]; + metadata: CallEvent["metadata"] | null; } export function mapCallRow(row: CallRow): Call { diff --git a/packages/call-service/src/domain/call.ts b/packages/call-service/src/domain/call.ts index 99016df..af2c581 100644 --- a/packages/call-service/src/domain/call.ts +++ b/packages/call-service/src/domain/call.ts @@ -1,12 +1,20 @@ import type { CallStatus, CallStatusUpdate, + CallEventType, CallType, EventPayload, QueueId, } from '@voycelink/contracts'; -export type { CallStatus, CallStatusUpdate, CallType, EventPayload, QueueId }; +export type { + CallStatus, + CallStatusUpdate, + CallEventType, + CallType, + EventPayload, + QueueId, +}; export interface CallFilters { status?: CallStatus; @@ -28,12 +36,19 @@ export class CallEvent { constructor( public readonly id: string, public readonly callId: string, - public readonly type: string, + public readonly type: CallEventType, public readonly timestamp: Date, public readonly metadata?: Record, ) {} } +export class CallNotFoundError extends Error { + constructor(callId: string) { + super(`Call not found: ${callId}`); + this.name = "CallNotFoundError"; + } +} + export interface CallServiceContract { processEvent(payload: EventPayload): Promise; getCalls(filters: CallFilters): Promise; diff --git a/packages/call-service/src/routes/events.integration.test.ts b/packages/call-service/src/routes/events.integration.test.ts new file mode 100644 index 0000000..dfe8d1e --- /dev/null +++ b/packages/call-service/src/routes/events.integration.test.ts @@ -0,0 +1,63 @@ +import "dotenv/config"; +import express, { Express } from "express"; +import request from "supertest"; +import { afterAll, beforeAll, beforeEach, describe, expect, it } from "vitest"; +import { db } from "../db/client"; + +describe("POST /api/events (integration)", () => { + let app: Express; + + beforeAll(async () => { + process.env.API_KEY = process.env.API_KEY ?? "change-me"; + const { default: eventsRouter } = await import("./events"); + app = express(); + app.use(express.json()); + app.use("/api/events", eventsRouter); + }); + + beforeEach(async () => { + await db.query("DELETE FROM call_events"); + await db.query("DELETE FROM calls"); + }); + + afterAll(async () => { + await db.end(); + }); + + it("ingests call_initiated and persists event + call in database", async () => { + const callId = `integration-${Date.now()}`; + const response = await request(app) + .post("/api/events") + .set("X-API-Key", process.env.API_KEY as string) + .send({ + event: "call_initiated", + callId, + type: "voice", + queueId: "medical_spanish", + }); + + expect(response.status).toBe(201); + expect(response.body.callId).toBe(callId); + expect(response.body.type).toBe("call_initiated"); + + const callResult = await db.query( + "SELECT id, status, queue_id FROM calls WHERE id = $1", + [callId], + ); + expect(callResult.rowCount).toBe(1); + expect(callResult.rows[0]).toMatchObject({ + id: callId, + status: "waiting", + queue_id: "medical_spanish", + }); + + const eventResult = await db.query( + "SELECT call_id, type, metadata FROM call_events WHERE call_id = $1 ORDER BY timestamp DESC LIMIT 1", + [callId], + ); + expect(eventResult.rowCount).toBe(1); + expect(eventResult.rows[0].call_id).toBe(callId); + expect(eventResult.rows[0].type).toBe("call_initiated"); + expect(eventResult.rows[0].metadata).toMatchObject({ slaSeconds: 30 }); + }); +}); diff --git a/packages/call-service/src/routes/events.test.ts b/packages/call-service/src/routes/events.test.ts index 79c8f7e..d87ebb4 100644 --- a/packages/call-service/src/routes/events.test.ts +++ b/packages/call-service/src/routes/events.test.ts @@ -1,7 +1,98 @@ -import { describe, it } from 'vitest'; +import express, { Express } from 'express'; +import request from 'supertest'; +import { beforeAll, beforeEach, describe, expect, it, vi } from 'vitest'; +import { CallNotFoundError } from '../domain/call'; + +const processEventMock = vi.fn(); + +vi.mock('../services', () => ({ + callService: { + processEvent: processEventMock, + }, +})); describe('POST /api/events', () => { - it.todo('returns 201 and persists the event for a valid call_initiated payload'); - it.todo('returns 400 for an invalid payload'); - it.todo('returns 401 when the API key is missing'); + let app: Express; + + const validPayload = { + event: 'call_initiated', + callId: 'call-1', + type: 'voice', + queueId: 'medical_spanish', + } as const; + + beforeAll(async () => { + process.env.API_KEY = 'change-me'; + const { default: eventsRouter } = await import('./events'); + app = express(); + app.use(express.json()); + app.use('/api/events', eventsRouter); + }); + + beforeEach(() => { + processEventMock.mockReset(); + }); + + it('returns 201 and persists the event for a valid call_initiated payload', async () => { + const persistedEvent = { + id: 'event-1', + callId: 'call-1', + type: 'call_initiated', + timestamp: new Date('2026-01-01T00:00:00.000Z'), + metadata: { slaSeconds: 30 }, + }; + processEventMock.mockResolvedValueOnce(persistedEvent); + + const response = await request(app) + .post('/api/events') + .set('X-API-Key', 'change-me') + .send(validPayload); + + expect(response.status).toBe(201); + expect(response.body).toEqual({ + ...persistedEvent, + timestamp: persistedEvent.timestamp.toISOString(), + }); + expect(processEventMock).toHaveBeenCalledOnce(); + expect(processEventMock).toHaveBeenCalledWith(validPayload); + }); + + it('returns 400 for an invalid payload', async () => { + const response = await request(app) + .post('/api/events') + .set('X-API-Key', 'change-me') + .send({ + event: 'call_initiated', + callId: 'call-1', + }); + + expect(response.status).toBe(400); + expect(response.body.message).toBe('Invalid event payload'); + expect(processEventMock).not.toHaveBeenCalled(); + }); + + it('returns 401 when the API key is missing', async () => { + const response = await request(app).post('/api/events').send(validPayload); + + expect(response.status).toBe(401); + expect(response.body).toEqual({ message: 'Unauthorized' }); + expect(processEventMock).not.toHaveBeenCalled(); + }); + + it('returns 404 when the call does not exist', async () => { + processEventMock.mockRejectedValueOnce(new CallNotFoundError('missing-call')); + + const response = await request(app) + .post('/api/events') + .set('X-API-Key', 'change-me') + .send({ + event: 'call_ended', + callId: 'missing-call', + endReason: 'completed', + duration: 90, + }); + + expect(response.status).toBe(404); + expect(response.body).toEqual({ message: 'Call not found' }); + }); }); diff --git a/packages/call-service/src/routes/events.ts b/packages/call-service/src/routes/events.ts index 2bb57af..6a51518 100644 --- a/packages/call-service/src/routes/events.ts +++ b/packages/call-service/src/routes/events.ts @@ -1,27 +1,54 @@ -import { Router, Request, Response } from 'express'; -import { eventPayloadSchema } from '@voycelink/contracts'; -import { ZodError } from 'zod'; -import type { EventPayload } from '../domain/call'; -import { callService } from '../services'; -import { apiKeyAuth } from '../middleware/apiKey'; +import { Router, Request, Response } from "express"; +import { eventPayloadSchema } from "@voycelink/contracts"; +import { ZodError } from "zod"; +import { CallNotFoundError, type EventPayload } from "../domain/call"; +import { callService } from "../services"; +import { apiKeyAuth } from "../middleware/apiKey"; const router = Router(); -router.post('/', apiKeyAuth, async (req: Request, res: Response) => { +function isValidationError(error: unknown): error is ZodError { + return ( + error instanceof ZodError || + (typeof error === "object" && + error !== null && + "name" in error && + (error as { name?: string }).name === "ZodError" && + "issues" in error) + ); +} + +function isCallNotFoundError(error: unknown): error is CallNotFoundError { + return ( + error instanceof CallNotFoundError || + (typeof error === "object" && + error !== null && + "name" in error && + (error as { name?: string }).name === "CallNotFoundError") + ); +} + +router.post("/", apiKeyAuth, async (req: Request, res: Response) => { try { const payload: EventPayload = eventPayloadSchema.parse(req.body); const event = await callService.processEvent(payload); res.status(201).json(event); } catch (error) { - if (error instanceof ZodError) { + if (isValidationError(error)) { res.status(400).json({ - message: 'Invalid event payload', + message: "Invalid event payload", issues: error.issues, }); return; } + if (isCallNotFoundError(error)) { + res.status(404).json({ + message: "Call not found", + }); + return; + } - res.status(500).json({ message: 'Internal server error' }); + res.status(500).json({ message: "Internal server error" }); } }); diff --git a/packages/call-service/src/services/CallService.test.ts b/packages/call-service/src/services/CallService.test.ts index d4c66f4..3c60efa 100644 --- a/packages/call-service/src/services/CallService.test.ts +++ b/packages/call-service/src/services/CallService.test.ts @@ -1,9 +1,49 @@ -import { describe, it } from 'vitest'; - -describe('CallService', () => { - it.todo('processes call_initiated and persists the call'); - it.todo('processes call_answered and updates call status to active'); - it.todo('flags call_answered when waitTime exceeds 30 seconds'); - it.todo('flags call_hold when holdDuration exceeds 60 seconds'); - it.todo('flags call_ended when duration is under 10 seconds'); +import { describe, it, expect, beforeEach } from "vitest"; +import { CALL_EVENTS, CALL_STATUSES } from "@voycelink/contracts"; +import { CallService } from "./CallService"; + +const [CALL_INITIATED, , CALL_ANSWERED] = CALL_EVENTS; +const [STATUS_WAITING] = CALL_STATUSES; + +describe("CallService", () => { + let service: CallService; + + beforeEach(() => { + service = new CallService(); + }); + + it("processes call_initiated and persists the call", async () => { + const event = await service.processEvent({ + event: CALL_INITIATED, + callId: "call-1", + type: "voice", + queueId: "medical_spanish", + }); + + expect(event.callId).toBe("call-1"); + + const calls = await service.getCalls({ queueId: "medical_spanish" }); + expect( + calls.some((c) => c.id === "call-1" && c.status === STATUS_WAITING), + ).toBe( + true, + ); + }); + + it("flags call_answered when waitTime exceeds 30 seconds", async () => { + await service.processEvent({ + event: CALL_INITIATED, + callId: "call-2", + type: "video", + queueId: "medical_english", + }); + + const answered = await service.processEvent({ + event: CALL_ANSWERED, + callId: "call-2", + waitTime: 35, + }); + + expect(answered.metadata?.slaBreached).toBe(true); + }); }); diff --git a/packages/call-service/src/services/CallService.ts b/packages/call-service/src/services/CallService.ts index 3c9b082..81a48b8 100644 --- a/packages/call-service/src/services/CallService.ts +++ b/packages/call-service/src/services/CallService.ts @@ -1,21 +1,200 @@ +import { randomUUID } from "crypto"; +import type { PoolClient } from "pg"; +import { + CALL_EVENTS, + CALL_STATUSES, + CALL_SERVICE_CONSTANTS, +} from "@voycelink/contracts"; +import { + mapCallEventRow, + mapCallRow, + CallEventRow, + CallRow, +} from "../db/mappers"; +import { publishStatusUpdate } from "../bus/publisher"; import { Call, CallEvent, CallFilters, + CallNotFoundError, CallServiceContract, EventPayload, -} from '../domain/call'; + CallStatus, +} from "../domain/call"; +import { db } from "../db/client"; + +import { + INSERT_CALL_SQL, + SELECT_CALL_BY_ID_SQL, + UPDATE_CALL_STATUS_SQL, + SELECT_CALLS_BASE_SQL, + SELECT_CALL_EVENTS_BY_CALL_ID_SQL, + INSERT_CALL_EVENT_SQL, +} from "./sql"; + +const [CALL_INITIATED, CALL_ROUTED, CALL_ANSWERED, CALL_HOLD, CALL_ENDED] = + CALL_EVENTS; +const [STATUS_WAITING, STATUS_ACTIVE, STATUS_ON_HOLD, STATUS_ENDED] = + CALL_STATUSES; + +const { + EMPTY_ROW_COUNT, + SLA_SECONDS, + REROUTE_THRESHOLD_SECONDS, + SLA_BREACH_WAIT_SECONDS, + HOLD_EXCEEDED_THRESHOLD_SECONDS, + TOO_SHORT_CALL_DURATION_SECONDS, +} = CALL_SERVICE_CONSTANTS; export class CallService implements CallServiceContract { - async processEvent(_payload: EventPayload): Promise { - throw new Error('CallService.processEvent not implemented'); + async processEvent(payload: EventPayload): Promise { + const { event, callId } = payload; + const client = await db.connect(); + let nextStatus: CallStatus | null = null; + let metadata: Record = {}; + let insertedEvent: CallEventRow | null = null; + let transactionStarted = false; + + try { + await client.query("BEGIN"); + transactionStarted = true; + + switch (event) { + case CALL_INITIATED: + nextStatus = STATUS_WAITING; + metadata = { slaSeconds: SLA_SECONDS }; + await client.query(INSERT_CALL_SQL, [ + callId, + payload.type, + nextStatus, + payload.queueId, + ]); + break; + case CALL_ROUTED: + nextStatus = STATUS_WAITING; + metadata = { + agentId: payload.agentId, + routingTime: payload.routingTime, + rerouteRecommended: payload.routingTime > REROUTE_THRESHOLD_SECONDS, + }; + break; + case CALL_ANSWERED: + nextStatus = STATUS_ACTIVE; + metadata = { + waitTime: payload.waitTime, + slaBreached: payload.waitTime > SLA_BREACH_WAIT_SECONDS, + }; + break; + case CALL_HOLD: + nextStatus = STATUS_ON_HOLD; + metadata = { + holdDuration: payload.holdDuration, + holdExceeded: + payload.holdDuration > HOLD_EXCEEDED_THRESHOLD_SECONDS, + }; + break; + case CALL_ENDED: + nextStatus = STATUS_ENDED; + metadata = { + endReason: payload.endReason, + duration: payload.duration, + tooShort: payload.duration < TOO_SHORT_CALL_DURATION_SECONDS, + }; + break; + } + + if (event !== CALL_INITIATED) { + const existing = await this.getCallById(client, callId); + if (existing.rowCount === EMPTY_ROW_COUNT) { + throw new CallNotFoundError(callId); + } + } + + if (nextStatus && event !== CALL_INITIATED) { + await client.query(UPDATE_CALL_STATUS_SQL, [callId, nextStatus]); + } + + const result = await client.query(INSERT_CALL_EVENT_SQL, [ + randomUUID(), + callId, + event, + JSON.stringify(metadata), + ]); + insertedEvent = result.rows[0]; + + await client.query("COMMIT"); + transactionStarted = false; + } catch (error) { + if (transactionStarted) { + await client.query("ROLLBACK"); + } + throw error; + } finally { + client.release(); + } + + if (!insertedEvent) { + throw new Error("Failed to persist call event"); + } + + if (nextStatus) { + try { + await publishStatusUpdate({ + callId, + status: nextStatus, + eventType: event, + timestamp: new Date().toISOString(), + metadata, + }); + } catch (error) { + console.error("Failed to publish status update", { + callId, + event, + error, + }); + } + } + + return mapCallEventRow(insertedEvent); + } + + async getCalls(filters: CallFilters): Promise { + const { sql, values } = this.buildGetCallsQuery(filters); + const result = await db.query(sql, values); + return result.rows.map(mapCallRow); + } + + async getCallEvents(callId: string): Promise { + const result = await db.query( + SELECT_CALL_EVENTS_BY_CALL_ID_SQL, + [callId], + ); + return result.rows.map(mapCallEventRow); } - async getCalls(_filters: CallFilters): Promise { - throw new Error('CallService.getCalls not implemented'); + private getCallById(client: PoolClient, callId: string) { + return client.query(SELECT_CALL_BY_ID_SQL, [callId]); } - async getCallEvents(_callId: string): Promise { - throw new Error('CallService.getCallEvents not implemented'); + private buildGetCallsQuery(filters: CallFilters): { + sql: string; + values: unknown[]; + } { + const where: string[] = []; + const values: unknown[] = []; + if (filters.status) { + values.push(filters.status); + where.push(`status = $${values.length}`); + } + if (filters.queueId) { + values.push(filters.queueId); + where.push(`queue_id = $${values.length}`); + } + + const sql = `${SELECT_CALLS_BASE_SQL} + ${where.length ? `WHERE ${where.join(" AND ")}` : ""} + ORDER BY start_time DESC`; + + return { sql, values }; } } diff --git a/packages/call-service/src/services/index.ts b/packages/call-service/src/services/index.ts index 1cb0c8c..5951c27 100644 --- a/packages/call-service/src/services/index.ts +++ b/packages/call-service/src/services/index.ts @@ -1,4 +1,5 @@ -import { CallService } from './CallService'; +import { CallService } from "./CallService"; +export * from "./sql"; -export { CallService } from './CallService'; +export { CallService } from "./CallService"; export const callService = new CallService(); diff --git a/packages/call-service/src/services/sql.ts b/packages/call-service/src/services/sql.ts new file mode 100644 index 0000000..132ffbe --- /dev/null +++ b/packages/call-service/src/services/sql.ts @@ -0,0 +1,32 @@ +export const INSERT_CALL_SQL = ` + INSERT INTO calls (id, type, status, queue_id, start_time) + VALUES ($1, $2, $3, $4, NOW()) + ON CONFLICT (id) DO NOTHING +`; + +export const SELECT_CALL_BY_ID_SQL = `SELECT * FROM calls WHERE id = $1`; + +export const UPDATE_CALL_STATUS_SQL = ` + UPDATE calls + SET status = $2::varchar, + end_time = CASE WHEN $2::text = 'ended' THEN NOW() ELSE end_time END + WHERE id = $1 +`; + +export const SELECT_CALLS_BASE_SQL = ` + SELECT id, type, status, queue_id, start_time, end_time + FROM calls +`; + +export const SELECT_CALL_EVENTS_BY_CALL_ID_SQL = ` + SELECT id, call_id, type, timestamp, metadata + FROM call_events + WHERE call_id = $1 + ORDER BY timestamp ASC +`; + +export const INSERT_CALL_EVENT_SQL = ` + INSERT INTO call_events (id, call_id, type, timestamp, metadata) + VALUES ($1, $2, $3, NOW(), $4::jsonb) + RETURNING id, call_id, type, timestamp, metadata +`; diff --git a/packages/contracts/index.d.ts b/packages/contracts/index.d.ts index 02f1768..b39927c 100644 --- a/packages/contracts/index.d.ts +++ b/packages/contracts/index.d.ts @@ -1,16 +1,33 @@ -import type { z } from 'zod'; +import type { z } from "zod"; -export const CALL_STATUSES: readonly ['waiting', 'active', 'on_hold', 'ended']; -export const CALL_TYPES: readonly ['voice', 'video']; +export const CALL_STATUSES: readonly ["waiting", "active", "on_hold", "ended"]; +export const CALL_TYPES: readonly ["voice", "video"]; +export const CALL_EVENTS: readonly [ + "call_initiated", + "call_routed", + "call_answered", + "call_hold", + "call_ended", +]; export const SUPPORTED_QUEUES: readonly [ - 'medical_spanish', - 'medical_english', - 'legal_spanish', - 'legal_english', + "medical_spanish", + "medical_english", + "legal_spanish", + "legal_english", ]; +export const CALL_SERVICE_CONSTANTS: Readonly<{ + EMPTY_ROW_COUNT: 0; + SLA_SECONDS: 30; + REROUTE_THRESHOLD_SECONDS: 15; + SLA_BREACH_WAIT_SECONDS: 30; + HOLD_EXCEEDED_THRESHOLD_SECONDS: 60; + TOO_SHORT_CALL_DURATION_SECONDS: 10; +}>; + export type CallStatus = (typeof CALL_STATUSES)[number]; export type CallType = (typeof CALL_TYPES)[number]; +export type CallEventType = (typeof CALL_EVENTS)[number]; export type QueueId = (typeof SUPPORTED_QUEUES)[number]; export interface Call { @@ -25,7 +42,7 @@ export interface Call { export interface CallEvent { id: string; callId: string; - type: string; + type: CallEventType; timestamp: string; metadata?: Record; } @@ -33,46 +50,46 @@ export interface CallEvent { export interface CallStatusUpdate { callId: string; status: CallStatus; - eventType: string; + eventType: CallEventType; timestamp: string; metadata?: Record; } export interface CallFilters { - status?: CallStatus | 'all'; + status?: CallStatus | "all"; queueId?: QueueId; } export interface CallInitiatedPayload { - event: 'call_initiated'; + event: "call_initiated"; callId: string; type: CallType; queueId: QueueId; } export interface CallRoutedPayload { - event: 'call_routed'; + event: "call_routed"; callId: string; agentId: string; routingTime: number; } export interface CallAnsweredPayload { - event: 'call_answered'; + event: "call_answered"; callId: string; waitTime: number; } export interface CallHoldPayload { - event: 'call_hold'; + event: "call_hold"; callId: string; holdDuration: number; } export interface CallEndedPayload { - event: 'call_ended'; + event: "call_ended"; callId: string; - endReason: 'completed' | 'abandoned' | 'failed'; + endReason: "completed" | "abandoned" | "failed"; duration: number; } diff --git a/packages/contracts/index.js b/packages/contracts/index.js index 7c6573b..54d3c60 100644 --- a/packages/contracts/index.js +++ b/packages/contracts/index.js @@ -2,15 +2,31 @@ const { z } = require('zod'); const CALL_STATUSES = ['waiting', 'active', 'on_hold', 'ended']; const CALL_TYPES = ['voice', 'video']; +const CALL_EVENTS = [ + 'call_initiated', + 'call_routed', + 'call_answered', + 'call_hold', + 'call_ended', +]; const SUPPORTED_QUEUES = [ 'medical_spanish', 'medical_english', 'legal_spanish', 'legal_english', ]; +const CALL_SERVICE_CONSTANTS = Object.freeze({ + EMPTY_ROW_COUNT: 0, + SLA_SECONDS: 30, + REROUTE_THRESHOLD_SECONDS: 15, + SLA_BREACH_WAIT_SECONDS: 30, + HOLD_EXCEEDED_THRESHOLD_SECONDS: 60, + TOO_SHORT_CALL_DURATION_SECONDS: 10, +}); const callStatusSchema = z.enum(CALL_STATUSES); const callTypeSchema = z.enum(CALL_TYPES); +const callEventTypeSchema = z.enum(CALL_EVENTS); const queueIdSchema = z.enum(SUPPORTED_QUEUES); const metadataSchema = z.record(z.unknown()).optional(); @@ -26,7 +42,7 @@ const callSchema = z.object({ const callEventSchema = z.object({ id: z.string().min(1), callId: z.string().min(1), - type: z.string().min(1), + type: callEventTypeSchema, timestamp: z.string().min(1), metadata: metadataSchema, }); @@ -34,39 +50,39 @@ const callEventSchema = z.object({ const callStatusUpdateSchema = z.object({ callId: z.string().min(1), status: callStatusSchema, - eventType: z.string().min(1), + eventType: callEventTypeSchema, timestamp: z.string().min(1), metadata: metadataSchema, }); const callInitiatedPayloadSchema = z.object({ - event: z.literal('call_initiated'), + event: z.literal(CALL_EVENTS[0]), callId: z.string().min(1), type: callTypeSchema, queueId: queueIdSchema, }); const callRoutedPayloadSchema = z.object({ - event: z.literal('call_routed'), + event: z.literal(CALL_EVENTS[1]), callId: z.string().min(1), agentId: z.string().min(1), routingTime: z.number().nonnegative(), }); const callAnsweredPayloadSchema = z.object({ - event: z.literal('call_answered'), + event: z.literal(CALL_EVENTS[2]), callId: z.string().min(1), waitTime: z.number().nonnegative(), }); const callHoldPayloadSchema = z.object({ - event: z.literal('call_hold'), + event: z.literal(CALL_EVENTS[3]), callId: z.string().min(1), holdDuration: z.number().nonnegative(), }); const callEndedPayloadSchema = z.object({ - event: z.literal('call_ended'), + event: z.literal(CALL_EVENTS[4]), callId: z.string().min(1), endReason: z.enum(['completed', 'abandoned', 'failed']), duration: z.number().nonnegative(), @@ -83,7 +99,9 @@ const eventPayloadSchema = z.discriminatedUnion('event', [ module.exports = { CALL_STATUSES, CALL_TYPES, + CALL_EVENTS, SUPPORTED_QUEUES, + CALL_SERVICE_CONSTANTS, callStatusSchema, callTypeSchema, queueIdSchema, diff --git a/packages/frontend/src/app/page.tsx b/packages/frontend/src/app/page.tsx index 0ddb140..f90a206 100644 --- a/packages/frontend/src/app/page.tsx +++ b/packages/frontend/src/app/page.tsx @@ -13,7 +13,7 @@ export default function DashboardPage() { const [filters, setFilters] = useState({ status: 'all' }); const [selectedCallId, setSelectedCallId] = useState(null); - const { calls, loading } = useCalls(filters); + const { calls, loading, connected } = useCalls(filters); const { events, loading: eventsLoading } = useCallEvents(selectedCallId); return ( @@ -26,10 +26,17 @@ export default function DashboardPage() {

Call Center Dashboard

- {/* TODO: show green dot + "Live" text when Socket.io is connected */} - - - Not connected + + + {connected ? 'Live' : 'Not connected'} diff --git a/packages/frontend/src/hooks/useCallEvents.ts b/packages/frontend/src/hooks/useCallEvents.ts index c428f4e..556dfe4 100644 --- a/packages/frontend/src/hooks/useCallEvents.ts +++ b/packages/frontend/src/hooks/useCallEvents.ts @@ -1,13 +1,10 @@ -'use client'; +"use client"; -import { useState, useEffect } from 'react'; -import { CallEvent } from '../types'; -import { MOCK_EVENTS } from '../mocks/data'; +import { useEffect, useState } from "react"; +import { CallEvent } from "../types"; +import { fetchCallEvents } from "../lib/api"; +import { getSocket, subscribeToCall, unsubscribeFromCall } from "../lib/socket"; -/** - * Returns the event history for a specific call. - * TODO: replace mock data with a real API call. - */ export function useCallEvents(callId: string | null) { const [events, setEvents] = useState([]); const [loading, setLoading] = useState(false); @@ -18,14 +15,41 @@ export function useCallEvents(callId: string | null) { return; } + let cancelled = false; setLoading(true); - // TODO: replace with fetchCallEvents(callId) - const t = setTimeout(() => { - setEvents(MOCK_EVENTS[callId] ?? []); - setLoading(false); - }, 200); - return () => clearTimeout(t); + fetchCallEvents(callId) + .then((data) => { + if (!cancelled) setEvents(data); + }) + .finally(() => { + if (!cancelled) setLoading(false); + }); + + return () => { + cancelled = true; + }; + }, [callId]); + + useEffect(() => { + if (!callId) return; + + const socket = getSocket(); + const onUpdate = (update: { callId: string }) => { + if (update.callId !== callId) return; + fetchCallEvents(callId) + .then(setEvents) + .catch(() => {}); + }; + + if (!socket.connected) socket.connect(); + subscribeToCall(callId); + socket.on("call_status_update", onUpdate); + + return () => { + socket.off("call_status_update", onUpdate); + unsubscribeFromCall(callId); + }; }, [callId]); return { events, loading }; diff --git a/packages/frontend/src/hooks/useCalls.ts b/packages/frontend/src/hooks/useCalls.ts index a35ab33..5d6f252 100644 --- a/packages/frontend/src/hooks/useCalls.ts +++ b/packages/frontend/src/hooks/useCalls.ts @@ -1,26 +1,63 @@ -'use client'; +"use client"; -import { useState, useEffect } from 'react'; -import { Call, CallFilters } from '../types'; -import { MOCK_CALLS } from '../mocks/data'; +import { useEffect, useRef, useState } from "react"; +import { Call, CallFilters } from "../types"; +import { fetchCalls } from "../lib/api"; +import { getSocket } from "../lib/socket"; -/** - * Returns the live call list and a loading indicator. - * TODO: replace mock data with real API + Socket.io updates. - */ -export function useCalls(_filters: CallFilters) { +export function useCalls(filters: CallFilters) { const [calls, setCalls] = useState([]); const [loading, setLoading] = useState(true); + const [connected, setConnected] = useState(false); + const mounted = useRef(true); useEffect(() => { - // TODO: replace with fetchCalls(_filters) + socket subscription - const t = setTimeout(() => { - setCalls(MOCK_CALLS); - setLoading(false); - }, 300); - - return () => clearTimeout(t); + mounted.current = true; + return () => { + mounted.current = false; + }; }, []); - return { calls, loading, setCalls }; + useEffect(() => { + let cancelled = false; + setLoading(true); + + fetchCalls(filters) + .then((data) => { + if (!cancelled && mounted.current) setCalls(data); + }) + .finally(() => { + if (!cancelled && mounted.current) setLoading(false); + }); + + return () => { + cancelled = true; + }; + }, [filters.status, filters.queueId]); + + useEffect(() => { + const socket = getSocket(); + + const onConnect = () => setConnected(true); + const onDisconnect = () => setConnected(false); + const onUpdate = () => { + fetchCalls(filters) + .then(setCalls) + .catch(() => {}); + }; + + socket.on("connect", onConnect); + socket.on("disconnect", onDisconnect); + socket.on("call_status_update", onUpdate); + + if (!socket.connected) socket.connect(); + + return () => { + socket.off("connect", onConnect); + socket.off("disconnect", onDisconnect); + socket.off("call_status_update", onUpdate); + }; + }, [filters.status, filters.queueId]); + + return { calls, loading, connected, setCalls }; } diff --git a/packages/realtime-service/src/bus/subscriber.ts b/packages/realtime-service/src/bus/subscriber.ts index 723b278..47dfd54 100644 --- a/packages/realtime-service/src/bus/subscriber.ts +++ b/packages/realtime-service/src/bus/subscriber.ts @@ -1,13 +1,28 @@ -import Redis from 'ioredis'; -import { CallStatusUpdate } from '../types'; +import Redis from "ioredis"; +import { CallStatusUpdate } from "../types"; -const CHANNEL = 'call-status-updates'; +const CHANNEL = "call-status-updates"; export function subscribeToCallUpdates( onUpdate: (update: CallStatusUpdate) => void, ): void { - // TODO: subscribe to CHANNEL on Redis and call onUpdate for each message - void Redis; - void CHANNEL; - void onUpdate; + const sub = new Redis(process.env.REDIS_URL ?? "redis://localhost:6379"); + + sub.subscribe(CHANNEL, (err) => { + if (err) console.error("[redis] subscribe error", err); + }); + + sub.on("message", (channel, raw) => { + if (channel !== CHANNEL) return; + try { + const parsed = JSON.parse(raw) as CallStatusUpdate; + onUpdate(parsed); + } catch (e) { + console.error("[redis] invalid message", e); + } + }); + + sub.on("error", (e) => { + console.error("[redis] subscriber error", e); + }); } diff --git a/packages/realtime-service/src/socket/server.ts b/packages/realtime-service/src/socket/server.ts index d344552..8241bfb 100644 --- a/packages/realtime-service/src/socket/server.ts +++ b/packages/realtime-service/src/socket/server.ts @@ -1,6 +1,10 @@ -import { Server as HttpServer } from 'http'; -import { Server } from 'socket.io'; -import { ServerToClientEvents, ClientToServerEvents, CallStatusUpdate } from '../types'; +import { Server as HttpServer } from "http"; +import { Server } from "socket.io"; +import { + ServerToClientEvents, + ClientToServerEvents, + CallStatusUpdate, +} from "../types"; type IoServer = Server; @@ -8,22 +12,16 @@ let io: IoServer; export function createSocketServer(httpServer: HttpServer): IoServer { io = new Server(httpServer, { - cors: { origin: '*' }, + cors: { origin: "*" }, }); - io.on('connection', (socket) => { - console.log(`[ws] client connected ${socket.id}`); - - socket.on('subscribe_call', (_callId) => { - // TODO: socket.join(_callId); - }); - - socket.on('unsubscribe_call', (_callId) => { - // TODO: socket.leave(_callId); + io.on("connection", (socket) => { + socket.on("subscribe_call", (callId) => { + socket.join(callId); }); - socket.on('disconnect', () => { - console.log(`[ws] client disconnected ${socket.id}`); + socket.on("unsubscribe_call", (callId) => { + socket.leave(callId); }); }); @@ -32,6 +30,5 @@ export function createSocketServer(httpServer: HttpServer): IoServer { export function broadcastStatusUpdate(update: CallStatusUpdate): void { if (!io) return; - // TODO: io.to(update.callId).emit('call_status_update', update); - io.emit('call_status_update', update); // naive – replace with room-based + io.to(update.callId).emit("call_status_update", update); }