diff --git a/packages/call-service/src/routes/calls.ts b/packages/call-service/src/routes/calls.ts index 762face..9753934 100644 --- a/packages/call-service/src/routes/calls.ts +++ b/packages/call-service/src/routes/calls.ts @@ -21,6 +21,7 @@ router.get('/', async (req: Request, res: Response) => { const calls = await callService.getCalls(filters); res.json(calls); } catch (_error) { + console.log('Error fetching calls:', _error); res.status(500).json({ message: 'Internal server error' }); } }); diff --git a/packages/call-service/src/routes/events.ts b/packages/call-service/src/routes/events.ts index 2bb57af..36d0bb2 100644 --- a/packages/call-service/src/routes/events.ts +++ b/packages/call-service/src/routes/events.ts @@ -21,6 +21,8 @@ router.post('/', apiKeyAuth, async (req: Request, res: Response) => { return; } + console.log('Error processing event:', error); + res.status(500).json({ message: 'Internal server error' }); } }); diff --git a/packages/call-service/src/services/CallService.ts b/packages/call-service/src/services/CallService.ts index 3c9b082..ed0a4cb 100644 --- a/packages/call-service/src/services/CallService.ts +++ b/packages/call-service/src/services/CallService.ts @@ -1,21 +1,271 @@ +import type { PoolClient } from 'pg'; +import { randomUUID } from 'crypto'; +import { db } from '../db/client'; +// import { publishStatusUpdate } from '../bus/publisher'; import { Call, + CallStatus, CallEvent, CallFilters, CallServiceContract, EventPayload, } from '../domain/call'; +import { + CallAnsweredPayload, + CallEndedPayload, + CallHoldPayload, + CallInitiatedPayload, + CallRoutedPayload, +} from '@voycelink/contracts'; + +type ProcessEventResult = { + callEvent: CallEvent; + status: CallStatus; +}; export class CallService implements CallServiceContract { - async processEvent(_payload: EventPayload): Promise { - throw new Error('CallService.processEvent not implemented'); + private async insertEvent( + client: PoolClient, + callId: string, + type: EventPayload['event'], + timestamp: Date, + metadata: Record, + ): Promise { + const event = await client.query( + 'INSERT INTO call_events (id, call_id, type, timestamp, metadata) VALUES ($1, $2, $3, $4, $5) RETURNING *', + [randomUUID(), callId, type, timestamp, metadata], + ); + + return new CallEvent( + event.rows[0].id, + event.rows[0].call_id, + event.rows[0].type, + new Date(event.rows[0].timestamp), + event.rows[0].metadata, + ); + } + + private async callInitiated( + client: PoolClient, + now: Date, + payload: CallInitiatedPayload): Promise { + + await client.query( + 'INSERT INTO calls (id, type, status, queue_id, start_time) VALUES ($1, $2, $3, $4, $5) RETURNING *', + [payload.callId, payload.type, 'waiting', payload.queueId, now], + ); + + const callEvent = await this.insertEvent(client, payload.callId, payload.event, now, { + ...payload, + slaMax: 30, + }); + + return { + callEvent, + status: 'waiting', + }; + } + + private async callRouted( + client: PoolClient, + now: Date, + payload: CallRoutedPayload): Promise { + + const query = 'SELECT status FROM calls WHERE id = $1'; + const routedCallResult = await client.query(query, [payload.callId]); + + if (routedCallResult.rowCount === 0) { + throw new Error(`Call with id ${payload.callId} not found`); + } + + await client.query( + 'UPDATE calls SET status = $1 WHERE id = $2', + ['waiting', payload.callId], + ); + + const callEvent = await this.insertEvent(client, payload.callId, payload.event, now, { + agentId: payload.agentId, + routingTime: payload.routingTime, + rerouteAfterSeconds: 15, + rerouteRequired: payload.routingTime > 15, + }); + + return { + callEvent, + status: 'waiting', + }; + } + + private async callAnswered( + client: PoolClient, + now: Date, + payload: CallAnsweredPayload, + ): Promise { + const query = 'SELECT status FROM calls WHERE id = $1'; + const answeredCallResult = await client.query(query, [payload.callId]); + + if (answeredCallResult.rowCount === 0) { + throw new Error(`Call with id ${payload.callId} not found`); + } + + await client.query('UPDATE calls SET status = $1 WHERE id = $2', ['active', payload.callId]); + + const callEvent = await this.insertEvent(client, payload.callId, payload.event, now, { + waitTime: payload.waitTime, + slaMax: 30, + }); + + return { + callEvent, + status: 'active', + }; + } + + private async callHold( + client: PoolClient, + now: Date, + payload: CallHoldPayload, + ): Promise { + const query = 'SELECT status FROM calls WHERE id = $1'; + const holdCallResult = await client.query(query, [payload.callId]); + if (holdCallResult.rowCount === 0) { + throw new Error(`Call with id ${payload.callId} not found`); + } + + await client.query('UPDATE calls SET status = $1 WHERE id = $2', ['on_hold', payload.callId]); + + const callEvent = await this.insertEvent(client, payload.callId, payload.event, now, { + holdDuration: payload.holdDuration, + maxHoldSeconds: 60, + }); + + return { + callEvent, + status: 'on_hold', + }; } - async getCalls(_filters: CallFilters): Promise { - throw new Error('CallService.getCalls not implemented'); + private async callEnded( + client: PoolClient, + now: Date, + payload: CallEndedPayload, + ): Promise { + const query = 'SELECT status FROM calls WHERE id = $1'; + const endedCallResult = await client.query(query, [payload.callId]); + if (endedCallResult.rowCount === 0) { + throw new Error(`Call with id ${payload.callId} not found`); + } + + await client.query( + 'UPDATE calls SET status = $1, end_time = $2 WHERE id = $3', + ['ended', now, payload.callId], + ); + + const callEvent = await this.insertEvent(client, payload.callId, payload.event, now, { + endReason: payload.endReason, + duration: payload.duration, + }); + + return { + callEvent, + status: 'ended', + }; + } + + async processEvent(payload: EventPayload): Promise { + const client = await db.connect(); + + try { + const now = new Date(); + await client.query('BEGIN'); + + let result: ProcessEventResult; + + if (payload.event === 'call_initiated') { + result = await this.callInitiated(client, now, payload); + } else if (payload.event === 'call_routed') { + result = await this.callRouted(client, now, payload); + } else if (payload.event === 'call_answered') { + result = await this.callAnswered(client, now, payload); + } else if (payload.event === 'call_hold') { + result = await this.callHold(client, now, payload); + } else if (payload.event === 'call_ended') { + result = await this.callEnded(client, now, payload); + } else { + throw new Error('Unsupported event type'); + } + + await client.query('COMMIT'); + + try { + // TODO: Publicar en redis + // await publishStatusUpdate({..}); + } catch (publishError) { + console.error({ publishError }); + } + + return result.callEvent; + } catch (error) { + await client.query('ROLLBACK'); + console.log('Error in processEvent:', error); + throw error; + } finally { + client.release(); + } + } + + async getCalls(filters: CallFilters): Promise { + const queryParts: string[] = ['SELECT * FROM calls']; + const whereClauses: string[] = []; + const values: Array = []; + + if (filters.status) { + values.push(filters.status); + whereClauses.push(`status = $${values.length}`); + } + + if (filters.queueId) { + values.push(filters.queueId); + whereClauses.push(`queue_id = $${values.length}`); + } + + if (whereClauses.length > 0) { + // Seria más facil usar un ORM o query builder para esto + queryParts.push(`WHERE ${whereClauses.join(' AND ')}`); + } + + queryParts.push('ORDER BY start_time DESC'); + + const query = queryParts.join(' '); + + const calls = await db.query(query, values); + return calls.rows.map( + (row) => + new Call( + row.id, + row.type, + row.status, + row.queue_id, + new Date(row.start_time), + row.end_time ? new Date(row.end_time) : undefined, + ), + ); } async getCallEvents(_callId: string): Promise { - throw new Error('CallService.getCallEvents not implemented'); + const query = 'SELECT * FROM call_events where call_id = $1 ORDER BY timestamp DESC'; + + const events = await db.query(query, [_callId]); + return events.rows.map( + (row) => + new CallEvent( + row.id, + row.call_id, + row.type, + new Date(row.timestamp), + row.metadata, + ), + ); + // throw new Error('CallService.getCallEvents not implemented'); } } diff --git a/packages/frontend/src/app/api/calls/route.ts b/packages/frontend/src/app/api/calls/route.ts new file mode 100644 index 0000000..c35803b --- /dev/null +++ b/packages/frontend/src/app/api/calls/route.ts @@ -0,0 +1,23 @@ +import { NextResponse } from 'next/server'; + +const CALL_SERVICE_URL = + process.env.NEXT_PUBLIC_CALL_SERVICE_URL ?? 'http://localhost:3001'; + +export async function GET(_request: Request) { + // Hago este step para que no se exponga el CALL_SERVICE_URL directamente al cliente + const response = await fetch(`${CALL_SERVICE_URL}/api/calls`, { + method: 'GET', + headers: { + 'Content-Type': 'application/json', + }, + cache: 'no-store', + }); + + const body = await response.text(); + return new NextResponse(body, { + status: response.status, + headers: { + 'Content-Type': response.headers.get('content-type') ?? 'application/json', + }, + }); +} diff --git a/packages/frontend/src/app/page.tsx b/packages/frontend/src/app/page.tsx index 0ddb140..6f35929 100644 --- a/packages/frontend/src/app/page.tsx +++ b/packages/frontend/src/app/page.tsx @@ -13,9 +13,11 @@ export default function DashboardPage() { const [filters, setFilters] = useState({ status: 'all' }); const [selectedCallId, setSelectedCallId] = useState(null); - const { calls, loading } = useCalls(filters); + const { calls, loading, isConnected } = useCalls(filters); const { events, loading: eventsLoading } = useCallEvents(selectedCallId); + // console.log({ events, eventsLoading }) + return (
{/* ── Header ─────────────────────────────────────────── */} @@ -26,10 +28,9 @@ export default function DashboardPage() {

Call Center Dashboard

- {/* TODO: show green dot + "Live" text when Socket.io is connected */} - - - Not connected + + + {isConnected ? 'Live' : 'Not connected'} diff --git a/packages/frontend/src/hooks/useCallEvents.ts b/packages/frontend/src/hooks/useCallEvents.ts index c428f4e..8bb144e 100644 --- a/packages/frontend/src/hooks/useCallEvents.ts +++ b/packages/frontend/src/hooks/useCallEvents.ts @@ -2,7 +2,8 @@ import { useState, useEffect } from 'react'; import { CallEvent } from '../types'; -import { MOCK_EVENTS } from '../mocks/data'; +import { fetchCallEvents } from '@/lib/api'; +import { getSocket, subscribeToCall, unsubscribeFromCall } from '@/lib/socket'; /** * Returns the event history for a specific call. @@ -19,13 +20,41 @@ export function useCallEvents(callId: string | null) { } setLoading(true); - // TODO: replace with fetchCallEvents(callId) - const t = setTimeout(() => { - setEvents(MOCK_EVENTS[callId] ?? []); - setLoading(false); - }, 200); - return () => clearTimeout(t); + async function loadCallEvents() { + setLoading(true); + try { + const data = await fetchCallEvents(callId as string); + setEvents(data); + } catch (error) { + console.error('Error fetching call events:', error); + // Optionally set an error state here + } finally { + setLoading(false); + } + } + + loadCallEvents(); + }, [callId]); + + useEffect(() => { + if (!callId) { + return; + } + + const socket = getSocket(); + + socket.on('call_status_update', console.log); + subscribeToCall(callId); + + if (!socket.connected) { + socket.connect(); + } + + return () => { + socket.off('call_status_update', console.log); + unsubscribeFromCall(callId); + }; }, [callId]); return { events, loading }; diff --git a/packages/frontend/src/hooks/useCalls.ts b/packages/frontend/src/hooks/useCalls.ts index a35ab33..c5f5c83 100644 --- a/packages/frontend/src/hooks/useCalls.ts +++ b/packages/frontend/src/hooks/useCalls.ts @@ -2,25 +2,55 @@ import { useState, useEffect } from 'react'; import { Call, CallFilters } from '../types'; -import { MOCK_CALLS } from '../mocks/data'; +import { fetchCalls } from '../lib/api'; +import { getSocket } from '../lib/socket'; /** * Returns the live call list and a loading indicator. * TODO: replace mock data with real API + Socket.io updates. */ -export function useCalls(_filters: CallFilters) { +export function useCalls(filters: CallFilters) { const [calls, setCalls] = useState([]); const [loading, setLoading] = useState(true); + const [isConnected, setIsConnected] = useState(false); useEffect(() => { - // TODO: replace with fetchCalls(_filters) + socket subscription - const t = setTimeout(() => { - setCalls(MOCK_CALLS); - setLoading(false); - }, 300); + async function loadCalls() { + setLoading(true); + try { + const data = await fetchCalls(filters); + setCalls(data); + } catch (error) { + console.error('Error fetching calls:', error); + // Optionally set an error state here + } finally { + setLoading(false); + } + } - return () => clearTimeout(t); - }, []); + loadCalls(); + }, [filters]); - return { calls, loading, setCalls }; + useEffect(() => { + const socket = getSocket(); + + const onConnect = () => setIsConnected(true); + const onDisconnect = () => setIsConnected(false); + + socket.on('connect', onConnect); + socket.on('disconnect', onDisconnect); + + if (!socket.connected) { + socket.connect(); + } else { + setIsConnected(true); + } + + return () => { + socket.off('connect', onConnect); + socket.off('disconnect', onDisconnect); + }; + }, [filters]); + + return { calls, loading, isConnected }; } diff --git a/packages/realtime-service/src/bus/subscriber.ts b/packages/realtime-service/src/bus/subscriber.ts index 723b278..f97f585 100644 --- a/packages/realtime-service/src/bus/subscriber.ts +++ b/packages/realtime-service/src/bus/subscriber.ts @@ -6,8 +6,21 @@ const CHANNEL = 'call-status-updates'; export function subscribeToCallUpdates( onUpdate: (update: CallStatusUpdate) => void, ): void { - // TODO: subscribe to CHANNEL on Redis and call onUpdate for each message - void Redis; - void CHANNEL; - void onUpdate; + const redisUrl = process.env.REDIS_URL; + const subscriber = new Redis(redisUrl as string); + + subscriber.on('message', (channel, message) => { + if (channel !== CHANNEL) return; + + try { + const payload = JSON.parse(message) as CallStatusUpdate; + onUpdate(payload); + } catch (error) { + console.error('[redis] failed to parse call update:', error); + } + }); + + subscriber.subscribe(CHANNEL).catch((error) => { + console.error(`[redis] failed to subscribe to ${CHANNEL}:`, error); + }); } diff --git a/packages/realtime-service/src/socket/server.ts b/packages/realtime-service/src/socket/server.ts index d344552..4dd7f51 100644 --- a/packages/realtime-service/src/socket/server.ts +++ b/packages/realtime-service/src/socket/server.ts @@ -14,12 +14,12 @@ export function createSocketServer(httpServer: HttpServer): IoServer { io.on('connection', (socket) => { console.log(`[ws] client connected ${socket.id}`); - socket.on('subscribe_call', (_callId) => { - // TODO: socket.join(_callId); + socket.on('subscribe_call', (callId) => { + socket.join(callId); }); - socket.on('unsubscribe_call', (_callId) => { - // TODO: socket.leave(_callId); + socket.on('unsubscribe_call', (callId) => { + socket.leave(callId); }); socket.on('disconnect', () => { @@ -32,6 +32,6 @@ export function createSocketServer(httpServer: HttpServer): IoServer { export function broadcastStatusUpdate(update: CallStatusUpdate): void { if (!io) return; - // TODO: io.to(update.callId).emit('call_status_update', update); - io.emit('call_status_update', update); // naive – replace with room-based + io.to(update.callId).emit('call_status_update', update); + io.to('dashboard').emit('call_status_update', update); }