Skip to content

Commit 690b47a

Browse files
authored
chore(monitoring): remove SSE connection tracking and Bun.gc debug instrumentation (#3472)
1 parent 158d523 commit 690b47a

File tree

7 files changed

+4
-108
lines changed

7 files changed

+4
-108
lines changed

apps/sim/app/api/a2a/serve/[agentId]/route.ts

Lines changed: 1 addition & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import { validateUrlWithDNS } from '@/lib/core/security/input-validation.server'
1919
import { SSE_HEADERS } from '@/lib/core/utils/sse'
2020
import { getBaseUrl } from '@/lib/core/utils/urls'
2121
import { markExecutionCancelled } from '@/lib/execution/cancellation'
22-
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
2322
import { checkWorkspaceAccess } from '@/lib/workspaces/permissions/utils'
2423
import { getWorkspaceBilledAccountUserId } from '@/lib/workspaces/utils'
2524
import {
@@ -631,11 +630,9 @@ async function handleMessageStream(
631630
}
632631

633632
const encoder = new TextEncoder()
634-
let messageStreamDecremented = false
635633

636634
const stream = new ReadableStream({
637635
async start(controller) {
638-
incrementSSEConnections('a2a-message')
639636
const sendEvent = (event: string, data: unknown) => {
640637
try {
641638
const jsonRpcResponse = {
@@ -845,19 +842,10 @@ async function handleMessageStream(
845842
})
846843
} finally {
847844
await releaseLock(lockKey, lockValue)
848-
if (!messageStreamDecremented) {
849-
messageStreamDecremented = true
850-
decrementSSEConnections('a2a-message')
851-
}
852845
controller.close()
853846
}
854847
},
855-
cancel() {
856-
if (!messageStreamDecremented) {
857-
messageStreamDecremented = true
858-
decrementSSEConnections('a2a-message')
859-
}
860-
},
848+
cancel() {},
861849
})
862850

863851
return new NextResponse(stream, {
@@ -1042,22 +1030,16 @@ async function handleTaskResubscribe(
10421030
{ once: true }
10431031
)
10441032

1045-
let sseDecremented = false
10461033
const cleanup = () => {
10471034
isCancelled = true
10481035
if (pollTimeoutId) {
10491036
clearTimeout(pollTimeoutId)
10501037
pollTimeoutId = null
10511038
}
1052-
if (!sseDecremented) {
1053-
sseDecremented = true
1054-
decrementSSEConnections('a2a-resubscribe')
1055-
}
10561039
}
10571040

10581041
const stream = new ReadableStream({
10591042
async start(controller) {
1060-
incrementSSEConnections('a2a-resubscribe')
10611043
const sendEvent = (event: string, data: unknown): boolean => {
10621044
if (isCancelled || abortSignal.aborted) return false
10631045
try {

apps/sim/app/api/mcp/events/route.ts

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import { getSession } from '@/lib/auth'
1414
import { SSE_HEADERS } from '@/lib/core/utils/sse'
1515
import { mcpConnectionManager } from '@/lib/mcp/connection-manager'
1616
import { mcpPubSub } from '@/lib/mcp/pubsub'
17-
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
1817
import { getUserEntityPermissions } from '@/lib/workspaces/permissions/utils'
1918

2019
const logger = createLogger('McpEventsSSE')
@@ -50,14 +49,11 @@ export async function GET(request: NextRequest) {
5049
for (const unsub of unsubscribers) {
5150
unsub()
5251
}
53-
decrementSSEConnections('mcp-events')
5452
logger.info(`SSE connection closed for workspace ${workspaceId}`)
5553
}
5654

5755
const stream = new ReadableStream({
5856
start(controller) {
59-
incrementSSEConnections('mcp-events')
60-
6157
const send = (eventName: string, data: Record<string, unknown>) => {
6258
if (cleaned) return
6359
try {

apps/sim/app/api/wand/route.ts

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@ import { checkAndBillOverageThreshold } from '@/lib/billing/threshold-billing'
1010
import { env } from '@/lib/core/config/env'
1111
import { getCostMultiplier, isBillingEnabled } from '@/lib/core/config/feature-flags'
1212
import { generateRequestId } from '@/lib/core/utils/request'
13-
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
1413
import { enrichTableSchema } from '@/lib/table/llm/wand'
1514
import { verifyWorkspaceMembership } from '@/app/api/workflows/utils'
1615
import { extractResponseText, parseResponsesUsage } from '@/providers/openai/utils'
@@ -331,14 +330,10 @@ export async function POST(req: NextRequest) {
331330
const encoder = new TextEncoder()
332331
const decoder = new TextDecoder()
333332

334-
let wandStreamClosed = false
335333
const readable = new ReadableStream({
336334
async start(controller) {
337-
incrementSSEConnections('wand')
338335
const reader = response.body?.getReader()
339336
if (!reader) {
340-
wandStreamClosed = true
341-
decrementSSEConnections('wand')
342337
controller.close()
343338
return
344339
}
@@ -483,18 +478,9 @@ export async function POST(req: NextRequest) {
483478
controller.close()
484479
} finally {
485480
reader.releaseLock()
486-
if (!wandStreamClosed) {
487-
wandStreamClosed = true
488-
decrementSSEConnections('wand')
489-
}
490-
}
491-
},
492-
cancel() {
493-
if (!wandStreamClosed) {
494-
wandStreamClosed = true
495-
decrementSSEConnections('wand')
496481
}
497482
},
483+
cancel() {},
498484
})
499485

500486
return new Response(readable, {

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ import { createExecutionEventWriter, setExecutionMeta } from '@/lib/execution/ev
2222
import { processInputFileFields } from '@/lib/execution/files'
2323
import { preprocessExecution } from '@/lib/execution/preprocessing'
2424
import { LoggingSession } from '@/lib/logs/execution/logging-session'
25-
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
2625
import {
2726
cleanupExecutionBase64Cache,
2827
hydrateUserFilesWithBase64,
@@ -764,7 +763,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
764763
const encoder = new TextEncoder()
765764
const timeoutController = createTimeoutAbortController(preprocessResult.executionTimeout?.sync)
766765
let isStreamClosed = false
767-
let sseDecremented = false
768766

769767
const eventWriter = createExecutionEventWriter(executionId)
770768
setExecutionMeta(executionId, {
@@ -775,7 +773,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
775773

776774
const stream = new ReadableStream<Uint8Array>({
777775
async start(controller) {
778-
incrementSSEConnections('workflow-execute')
779776
let finalMetaStatus: 'complete' | 'error' | 'cancelled' | null = null
780777

781778
const sendEvent = (event: ExecutionEvent) => {
@@ -1159,10 +1156,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
11591156
if (executionId) {
11601157
await cleanupExecutionBase64Cache(executionId)
11611158
}
1162-
if (!sseDecremented) {
1163-
sseDecremented = true
1164-
decrementSSEConnections('workflow-execute')
1165-
}
11661159
if (!isStreamClosed) {
11671160
try {
11681161
controller.enqueue(encoder.encode('data: [DONE]\n\n'))
@@ -1174,10 +1167,6 @@ export async function POST(req: NextRequest, { params }: { params: Promise<{ id:
11741167
cancel() {
11751168
isStreamClosed = true
11761169
logger.info(`[${requestId}] Client disconnected from SSE stream`)
1177-
if (!sseDecremented) {
1178-
sseDecremented = true
1179-
decrementSSEConnections('workflow-execute')
1180-
}
11811170
},
11821171
})
11831172

apps/sim/app/api/workflows/[id]/executions/[executionId]/stream/route.ts

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@ import {
77
getExecutionMeta,
88
readExecutionEvents,
99
} from '@/lib/execution/event-buffer'
10-
import { decrementSSEConnections, incrementSSEConnections } from '@/lib/monitoring/sse-connections'
1110
import { formatSSEEvent } from '@/lib/workflows/executor/execution-events'
1211
import { authorizeWorkflowByWorkspacePermission } from '@/lib/workflows/utils'
1312

@@ -74,10 +73,8 @@ export async function GET(
7473

7574
let closed = false
7675

77-
let sseDecremented = false
7876
const stream = new ReadableStream<Uint8Array>({
7977
async start(controller) {
80-
incrementSSEConnections('execution-stream-reconnect')
8178
let lastEventId = fromEventId
8279
const pollDeadline = Date.now() + MAX_POLL_DURATION_MS
8380

@@ -145,20 +142,11 @@ export async function GET(
145142
controller.close()
146143
} catch {}
147144
}
148-
} finally {
149-
if (!sseDecremented) {
150-
sseDecremented = true
151-
decrementSSEConnections('execution-stream-reconnect')
152-
}
153145
}
154146
},
155147
cancel() {
156148
closed = true
157149
logger.info('Client disconnected from reconnection stream', { executionId })
158-
if (!sseDecremented) {
159-
sseDecremented = true
160-
decrementSSEConnections('execution-stream-reconnect')
161-
}
162150
},
163151
})
164152

apps/sim/lib/monitoring/memory-telemetry.ts

Lines changed: 2 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,10 @@
11
/**
2-
* Periodic memory telemetry for diagnosing heap growth in production.
3-
* Logs process.memoryUsage(), V8 heap stats, and active SSE connection
4-
* counts every 60s, enabling correlation between connection leaks and
5-
* memory spikes.
2+
* Periodic memory telemetry for monitoring heap growth in production.
3+
* Logs process.memoryUsage() and V8 heap stats every 60s.
64
*/
75

86
import v8 from 'node:v8'
97
import { createLogger } from '@sim/logger'
10-
import {
11-
getActiveSSEConnectionCount,
12-
getActiveSSEConnectionsByRoute,
13-
} from '@/lib/monitoring/sse-connections'
148

159
const logger = createLogger('MemoryTelemetry', { logLevel: 'INFO' })
1610

@@ -23,16 +17,6 @@ export function startMemoryTelemetry(intervalMs = 60_000) {
2317
started = true
2418

2519
const timer = setInterval(() => {
26-
// Trigger opportunistic (non-blocking) garbage collection if running on Bun.
27-
// This signals JSC GC + mimalloc page purge without blocking the event loop,
28-
// helping reclaim RSS that mimalloc otherwise retains under sustained load.
29-
const bunGlobal = (globalThis as Record<string, unknown>).Bun as
30-
| { gc?: (force: boolean) => void }
31-
| undefined
32-
if (typeof bunGlobal?.gc === 'function') {
33-
bunGlobal.gc(false)
34-
}
35-
3620
const mem = process.memoryUsage()
3721
const heap = v8.getHeapStatistics()
3822

@@ -49,8 +33,6 @@ export function startMemoryTelemetry(intervalMs = 60_000) {
4933
? process.getActiveResourcesInfo().length
5034
: -1,
5135
uptimeMin: Math.round(process.uptime() / 60),
52-
activeSSEConnections: getActiveSSEConnectionCount(),
53-
sseByRoute: getActiveSSEConnectionsByRoute(),
5436
})
5537
}, intervalMs)
5638
timer.unref()

apps/sim/lib/monitoring/sse-connections.ts

Lines changed: 0 additions & 27 deletions
This file was deleted.

0 commit comments

Comments
 (0)