From 28c5a070a704cf487473ca5d5b06413080956ed4 Mon Sep 17 00:00:00 2001 From: Bersabel Tadesse Date: Wed, 3 Jun 2026 11:50:01 -0700 Subject: [PATCH] fix provider restart stop exit handling --- .../src/runtime-provider-process.ts | 85 ++++- .../src/runtime.lifecycle.test.ts | 355 ++++++++++++++++++ packages/agent-runtime/src/runtime.ts | 60 ++- 3 files changed, 479 insertions(+), 21 deletions(-) diff --git a/packages/agent-runtime/src/runtime-provider-process.ts b/packages/agent-runtime/src/runtime-provider-process.ts index 78db02851..221eeb3a5 100644 --- a/packages/agent-runtime/src/runtime-provider-process.ts +++ b/packages/agent-runtime/src/runtime-provider-process.ts @@ -20,10 +20,26 @@ import { import type { RuntimeProviderIdentityState } from "./runtime-thread-identity.js"; import type { AgentRuntimeOptions, AgentRuntimeSkillRoot } from "./types.js"; +/** + * Opaque handle identifying a single caller's expected-shutdown mark. Each + * `markProviderShutdownExpected` call mints a fresh token so overlapping callers + * can each clear only their own mark without disturbing another caller's. + */ +export type ProviderShutdownToken = symbol; + +function createProviderShutdownToken(): ProviderShutdownToken { + return Symbol("provider-shutdown-expected"); +} + export interface RuntimeProviderProcess { adapter: ProviderAdapter; child: ChildProcess; - expectedShutdown: boolean; + /** + * Outstanding expected-shutdown marks. A process exit is treated as expected + * when this set is non-empty; it is consumed (cleared) on exit. A set rather + * than a boolean so concurrent callers cannot clear each other's mark. + */ + expectedShutdownTokens: Set; identity: RuntimeProviderIdentityState; interactiveRequestScope: string; pending: Map; @@ -65,6 +81,15 @@ export interface ShutdownRuntimeProviderArgs { timeoutMs?: number; } +export interface ProviderShutdownExpectedArgs { + providerId: string; +} + +export interface ClearProviderShutdownExpectedArgs { + providerId: string; + token: ProviderShutdownToken; +} + interface CleanupFailedStartupArgs { providerId: string; providerProcess: RuntimeProviderProcess; @@ -188,13 +213,48 @@ export class RuntimeProviderProcessManager { return; } - providerProcess.expectedShutdown = true; + providerProcess.expectedShutdownTokens.add(createProviderShutdownToken()); await this.terminateProviderProcess({ providerProcess, timeoutMs: args.timeoutMs, }); } + /** + * Marks the provider's next process exit as an expected shutdown without + * terminating it, returning a token the caller passes to + * {@link clearProviderShutdownExpected} to undo only its own mark. Callers that + * are about to tear a provider down (for example a restart-provider stop) use + * this so an exit while their request is still in flight is treated as the + * intended shutdown rather than an unexpected crash. Returns `null` when the + * provider is gone or already exited (nothing to mark or later clear). + */ + markProviderShutdownExpected( + args: ProviderShutdownExpectedArgs, + ): ProviderShutdownToken | null { + const providerProcess = this.processes.get(args.providerId); + if (!providerProcess || hasChildProcessExited(providerProcess.child)) { + return null; + } + const token = createProviderShutdownToken(); + providerProcess.expectedShutdownTokens.add(token); + return token; + } + + /** + * Undoes a single {@link markProviderShutdownExpected} mark when the + * anticipated teardown did not happen, so a later unrelated exit is not + * silently treated as an expected shutdown. Only the matching token is + * removed, leaving any concurrent caller's mark intact. + */ + clearProviderShutdownExpected(args: ClearProviderShutdownExpectedArgs): void { + const providerProcess = this.processes.get(args.providerId); + if (!providerProcess) { + return; + } + providerProcess.expectedShutdownTokens.delete(args.token); + } + async shutdown(): Promise { this.shuttingDown = true; const shutdownPromises: Promise[] = []; @@ -263,7 +323,7 @@ export class RuntimeProviderProcessManager { const providerProcess: RuntimeProviderProcess = { child, adapter, - expectedShutdown: false, + expectedShutdownTokens: new Set(), interactiveRequestScope: randomUUID(), identity: this.args.createProviderIdentityState(providerId), pending: new Map(), @@ -324,7 +384,9 @@ export class RuntimeProviderProcessManager { } this.processes.delete(args.providerId); - args.providerProcess.expectedShutdown = true; + args.providerProcess.expectedShutdownTokens.add( + createProviderShutdownToken(), + ); for (const [, pending] of args.providerProcess.pending) { pending.reject(args.startupError); } @@ -445,6 +507,15 @@ export class RuntimeProviderProcessManager { } } +/** + * Whether a child process has terminated, covering both normal exits + * (`exitCode`) and signal terminations (`signalCode`). Node reports a + * signal-killed child with a null `exitCode` and a set `signalCode`. + */ +export function hasChildProcessExited(child: ChildProcess): boolean { + return child.exitCode !== null || child.signalCode !== null; +} + function formatProviderStderr(stderrChunks: readonly string[]): string | null { const stderr = stderrChunks.join("\n").trim(); if (stderr.length === 0) { @@ -456,8 +527,10 @@ function formatProviderStderr(stderrChunks: readonly string[]): string | null { function consumeExpectedProviderProcessShutdown( providerProcess: RuntimeProviderProcess, ): boolean { - const expected = providerProcess.expectedShutdown; - providerProcess.expectedShutdown = false; + // The process exits once, so any outstanding mark (from this or a concurrent + // caller) makes the exit expected; clear them all. + const expected = providerProcess.expectedShutdownTokens.size > 0; + providerProcess.expectedShutdownTokens.clear(); return expected; } diff --git a/packages/agent-runtime/src/runtime.lifecycle.test.ts b/packages/agent-runtime/src/runtime.lifecycle.test.ts index a937bd042..559ddccda 100644 --- a/packages/agent-runtime/src/runtime.lifecycle.test.ts +++ b/packages/agent-runtime/src/runtime.lifecycle.test.ts @@ -16,10 +16,12 @@ import { findLastRecordedCommand, fullRuntimeOptions, wait, + waitForRuntimeState, waitForThreadAgentMessageText, waitForThreadTurnCompleted, waitForThreadTurnStarted, } from "./test/runtime-test-harness.js"; +import type { AgentRuntimeProcessExitInfo } from "./types.js"; describe("createAgentRuntime lifecycle", () => { let tmpDir: string; @@ -986,6 +988,359 @@ rl.on("line", (line) => { await runtime.shutdown(); }); + it("treats a provider exit during a restart-provider stop as an expected shutdown", async () => { + // Mirror codex's app-server exiting while it handles turn/interrupt: the + // provider acknowledges nothing and exits while the stop request is still + // in flight. + const stopExitScript = join(tmpDir, "stop-exit-provider.cjs"); + writeFileSync( + stopExitScript, + `const rl = require("readline").createInterface({ input: process.stdin }); + rl.on("line", (line) => { + const msg = JSON.parse(line); + if (msg.method === "initialize") { + process.stdout.write(JSON.stringify({ jsonrpc: "2.0", id: msg.id, result: {} }) + "\\n"); + } else if (msg.method === "thread/start") { + process.stdout.write(JSON.stringify({ + jsonrpc: "2.0", id: msg.id, + result: { providerThreadId: "prov-stop" } + }) + "\\n"); + process.stdout.write(JSON.stringify({ + jsonrpc: "2.0", method: "thread/identity", + params: { threadId: msg.params?.threadId, providerThreadId: "prov-stop" } + }) + "\\n"); + } else if (msg.method === "thread/stop") { + // Exit while the stop request is in flight, never responding. + setTimeout(() => process.exit(0), 10); + } + });`, + ); + + const exits: AgentRuntimeProcessExitInfo[] = []; + const adapter = createFakeAdapter(stopExitScript); + const runtime = createAgentRuntimeWithAdapters({ + workspacePath: tmpDir, + onEvent: () => {}, + onToolCall: async () => ({ + contentItems: [{ type: "inputText", text: "ok" }], + success: true, + }), + onProcessExit: (info) => { + exits.push(info); + }, + adapterFactory: () => ({ + ...adapter, + buildCommandPlan(command): ProviderCommandPlan { + const plan = adapter.buildCommandPlan(command); + if (command.type === "thread/stop" && plan.kind === "request") { + const processEffect: ProviderCommandProcessEffect = + "restart-provider"; + return { ...plan, processEffect }; + } + return plan; + }, + }), + }); + + await runtime.startThread({ + environmentId: "env-1", + threadId: "t1", + projectId: "p1", + providerId: "fake", + options: fullRuntimeOptions, + }); + + // The stop must resolve even though the provider exits mid-interrupt + // instead of replying — the restart is the intended outcome, not a crash. + await expect( + runtime.stopThread({ threadId: "t1" }), + ).resolves.toBeUndefined(); + expect(runtime.listRunningProviders()).toEqual([]); + + await waitForRuntimeState({ + label: "provider process exit callback", + predicate: () => exits.length === 1, + }); + // The exit is reported as expected, so the runtime manager does not raise a + // provider-crash error for an intentional stop. + expect(exits[0]).toEqual( + expect.objectContaining({ providerId: "fake", expected: true }), + ); + + await runtime.shutdown(); + }); + + it("treats a signal-killed provider during a restart-provider stop as expected", async () => { + // Codex can also die by signal while handling the interrupt. A signal exit + // leaves exitCode null but sets signalCode, so the exited check must cover + // both. + const stopSignalScript = join(tmpDir, "stop-signal-provider.cjs"); + writeFileSync( + stopSignalScript, + `const rl = require("readline").createInterface({ input: process.stdin }); + rl.on("line", (line) => { + const msg = JSON.parse(line); + if (msg.method === "initialize") { + process.stdout.write(JSON.stringify({ jsonrpc: "2.0", id: msg.id, result: {} }) + "\\n"); + } else if (msg.method === "thread/start") { + process.stdout.write(JSON.stringify({ + jsonrpc: "2.0", id: msg.id, + result: { providerThreadId: "prov-stop" } + }) + "\\n"); + process.stdout.write(JSON.stringify({ + jsonrpc: "2.0", method: "thread/identity", + params: { threadId: msg.params?.threadId, providerThreadId: "prov-stop" } + }) + "\\n"); + } else if (msg.method === "thread/stop") { + // Die by signal while the stop request is in flight (exitCode null). + setTimeout(() => process.kill(process.pid, "SIGKILL"), 10); + } + });`, + ); + + const exits: AgentRuntimeProcessExitInfo[] = []; + const adapter = createFakeAdapter(stopSignalScript); + const runtime = createAgentRuntimeWithAdapters({ + workspacePath: tmpDir, + onEvent: () => {}, + onToolCall: async () => ({ + contentItems: [{ type: "inputText", text: "ok" }], + success: true, + }), + onProcessExit: (info) => { + exits.push(info); + }, + adapterFactory: () => ({ + ...adapter, + buildCommandPlan(command): ProviderCommandPlan { + const plan = adapter.buildCommandPlan(command); + if (command.type === "thread/stop" && plan.kind === "request") { + const processEffect: ProviderCommandProcessEffect = + "restart-provider"; + return { ...plan, processEffect }; + } + return plan; + }, + }), + }); + + await runtime.startThread({ + environmentId: "env-1", + threadId: "t1", + projectId: "p1", + providerId: "fake", + options: fullRuntimeOptions, + }); + + await expect( + runtime.stopThread({ threadId: "t1" }), + ).resolves.toBeUndefined(); + expect(runtime.listRunningProviders()).toEqual([]); + + await waitForRuntimeState({ + label: "provider process exit callback", + predicate: () => exits.length === 1, + }); + expect(exits[0]).toEqual( + expect.objectContaining({ + providerId: "fake", + expected: true, + signal: "SIGKILL", + }), + ); + + await runtime.shutdown(); + }); + + it("clears the expected-shutdown mark when a restart-provider stop fails while the provider stays alive", async () => { + // The interrupt is rejected by a protocol error, not a process exit, and + // the provider keeps running. The stop must surface the error AND must not + // leave the provider poisoned so a later unrelated crash is suppressed. + const stopRejectScript = join(tmpDir, "stop-reject-provider.cjs"); + writeFileSync( + stopRejectScript, + `const rl = require("readline").createInterface({ input: process.stdin }); + rl.on("line", (line) => { + const msg = JSON.parse(line); + if (msg.method === "initialize") { + process.stdout.write(JSON.stringify({ jsonrpc: "2.0", id: msg.id, result: {} }) + "\\n"); + } else if (msg.method === "thread/start") { + process.stdout.write(JSON.stringify({ + jsonrpc: "2.0", id: msg.id, + result: { providerThreadId: "prov-stop" } + }) + "\\n"); + process.stdout.write(JSON.stringify({ + jsonrpc: "2.0", method: "thread/identity", + params: { threadId: msg.params?.threadId, providerThreadId: "prov-stop" } + }) + "\\n"); + } else if (msg.method === "thread/stop") { + // Reject the interrupt while staying alive... + process.stdout.write(JSON.stringify({ + jsonrpc: "2.0", id: msg.id, + error: { code: -32000, message: "interrupt failed" } + }) + "\\n"); + // ...then crash later for an unrelated reason. + setTimeout(() => process.exit(7), 100); + } + });`, + ); + + const exits: AgentRuntimeProcessExitInfo[] = []; + const adapter = createFakeAdapter(stopRejectScript); + const runtime = createAgentRuntimeWithAdapters({ + workspacePath: tmpDir, + onEvent: () => {}, + onToolCall: async () => ({ + contentItems: [{ type: "inputText", text: "ok" }], + success: true, + }), + onProcessExit: (info) => { + exits.push(info); + }, + adapterFactory: () => ({ + ...adapter, + buildCommandPlan(command): ProviderCommandPlan { + const plan = adapter.buildCommandPlan(command); + if (command.type === "thread/stop" && plan.kind === "request") { + const processEffect: ProviderCommandProcessEffect = + "restart-provider"; + return { ...plan, processEffect }; + } + return plan; + }, + }), + }); + + await runtime.startThread({ + environmentId: "env-1", + threadId: "t1", + projectId: "p1", + providerId: "fake", + options: fullRuntimeOptions, + }); + + // A non-exit interrupt failure must be surfaced, not swallowed. + await expect(runtime.stopThread({ threadId: "t1" })).rejects.toThrow( + /interrupt failed/, + ); + + // The later, unrelated exit must NOT be suppressed as an expected shutdown. + await waitForRuntimeState({ + label: "unrelated provider exit", + predicate: () => exits.length === 1, + }); + expect(exits[0]).toEqual( + expect.objectContaining({ + providerId: "fake", + expected: false, + code: 7, + }), + ); + + await runtime.shutdown(); + }); + + it("keeps a concurrent stop's expected-shutdown mark when an overlapping restart-provider stop fails", async () => { + // Two restart-provider stops overlap on the same provider process. Stop B + // (t2) fails with a protocol error while the process is alive and clears + // its own mark; Stop A (t1) then causes the intended process exit. A's + // mark must survive B's failure so the exit is still reported as expected. + const overlapScript = join(tmpDir, "stop-overlap-provider.cjs"); + writeFileSync( + overlapScript, + `const rl = require("readline").createInterface({ input: process.stdin }); + let started = 0; + rl.on("line", (line) => { + const msg = JSON.parse(line); + if (msg.method === "initialize") { + process.stdout.write(JSON.stringify({ jsonrpc: "2.0", id: msg.id, result: {} }) + "\\n"); + } else if (msg.method === "thread/start") { + started += 1; + const providerThreadId = "prov-" + started; + process.stdout.write(JSON.stringify({ + jsonrpc: "2.0", id: msg.id, result: { providerThreadId } + }) + "\\n"); + process.stdout.write(JSON.stringify({ + jsonrpc: "2.0", method: "thread/identity", + params: { threadId: msg.params?.threadId, providerThreadId } + }) + "\\n"); + } else if (msg.method === "thread/stop") { + if (msg.params?.threadId === "t2") { + // Stop B fails without exiting, then the process exits for Stop A. + process.stdout.write(JSON.stringify({ + jsonrpc: "2.0", id: msg.id, + error: { code: -32000, message: "interrupt failed" } + }) + "\\n"); + setTimeout(() => process.exit(0), 50); + } + // Stop A (t1): held open; the scheduled exit interrupts it. + } + });`, + ); + + const exits: AgentRuntimeProcessExitInfo[] = []; + const adapter = createFakeAdapter(overlapScript); + const runtime = createAgentRuntimeWithAdapters({ + workspacePath: tmpDir, + onEvent: () => {}, + onToolCall: async () => ({ + contentItems: [{ type: "inputText", text: "ok" }], + success: true, + }), + onProcessExit: (info) => { + exits.push(info); + }, + adapterFactory: () => ({ + ...adapter, + buildCommandPlan(command): ProviderCommandPlan { + const plan = adapter.buildCommandPlan(command); + if (command.type === "thread/stop" && plan.kind === "request") { + const processEffect: ProviderCommandProcessEffect = + "restart-provider"; + return { ...plan, processEffect }; + } + return plan; + }, + }), + }); + + await runtime.startThread({ + environmentId: "env-1", + threadId: "t1", + projectId: "p1", + providerId: "fake", + options: fullRuntimeOptions, + }); + await runtime.startThread({ + environmentId: "env-1", + threadId: "t2", + projectId: "p1", + providerId: "fake", + options: fullRuntimeOptions, + }); + // Both threads share one provider process. + expect(runtime.listRunningProviders()).toEqual(["fake"]); + + const stopA = runtime.stopThread({ threadId: "t1" }); + const stopB = runtime.stopThread({ threadId: "t2" }); + + // Stop B fails (protocol error) and clears only its own mark. + await expect(stopB).rejects.toThrow(/interrupt failed/); + // Stop A's intended exit must still be expected — B must not have erased it. + await expect(stopA).resolves.toBeUndefined(); + expect(runtime.listRunningProviders()).toEqual([]); + + await waitForRuntimeState({ + label: "provider process exit callback", + predicate: () => exits.length === 1, + }); + expect(exits[0]).toEqual( + expect.objectContaining({ providerId: "fake", expected: true }), + ); + + await runtime.shutdown(); + }); + it("steers an active turn", async () => { const events: ThreadEvent[] = []; const runtime = createAgentRuntimeWithAdapters({ diff --git a/packages/agent-runtime/src/runtime.ts b/packages/agent-runtime/src/runtime.ts index efdd1fd0e..f2ad648d0 100644 --- a/packages/agent-runtime/src/runtime.ts +++ b/packages/agent-runtime/src/runtime.ts @@ -32,7 +32,9 @@ import { type RuntimeProviderRequestKind, } from "./runtime-provider-requests.js"; import { + hasChildProcessExited, RuntimeProviderProcessManager, + type ProviderShutdownToken, type RuntimeProviderProcess, } from "./runtime-provider-process.js"; import { @@ -1021,23 +1023,51 @@ function createAgentRuntimeInternal( turnReplayFilter.clearThread(threadId); return; } - await sendJsonRpcRequest({ - child: proc.child, - message: cmd, - pending: proc.pending, - getNextId: () => nextRequestId++, - resultSchema: ignoredJsonRpcResultSchema, - }); - emitAcceptedCommandEvents({ - command: adapterCommand, - proc, - providerId: pid, - rawMethod: cmd.method, - sourceThreadId: threadId, - }); + + // A restart-provider stop tears the provider process down immediately + // after the interrupt, so the process exiting *while the interrupt is in + // flight* is the restart we asked for — not a crash. Mark the shutdown + // expected up front so the exit is not surfaced as a provider-crash error, + // and tolerate the in-flight request rejecting because the process is + // already gone. + const restartsProvider = cmd.processEffect === "restart-provider"; + const shutdownToken: ProviderShutdownToken | null = restartsProvider + ? providerProcesses.markProviderShutdownExpected({ providerId: pid }) + : null; + try { + await sendJsonRpcRequest({ + child: proc.child, + message: cmd, + pending: proc.pending, + getNextId: () => nextRequestId++, + resultSchema: ignoredJsonRpcResultSchema, + }); + emitAcceptedCommandEvents({ + command: adapterCommand, + proc, + providerId: pid, + rawMethod: cmd.method, + sourceThreadId: threadId, + }); + } catch (error) { + if (!(restartsProvider && hasChildProcessExited(proc.child))) { + // The interrupt failed for a reason other than the provider exiting + // (e.g. a protocol/JSON-RPC error) while the process is still live, so + // undo only this stop's expected-shutdown mark — otherwise a later + // unrelated exit would be silently treated as expected. Concurrent + // stops keep their own marks. Then surface the error. + if (shutdownToken) { + providerProcesses.clearProviderShutdownExpected({ + providerId: pid, + token: shutdownToken, + }); + } + throw error; + } + } turnState.clearThread(threadId); turnReplayFilter.clearThread(threadId); - if (cmd.processEffect === "restart-provider") { + if (restartsProvider) { await providerProcesses.shutdownProvider({ providerId: pid }); } },