diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index aac893075f00..4a6bb95b2f44 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -661,11 +661,12 @@ export const layer = Layer.effect( } ctx.reasoningMap = {} - yield* Effect.forEach( - Object.values(ctx.toolcalls), - (call) => Deferred.await(call.done).pipe(Effect.timeout("250 millis"), Effect.ignore), - { concurrency: "unbounded" }, - ) + if (!aborted) { + yield* Effect.forEach(Object.values(ctx.toolcalls), (call) => Deferred.await(call.done), { + concurrency: "unbounded", + discard: true, + }) + } for (const toolCallID of Object.keys(ctx.toolcalls)) { const match = yield* readToolCall(toolCallID) diff --git a/packages/opencode/test/session/processor-effect.test.ts b/packages/opencode/test/session/processor-effect.test.ts index 78c7e4c64228..e58a6b7eaf86 100644 --- a/packages/opencode/test/session/processor-effect.test.ts +++ b/packages/opencode/test/session/processor-effect.test.ts @@ -1,6 +1,6 @@ import { NodeFileSystem } from "@effect/platform-node" import { expect } from "bun:test" -import { Cause, Effect, Exit, Fiber, Layer } from "effect" +import { Cause, Effect, Exit, Fiber, Layer, Stream } from "effect" import path from "path" import type { Agent } from "../../src/agent/agent" import { Agent as AgentSvc } from "../../src/agent/agent" @@ -195,6 +195,70 @@ const env = Layer.mergeAll( const it = testEffect(env) +const lateToolLlm = Layer.succeed( + LLM.Service, + LLM.Service.of({ + stream: () => + Stream.make( + { type: "start" } satisfies LLM.Event, + { type: "start-step", request: {}, warnings: [] } satisfies LLM.Event, + { type: "tool-input-start", id: "call-1", toolName: "slow" } satisfies LLM.Event, + { type: "tool-call", toolCallId: "call-1", toolName: "slow", input: {} } satisfies LLM.Event, + { + type: "finish-step", + finishReason: "tool-calls", + rawFinishReason: "tool_calls", + response: { id: "res", modelId: "test-model", timestamp: new Date() }, + providerMetadata: undefined, + usage: { + inputTokens: 1, + outputTokens: 1, + totalTokens: 2, + inputTokenDetails: { + noCacheTokens: undefined, + cacheReadTokens: undefined, + cacheWriteTokens: undefined, + }, + outputTokenDetails: { + textTokens: undefined, + reasoningTokens: undefined, + }, + }, + } satisfies LLM.Event, + { + type: "finish", + finishReason: "tool-calls", + rawFinishReason: "tool_calls", + totalUsage: { + inputTokens: 1, + outputTokens: 1, + totalTokens: 2, + inputTokenDetails: { + noCacheTokens: undefined, + cacheReadTokens: undefined, + cacheWriteTokens: undefined, + }, + outputTokenDetails: { + textTokens: undefined, + reasoningTokens: undefined, + }, + }, + } satisfies LLM.Event, + ), + }), +) +const lateToolEnv = Layer.mergeAll( + TestLLMServer.layer, + SessionProcessor.layer.pipe( + Layer.provide(lateToolLlm), + Layer.provide(summary), + Layer.provide(Image.defaultLayer), + Layer.provide(RuntimeFlags.layer({ experimentalEventSystem: true })), + Layer.provideMerge(deps), + ), +) +const itLateTool = testEffect(lateToolEnv) + const boot = Effect.fn("test.boot")(function* () { const processors = yield* SessionProcessor.Service const session = yield* Session.Service @@ -253,6 +317,64 @@ it.live("session.processor effect tests capture llm input cleanly", () => ), ) +itLateTool.live("session.processor waits for late tool results before cleanup", () => + provideTmpdirServer( + ({ dir }) => + Effect.gen(function* () { + const { processors, session, provider } = yield* boot() + + const chat = yield* session.create({}) + const parent = yield* user(chat.id, "run tool") + const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) + const mdl = yield* provider.getModel(ref.providerID, ref.modelID) + const handle = yield* processors.create({ + assistantMessage: msg, + sessionID: chat.id, + model: mdl, + }) + + const run = yield* handle + .process({ + user: { + id: parent.id, + sessionID: chat.id, + role: "user", + time: parent.time, + agent: parent.agent, + model: { providerID: ref.providerID, modelID: ref.modelID }, + } satisfies MessageV2.User, + sessionID: chat.id, + model: mdl, + agent: agent(), + system: [], + messages: [{ role: "user", content: "run tool" }], + tools: {}, + }) + .pipe(Effect.forkChild) + + yield* waitFor( + Effect.sync(() => + MessageV2.parts(msg.id).find( + (part): part is MessageV2.ToolPart => part.type === "tool" && part.state.status === "running", + ), + ), + "timed out waiting for running tool", + ) + yield* Effect.sleep("350 millis") + yield* handle.completeToolCall("call-1", { title: "slow", metadata: {}, output: "done" }) + + const exit = yield* Fiber.await(run).pipe(Effect.timeout("1 second")) + const tool = MessageV2.parts(msg.id).find((part): part is MessageV2.ToolPart => part.type === "tool") + + expect(Exit.isSuccess(exit)).toBe(true) + expect(tool?.state.status).toBe("completed") + if (tool?.state.status !== "completed") return + expect(tool.state.output).toBe("done") + }), + { git: true, config: (url) => providerCfg(url) }, + ), +) + it.live("session.processor effect tests preserve text start time", () => provideTmpdirServer( ({ dir, llm }) =>