diff --git a/server/routes.ts b/server/routes.ts index e392a5cb..9714e690 100644 --- a/server/routes.ts +++ b/server/routes.ts @@ -27,7 +27,6 @@ export async function registerRoutes(app: Express): Promise { const logger = new Logger("Routes"); const httpServer = createServer(app); - // Initialize SandboxRunnerPool for managing runner instances await initializeSandboxRunnerPool(); // Lightweight health endpoint for backend reachability checks diff --git a/server/routes/simulation.ws.ts b/server/routes/simulation.ws.ts index cd1eb66f..1920bf10 100644 --- a/server/routes/simulation.ws.ts +++ b/server/routes/simulation.ws.ts @@ -14,10 +14,11 @@ export type SimulationDeps = { shouldSendSimulationEndMessage: (compileFailed: boolean) => boolean; getLastCompiledCode: () => string | null; logger: Logger; + runnerPool?: ReturnType; }; // Return type exposes a small API used by other modules (test-reset) -export function registerSimulationWebSocket(httpServer: Server, deps: SimulationDeps & { runnerPool?: ReturnType }) { +export function registerSimulationWebSocket(httpServer: Server, deps: SimulationDeps) { const { SandboxRunner, getSimulationRateLimiter, shouldSendSimulationEndMessage, getLastCompiledCode, logger, runnerPool } = deps; const pool = runnerPool ?? getSandboxRunnerPool(); @@ -56,6 +57,32 @@ export function registerSimulationWebSocket(httpServer: Server, deps: Simulation } } + async function safeReleaseRunner( + state: { runner: InstanceType | null; isRunning: boolean; isPaused: boolean }, + reason: string, + ): Promise { + if (!state.runner) { + return; + } + + const runner = state.runner; + state.runner = null; + state.isRunning = false; + state.isPaused = false; + + try { + await runner.stop(); + } catch (error) { + logger.debug(`[SandboxRunnerPool] runner.stop() failed during ${reason}: ${error}`); + } + + try { + await pool.releaseRunner(runner); + } catch (error) { + logger.warn(`[SandboxRunnerPool] releaseRunner failed during ${reason}: ${error}`); + } + } + wss.on("connection", (ws, req) => { const url = req.url || ""; const urlParams = new URLSearchParams(url.split("?")[1] || ""); @@ -95,9 +122,7 @@ export function registerSimulationWebSocket(httpServer: Server, deps: Simulation const clientState = clientRunners.get(ws); if (clientState?.runner) { - clientState.runner.stop(); - clientState.isRunning = false; - clientState.isPaused = false; + await safeReleaseRunner(clientState, "rate-limit"); } sendMessageToClient(ws, { @@ -114,10 +139,7 @@ export function registerSimulationWebSocket(httpServer: Server, deps: Simulation const lastCompiledCode = getLastCompiledCode(); if (!lastCompiledCode) { if (clientState.runner) { - await clientState.runner.stop(); - // Release old runner back to pool - await pool.releaseRunner(clientState.runner); - clientState.runner = null; + await safeReleaseRunner(clientState, "missing-compiled-code"); } clientState.isRunning = false; clientState.isPaused = false; @@ -127,26 +149,23 @@ export function registerSimulationWebSocket(httpServer: Server, deps: Simulation break; } - // Release old runner if exists if (clientState.runner) { - await clientState.runner.stop(); - await pool.releaseRunner(clientState.runner); + await safeReleaseRunner(clientState, "start-replace-existing"); } - // Acquire fresh runner from pool (not new instance) try { clientState.runner = await pool.acquireRunner(); logger.debug(`[SandboxRunnerPool] Acquired runner for client. Pool stats: ${JSON.stringify(pool.getStats())}`); - } catch (acquireError) { - logger.error(`[SandboxRunnerPool] Failed to acquire runner: ${acquireError}`); + } catch (error) { + logger.error(`[SandboxRunnerPool] Failed to acquire runner: ${error}`); clientState.runner = null; clientState.isRunning = false; + clientState.isPaused = false; sendMessageToClient(ws, { type: "serial_output", data: "[ERR] Server overloaded. All runners busy. Please try again.\n" }); sendMessageToClient(ws, { type: "simulation_status", status: "stopped" }); break; } - // Note: tempDir handling is already configured internally in SandboxRunner clientState.isRunning = true; clientState.isPaused = false; @@ -177,19 +196,7 @@ export function registerSimulationWebSocket(httpServer: Server, deps: Simulation try { const cs = clientRunners.get(ws); if (cs) { - cs.isRunning = false; - cs.isPaused = false; - - // Release runner back to pool when simulation ends - if (cs.runner) { - try { - await pool.releaseRunner(cs.runner); - logger.debug(`[SandboxRunnerPool] Released runner on exit. Pool stats: ${JSON.stringify(pool.getStats())}`); - } catch (releaseErr) { - logger.warn(`[SandboxRunnerPool] Error releasing runner on exit: ${releaseErr}`); - } - cs.runner = null; - } + await safeReleaseRunner(cs, "onExit"); } if (!shouldSendSimulationEndMessage(compileFailed)) return; @@ -212,17 +219,10 @@ export function registerSimulationWebSocket(httpServer: Server, deps: Simulation sendMessageToClient(ws, { type: "compilation_status", gccStatus: "error" }); sendMessageToClient(ws, { type: "simulation_status", status: "stopped" }); const cs = clientRunners.get(ws); - if (cs) { - cs.isRunning = false; - cs.isPaused = false; - - // Release runner back to pool on compile error - if (cs.runner) { - pool.releaseRunner(cs.runner).catch(err => { - logger.warn(`[SandboxRunnerPool] Error releasing runner on compile error: ${err}`); - }); - cs.runner = null; - } + if (cs) { + safeReleaseRunner(cs, "onCompileError").catch((error) => { + logger.warn(`[SandboxRunnerPool] safeReleaseRunner failed in onCompileError: ${error}`); + }); } logger.error(`[Client Compile Error]: ${compileErr}`); }, @@ -288,9 +288,7 @@ export function registerSimulationWebSocket(httpServer: Server, deps: Simulation case "code_changed": { const clientState = clientRunners.get(ws); if (clientState?.runner && (clientState?.isRunning || clientState?.isPaused)) { - clientState.runner.stop(); - clientState.isRunning = false; - clientState.isPaused = false; + await safeReleaseRunner(clientState, "code_changed"); sendMessageToClient(ws, { type: "simulation_status", status: "stopped" }); sendMessageToClient(ws, { type: "serial_output", data: "--- Simulation stopped due to code change ---\n" }); } @@ -300,9 +298,7 @@ export function registerSimulationWebSocket(httpServer: Server, deps: Simulation case "stop_simulation": { const clientState = clientRunners.get(ws); if (clientState?.runner) { - clientState.runner.stop(); - clientState.isRunning = false; - clientState.isPaused = false; + await safeReleaseRunner(clientState, "stop_simulation"); } sendMessageToClient(ws, { type: "simulation_status", status: "stopped" }); sendMessageToClient(ws, { type: "serial_output", data: "--- Simulation stopped ---\n" }); @@ -364,12 +360,7 @@ export function registerSimulationWebSocket(httpServer: Server, deps: Simulation ws.on("close", async () => { const clientState = clientRunners.get(ws); if (clientState?.runner) { - await clientState.runner.stop(); - // Release runner back to pool when client disconnects - await pool.releaseRunner(clientState.runner).catch(err => { - logger.warn(`[SandboxRunnerPool] Error releasing runner on client close: ${err}`); - }); - clientState.runner = null; + await safeReleaseRunner(clientState, "ws-close"); } clientRunners.delete(ws); const rateLimiter = getSimulationRateLimiter(); @@ -377,7 +368,11 @@ export function registerSimulationWebSocket(httpServer: Server, deps: Simulation logger.info(`Client disconnected. Remaining clients: ${wss.clients.size}`); }); - ws.on("error", (error) => { + ws.on("error", async (error) => { + const clientState = clientRunners.get(ws); + if (clientState?.runner) { + await safeReleaseRunner(clientState, "ws-error"); + } logger.error(`WebSocket error: ${error instanceof Error ? error.message : String(error)}`); }); }); @@ -388,14 +383,7 @@ export function registerSimulationWebSocket(httpServer: Server, deps: Simulation for (const [ws, clientState] of clientRunners.entries()) { if (clientState.runner) { - try { - await clientState.runner.stop(); - // Release runner back to pool during reset - await pool.releaseRunner(clientState.runner); - } catch (err) { - logger.debug(`Failed to stop/release runner during reset: ${err}`); - } - clientState.runner = null; + await safeReleaseRunner(clientState, "test-reset"); } clientState.isRunning = false; clientState.isPaused = false; diff --git a/server/services/sandbox-runner-pool.ts b/server/services/sandbox-runner-pool.ts index 6cc62b96..e1dc4966 100644 --- a/server/services/sandbox-runner-pool.ts +++ b/server/services/sandbox-runner-pool.ts @@ -1,49 +1,25 @@ -/** - * SandboxRunnerPool - * - * Manages a fixed pool of SandboxRunner instances to: - * - Prevent unlimited process spawning (OOM protection) - * - Recycle runner instances (efficiency) - * - Maintain strict isolation between requests (security) - * - * Queue-based management ensures fair access when all runners busy. - */ - import { SandboxRunner } from "./sandbox-runner"; -import { Logger } from "@shared/logger"; import { RegistryManager } from "./registry-manager"; +import { Logger } from "@shared/logger"; -/** - * Internal wrapper tracking runner state - */ interface PooledRunner { runner: SandboxRunner; inUse: boolean; lastReleasedTime: number; } -/** - * Queue entry for waiting acquire requests - */ interface QueueEntry { resolve: (runner: SandboxRunner) => void; reject: (error: Error) => void; timeout: NodeJS.Timeout; } -/** - * SandboxRunnerPool - manages fixed number of reusable sandbox runners - * - * Security: Strict state isolation via complete reset on release - * Performance: No unbounded process creation; queue-based fairness - * Reliability: Timeout protection, error handling, cleanup - */ export class SandboxRunnerPool { private readonly numRunners: number; private readonly runners: PooledRunner[] = []; private readonly queue: QueueEntry[] = []; private readonly logger = new Logger("SandboxRunnerPool"); - private readonly acquireTimeoutMs = 60000; // 60s timeout per acquire request + private readonly acquireTimeoutMs = 60000; private initialized = false; constructor(numRunners: number = 5) { @@ -51,17 +27,12 @@ export class SandboxRunnerPool { this.logger.info(`[SandboxRunnerPool] Initialized with target pool size: ${this.numRunners}`); } - /** - * Initialize all runners in the pool - * Deferred from constructor to allow async setup - */ async initialize(): Promise { if (this.initialized) { return; } this.logger.info(`[SandboxRunnerPool] Initializing ${this.numRunners} runner instances...`); - for (let i = 0; i < this.numRunners; i++) { const runner = new SandboxRunner(); this.runners.push({ @@ -76,57 +47,44 @@ export class SandboxRunnerPool { this.logger.info(`[SandboxRunnerPool] Pool ready with ${this.numRunners} runners`); } - /** - * Acquire a runner from the pool - * Returns immediately if available, otherwise queues request - * - * @throws Error if pool not initialized or timeout reached - */ async acquireRunner(): Promise { if (!this.initialized) { throw new Error("SandboxRunnerPool not initialized. Call initialize() first."); } - // Try to find an available runner const available = this.runners.find((p) => !p.inUse); if (available) { available.inUse = true; this.logger.debug( - `[SandboxRunnerPool] Runner acquired (available: ${this.runners.filter((p) => !p.inUse).length}/${this.numRunners - 1})` + `[SandboxRunnerPool] Runner acquired (available: ${this.runners.filter((p) => !p.inUse).length}/${this.numRunners})`, ); return available.runner; } - // All runners busy - queue the request return new Promise((resolve, reject) => { + let entry: QueueEntry; const timeout = setTimeout(() => { - // Remove from queue if timeout fires const index = this.queue.indexOf(entry); if (index !== -1) { this.queue.splice(index, 1); } - reject(new Error(`SandboxRunnerPool: acquire timeout after ${this.acquireTimeoutMs}ms (queue: ${this.queue.length})`)); + reject( + new Error( + `SandboxRunnerPool: acquire timeout after ${this.acquireTimeoutMs}ms (queue: ${this.queue.length})`, + ), + ); }, this.acquireTimeoutMs); - const entry: QueueEntry = { resolve, reject, timeout }; + entry = { resolve, reject, timeout }; this.queue.push(entry); - this.logger.debug( - `[SandboxRunnerPool] Runner queued (queue length: ${this.queue.length}/${this.numRunners})` + `[SandboxRunnerPool] Runner queued (queue length: ${this.queue.length}/${this.numRunners})`, ); }); } - /** - * Release a runner back to the pool - * CRITICAL: Performs complete state reset for isolation - * - * @param runner The runner to release - * @throws Error if runner not from this pool - */ async releaseRunner(runner: SandboxRunner): Promise { const pooledRunner = this.runners.find((p) => p.runner === runner); - if (!pooledRunner) { this.logger.warn("[SandboxRunnerPool] Attempt to release unknown runner (ignored)"); return; @@ -137,66 +95,42 @@ export class SandboxRunnerPool { return; } - // CRITICAL: Complete state reset before returning to pool await this.resetRunnerState(runner); - // Mark as available pooledRunner.inUse = false; pooledRunner.lastReleasedTime = Date.now(); - this.logger.debug( - `[SandboxRunnerPool] Runner released and reset (available: ${this.runners.filter((p) => !p.inUse).length}/${this.numRunners})` + `[SandboxRunnerPool] Runner released and reset (available: ${this.runners.filter((p) => !p.inUse).length}/${this.numRunners})`, ); - // Process queue if any requests waiting if (this.queue.length > 0) { const entry = this.queue.shift()!; clearTimeout(entry.timeout); - entry.resolve(runner); - - // Mark as immediately in use (for next request) pooledRunner.inUse = true; - - this.logger.debug(`[SandboxRunnerPool] Queued request granted (queue: ${this.queue.length} remaining)`); + entry.resolve(runner); + this.logger.debug( + `[SandboxRunnerPool] Queued request granted (queue: ${this.queue.length} remaining)`, + ); } } - /** - * SECURITY CRITICAL: Complete state reset - * Ensures student A cannot see student B's data - * - * Resets all: - * - Callbacks (onOutput, error, etc.) - * - State machines (simulationState counters) - * - Timing data (pauseStartTime, totalPausedTime) - * - Managers (RegistryManager, TimeoutManager) - * - Buffers (output, error) - * - Process state - */ private async resetRunnerState(runner: SandboxRunner): Promise { try { - // 1. Stop any active simulation to trigger internal cleanup if (runner.isRunning) { - this.logger.debug("[SandboxRunnerPool] Runner still running - stopping..."); await runner.stop(); } - // 2. Access private fields via reflection to reset state - // (TypeScript allows this at runtime) const r = runner as any; - // Reset simulation state - r.state = 0; // SimulationState.STOPPED + r.state = "stopped"; r.processKilled = false; r.pauseStartTime = null; r.totalPausedTime = 0; r.lastPauseTimestamp = null; - // Reset batchers to null (already destroyed in stop()) r.pinStateBatcher = null; r.serialOutputBatcher = null; - // Reset callbacks r.onOutputCallback = null; r.outputCallback = null; r.errorCallback = null; @@ -204,37 +138,32 @@ export class SandboxRunnerPool { r.pinStateCallback = null; r.ioRegistryCallback = null; - // Reset buffers r.outputBuffer = ""; r.errorBuffer = ""; + r.totalOutputBytes = 0; r.isSendingOutput = false; - // Reset pending cleanup flag r.pendingCleanup = false; r.cleanupRetries = new Map(); + r.messageQueue = []; - // Clear flush timer if (r.flushTimer) { clearTimeout(r.flushTimer); r.flushTimer = null; } - // Reset file builder state (clear created sketch directories list) - if (r.fileBuilder && typeof r.fileBuilder.reset === 'function') { + if (r.fileBuilder && typeof r.fileBuilder.reset === "function") { r.fileBuilder.reset(); } - // RegistryManager is recreated fresh (not reused across requests) - // This is the safest approach to avoid any state leakage if (r.registryManager) { try { - r.registryManager.destroy(); // Cleanup existing - } catch (e) { - this.logger.debug(`[SandboxRunnerPool] Error destroying old RegistryManager: ${e}`); + r.registryManager.destroy(); + } catch (error) { + this.logger.debug(`[SandboxRunnerPool] Error destroying old RegistryManager: ${error}`); } } - // Create fresh RegistryManager (same as in constructor) r.registryManager = new RegistryManager({ onUpdate: (registry: any, baudrate: any, reason: any) => { if (r.ioRegistryCallback) { @@ -250,7 +179,6 @@ export class SandboxRunnerPool { enableTelemetry: true, }); - // Reset TimeoutManager if (r.timeoutManager) { r.timeoutManager.clear(); } @@ -258,14 +186,9 @@ export class SandboxRunnerPool { this.logger.debug("[SandboxRunnerPool] Runner state reset complete (isolation verified)"); } catch (error) { this.logger.error(`[SandboxRunnerPool] Error during runner reset: ${error}`); - // Don't throw - mark runner as available anyway (will be in incomplete state if reused) - // Better to return runner than to lose it from pool } } - /** - * Get current pool statistics - */ getStats() { return { totalRunners: this.numRunners, @@ -276,20 +199,15 @@ export class SandboxRunnerPool { }; } - /** - * Graceful shutdown - stop all runners - */ async shutdown(): Promise { this.logger.info("[SandboxRunnerPool] Shutting down..."); - // Reject any pending queue entries for (const entry of this.queue) { clearTimeout(entry.timeout); entry.reject(new Error("SandboxRunnerPool shutting down")); } this.queue.length = 0; - // Stop all runners for (const { runner } of this.runners) { try { if (runner.isRunning) { @@ -304,23 +222,15 @@ export class SandboxRunnerPool { } } -// Singleton instance let poolInstance: SandboxRunnerPool | null = null; -/** - * Get or create the global SandboxRunnerPool - */ export function getSandboxRunnerPool(): SandboxRunnerPool { if (!poolInstance) { - poolInstance = new SandboxRunnerPool(5); // Default: 5 runners + poolInstance = new SandboxRunnerPool(5); } return poolInstance; } -/** - * Initialize the global runner pool - * Must be called at app startup - */ export async function initializeSandboxRunnerPool(): Promise { const pool = getSandboxRunnerPool(); await pool.initialize();