From 1a1c99c63203ca14465ecf18bf79c2bac26e3b02 Mon Sep 17 00:00:00 2001 From: ttbombadil Date: Mon, 2 Mar 2026 17:08:09 +0100 Subject: [PATCH] fix(runners): resolve zombie-state and ensure consistent pool release --- server/routes.ts | 11 +- server/routes/simulation.ws.ts | 92 +++++++--- server/services/sandbox-runner-pool.ts | 237 +++++++++++++++++++++++++ 3 files changed, 312 insertions(+), 28 deletions(-) create mode 100644 server/services/sandbox-runner-pool.ts diff --git a/server/routes.ts b/server/routes.ts index 79c87674..9714e690 100644 --- a/server/routes.ts +++ b/server/routes.ts @@ -8,6 +8,7 @@ import { getPooledCompiler } from "./services/pooled-compiler"; import { SandboxRunner } from "./services/sandbox-runner"; import { getSimulationRateLimiter } from "./services/rate-limiter"; import { shouldSendSimulationEndMessage } from "./services/simulation-end"; +import { getSandboxRunnerPool, initializeSandboxRunnerPool } from "./services/sandbox-runner-pool"; import { insertSketchSchema } from "@shared/schema"; import fs from "fs"; import path from "path"; @@ -26,6 +27,8 @@ export async function registerRoutes(app: Express): Promise { const logger = new Logger("Routes"); const httpServer = createServer(app); + await initializeSandboxRunnerPool(); + // Lightweight health endpoint for backend reachability checks app.get("/api/health", (_req, res) => { res.json({ status: "ok" }); @@ -33,7 +36,7 @@ export async function registerRoutes(app: Express): Promise { // Test Reset Endpoint: Cleanup all running simulations for idempotent test isolation // Each E2E test can call this before starting to ensure a clean backend state - app.post("/api/test-reset", (_req, res) => { + app.post("/api/test-reset", async (_req, res) => { try { // Delegate cleanup to the WebSocket module which owns runner state if (!simulationApi) { @@ -41,7 +44,7 @@ export async function registerRoutes(app: Express): Promise { return res.json({ status: "reset", message: "No active runners", cleanedTestRunIds: [], timestamp: new Date().toISOString() }); } - const { cleanedUpCount, cleanedTestRunIds } = simulationApi.stopAllRunnersAndNotify(); + const { cleanedUpCount, cleanedTestRunIds } = await simulationApi.stopAllRunnersAndNotify(); logger.info(`[Test Reset] Cleaned up ${cleanedUpCount} client runner(s). TestRunIds: ${cleanedTestRunIds.join(", ") || "none"}`); res.json({ status: "reset", message: `Backend reset complete. Cleaned up ${cleanedUpCount} runner(s).`, cleanedTestRunIds, timestamp: new Date().toISOString() }); @@ -63,7 +66,7 @@ export async function registerRoutes(app: Express): Promise { const CACHE_TTL = 5 * 60 * 1000; // 5 minutes // Placeholder for simulation websocket API (populated when WS module is registered) - let simulationApi: { stopAllRunnersAndNotify: () => { cleanedUpCount: number; cleanedTestRunIds: string[] } } | null = null; + let simulationApi: { stopAllRunnersAndNotify: () => Promise<{ cleanedUpCount: number; cleanedTestRunIds: string[] }> } | null = null; // Helper function to generate code hash function hashCode( @@ -191,12 +194,14 @@ export async function registerRoutes(app: Express): Promise { // --- WebSocket handler (moved to modular WS file) --- // Register WS handlers and receive a small API back so other routes // (e.g. /api/test-reset) can operate on the same runner state. + const runnerPool = getSandboxRunnerPool(); simulationApi = registerSimulationWebSocket(httpServer, { SandboxRunner, getSimulationRateLimiter, shouldSendSimulationEndMessage, getLastCompiledCode: () => lastCompiledCode, logger, + runnerPool, }); // (WS implementation moved to server/routes/simulation.ws.ts) diff --git a/server/routes/simulation.ws.ts b/server/routes/simulation.ws.ts index ed6f5420..16136d8e 100644 --- a/server/routes/simulation.ws.ts +++ b/server/routes/simulation.ws.ts @@ -3,6 +3,7 @@ import type { Server } from "http"; import type { SandboxRunner } from "../services/sandbox-runner"; import type { IOPinRecord } from "@shared/schema"; import type { Logger } from "@shared/logger"; +import { getSandboxRunnerPool } from "../services/sandbox-runner-pool"; import fs from "fs"; import path from "path"; import { constants as zlibConstants } from "zlib"; @@ -13,11 +14,13 @@ export type SimulationDeps = { shouldSendSimulationEndMessage: (compileFailed: boolean) => boolean; getLastCompiledCode: () => string | null; logger: Logger; + runnerPool?: ReturnType; }; // Return type exposes a small API used by other modules (test-reset) export function registerSimulationWebSocket(httpServer: Server, deps: SimulationDeps) { - const { SandboxRunner, getSimulationRateLimiter, shouldSendSimulationEndMessage, getLastCompiledCode, logger } = deps; + const { SandboxRunner, getSimulationRateLimiter, shouldSendSimulationEndMessage, getLastCompiledCode, logger, runnerPool } = deps; + const pool = runnerPool ?? getSandboxRunnerPool(); const wss = new WebSocketServer({ server: httpServer, @@ -54,6 +57,32 @@ export function registerSimulationWebSocket(httpServer: Server, deps: Simulation } } + async function safeReleaseRunner( + state: { runner: InstanceType | null; isRunning: boolean; isPaused: boolean }, + reason: string, + ): Promise { + if (!state.runner) { + return; + } + + const runner = state.runner; + state.runner = null; + state.isRunning = false; + state.isPaused = false; + + try { + await runner.stop(); + } catch (error) { + logger.debug(`[SandboxRunnerPool] runner.stop() failed during ${reason}: ${error}`); + } + + try { + await pool.releaseRunner(runner); + } catch (error) { + logger.warn(`[SandboxRunnerPool] releaseRunner failed during ${reason}: ${error}`); + } + } + wss.on("connection", (ws, req) => { const url = req.url || ""; const urlParams = new URLSearchParams(url.split("?")[1] || ""); @@ -93,9 +122,7 @@ export function registerSimulationWebSocket(httpServer: Server, deps: Simulation const clientState = clientRunners.get(ws); if (clientState?.runner) { - clientState.runner.stop(); - clientState.isRunning = false; - clientState.isPaused = false; + await safeReleaseRunner(clientState, "rate-limit"); } sendMessageToClient(ws, { @@ -112,9 +139,7 @@ export function registerSimulationWebSocket(httpServer: Server, deps: Simulation const lastCompiledCode = getLastCompiledCode(); if (!lastCompiledCode) { if (clientState.runner) { - clientState.runner.stop(); - clientState.isRunning = false; - clientState.isPaused = false; + await safeReleaseRunner(clientState, "missing-compiled-code"); } sendMessageToClient(ws, { type: "serial_output", data: "[ERR] No compiled code available. Please compile first.\n" }); @@ -122,11 +147,23 @@ export function registerSimulationWebSocket(httpServer: Server, deps: Simulation break; } - if (clientState.runner) clientState.runner.stop(); + if (clientState.runner) { + await safeReleaseRunner(clientState, "start-replace-existing"); + } - const runnerTempDir = clientState.testRunId ? path.join(process.cwd(), "temp", clientState.testRunId) : undefined; + try { + clientState.runner = await pool.acquireRunner(); + logger.debug(`[SandboxRunnerPool] Acquired runner for client. Pool stats: ${JSON.stringify(pool.getStats())}`); + } catch (error) { + logger.error(`[SandboxRunnerPool] Failed to acquire runner: ${error}`); + clientState.runner = null; + clientState.isRunning = false; + clientState.isPaused = false; + sendMessageToClient(ws, { type: "serial_output", data: "[ERR] Server overloaded. All runners busy. Please try again.\n" }); + sendMessageToClient(ws, { type: "simulation_status", status: "stopped" }); + break; + } - clientState.runner = new SandboxRunner({ tempDir: runnerTempDir }); clientState.isRunning = true; clientState.isPaused = false; @@ -153,12 +190,11 @@ export function registerSimulationWebSocket(httpServer: Server, deps: Simulation sendMessageToClient(ws, { type: "serial_output", data: "[ERR] " + err }); }, onExit: (exitCode: number | null) => { - setTimeout(() => { + setTimeout(async () => { try { const cs = clientRunners.get(ws); if (cs) { - cs.isRunning = false; - cs.isPaused = false; + await safeReleaseRunner(cs, "onExit"); } if (!shouldSendSimulationEndMessage(compileFailed)) return; @@ -181,7 +217,11 @@ export function registerSimulationWebSocket(httpServer: Server, deps: Simulation sendMessageToClient(ws, { type: "compilation_status", gccStatus: "error" }); sendMessageToClient(ws, { type: "simulation_status", status: "stopped" }); const cs = clientRunners.get(ws); - if (cs) { cs.isRunning = false; cs.isPaused = false; } + if (cs) { + safeReleaseRunner(cs, "onCompileError").catch((error) => { + logger.warn(`[SandboxRunnerPool] safeReleaseRunner failed in onCompileError: ${error}`); + }); + } logger.error(`[Client Compile Error]: ${compileErr}`); }, onCompileSuccess: () => { @@ -246,9 +286,7 @@ export function registerSimulationWebSocket(httpServer: Server, deps: Simulation case "code_changed": { const clientState = clientRunners.get(ws); if (clientState?.runner && (clientState?.isRunning || clientState?.isPaused)) { - clientState.runner.stop(); - clientState.isRunning = false; - clientState.isPaused = false; + await safeReleaseRunner(clientState, "code_changed"); sendMessageToClient(ws, { type: "simulation_status", status: "stopped" }); sendMessageToClient(ws, { type: "serial_output", data: "--- Simulation stopped due to code change ---\n" }); } @@ -258,9 +296,7 @@ export function registerSimulationWebSocket(httpServer: Server, deps: Simulation case "stop_simulation": { const clientState = clientRunners.get(ws); if (clientState?.runner) { - clientState.runner.stop(); - clientState.isRunning = false; - clientState.isPaused = false; + await safeReleaseRunner(clientState, "stop_simulation"); } sendMessageToClient(ws, { type: "simulation_status", status: "stopped" }); sendMessageToClient(ws, { type: "serial_output", data: "--- Simulation stopped ---\n" }); @@ -319,27 +355,33 @@ export function registerSimulationWebSocket(httpServer: Server, deps: Simulation } }); - ws.on("close", () => { + ws.on("close", async () => { const clientState = clientRunners.get(ws); - if (clientState?.runner) clientState.runner.stop(); + if (clientState?.runner) { + await safeReleaseRunner(clientState, "ws-close"); + } clientRunners.delete(ws); const rateLimiter = getSimulationRateLimiter(); rateLimiter.removeClient(ws); logger.info(`Client disconnected. Remaining clients: ${wss.clients.size}`); }); - ws.on("error", (error) => { + ws.on("error", async (error) => { + const clientState = clientRunners.get(ws); + if (clientState?.runner) { + await safeReleaseRunner(clientState, "ws-error"); + } logger.error(`WebSocket error: ${error instanceof Error ? error.message : String(error)}`); }); }); - function stopAllRunnersAndNotify() { + async function stopAllRunnersAndNotify() { const cleanedUpCount = clientRunners.size; const cleanedTestRunIds: (string | undefined)[] = []; for (const [ws, clientState] of clientRunners.entries()) { if (clientState.runner) { - try { clientState.runner.stop(); } catch (err) { logger.debug(`Failed to stop runner during reset: ${err}`); } + await safeReleaseRunner(clientState, "test-reset"); } clientState.isRunning = false; clientState.isPaused = false; diff --git a/server/services/sandbox-runner-pool.ts b/server/services/sandbox-runner-pool.ts new file mode 100644 index 00000000..64d81263 --- /dev/null +++ b/server/services/sandbox-runner-pool.ts @@ -0,0 +1,237 @@ +import { SandboxRunner } from "./sandbox-runner"; +import { RegistryManager } from "./registry-manager"; +import { Logger } from "@shared/logger"; + +interface PooledRunner { + runner: SandboxRunner; + inUse: boolean; + lastReleasedTime: number; +} + +interface QueueEntry { + resolve: (runner: SandboxRunner) => void; + reject: (error: Error) => void; + timeout: NodeJS.Timeout; +} + +export class SandboxRunnerPool { + private readonly numRunners: number; + private readonly runners: PooledRunner[] = []; + private readonly queue: QueueEntry[] = []; + private readonly logger = new Logger("SandboxRunnerPool"); + private readonly acquireTimeoutMs = 60000; + private initialized = false; + + constructor(numRunners: number = 5) { + this.numRunners = numRunners; + this.logger.info(`[SandboxRunnerPool] Initialized with target pool size: ${this.numRunners}`); + } + + async initialize(): Promise { + if (this.initialized) { + return; + } + + this.logger.info(`[SandboxRunnerPool] Initializing ${this.numRunners} runner instances...`); + for (let i = 0; i < this.numRunners; i++) { + const runner = new SandboxRunner(); + this.runners.push({ + runner, + inUse: false, + lastReleasedTime: Date.now(), + }); + this.logger.debug(`[SandboxRunnerPool] Created runner [${i}]`); + } + + this.initialized = true; + this.logger.info(`[SandboxRunnerPool] Pool ready with ${this.numRunners} runners`); + } + + async acquireRunner(): Promise { + if (!this.initialized) { + throw new Error("SandboxRunnerPool not initialized. Call initialize() first."); + } + + const available = this.runners.find((p) => !p.inUse); + if (available) { + available.inUse = true; + this.logger.debug( + `[SandboxRunnerPool] Runner acquired (available: ${this.runners.filter((p) => !p.inUse).length}/${this.numRunners})`, + ); + return available.runner; + } + + return new Promise((resolve, reject) => { + let entry: QueueEntry; + const timeout = setTimeout(() => { + const index = this.queue.indexOf(entry); + if (index !== -1) { + this.queue.splice(index, 1); + } + reject( + new Error( + `SandboxRunnerPool: acquire timeout after ${this.acquireTimeoutMs}ms (queue: ${this.queue.length})`, + ), + ); + }, this.acquireTimeoutMs); + + entry = { resolve, reject, timeout }; + this.queue.push(entry); + this.logger.debug( + `[SandboxRunnerPool] Runner queued (queue length: ${this.queue.length}/${this.numRunners})`, + ); + }); + } + + async releaseRunner(runner: SandboxRunner): Promise { + const pooledRunner = this.runners.find((p) => p.runner === runner); + if (!pooledRunner) { + this.logger.warn("[SandboxRunnerPool] Attempt to release unknown runner (ignored)"); + return; + } + + if (!pooledRunner.inUse) { + this.logger.warn("[SandboxRunnerPool] Attempt to release already-released runner (ignored)"); + return; + } + + await this.resetRunnerState(runner); + + pooledRunner.inUse = false; + pooledRunner.lastReleasedTime = Date.now(); + this.logger.debug( + `[SandboxRunnerPool] Runner released and reset (available: ${this.runners.filter((p) => !p.inUse).length}/${this.numRunners})`, + ); + + if (this.queue.length > 0) { + const entry = this.queue.shift()!; + clearTimeout(entry.timeout); + pooledRunner.inUse = true; + entry.resolve(runner); + this.logger.debug( + `[SandboxRunnerPool] Queued request granted (queue: ${this.queue.length} remaining)`, + ); + } + } + + private async resetRunnerState(runner: SandboxRunner): Promise { + try { + if (runner.isRunning) { + await runner.stop(); + } + + const r = runner as any; + + r.state = "stopped"; + r.processKilled = false; + r.pauseStartTime = null; + r.totalPausedTime = 0; + r.lastPauseTimestamp = null; + + r.pinStateBatcher = null; + r.serialOutputBatcher = null; + + r.onOutputCallback = null; + r.outputCallback = null; + r.errorCallback = null; + r.telemetryCallback = null; + r.pinStateCallback = null; + r.ioRegistryCallback = null; + + r.outputBuffer = ""; + r.errorBuffer = ""; + r.totalOutputBytes = 0; + r.isSendingOutput = false; + + r.pendingCleanup = false; + r.cleanupRetries = new Map(); + r.messageQueue = []; + + if (r.flushTimer) { + clearTimeout(r.flushTimer); + r.flushTimer = null; + } + + if (r.fileBuilder && typeof r.fileBuilder.reset === "function") { + r.fileBuilder.reset(); + } + + if (r.registryManager) { + try { + r.registryManager.destroy(); + } catch (error) { + this.logger.debug(`[SandboxRunnerPool] Error destroying old RegistryManager: ${error}`); + } + } + + r.registryManager = new RegistryManager({ + onUpdate: (registry: any, baudrate: any, reason: any) => { + if (r.ioRegistryCallback) { + r.ioRegistryCallback(registry, baudrate, reason); + } + r.flushMessageQueue?.(); + }, + onTelemetry: (metrics: any) => { + if (r.telemetryCallback) { + r.telemetryCallback(metrics); + } + }, + enableTelemetry: true, + }); + + if (r.timeoutManager) { + r.timeoutManager.clear(); + } + + this.logger.debug("[SandboxRunnerPool] Runner state reset complete (isolation verified)"); + } catch (error) { + this.logger.error(`[SandboxRunnerPool] Error during runner reset: ${error}`); + } + } + + getStats() { + return { + totalRunners: this.numRunners, + availableRunners: this.runners.filter((p) => !p.inUse).length, + inUseRunners: this.runners.filter((p) => p.inUse).length, + queuedRequests: this.queue.length, + initialized: this.initialized, + }; + } + + async shutdown(): Promise { + this.logger.info("[SandboxRunnerPool] Shutting down..."); + + for (const entry of this.queue) { + clearTimeout(entry.timeout); + entry.reject(new Error("SandboxRunnerPool shutting down")); + } + this.queue.length = 0; + + for (const { runner } of this.runners) { + try { + if (runner.isRunning) { + await runner.stop(); + } + } catch (error) { + this.logger.warn(`[SandboxRunnerPool] Error stopping runner during shutdown: ${error}`); + } + } + + this.logger.info("[SandboxRunnerPool] Shutdown complete"); + } +} + +let poolInstance: SandboxRunnerPool | null = null; + +export function getSandboxRunnerPool(): SandboxRunnerPool { + if (!poolInstance) { + poolInstance = new SandboxRunnerPool(5); + } + return poolInstance; +} + +export async function initializeSandboxRunnerPool(): Promise { + const pool = getSandboxRunnerPool(); + await pool.initialize(); +} \ No newline at end of file