Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 3 additions & 1 deletion packages/call-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@
"dotenv": "^16.4.5",
"express": "^4.19.2",
"ioredis": "^5.3.2",
"morgan": "^1.10.1",
"pg": "^8.11.5",
"uuid": "^9.0.1",
"zod": "^3.23.8"
},
"devDependencies": {
"@types/cors": "^2.8.17",
"@types/express": "^4.17.21",
"@types/morgan": "^1.9.10",
"@types/node": "^20.12.7",
"@types/pg": "^8.11.5",
"@types/supertest": "^6.0.2",
Expand All @@ -34,4 +36,4 @@
"typescript": "^5.4.5",
"vitest": "^1.6.0"
}
}
}
52 changes: 52 additions & 0 deletions packages/call-service/src/bus/OutboxProcessor.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
import { db } from '../db/client';
import { publishStatusUpdate } from './publisher';

export class OutboxProcessor {
private timer: NodeJS.Timeout | null = null;
private isProcessing = false;

start() {
if (this.timer) return;
this.timer = setInterval(() => this.processOutbox(), 1000);
}

stop() {
if (this.timer) clearInterval(this.timer);
this.timer = null;
}

async processOutbox() {
if (this.isProcessing) return;
this.isProcessing = true;

try {
// Pull up to 50 events
const res = await db.query(
`SELECT * FROM outbox_events ORDER BY id ASC LIMIT 50`
);

for (const row of res.rows) {
try {
await publishStatusUpdate({
callId: row.call_id,
status: row.status,
eventType: row.event_type,
timestamp: row.timestamp.toISOString(),
metadata: row.metadata,
});

// Delete on success
await db.query(`DELETE FROM outbox_events WHERE id = $1`, [row.id]);
} catch (publishErr) {
console.error(`Failed to publish outbox event ${row.id}:`, publishErr);
}
}
} catch (e) {
console.error('OutboxProcessor failed to query DB', e);
} finally {
this.isProcessing = false;
}
}
}

export const outboxProcessor = new OutboxProcessor();
144 changes: 144 additions & 0 deletions packages/call-service/src/db/CallRepository.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
import { db } from './client';
import { mapCallRow, mapCallEventRow } from './mappers';
import { Call, CallEvent, CallFilters, CallStatus } from '../domain/call';
import { EventPayload } from '../domain/call';

export class CallRepository {
async getCalls(filters: CallFilters & { limit?: number; offset?: number }): Promise<Call[]> {
let query = 'SELECT * FROM calls WHERE 1=1';
const params: any[] = [];
let paramIndex = 1;

if (filters.status) {
query += ` AND status = $${paramIndex++}`;
params.push(filters.status);
}
if (filters.queueId) {
query += ` AND queue_id = $${paramIndex++}`;
params.push(filters.queueId);
}

query += ` ORDER BY start_time DESC`;

if (filters.limit !== undefined) {
query += ` LIMIT $${paramIndex++}`;
params.push(filters.limit);
}
if (filters.offset !== undefined) {
query += ` OFFSET $${paramIndex++}`;
params.push(filters.offset);
}


const res = await db.query(query, params);
return res.rows.map(mapCallRow);
}

async getCallEvents(callId: string): Promise<CallEvent[]> {
console.log('callId ----- ', callId);
const res = await db.query(
`SELECT * FROM call_events WHERE call_id = $1 ORDER BY timestamp ASC`,
[callId]
);
return res.rows.map(mapCallEventRow);
}

async getEventByIdempotencyKey(key: string): Promise<CallEvent | null> {
const res = await db.query(
`SELECT ce.* FROM call_events ce
JOIN idempotency_keys ik ON ce.id = ik.event_id
WHERE ik.key = $1`,
[key]
);
if (res.rows.length === 0) return null;
return mapCallEventRow(res.rows[0]);
}

async processEventTransaction(
payload: EventPayload,
eventId: string,
newStatus: CallStatus,
now: Date,
idempotencyKey?: string
): Promise<void> {
const { callId, event } = payload;
const client = await db.connect();

try {
await client.query('BEGIN');

if (event === 'call_initiated') {
const p = payload as Extract<EventPayload, { event: 'call_initiated' }>;
await client.query(
`INSERT INTO calls (id, type, status, queue_id, start_time) VALUES ($1, $2, $3, $4, $5)`,
[p.callId, p.type, newStatus, p.queueId, now]
);
} else {
if (event === 'call_ended') {
await client.query(
`UPDATE calls SET status = $1, end_time = $2 WHERE id = $3`,
[newStatus, now, callId]
);
} else {
await client.query(
`UPDATE calls SET status = $1 WHERE id = $2`,
[newStatus, callId]
);
}
}

// 2. Insert event
await client.query(
`INSERT INTO call_events (id, call_id, type, timestamp, metadata) VALUES ($1, $2, $3, $4, $5)`,
[eventId, callId, event, now, JSON.stringify(payload)]
);

// 3. Optional: idempotency key
if (idempotencyKey) {
await client.query(
`INSERT INTO idempotency_keys (key, event_id, created_at) VALUES ($1, $2, $3)`,
[idempotencyKey, eventId, now]
);
}

// 4. Outbox event for Redis
await client.query(
`INSERT INTO outbox_events (call_id, status, event_type, timestamp, metadata)
VALUES ($1, $2, $3, $4, $5)`,
[callId, newStatus, event, now, JSON.stringify(payload)]
);

await client.query('COMMIT');
} catch (e) {
await client.query('ROLLBACK');
throw e;
} finally {
client.release();
}
}

async insertFlag(callId: string, eventId: string, flagType: string, status: CallStatus): Promise<void> {
const now = new Date();
const client = await db.connect();
try {
await client.query('BEGIN');
await client.query(
`INSERT INTO call_events (id, call_id, type, timestamp, metadata) VALUES ($1, $2, $3, $4, $5)`,
[eventId, callId, 'flagged', now, JSON.stringify({ reason: flagType })]
);
await client.query(
`INSERT INTO outbox_events (call_id, status, event_type, timestamp, metadata)
VALUES ($1, $2, $3, $4, $5)`,
[callId, status, 'flagged', now, JSON.stringify({ reason: flagType })]
);
await client.query('COMMIT');
} catch (e) {
await client.query('ROLLBACK');
throw e;
} finally {
client.release();
}
}
}

export const callRepository = new CallRepository();
16 changes: 16 additions & 0 deletions packages/call-service/src/db/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -21,3 +21,19 @@ CREATE TABLE IF NOT EXISTS call_events (
CREATE INDEX IF NOT EXISTS idx_call_events_call_id ON call_events(call_id);
CREATE INDEX IF NOT EXISTS idx_calls_status ON calls(status);
CREATE INDEX IF NOT EXISTS idx_calls_queue_id ON calls(queue_id);

CREATE TABLE IF NOT EXISTS idempotency_keys (
key VARCHAR(100) PRIMARY KEY,
event_id VARCHAR(36) NOT NULL REFERENCES call_events(id) ON DELETE CASCADE,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);

CREATE TABLE IF NOT EXISTS outbox_events (
id SERIAL PRIMARY KEY,
call_id VARCHAR(36) NOT NULL,
status VARCHAR(20) NOT NULL,
event_type VARCHAR(50) NOT NULL,
timestamp TIMESTAMPTZ NOT NULL,
metadata JSONB,
created_at TIMESTAMPTZ NOT NULL DEFAULT NOW()
);
Loading