Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions packages/bcode-laminar/src/plugin.ts
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,18 @@ export const LaminarPlugin: Plugin = ({ client }) => {
config.experimental = { ...(config.experimental ?? {}), openTelemetry: true }
}
},
// Synchronous end-of-turn drain. The bus-based session.idle /
// server.instance.disposed events race with Effect scope teardown in
// headless `bcode run` mode and don't reliably deliver, so the turn span
// was historically being left un-ended and never exported. The host calls
// this hook from its top-level finally before forceFlush, so span.end()
// here gets its export drained by the host's existing forceFlush race.
shutdown: () => {
for (const [sessionId, span] of Object.entries(sessionCurrentTurnSpan)) {
span.end()
delete sessionCurrentTurnSpan[sessionId]
}
},
event: async ({ event }) => {
switch (event.type) {
case "session.idle": {
Expand Down
28 changes: 22 additions & 6 deletions packages/opencode/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -250,13 +250,29 @@ try {
}
process.exitCode = 1
} finally {
// Give plugins a synchronous chance to end any open OTel spans before the
// exporter drain below. The bus-based session.idle / server.instance.disposed
// events race with Effect scope teardown and don't reliably reach plugin
// subscribers in headless `bcode run` mode, so we expose a direct sync hook
// (see packages/opencode/src/plugin/index.ts pluginShutdownHooks).
// The import is wrapped so a module-load failure can't strand the process
// before forceFlush + process.exit() below.
try {
const { pluginShutdownHooks } = await import("./plugin")
for (const hook of pluginShutdownHooks) {
try {
hook()
} catch (err) {
Log.Default.error("plugin shutdown hook failed", { error: err })
}
}
} catch (err) {
Log.Default.error("plugin shutdown import failed", { error: err })
}
// Drain any registered OTel span processors (e.g. bcode-laminar) before
// exiting. The plugin's `session.idle` event handler is invoked
// fire-and-forget (`packages/opencode/src/plugin/index.ts:249`), so its
// `processor.forceFlush()` Promise was never awaited — without this drain,
// `process.exit()` kills any in-flight gRPC export and the final agent
// span is lost. Bounded with a 3 s race so a wedged exporter cannot hang
// bcode on exit. Generic to any OTel-based plugin, not laminar-specific.
// exiting so the just-ended turn spans actually hit the wire. Bounded with
// a 3 s race so a wedged exporter cannot hang bcode on exit. Generic to any
// OTel-based plugin, not laminar-specific.
const provider = trace.getTracerProvider() as { forceFlush?: () => Promise<void> }
if (provider.forceFlush) {
await Promise.race([
Expand Down
43 changes: 30 additions & 13 deletions packages/opencode/src/plugin/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,12 @@ import { RuntimeFlags } from "@/effect/runtime-flags"

const log = Log.create({ service: "plugin" })

// Synchronous shutdown hooks invoked from src/index.ts's top-level finally
// before forceFlush. Plugins register here when loaded; runs once per
// process before process.exit(). Module-level intentionally — needs to be
// reachable outside the Effect runtime.
export const pluginShutdownHooks = new Set<() => void>()

type State = {
hooks: Hooks[]
}
Expand Down Expand Up @@ -245,29 +251,21 @@ export const layer = Layer.effect(
}

// Subscribe to bus events, fiber interrupted when scope closes.
// session.idle and server.instance.disposed are plugins' only chance to
// drain async work (e.g. OTel span exporters) before src/index.ts's
// top-level finally runs forceFlush and calls process.exit() — await
// those handlers; keep the rest fire-and-forget for throughput.
// Isolate per-hook failures (sync throw or async rejection) so one bad
// plugin can't kill the subscription fiber and silently disable every
// other plugin's event handler for the rest of the process.
yield* bus.subscribeAll().pipe(
Stream.runForEach((input) =>
Effect.promise(async () => {
const awaitHook = input.type === "server.instance.disposed" || input.type === "session.idle"
Effect.sync(() => {
for (const hook of hooks) {
try {
const ret = hook["event"]?.({ event: input as any })
if (awaitHook && ret) {
await ret
} else if (ret) {
// Fire-and-forget path: surface async failures to logs instead of letting them
// become unhandledRejections that hide which plugin/event broke.
if (ret) {
void Promise.resolve(ret).catch((err) =>
log.error("plugin event hook failed", { error: err }),
)
}
} catch (err) {
// Catches sync throws + awaited async rejections so one bad plugin can't kill
// the subscription fiber and silently disable every other plugin.
log.error("plugin event hook failed", { error: err })
}
}
Expand All @@ -276,6 +274,25 @@ export const layer = Layer.effect(
Effect.forkScoped,
)

// Register synchronous shutdown hooks for the top-level finally in
// src/index.ts. Runs before forceFlush so plugins can end any open
// OTel spans (e.g. bcode-laminar's turn span) — the bus-based
// session.idle / server.instance.disposed paths race with scope
// teardown and don't reliably deliver, so plugins need a direct sync
// entry point. Deregister on instance disposal so multi-instance TUI
// mode doesn't accumulate stale closures across reopens.
const registered: Array<() => void> = []
for (const hook of hooks) {
if (!hook.shutdown) continue
pluginShutdownHooks.add(hook.shutdown)
registered.push(hook.shutdown)
}
Comment thread
cubic-dev-ai[bot] marked this conversation as resolved.
yield* Effect.addFinalizer(() =>
Effect.sync(() => {
for (const fn of registered) pluginShutdownHooks.delete(fn)
}),
)

return { hooks }
}),
)
Expand Down
9 changes: 9 additions & 0 deletions packages/plugin/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,15 @@ export type AuthOuathResult = AuthOAuthResult
export interface Hooks {
event?: (input: { event: Event }) => Promise<void>
config?: (input: Config) => Promise<void>
/**
* Synchronous shutdown hook invoked once per process before
* `process.exit()`, after the event loop has finished its last task and
* before the host's OTel span exporter drain. Use this to end any
* still-open OTel spans your plugin created — async work is not honored
* here, but ending a span (`span.end()`) is synchronous and the host's
* `forceFlush` runs right after this hook.
*/
shutdown?: () => void
tool?: {
[key: string]: ToolDefinition
}
Expand Down
Loading