diff --git a/packages/bcode-laminar/src/plugin.ts b/packages/bcode-laminar/src/plugin.ts index f6db1656b..967aaa1da 100644 --- a/packages/bcode-laminar/src/plugin.ts +++ b/packages/bcode-laminar/src/plugin.ts @@ -69,6 +69,21 @@ export const LaminarPlugin: Plugin = ({ client }) => { sdk.start() log("info", `Laminar tracing initialized → ${otlpEndpoint ?? baseUrl}`) + // Track forceFlush() Promises kicked off by bus event handlers + // (session.idle, session.deleted). Each is fire-started by the handler + // but the host doesn't await them — they're orphan microtasks. If + // `process.exit()` fires before they resolve, in-flight OTLP HTTP + // requests die and the span is dropped server-side. + // + // The sync shutdown hook awaits this set before returning so the host's + // `Promise.race([hooks, 3s])` race can let the export actually finish. + const pendingFlushes = new Set>() + const trackFlush = (p: Promise | undefined): void => { + if (!p) return + const wrapped = p.catch(() => {}).finally(() => pendingFlushes.delete(wrapped)) + pendingFlushes.add(wrapped) + } + return Promise.resolve({ config: async (config) => { if (!config.experimental?.openTelemetry) { @@ -82,22 +97,24 @@ export const LaminarPlugin: Plugin = ({ client }) => { // this hook from its top-level finally before forceFlush, so span.end() // here gets its export drained by the host's existing forceFlush race. shutdown: async () => { - // End any still-open turn spans synchronously, then drain the inner - // BatchSpanProcessor. The host awaits this Promise with a bounded - // race so a wedged exporter cannot hang `process.exit()`. + // End any still-open turn spans, then drain. Awaits both an explicit + // forceFlush AND any pendingFlushes kicked off by the bus event + // handlers (session.idle, session.deleted) that the host doesn't + // otherwise wait for. The host's `Promise.race([hooks, 3000ms])` + // bounds this so a wedged exporter cannot hang `process.exit()`. // - // This runs AFTER the Effect runtime has torn down. The bus-based - // session.idle / server.instance.disposed handlers may have already - // emptied `sessionCurrentTurnSpan` and unregistered the global - // TracerProvider via `sdk.shutdown()`. Either way, the BSP itself - // still has its queue intact and can drain — we hold a direct ref - // to `processor` via closure. + // The pendingFlushes set is the critical fix: session.idle's await + // on processor.forceFlush() is an orphan microtask from the host's + // perspective — it kicks off the OTLP HTTP export but `process.exit()` + // would kill the request mid-flight without us tracking it here. // // stderr writes go to v4-worker's bcode-output-.log so cloud // verification can see whether this path executed. Temporary, will // be removed once headless V4 telemetry is settled. const sessionIds = Object.keys(sessionCurrentTurnSpan) - process.stderr.write(`[bcode-laminar] shutdown: ending ${sessionIds.length} open turn span(s)\n`) + process.stderr.write( + `[bcode-laminar] shutdown: ending ${sessionIds.length} open turn span(s), waiting on ${pendingFlushes.size} pending flush(es)\n`, + ) for (const sessionId of sessionIds) { const span = sessionCurrentTurnSpan[sessionId] if (!span) continue @@ -110,14 +127,17 @@ export const LaminarPlugin: Plugin = ({ client }) => { } delete sessionCurrentTurnSpan[sessionId] } - process.stderr.write(`[bcode-laminar] shutdown: forceFlush start\n`) const start = Date.now() + // Kick a final flush AND wait for any in-flight ones from bus handlers. + trackFlush(processor.forceFlush()) try { - await processor.forceFlush() - process.stderr.write(`[bcode-laminar] shutdown: forceFlush done in ${Date.now() - start}ms\n`) + await Promise.all(Array.from(pendingFlushes)) + process.stderr.write( + `[bcode-laminar] shutdown: all flushes done in ${Date.now() - start}ms\n`, + ) } catch (err) { process.stderr.write( - `[bcode-laminar] shutdown: forceFlush threw after ${Date.now() - start}ms: ${(err as Error).message}\n`, + `[bcode-laminar] shutdown: flush threw after ${Date.now() - start}ms: ${(err as Error).message}\n`, ) } }, @@ -128,22 +148,19 @@ export const LaminarPlugin: Plugin = ({ client }) => { const span = sessionCurrentTurnSpan[sessionId] if (span) { const sid = span.spanContext().spanId - process.stderr.write(`[bcode-laminar] session.idle: ending turn span ${sid} session=${sessionId}\n`) + process.stderr.write( + `[bcode-laminar] session.idle: ending turn span ${sid} session=${sessionId}\n`, + ) span.end() delete sessionCurrentTurnSpan[sessionId] - const start = Date.now() - try { - await processor.forceFlush() - process.stderr.write(`[bcode-laminar] session.idle: forceFlush done in ${Date.now() - start}ms\n`) - } catch (err) { - process.stderr.write( - `[bcode-laminar] session.idle: forceFlush threw after ${Date.now() - start}ms: ${(err as Error).message}\n`, - ) - } } else { - process.stderr.write(`[bcode-laminar] session.idle: no turn span for session=${sessionId}\n`) - await processor.forceFlush() + process.stderr.write( + `[bcode-laminar] session.idle: no turn span for session=${sessionId}\n`, + ) } + // Track the flush Promise so the sync shutdown hook can await it + // before process.exit. Fire-and-forget from this fiber's POV. + trackFlush(processor.forceFlush()) break } case "server.instance.disposed": { @@ -182,7 +199,7 @@ export const LaminarPlugin: Plugin = ({ client }) => { } delete subagentSessionIds[sessionId] for (const children of Object.values(subagentSessionIds)) children.delete(sessionId) - await processor.forceFlush() + trackFlush(processor.forceFlush()) break } }