diff --git a/packages/bcode-laminar/src/plugin.ts b/packages/bcode-laminar/src/plugin.ts index fa502401b..20a750145 100644 --- a/packages/bcode-laminar/src/plugin.ts +++ b/packages/bcode-laminar/src/plugin.ts @@ -75,6 +75,18 @@ export const LaminarPlugin: Plugin = ({ client }) => { config.experimental = { ...(config.experimental ?? {}), openTelemetry: true } } }, + // Synchronous end-of-turn drain. The bus-based session.idle / + // server.instance.disposed events race with Effect scope teardown in + // headless `bcode run` mode and don't reliably deliver, so the turn span + // was historically being left un-ended and never exported. The host calls + // this hook from its top-level finally before forceFlush, so span.end() + // here gets its export drained by the host's existing forceFlush race. + shutdown: () => { + for (const [sessionId, span] of Object.entries(sessionCurrentTurnSpan)) { + span.end() + delete sessionCurrentTurnSpan[sessionId] + } + }, event: async ({ event }) => { switch (event.type) { case "session.idle": { diff --git a/packages/opencode/src/index.ts b/packages/opencode/src/index.ts index 6261c5a14..c487778ef 100644 --- a/packages/opencode/src/index.ts +++ b/packages/opencode/src/index.ts @@ -250,13 +250,29 @@ try { } process.exitCode = 1 } finally { + // Give plugins a synchronous chance to end any open OTel spans before the + // exporter drain below. The bus-based session.idle / server.instance.disposed + // events race with Effect scope teardown and don't reliably reach plugin + // subscribers in headless `bcode run` mode, so we expose a direct sync hook + // (see packages/opencode/src/plugin/index.ts pluginShutdownHooks). + // The import is wrapped so a module-load failure can't strand the process + // before forceFlush + process.exit() below. + try { + const { pluginShutdownHooks } = await import("./plugin") + for (const hook of pluginShutdownHooks) { + try { + hook() + } catch (err) { + Log.Default.error("plugin shutdown hook failed", { error: err }) + } + } + } catch (err) { + Log.Default.error("plugin shutdown import failed", { error: err }) + } // Drain any registered OTel span processors (e.g. bcode-laminar) before - // exiting. The plugin's `session.idle` event handler is invoked - // fire-and-forget (`packages/opencode/src/plugin/index.ts:249`), so its - // `processor.forceFlush()` Promise was never awaited — without this drain, - // `process.exit()` kills any in-flight gRPC export and the final agent - // span is lost. Bounded with a 3 s race so a wedged exporter cannot hang - // bcode on exit. Generic to any OTel-based plugin, not laminar-specific. + // exiting so the just-ended turn spans actually hit the wire. Bounded with + // a 3 s race so a wedged exporter cannot hang bcode on exit. Generic to any + // OTel-based plugin, not laminar-specific. const provider = trace.getTracerProvider() as { forceFlush?: () => Promise } if (provider.forceFlush) { await Promise.race([ diff --git a/packages/opencode/src/plugin/index.ts b/packages/opencode/src/plugin/index.ts index d15b9433c..004627806 100644 --- a/packages/opencode/src/plugin/index.ts +++ b/packages/opencode/src/plugin/index.ts @@ -32,6 +32,12 @@ import { RuntimeFlags } from "@/effect/runtime-flags" const log = Log.create({ service: "plugin" }) +// Synchronous shutdown hooks invoked from src/index.ts's top-level finally +// before forceFlush. Plugins register here when loaded; runs once per +// process before process.exit(). Module-level intentionally — needs to be +// reachable outside the Effect runtime. +export const pluginShutdownHooks = new Set<() => void>() + type State = { hooks: Hooks[] } @@ -245,29 +251,21 @@ export const layer = Layer.effect( } // Subscribe to bus events, fiber interrupted when scope closes. - // session.idle and server.instance.disposed are plugins' only chance to - // drain async work (e.g. OTel span exporters) before src/index.ts's - // top-level finally runs forceFlush and calls process.exit() — await - // those handlers; keep the rest fire-and-forget for throughput. + // Isolate per-hook failures (sync throw or async rejection) so one bad + // plugin can't kill the subscription fiber and silently disable every + // other plugin's event handler for the rest of the process. yield* bus.subscribeAll().pipe( Stream.runForEach((input) => - Effect.promise(async () => { - const awaitHook = input.type === "server.instance.disposed" || input.type === "session.idle" + Effect.sync(() => { for (const hook of hooks) { try { const ret = hook["event"]?.({ event: input as any }) - if (awaitHook && ret) { - await ret - } else if (ret) { - // Fire-and-forget path: surface async failures to logs instead of letting them - // become unhandledRejections that hide which plugin/event broke. + if (ret) { void Promise.resolve(ret).catch((err) => log.error("plugin event hook failed", { error: err }), ) } } catch (err) { - // Catches sync throws + awaited async rejections so one bad plugin can't kill - // the subscription fiber and silently disable every other plugin. log.error("plugin event hook failed", { error: err }) } } @@ -276,6 +274,25 @@ export const layer = Layer.effect( Effect.forkScoped, ) + // Register synchronous shutdown hooks for the top-level finally in + // src/index.ts. Runs before forceFlush so plugins can end any open + // OTel spans (e.g. bcode-laminar's turn span) — the bus-based + // session.idle / server.instance.disposed paths race with scope + // teardown and don't reliably deliver, so plugins need a direct sync + // entry point. Deregister on instance disposal so multi-instance TUI + // mode doesn't accumulate stale closures across reopens. + const registered: Array<() => void> = [] + for (const hook of hooks) { + if (!hook.shutdown) continue + pluginShutdownHooks.add(hook.shutdown) + registered.push(hook.shutdown) + } + yield* Effect.addFinalizer(() => + Effect.sync(() => { + for (const fn of registered) pluginShutdownHooks.delete(fn) + }), + ) + return { hooks } }), ) diff --git a/packages/plugin/src/index.ts b/packages/plugin/src/index.ts index 6156477be..576427ac9 100644 --- a/packages/plugin/src/index.ts +++ b/packages/plugin/src/index.ts @@ -222,6 +222,15 @@ export type AuthOuathResult = AuthOAuthResult export interface Hooks { event?: (input: { event: Event }) => Promise config?: (input: Config) => Promise + /** + * Synchronous shutdown hook invoked once per process before + * `process.exit()`, after the event loop has finished its last task and + * before the host's OTel span exporter drain. Use this to end any + * still-open OTel spans your plugin created — async work is not honored + * here, but ending a span (`span.end()`) is synchronous and the host's + * `forceFlush` runs right after this hook. + */ + shutdown?: () => void tool?: { [key: string]: ToolDefinition }