Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 56 additions & 9 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import {
LangfuseClientService,
createLangfuseClient,
type ActiveGenerationStep,
type LangfuseClient,
} from "./langfuse.js";
import { OpencodeClientService } from "./opencode.js";
import { log } from "./utils.js";
Expand Down Expand Up @@ -126,27 +127,40 @@ const loadLangfuseCredentials = Effect.gen(function* () {
return credentials;
});

const eventHook = (event: OpencodeEvent) =>
const eventHook = (event: OpencodeEvent, shutdown?: () => Promise<void>) =>
Effect.gen(function* () {
const langfuse = yield* LangfuseClientService;

if (event.type === "session.idle") {
yield* log("info", "Flushing spans");
const finalizeSessionTracing = () => {
langfuse.endActiveToolObservations();
langfuse.endActiveGenerationSteps();
langfuse.endActiveTurnObservations();
langfuse.clearTraceState();
};

if (event.type === "session.idle") {
yield* log("info", "Flushing spans");
finalizeSessionTracing();

yield* langfuse.forceFlush;
}

if (event.type === "server.instance.disposed") {
langfuse.endActiveToolObservations();
langfuse.endActiveGenerationSteps();
langfuse.endActiveTurnObservations();
langfuse.clearTraceState();
finalizeSessionTracing();

yield* langfuse.shutdown;
if (shutdown) {
yield* Effect.tryPromise({
try: () => shutdown(),
catch: (error) => error,
});
}
}

if (event.type === "session.error" && event.properties.sessionID) {
langfuse.traceSessionError({
sessionID: event.properties.sessionID,
error: event.properties.error,
});
}

if (event.type === "message.part.updated") {
Expand Down Expand Up @@ -247,6 +261,18 @@ const formatHookError = (error: unknown) => {
}
};

const createShutdownOnce = (langfuse: LangfuseClient) => {
let shutdownPromise: Promise<void> | undefined;

return () => {
if (!shutdownPromise) {
shutdownPromise = Effect.runPromise(langfuse.shutdown);
}

return shutdownPromise;
};
};

const main = Effect.gen(function* () {
const opencode = yield* OpencodeClientService;

Expand Down Expand Up @@ -290,6 +316,14 @@ const main = Effect.gen(function* () {
Layer.succeed(LangfuseClientService, langfuse),
);

const finalizeTracing = Effect.sync(() => {
langfuse.endActiveToolObservations();
langfuse.endActiveGenerationSteps();
langfuse.endActiveTurnObservations();
langfuse.clearTraceState();
});
const shutdownOnce = createShutdownOnce(langfuse);

const runHook = (
hookName: string,
effect: Effect.Effect<
Expand Down Expand Up @@ -318,6 +352,19 @@ const main = Effect.gen(function* () {
);

const hooks: Hooks = {
dispose: () =>
runHook(
"dispose",
finalizeTracing.pipe(
Effect.zipRight(
Effect.tryPromise({
try: () => shutdownOnce(),
catch: (error) => error,
}),
),
),
),

config: (config) =>
runHook(
"config",
Expand All @@ -331,7 +378,7 @@ const main = Effect.gen(function* () {
}),
),

event: ({ event }) => runHook("event", eventHook(event)),
event: ({ event }) => runHook("event", eventHook(event, shutdownOnce)),

"chat.message": (input, output) =>
runHook(
Expand Down
141 changes: 132 additions & 9 deletions src/langfuse.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,26 +32,56 @@ export class LangfuseClient {

clearTraceState() {
this.traceState.assistantParts.clear();
this.traceState.abortedSessions.clear();
this.traceState.tracedEventIds.clear();
this.traceState.generationParentSpans.clear();
this.traceState.turnObservationsByMessageId.clear();
this.traceState.latestTurnObservationsBySession.clear();
}

endActiveToolObservations() {
for (const observation of this.traceState.activeToolObservations.values()) {
endActiveToolObservations(sessionID?: string, error?: SessionErrorInfo) {
for (const [callID, observation] of this.traceState
.activeToolObservations) {
if (sessionID && observation.sessionID !== sessionID) {
continue;
}

if (error && error.name !== "MessageAbortedError") {
const message = this.getSessionErrorMessage(error);

observation.span.setStatus({
code: SpanStatusCode.ERROR,
message,
});
observation.span.recordException({ message, name: error.name });
}

observation.span.end();
this.traceState.activeToolObservations.delete(callID);
}

this.traceState.activeToolObservations.clear();
}

endActiveGenerationSteps() {
for (const step of this.traceState.activeGenerationSteps.values()) {
endActiveGenerationSteps(sessionID?: string, error?: SessionErrorInfo) {
for (const [activeSessionID, step] of this.traceState
.activeGenerationSteps) {
if (sessionID && activeSessionID !== sessionID) {
continue;
}

if (error && error.name !== "MessageAbortedError") {
const message = this.getSessionErrorMessage(error);

step.span.setStatus({
code: SpanStatusCode.ERROR,
message,
});
step.span.recordException({ message, name: error.name });
}

step.span.end();
this.traceState.activeGenerationSteps.delete(activeSessionID);
this.traceState.generationParentSpans.delete(activeSessionID);
}

this.traceState.activeGenerationSteps.clear();
}

endActiveTurnObservations() {
Expand Down Expand Up @@ -138,6 +168,10 @@ export class LangfuseClient {

existingStep?.span.end(new Date(input.started));

if (!this.getTurnObservation(input.sessionID, undefined)) {
return;
}

this.withTurnParent(input.sessionID, undefined, () => {
const span = this.traceState.tracer.startSpan("opencode.generation", {
attributes: {
Expand Down Expand Up @@ -179,6 +213,8 @@ export class LangfuseClient {
return;
}

this.traceState.abortedSessions.delete(input.sessionID);

const formattedInput = {
role: "user" as const,
parts: input.parts.map((part) => {
Expand Down Expand Up @@ -318,6 +354,10 @@ export class LangfuseClient {
cache: { read: number; write: number };
};
}) {
if (this.traceState.abortedSessions.has(input.sessionID)) {
return;
}

if (this.traceState.tracedGenerationIds.has(input.messageID)) {
return;
}
Expand All @@ -328,8 +368,9 @@ export class LangfuseClient {
role: "assistant" as const,
content: this.getAssistantText(input.messageID),
};
const turn = this.getTurnObservation(input.sessionID, input.parentID);

if (input.mode !== "compaction") {
const turn = this.getTurnObservation(input.sessionID, input.parentID);
turn?.span.setAttribute(
"langfuse.observation.output",
JSON.stringify(output),
Expand Down Expand Up @@ -380,6 +421,10 @@ export class LangfuseClient {
return;
}

if (!turn) {
return;
}

this.withTurnParent(input.sessionID, input.parentID, () => {
const span = this.traceState.tracer.startSpan("opencode.generation", {
attributes: {
Expand Down Expand Up @@ -456,6 +501,10 @@ export class LangfuseClient {
return;
}

if (!this.getTurnObservation(input.sessionID, undefined)) {
return;
}

this.withTurnParent(input.sessionID, undefined, () => {
const span = this.traceState.tracer.startSpan(
"opencode.generation.failed",
Expand All @@ -481,6 +530,49 @@ export class LangfuseClient {
});
}

traceSessionError(input: { sessionID: string; error?: SessionErrorInfo }) {
this.endActiveToolObservations(input.sessionID, input.error);
this.endActiveGenerationSteps(input.sessionID, input.error);

if (input.error?.name === "MessageAbortedError") {
this.traceState.abortedSessions.add(input.sessionID);
}

const turn = this.getTurnObservation(input.sessionID, undefined);

if (!turn) {
this.traceState.generationParentSpans.delete(input.sessionID);

return;
}

if (input.error) {
turn.span.setAttribute(
"langfuse.observation.output",
JSON.stringify({ error: input.error }),
);

if (input.error.name !== "MessageAbortedError") {
const message = this.getSessionErrorMessage(input.error);

turn.span.setStatus({
code: SpanStatusCode.ERROR,
message,
});
turn.span.recordException({ message, name: input.error.name });
}
}

turn.span.end();

if (turn.messageID) {
this.traceState.turnObservationsByMessageId.delete(turn.messageID);
}

this.traceState.latestTurnObservationsBySession.delete(input.sessionID);
this.traceState.generationParentSpans.delete(input.sessionID);
}

traceToolStart(input: {
sessionID: string;
callID: string;
Expand Down Expand Up @@ -558,6 +650,10 @@ export class LangfuseClient {
return;
}

if (!this.getTurnObservation(sessionID, undefined)) {
return;
}

this.withTurnParent(sessionID, undefined, () => {
const span = this.traceState.tracer.startSpan("opencode.generation", {
attributes: {
Expand Down Expand Up @@ -613,11 +709,30 @@ export class LangfuseClient {
.map((part) => part.text)
.join("");
}

private getSessionErrorMessage(error: SessionErrorInfo) {
if ("message" in error && typeof error.message === "string") {
return error.message;
}

if (
"data" in error &&
error.data &&
typeof error.data === "object" &&
"message" in error.data &&
typeof error.data.message === "string"
) {
return error.data.message;
}

return error.name;
}
}

export type LangfuseTraceState = {
tracerName: string;
tracer: Tracer;
abortedSessions: Set<string>;
tracedMessageIds: Set<string>;
tracedGenerationIds: Set<string>;
tracedEventIds: Set<string>;
Expand All @@ -642,6 +757,13 @@ export type FormattedMessagePart =
| { type: string; tool?: string; title?: string }
| { type: string };

export type SessionError = Extract<
Parameters<NonNullable<Hooks["event"]>>[0]["event"],
{ type: "session.error" }
>["properties"]["error"];

export type SessionErrorInfo = NonNullable<SessionError>;

export type UserMessageInput = {
role: "user";
parts: FormattedMessagePart[];
Expand Down Expand Up @@ -725,6 +847,7 @@ export const createLangfuseClient = (input: {
const traceState: LangfuseTraceState = {
tracerName,
tracer: trace.getTracer(tracerName, PLUGIN_VERSION),
abortedSessions: new Set<string>(),
tracedMessageIds: new Set<string>(),
tracedGenerationIds: new Set<string>(),
tracedEventIds: new Set<string>(),
Expand Down