From c9a3121ad519c29882239dcf94896dc54bde8756 Mon Sep 17 00:00:00 2001 From: Ben Bachem <10088265+bezbac@users.noreply.github.com> Date: Wed, 1 Jul 2026 14:29:30 +0200 Subject: [PATCH] Prevent dangling observations when a turn is interrupted --- src/index.ts | 65 ++++++++++++++++++---- src/langfuse.ts | 141 ++++++++++++++++++++++++++++++++++++++++++++---- 2 files changed, 188 insertions(+), 18 deletions(-) diff --git a/src/index.ts b/src/index.ts index f7adb33..6338fc4 100644 --- a/src/index.ts +++ b/src/index.ts @@ -9,6 +9,7 @@ import { LangfuseClientService, createLangfuseClient, type ActiveGenerationStep, + type LangfuseClient, } from "./langfuse.js"; import { OpencodeClientService } from "./opencode.js"; import { log } from "./utils.js"; @@ -126,27 +127,40 @@ const loadLangfuseCredentials = Effect.gen(function* () { return credentials; }); -const eventHook = (event: OpencodeEvent) => +const eventHook = (event: OpencodeEvent, shutdown?: () => Promise) => Effect.gen(function* () { const langfuse = yield* LangfuseClientService; - if (event.type === "session.idle") { - yield* log("info", "Flushing spans"); + const finalizeSessionTracing = () => { langfuse.endActiveToolObservations(); langfuse.endActiveGenerationSteps(); langfuse.endActiveTurnObservations(); langfuse.clearTraceState(); + }; + + if (event.type === "session.idle") { + yield* log("info", "Flushing spans"); + finalizeSessionTracing(); yield* langfuse.forceFlush; } if (event.type === "server.instance.disposed") { - langfuse.endActiveToolObservations(); - langfuse.endActiveGenerationSteps(); - langfuse.endActiveTurnObservations(); - langfuse.clearTraceState(); + finalizeSessionTracing(); - yield* langfuse.shutdown; + if (shutdown) { + yield* Effect.tryPromise({ + try: () => shutdown(), + catch: (error) => error, + }); + } + } + + if (event.type === "session.error" && event.properties.sessionID) { + langfuse.traceSessionError({ + sessionID: event.properties.sessionID, + error: event.properties.error, + }); } if (event.type === "message.part.updated") { @@ -247,6 +261,18 @@ const formatHookError = (error: unknown) => { } }; +const createShutdownOnce = (langfuse: LangfuseClient) => { + let shutdownPromise: Promise | undefined; + + return () => { + if (!shutdownPromise) { + shutdownPromise = Effect.runPromise(langfuse.shutdown); + } + + return shutdownPromise; + }; +}; + const main = Effect.gen(function* () { const opencode = yield* OpencodeClientService; @@ -290,6 +316,14 @@ const main = Effect.gen(function* () { Layer.succeed(LangfuseClientService, langfuse), ); + const finalizeTracing = Effect.sync(() => { + langfuse.endActiveToolObservations(); + langfuse.endActiveGenerationSteps(); + langfuse.endActiveTurnObservations(); + langfuse.clearTraceState(); + }); + const shutdownOnce = createShutdownOnce(langfuse); + const runHook = ( hookName: string, effect: Effect.Effect< @@ -318,6 +352,19 @@ const main = Effect.gen(function* () { ); const hooks: Hooks = { + dispose: () => + runHook( + "dispose", + finalizeTracing.pipe( + Effect.zipRight( + Effect.tryPromise({ + try: () => shutdownOnce(), + catch: (error) => error, + }), + ), + ), + ), + config: (config) => runHook( "config", @@ -331,7 +378,7 @@ const main = Effect.gen(function* () { }), ), - event: ({ event }) => runHook("event", eventHook(event)), + event: ({ event }) => runHook("event", eventHook(event, shutdownOnce)), "chat.message": (input, output) => runHook( diff --git a/src/langfuse.ts b/src/langfuse.ts index d287559..382ca4a 100644 --- a/src/langfuse.ts +++ b/src/langfuse.ts @@ -32,26 +32,56 @@ export class LangfuseClient { clearTraceState() { this.traceState.assistantParts.clear(); + this.traceState.abortedSessions.clear(); this.traceState.tracedEventIds.clear(); this.traceState.generationParentSpans.clear(); this.traceState.turnObservationsByMessageId.clear(); this.traceState.latestTurnObservationsBySession.clear(); } - endActiveToolObservations() { - for (const observation of this.traceState.activeToolObservations.values()) { + endActiveToolObservations(sessionID?: string, error?: SessionErrorInfo) { + for (const [callID, observation] of this.traceState + .activeToolObservations) { + if (sessionID && observation.sessionID !== sessionID) { + continue; + } + + if (error && error.name !== "MessageAbortedError") { + const message = this.getSessionErrorMessage(error); + + observation.span.setStatus({ + code: SpanStatusCode.ERROR, + message, + }); + observation.span.recordException({ message, name: error.name }); + } + observation.span.end(); + this.traceState.activeToolObservations.delete(callID); } - - this.traceState.activeToolObservations.clear(); } - endActiveGenerationSteps() { - for (const step of this.traceState.activeGenerationSteps.values()) { + endActiveGenerationSteps(sessionID?: string, error?: SessionErrorInfo) { + for (const [activeSessionID, step] of this.traceState + .activeGenerationSteps) { + if (sessionID && activeSessionID !== sessionID) { + continue; + } + + if (error && error.name !== "MessageAbortedError") { + const message = this.getSessionErrorMessage(error); + + step.span.setStatus({ + code: SpanStatusCode.ERROR, + message, + }); + step.span.recordException({ message, name: error.name }); + } + step.span.end(); + this.traceState.activeGenerationSteps.delete(activeSessionID); + this.traceState.generationParentSpans.delete(activeSessionID); } - - this.traceState.activeGenerationSteps.clear(); } endActiveTurnObservations() { @@ -138,6 +168,10 @@ export class LangfuseClient { existingStep?.span.end(new Date(input.started)); + if (!this.getTurnObservation(input.sessionID, undefined)) { + return; + } + this.withTurnParent(input.sessionID, undefined, () => { const span = this.traceState.tracer.startSpan("opencode.generation", { attributes: { @@ -179,6 +213,8 @@ export class LangfuseClient { return; } + this.traceState.abortedSessions.delete(input.sessionID); + const formattedInput = { role: "user" as const, parts: input.parts.map((part) => { @@ -318,6 +354,10 @@ export class LangfuseClient { cache: { read: number; write: number }; }; }) { + if (this.traceState.abortedSessions.has(input.sessionID)) { + return; + } + if (this.traceState.tracedGenerationIds.has(input.messageID)) { return; } @@ -328,8 +368,9 @@ export class LangfuseClient { role: "assistant" as const, content: this.getAssistantText(input.messageID), }; + const turn = this.getTurnObservation(input.sessionID, input.parentID); + if (input.mode !== "compaction") { - const turn = this.getTurnObservation(input.sessionID, input.parentID); turn?.span.setAttribute( "langfuse.observation.output", JSON.stringify(output), @@ -380,6 +421,10 @@ export class LangfuseClient { return; } + if (!turn) { + return; + } + this.withTurnParent(input.sessionID, input.parentID, () => { const span = this.traceState.tracer.startSpan("opencode.generation", { attributes: { @@ -456,6 +501,10 @@ export class LangfuseClient { return; } + if (!this.getTurnObservation(input.sessionID, undefined)) { + return; + } + this.withTurnParent(input.sessionID, undefined, () => { const span = this.traceState.tracer.startSpan( "opencode.generation.failed", @@ -481,6 +530,49 @@ export class LangfuseClient { }); } + traceSessionError(input: { sessionID: string; error?: SessionErrorInfo }) { + this.endActiveToolObservations(input.sessionID, input.error); + this.endActiveGenerationSteps(input.sessionID, input.error); + + if (input.error?.name === "MessageAbortedError") { + this.traceState.abortedSessions.add(input.sessionID); + } + + const turn = this.getTurnObservation(input.sessionID, undefined); + + if (!turn) { + this.traceState.generationParentSpans.delete(input.sessionID); + + return; + } + + if (input.error) { + turn.span.setAttribute( + "langfuse.observation.output", + JSON.stringify({ error: input.error }), + ); + + if (input.error.name !== "MessageAbortedError") { + const message = this.getSessionErrorMessage(input.error); + + turn.span.setStatus({ + code: SpanStatusCode.ERROR, + message, + }); + turn.span.recordException({ message, name: input.error.name }); + } + } + + turn.span.end(); + + if (turn.messageID) { + this.traceState.turnObservationsByMessageId.delete(turn.messageID); + } + + this.traceState.latestTurnObservationsBySession.delete(input.sessionID); + this.traceState.generationParentSpans.delete(input.sessionID); + } + traceToolStart(input: { sessionID: string; callID: string; @@ -558,6 +650,10 @@ export class LangfuseClient { return; } + if (!this.getTurnObservation(sessionID, undefined)) { + return; + } + this.withTurnParent(sessionID, undefined, () => { const span = this.traceState.tracer.startSpan("opencode.generation", { attributes: { @@ -613,11 +709,30 @@ export class LangfuseClient { .map((part) => part.text) .join(""); } + + private getSessionErrorMessage(error: SessionErrorInfo) { + if ("message" in error && typeof error.message === "string") { + return error.message; + } + + if ( + "data" in error && + error.data && + typeof error.data === "object" && + "message" in error.data && + typeof error.data.message === "string" + ) { + return error.data.message; + } + + return error.name; + } } export type LangfuseTraceState = { tracerName: string; tracer: Tracer; + abortedSessions: Set; tracedMessageIds: Set; tracedGenerationIds: Set; tracedEventIds: Set; @@ -642,6 +757,13 @@ export type FormattedMessagePart = | { type: string; tool?: string; title?: string } | { type: string }; +export type SessionError = Extract< + Parameters>[0]["event"], + { type: "session.error" } +>["properties"]["error"]; + +export type SessionErrorInfo = NonNullable; + export type UserMessageInput = { role: "user"; parts: FormattedMessagePart[]; @@ -725,6 +847,7 @@ export const createLangfuseClient = (input: { const traceState: LangfuseTraceState = { tracerName, tracer: trace.getTracer(tracerName, PLUGIN_VERSION), + abortedSessions: new Set(), tracedMessageIds: new Set(), tracedGenerationIds: new Set(), tracedEventIds: new Set(),