From 93da0ce8260b6c8fdf00052296c5992a22db4da1 Mon Sep 17 00:00:00 2001 From: Ben Bachem <10088265+bezbac@users.noreply.github.com> Date: Wed, 1 Jul 2026 18:31:32 +0200 Subject: [PATCH] Trace reasoning messages in Langfuse --- src/index.ts | 14 ++-- src/langfuse.ts | 178 +++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 184 insertions(+), 8 deletions(-) diff --git a/src/index.ts b/src/index.ts index 6338fc4..d840940 100644 --- a/src/index.ts +++ b/src/index.ts @@ -58,6 +58,7 @@ type SessionNextEvent = properties: { sessionID: string; timestamp: number; + assistantMessageID: string; reasoningID: string; text: string; }; @@ -165,6 +166,7 @@ const eventHook = (event: OpencodeEvent, shutdown?: () => Promise) => if (event.type === "message.part.updated") { langfuse.rememberAssistantPart(event.properties.part); + langfuse.traceReasoningPart(event.properties.part); } if (event.type === "session.next.step.started") { @@ -200,15 +202,13 @@ const eventHook = (event: OpencodeEvent, shutdown?: () => Promise) => } if (event.type === "session.next.reasoning.ended") { - langfuse.traceEvent({ - id: event.id, + langfuse.traceReasoning({ + reasoningID: event.properties.reasoningID, sessionID: event.properties.sessionID, - name: "opencode.generation.reasoning", timestamp: event.properties.timestamp, - output: { text: event.properties.text }, - metadata: { - reasoningID: event.properties.reasoningID, - }, + text: event.properties.text, + messageID: event.properties.assistantMessageID, + source: "session.next.reasoning.ended", }); } diff --git a/src/langfuse.ts b/src/langfuse.ts index 382ca4a..7c05609 100644 --- a/src/langfuse.ts +++ b/src/langfuse.ts @@ -34,6 +34,9 @@ export class LangfuseClient { this.traceState.assistantParts.clear(); this.traceState.abortedSessions.clear(); this.traceState.tracedEventIds.clear(); + this.traceState.tracedReasoningIds.clear(); + this.traceState.pendingReasoningPartsByMessageId.clear(); + this.traceState.generationSpansByMessageId.clear(); this.traceState.generationParentSpans.clear(); this.traceState.turnObservationsByMessageId.clear(); this.traceState.latestTurnObservationsBySession.clear(); @@ -103,6 +106,7 @@ export class LangfuseClient { input?: unknown; output?: unknown; metadata?: unknown; + parentSpan?: ApiSpan; }) { if (this.traceState.tracedEventIds.has(input.id)) { return; @@ -110,7 +114,7 @@ export class LangfuseClient { this.traceState.tracedEventIds.add(input.id); - this.withObservationParent(input.sessionID, () => { + const startEvent = () => { const span = this.traceState.tracer.startSpan(input.name, { attributes: { "langfuse.observation.type": "event", @@ -127,6 +131,98 @@ export class LangfuseClient { }); span.end(new Date(input.timestamp)); + }; + + if (input.parentSpan) { + context.with( + trace.setSpan(context.active(), input.parentSpan), + startEvent, + ); + return; + } + + this.withObservationParent(input.sessionID, startEvent); + } + + traceReasoning(input: { + reasoningID: string; + sessionID: string; + timestamp: number; + text: string; + messageID?: string; + source: string; + parentSpan?: ApiSpan; + }) { + if (!input.text.trim()) { + return; + } + + const reasoningTraceKey = `${input.sessionID}:${input.reasoningID}`; + + if (this.traceState.tracedReasoningIds.has(reasoningTraceKey)) { + return; + } + + this.traceState.tracedReasoningIds.add(reasoningTraceKey); + + const parentSpan = + input.parentSpan ?? + (input.messageID + ? this.traceState.generationSpansByMessageId.get(input.messageID) + : undefined); + + const generationParentSpan = + parentSpan ?? + this.traceState.activeGenerationSteps.get(input.sessionID)?.span ?? + this.traceState.generationParentSpans.get(input.sessionID); + + this.traceEvent({ + id: `reasoning:${reasoningTraceKey}`, + sessionID: input.sessionID, + name: "opencode.generation.reasoning", + timestamp: input.timestamp, + output: { text: input.text }, + metadata: { + reasoningID: input.reasoningID, + messageID: input.messageID, + source: input.source, + }, + parentSpan: generationParentSpan, + }); + } + + traceReasoningPart(part: MessagePart) { + const completed = getCompletedReasoningTimestamp(part); + + if (!isCompletedReasoningPart(part) || completed === undefined) { + return; + } + + const generationSpan = + this.traceState.generationSpansByMessageId.get(part.messageID) ?? + this.traceState.activeGenerationSteps.get(part.sessionID)?.span ?? + this.traceState.generationParentSpans.get(part.sessionID); + + if (!generationSpan) { + const pending = + this.traceState.pendingReasoningPartsByMessageId.get(part.messageID) ?? + new Map(); + pending.set(part.id, part); + this.traceState.pendingReasoningPartsByMessageId.set( + part.messageID, + pending, + ); + return; + } + + this.traceReasoning({ + reasoningID: part.id, + sessionID: part.sessionID, + timestamp: completed, + text: part.text, + messageID: part.messageID, + source: "message.part.updated", + parentSpan: generationSpan, }); } @@ -415,6 +511,12 @@ export class LangfuseClient { }), ); + this.traceState.generationSpansByMessageId.set( + input.messageID, + step.span, + ); + this.flushPendingReasoning(input.messageID, step.span); + step.span.end(new Date(input.completed)); this.traceState.activeGenerationSteps.delete(input.sessionID); @@ -458,10 +560,37 @@ export class LangfuseClient { }); this.traceState.generationParentSpans.set(input.sessionID, span); + this.traceState.generationSpansByMessageId.set(input.messageID, span); + this.flushPendingReasoning(input.messageID, span); span.end(new Date(input.completed)); }); } + private flushPendingReasoning(messageID: string, parentSpan: ApiSpan) { + const pending = + this.traceState.pendingReasoningPartsByMessageId.get(messageID) ?? + new Map(); + this.traceState.pendingReasoningPartsByMessageId.delete(messageID); + + for (const part of pending.values()) { + const completed = getCompletedReasoningTimestamp(part); + + if (completed === undefined) { + continue; + } + + this.traceReasoning({ + reasoningID: part.id, + sessionID: part.sessionID, + timestamp: completed, + text: part.text, + messageID: part.messageID, + source: "message.part.updated", + parentSpan, + }); + } + } + traceFailedGenerationStep(input: { id: string; sessionID: string; @@ -736,6 +865,12 @@ export type LangfuseTraceState = { tracedMessageIds: Set; tracedGenerationIds: Set; tracedEventIds: Set; + tracedReasoningIds: Set; + pendingReasoningPartsByMessageId: Map< + string, + Map + >; + generationSpansByMessageId: Map; assistantParts: Map>; turnObservationsByMessageId: Map; latestTurnObservationsBySession: Map; @@ -749,6 +884,41 @@ export type MessagePart = Extract< { type: "message.part.updated" } >["properties"]["part"]; +type CompletedReasoningPart = MessagePart & { + id: string; + sessionID: string; + text: string; + messageID: string; + time: { completed?: number; end?: number }; +}; + +function isCompletedReasoningPart( + part: MessagePart, +): part is CompletedReasoningPart { + return ( + part.type === "reasoning" && + typeof part.id === "string" && + typeof part.sessionID === "string" && + typeof part.messageID === "string" && + typeof part.text === "string" && + typeof getCompletedReasoningTimestamp(part) === "number" + ); +} + +function getCompletedReasoningTimestamp(part: MessagePart) { + const time = (part as { time?: { completed?: unknown; end?: unknown } }).time; + + if (typeof time?.completed === "number") { + return time.completed; + } + + if (typeof time?.end === "number") { + return time.end; + } + + return undefined; +} + export type FormattedMessagePart = | { type: string; text: string } | { type: string; filename?: string; url?: string } @@ -851,6 +1021,12 @@ export const createLangfuseClient = (input: { tracedMessageIds: new Set(), tracedGenerationIds: new Set(), tracedEventIds: new Set(), + tracedReasoningIds: new Set(), + pendingReasoningPartsByMessageId: new Map< + string, + Map + >(), + generationSpansByMessageId: new Map(), assistantParts: new Map>(), turnObservationsByMessageId: new Map(), latestTurnObservationsBySession: new Map(),