Skip to content

Commit 738e627

Browse files
Add SSE flush and disable nginx buffering to fix last event not received
1 parent a025787 commit 738e627

1 file changed

Lines changed: 23 additions & 11 deletions

File tree

routes/create/services/sseService.js

Lines changed: 23 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ class SSEService extends EventEmitter {
1616
* Add SSE client connection
1717
*/
1818
addClient(sessionId, res, metadata = {}) {
19-
console.log(`📡 SSE client connected: ${sessionId}`);
19+
console.log(`[SSE] Client connected: ${sessionId}`);
2020

2121
// Store client response object
2222
this.clients.set(sessionId, res);
@@ -26,14 +26,20 @@ class SSEService extends EventEmitter {
2626
status: 'connected'
2727
});
2828

29-
// Setup SSE headers
29+
// Setup SSE headers - include X-Accel-Buffering to disable nginx buffering
3030
res.writeHead(200, {
3131
'Content-Type': 'text/event-stream',
32-
'Cache-Control': 'no-cache',
32+
'Cache-Control': 'no-cache, no-transform',
3333
'Connection': 'keep-alive',
34+
'X-Accel-Buffering': 'no',
3435
'Access-Control-Allow-Origin': '*',
3536
'Access-Control-Allow-Headers': 'Cache-Control'
3637
});
38+
39+
// Flush headers immediately
40+
if (res.flushHeaders) {
41+
res.flushHeaders();
42+
}
3743

3844
// Send initial connection message
3945
this.sendToClient(sessionId, 'connected', {
@@ -44,12 +50,12 @@ class SSEService extends EventEmitter {
4450

4551
// Handle client disconnect
4652
res.on('close', () => {
47-
console.log(`📡 SSE client disconnected: ${sessionId}`);
53+
console.log(`[SSE] Client disconnected: ${sessionId}`);
4854
this.removeClient(sessionId);
4955
});
5056

5157
res.on('error', (error) => {
52-
console.error(`📡 SSE client error for ${sessionId}:`, error);
58+
console.error(`[SSE] Client error for ${sessionId}:`, error);
5359
this.removeClient(sessionId);
5460
});
5561

@@ -62,7 +68,7 @@ class SSEService extends EventEmitter {
6268
removeClient(sessionId) {
6369
this.clients.delete(sessionId);
6470
this.sessions.delete(sessionId);
65-
console.log(`📡 Removed SSE client: ${sessionId}`);
71+
console.log(`[SSE] Removed client: ${sessionId}`);
6672
}
6773

6874
/**
@@ -71,7 +77,7 @@ class SSEService extends EventEmitter {
7177
sendToClient(sessionId, eventType, data) {
7278
const client = this.clients.get(sessionId);
7379
if (!client) {
74-
console.warn(`📡 No client found for session: ${sessionId}`);
80+
console.warn(`[SSE] No client found for session: ${sessionId}`);
7581
return false;
7682
}
7783

@@ -80,10 +86,16 @@ class SSEService extends EventEmitter {
8086
const cleanData = JSON.parse(JSON.stringify(data || {}));
8187
const sseData = `event: ${eventType}\ndata: ${JSON.stringify(cleanData)}\n\n`;
8288
client.write(sseData);
89+
90+
// Force flush the response buffer to ensure data is sent immediately
91+
if (client.flush) {
92+
client.flush();
93+
}
94+
8395
return true;
8496
} catch (error) {
85-
console.error(`📡 Failed to send to client ${sessionId}:`, error);
86-
console.error(`📡 Problematic data:`, data);
97+
console.error(`[SSE] Failed to send to client ${sessionId}:`, error);
98+
console.error(`[SSE] Problematic data:`, data);
8799
this.removeClient(sessionId);
88100
return false;
89101
}
@@ -99,7 +111,7 @@ class SSEService extends EventEmitter {
99111
successCount++;
100112
}
101113
}
102-
console.log(`📡 Broadcasted to ${successCount}/${this.clients.size} clients`);
114+
console.log(`[SSE] Broadcasted to ${successCount}/${this.clients.size} clients`);
103115
return successCount;
104116
}
105117

@@ -210,7 +222,7 @@ class SSEService extends EventEmitter {
210222
}
211223
}, intervalMs);
212224

213-
console.log(`💓 SSE heartbeat started (${intervalMs}ms interval)`);
225+
console.log(`[SSE] Heartbeat started (${intervalMs}ms interval)`);
214226
}
215227
}
216228

0 commit comments

Comments
 (0)