diff --git a/.changeset/fix-ipc-shutdown-entrypoint.md b/.changeset/fix-ipc-shutdown-entrypoint.md new file mode 100644 index 000000000..74f5407e0 --- /dev/null +++ b/.changeset/fix-ipc-shutdown-entrypoint.md @@ -0,0 +1,5 @@ +--- +'@livekit/agents': patch +--- + +fix(ipc): run shutdown callbacks when entrypoint raises during shutdown diff --git a/agents/src/ipc/job_proc_lazy_main.ts b/agents/src/ipc/job_proc_lazy_main.ts index dba451ada..6ae57314e 100644 --- a/agents/src/ipc/job_proc_lazy_main.ts +++ b/agents/src/ipc/job_proc_lazy_main.ts @@ -15,6 +15,7 @@ import type { InferenceExecutor } from './inference_executor.js'; import type { IPCMessage } from './message.js'; const ORPHANED_TIMEOUT = 15 * 1000; +const ENTRYPOINT_SHUTDOWN_TIMEOUT = 15 * 1000; const safeSend = (msg: IPCMessage): boolean => { try { @@ -128,41 +129,59 @@ const startJob = ( } }, 10000); - try { - const closePromise = once(closeEvent, 'close').then((close) => { - logger.debug('shutting down'); - shutdown = true; - safeSend({ case: 'exiting', value: { reason: close[1] } }); - }); + const closePromise = once(closeEvent, 'close').then((close) => { + logger.debug('shutting down'); + shutdown = true; + safeSend({ case: 'exiting', value: { reason: close[1] } }); + }); - // Run the job function within the AsyncLocalStorage context - await runWithJobContextAsync(ctx, async () => { - const { tracer, traceTypes } = await import('../telemetry/index.js'); - return tracer.startActiveSpan( - async (span) => { - span.setAttribute(traceTypes.ATTR_JOB_ID, info.job.id); - span.setAttribute(traceTypes.ATTR_AGENT_NAME, info.job.agentName); - span.setAttribute(traceTypes.ATTR_ROOM_NAME, info.job.room?.name ?? ''); - return func(ctx); - }, - { name: 'job_entrypoint' }, - ); - }) - .then(async () => { - if (!shutdown) { - await closePromise; - } - }) - .finally(async () => { - clearTimeout(unconnectedTimeout); + let entrypointDone = false; + const entrypointPromise = runWithJobContextAsync(ctx, async () => { + const { tracer, traceTypes } = await import('../telemetry/index.js'); + return tracer.startActiveSpan( + async (span) => { + span.setAttribute(traceTypes.ATTR_JOB_ID, info.job.id); + span.setAttribute(traceTypes.ATTR_AGENT_NAME, info.job.agentName); + span.setAttribute(traceTypes.ATTR_ROOM_NAME, info.job.room?.name ?? ''); + return func(ctx); + }, + { name: 'job_entrypoint' }, + ); + }) + .catch((error) => { + if (shutdown) { + logger.debug({ error }, 'entrypoint raised during shutdown'); + return; + } + + logger.error({ error }, 'error in entry function'); + shutdown = true; + safeSend({ + case: 'exiting', + value: { reason: error instanceof Error ? error.message : String(error) }, }); - } catch (error) { - logger.error({ error }, 'error in entry function'); - shutdown = true; - safeSend({ - case: 'exiting', - value: { reason: error instanceof Error ? error.message : String(error) }, + }) + .finally(() => { + entrypointDone = true; + clearTimeout(unconnectedTimeout); }); + + await Promise.race([entrypointPromise, closePromise]); + + if (!shutdown) { + await closePromise; + } else if (!entrypointDone) { + await Promise.race([ + entrypointPromise, + new Promise((resolve) => { + const timeout = setTimeout(() => { + logger.warn('entrypoint did not exit in time'); + resolve(); + }, ENTRYPOINT_SHUTDOWN_TIMEOUT); + + void entrypointPromise.finally(() => clearTimeout(timeout)); + }), + ]); } // Close the primary agent session if it exists