From 7940b566797ec41e44dddb2642c8dd278da71640 Mon Sep 17 00:00:00 2001 From: wangtao Date: Sat, 16 May 2026 17:58:32 +0800 Subject: [PATCH 1/3] fix: wait for tool results before cleanup --- packages/opencode/src/session/processor.ts | 11 +- .../test/session/processor-effect.test.ts | 124 +++++++++++++++++- 2 files changed, 129 insertions(+), 6 deletions(-) diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index aac893075f00..4a6bb95b2f44 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -661,11 +661,12 @@ export const layer = Layer.effect( } ctx.reasoningMap = {} - yield* Effect.forEach( - Object.values(ctx.toolcalls), - (call) => Deferred.await(call.done).pipe(Effect.timeout("250 millis"), Effect.ignore), - { concurrency: "unbounded" }, - ) + if (!aborted) { + yield* Effect.forEach(Object.values(ctx.toolcalls), (call) => Deferred.await(call.done), { + concurrency: "unbounded", + discard: true, + }) + } for (const toolCallID of Object.keys(ctx.toolcalls)) { const match = yield* readToolCall(toolCallID) diff --git a/packages/opencode/test/session/processor-effect.test.ts b/packages/opencode/test/session/processor-effect.test.ts index 78c7e4c64228..e58a6b7eaf86 100644 --- a/packages/opencode/test/session/processor-effect.test.ts +++ b/packages/opencode/test/session/processor-effect.test.ts @@ -1,6 +1,6 @@ import { NodeFileSystem } from "@effect/platform-node" import { expect } from "bun:test" -import { Cause, Effect, Exit, Fiber, Layer } from "effect" +import { Cause, Effect, Exit, Fiber, Layer, Stream } from "effect" import path from "path" import type { Agent } from "../../src/agent/agent" import { Agent as AgentSvc } from "../../src/agent/agent" @@ -195,6 +195,70 @@ const env = Layer.mergeAll( const it = testEffect(env) +const lateToolLlm = Layer.succeed( + LLM.Service, + LLM.Service.of({ + stream: () => + Stream.make( + { type: "start" } satisfies LLM.Event, + { type: "start-step", request: {}, warnings: [] } satisfies LLM.Event, + { type: "tool-input-start", id: "call-1", toolName: "slow" } satisfies LLM.Event, + { type: "tool-call", toolCallId: "call-1", toolName: "slow", input: {} } satisfies LLM.Event, + { + type: "finish-step", + finishReason: "tool-calls", + rawFinishReason: "tool_calls", + response: { id: "res", modelId: "test-model", timestamp: new Date() }, + providerMetadata: undefined, + usage: { + inputTokens: 1, + outputTokens: 1, + totalTokens: 2, + inputTokenDetails: { + noCacheTokens: undefined, + cacheReadTokens: undefined, + cacheWriteTokens: undefined, + }, + outputTokenDetails: { + textTokens: undefined, + reasoningTokens: undefined, + }, + }, + } satisfies LLM.Event, + { + type: "finish", + finishReason: "tool-calls", + rawFinishReason: "tool_calls", + totalUsage: { + inputTokens: 1, + outputTokens: 1, + totalTokens: 2, + inputTokenDetails: { + noCacheTokens: undefined, + cacheReadTokens: undefined, + cacheWriteTokens: undefined, + }, + outputTokenDetails: { + textTokens: undefined, + reasoningTokens: undefined, + }, + }, + } satisfies LLM.Event, + ), + }), +) +const lateToolEnv = Layer.mergeAll( + TestLLMServer.layer, + SessionProcessor.layer.pipe( + Layer.provide(lateToolLlm), + Layer.provide(summary), + Layer.provide(Image.defaultLayer), + Layer.provide(RuntimeFlags.layer({ experimentalEventSystem: true })), + Layer.provideMerge(deps), + ), +) +const itLateTool = testEffect(lateToolEnv) + const boot = Effect.fn("test.boot")(function* () { const processors = yield* SessionProcessor.Service const session = yield* Session.Service @@ -253,6 +317,64 @@ it.live("session.processor effect tests capture llm input cleanly", () => ), ) +itLateTool.live("session.processor waits for late tool results before cleanup", () => + provideTmpdirServer( + ({ dir }) => + Effect.gen(function* () { + const { processors, session, provider } = yield* boot() + + const chat = yield* session.create({}) + const parent = yield* user(chat.id, "run tool") + const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) + const mdl = yield* provider.getModel(ref.providerID, ref.modelID) + const handle = yield* processors.create({ + assistantMessage: msg, + sessionID: chat.id, + model: mdl, + }) + + const run = yield* handle + .process({ + user: { + id: parent.id, + sessionID: chat.id, + role: "user", + time: parent.time, + agent: parent.agent, + model: { providerID: ref.providerID, modelID: ref.modelID }, + } satisfies MessageV2.User, + sessionID: chat.id, + model: mdl, + agent: agent(), + system: [], + messages: [{ role: "user", content: "run tool" }], + tools: {}, + }) + .pipe(Effect.forkChild) + + yield* waitFor( + Effect.sync(() => + MessageV2.parts(msg.id).find( + (part): part is MessageV2.ToolPart => part.type === "tool" && part.state.status === "running", + ), + ), + "timed out waiting for running tool", + ) + yield* Effect.sleep("350 millis") + yield* handle.completeToolCall("call-1", { title: "slow", metadata: {}, output: "done" }) + + const exit = yield* Fiber.await(run).pipe(Effect.timeout("1 second")) + const tool = MessageV2.parts(msg.id).find((part): part is MessageV2.ToolPart => part.type === "tool") + + expect(Exit.isSuccess(exit)).toBe(true) + expect(tool?.state.status).toBe("completed") + if (tool?.state.status !== "completed") return + expect(tool.state.output).toBe("done") + }), + { git: true, config: (url) => providerCfg(url) }, + ), +) + it.live("session.processor effect tests preserve text start time", () => provideTmpdirServer( ({ dir, llm }) => From 21fe2d1af9382c883051e34a15cfa8d14f110cb4 Mon Sep 17 00:00:00 2001 From: wangtao Date: Sat, 16 May 2026 23:36:43 +0800 Subject: [PATCH 2/3] fix: recover interrupted tool and lsp warmup states --- packages/opencode/src/session/processor.ts | 19 +++-- packages/opencode/src/tool/read.ts | 20 ++++- .../test/session/processor-effect.test.ts | 73 +++++++++++++++++++ packages/opencode/test/tool/read.test.ts | 34 ++++++++- 4 files changed, 135 insertions(+), 11 deletions(-) diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index 4a6bb95b2f44..5f97ce04ed7b 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -661,11 +661,20 @@ export const layer = Layer.effect( } ctx.reasoningMap = {} - if (!aborted) { - yield* Effect.forEach(Object.values(ctx.toolcalls), (call) => Deferred.await(call.done), { - concurrency: "unbounded", - discard: true, - }) + // Only running tool calls can produce results. Pending calls are still + // streaming input; if the stream fails before `tool-call`, waiting here + // would block cleanup from marking them interrupted. + if (!aborted && !ctx.assistantMessage.error) { + yield* Effect.forEach( + Object.keys(ctx.toolcalls), + (toolCallID) => + Effect.gen(function* () { + const match = yield* readToolCall(toolCallID) + if (match?.part.state.status !== "running") return + yield* Deferred.await(match.call.done) + }), + { concurrency: "unbounded", discard: true }, + ) } for (const toolCallID of Object.keys(ctx.toolcalls)) { diff --git a/packages/opencode/src/tool/read.ts b/packages/opencode/src/tool/read.ts index 33bff77b9f37..602ca1979ef9 100644 --- a/packages/opencode/src/tool/read.ts +++ b/packages/opencode/src/tool/read.ts @@ -1,15 +1,18 @@ -import { Effect, Option, Schema, Scope, Stream } from "effect" +import { Cause, Effect, Option, Schema, Scope, Stream } from "effect" import { NonNegativeInt } from "@opencode-ai/core/schema" import * as path from "path" import * as Tool from "./tool" import { AppFileSystem } from "@opencode-ai/core/filesystem" import { LSP } from "@/lsp/lsp" +import { InstanceRef } from "@/effect/instance-ref" import DESCRIPTION from "./read.txt" import { InstanceState } from "@/effect/instance-state" import { assertExternalDirectoryEffect } from "./external-directory" import { Instruction } from "../session/instruction" import { isPdfAttachment, sniffAttachmentMime } from "@/util/media" import { Reference } from "@/reference/reference" +import * as Log from "@opencode-ai/core/util/log" +import type { InstanceContext } from "@/project/instance-context" const DEFAULT_READ_LIMIT = 2000 const MAX_LINE_LENGTH = 2000 @@ -18,6 +21,7 @@ const MAX_BYTES = 50 * 1024 const MAX_BYTES_LABEL = `${MAX_BYTES / 1024} KB` const SAMPLE_BYTES = 4096 const SUPPORTED_IMAGE_MIMES = new Set(["image/jpeg", "image/png", "image/gif", "image/webp"]) +const log = Log.create({ service: "tool.read" }) class ReadStop extends Schema.TaggedErrorClass()("ReadStop", {}) {} @@ -86,8 +90,16 @@ export const ReadTool = Tool.define( ).pipe(Effect.map((items: string[]) => items.sort((a, b) => a.localeCompare(b)))) }) - const warm = Effect.fn("ReadTool.warm")(function* (filepath: string) { - yield* lsp.touchFile(filepath).pipe(Effect.ignore, Effect.forkIn(scope)) + const warm = Effect.fn("ReadTool.warm")(function* (filepath: string, instance: InstanceContext) { + yield* lsp + .touchFile(filepath) + .pipe( + Effect.provideService(InstanceRef, instance), + Effect.catchCause((cause) => + Effect.sync(() => log.warn("lsp warmup failed", { file: filepath, cause: Cause.pretty(cause) })), + ), + Effect.forkIn(scope), + ) }) const readSample = Effect.fn("ReadTool.readSample")(function* ( @@ -314,7 +326,7 @@ export const ReadTool = Tool.define( } output += "\n" - yield* warm(filepath) + yield* warm(filepath, instance) if (loaded.length > 0) { output += `\n\n\n${loaded.map((item) => item.content).join("\n\n")}\n` diff --git a/packages/opencode/test/session/processor-effect.test.ts b/packages/opencode/test/session/processor-effect.test.ts index e58a6b7eaf86..a50d9bd809dd 100644 --- a/packages/opencode/test/session/processor-effect.test.ts +++ b/packages/opencode/test/session/processor-effect.test.ts @@ -259,6 +259,32 @@ const lateToolEnv = Layer.mergeAll( ) const itLateTool = testEffect(lateToolEnv) +const interruptedToolInputLlm = Layer.succeed( + LLM.Service, + LLM.Service.of({ + stream: () => + Stream.concat( + Stream.make( + { type: "start" } satisfies LLM.Event, + { type: "start-step", request: {}, warnings: [] } satisfies LLM.Event, + { type: "tool-input-start", id: "call-1", toolName: "bash" } satisfies LLM.Event, + ), + Stream.fail(new Error("InstanceRef not provided")), + ), + }), +) +const interruptedToolInputEnv = Layer.mergeAll( + TestLLMServer.layer, + SessionProcessor.layer.pipe( + Layer.provide(interruptedToolInputLlm), + Layer.provide(summary), + Layer.provide(Image.defaultLayer), + Layer.provide(RuntimeFlags.layer({ experimentalEventSystem: true })), + Layer.provideMerge(deps), + ), +) +const itInterruptedToolInput = testEffect(interruptedToolInputEnv) + const boot = Effect.fn("test.boot")(function* () { const processors = yield* SessionProcessor.Service const session = yield* Session.Service @@ -375,6 +401,53 @@ itLateTool.live("session.processor waits for late tool results before cleanup", ), ) +itInterruptedToolInput.live("session.processor does not wait forever for interrupted pending tool input", () => + provideTmpdirServer( + ({ dir }) => + Effect.gen(function* () { + const { processors, session, provider } = yield* boot() + + const chat = yield* session.create({}) + const parent = yield* user(chat.id, "run tool") + const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) + const mdl = yield* provider.getModel(ref.providerID, ref.modelID) + const handle = yield* processors.create({ + assistantMessage: msg, + sessionID: chat.id, + model: mdl, + }) + + const value = yield* handle + .process({ + user: { + id: parent.id, + sessionID: chat.id, + role: "user", + time: parent.time, + agent: parent.agent, + model: { providerID: ref.providerID, modelID: ref.modelID }, + } satisfies MessageV2.User, + sessionID: chat.id, + model: mdl, + agent: agent(), + system: [], + messages: [{ role: "user", content: "run tool" }], + tools: {}, + }) + .pipe(Effect.timeout("1 second")) + const tool = MessageV2.parts(msg.id).find((part): part is MessageV2.ToolPart => part.type === "tool") + + expect(value).toBe("stop") + expect(tool?.state.status).toBe("error") + if (tool?.state.status !== "error") return + expect(tool.state.input).toEqual({}) + expect(tool.state.error).toBe("Tool execution aborted") + expect(tool.state.metadata?.interrupted).toBe(true) + }), + { git: true, config: (url) => providerCfg(url) }, + ), +) + it.live("session.processor effect tests preserve text start time", () => provideTmpdirServer( ({ dir, llm }) => diff --git a/packages/opencode/test/tool/read.test.ts b/packages/opencode/test/tool/read.test.ts index 1eba44c6a1ee..4629213bae61 100644 --- a/packages/opencode/test/tool/read.test.ts +++ b/packages/opencode/test/tool/read.test.ts @@ -45,19 +45,39 @@ const referenceLayer = (flags: Partial = {}) => Layer.provide(RuntimeFlags.layer(flags)), ) -const readLayer = (flags: Partial = {}) => +const readLayer = (flags: Partial = {}, lspLayer = LSP.defaultLayer) => Layer.mergeAll( Agent.defaultLayer, AppFileSystem.defaultLayer, CrossSpawnSpawner.defaultLayer, Instruction.defaultLayer, - LSP.defaultLayer, + lspLayer, referenceLayer(flags), Truncate.defaultLayer, ) const it = testEffect(readLayer()) const scout = testEffect(readLayer({ experimentalScout: true })) +const failingLsp = Layer.succeed( + LSP.Service, + LSP.Service.of({ + init: () => Effect.void, + status: () => Effect.succeed([]), + hasClients: () => Effect.succeed(true), + touchFile: () => Effect.die(new Error("InstanceRef not provided")), + diagnostics: () => Effect.succeed({}), + hover: () => Effect.succeed(undefined), + definition: () => Effect.succeed([]), + references: () => Effect.succeed([]), + implementation: () => Effect.succeed([]), + documentSymbol: () => Effect.succeed([]), + workspaceSymbol: () => Effect.succeed([]), + prepareCallHierarchy: () => Effect.succeed([]), + incomingCalls: () => Effect.succeed([]), + outgoingCalls: () => Effect.succeed([]), + }), +) +const lspWarmupFailure = testEffect(readLayer({}, failingLsp)) const init = Effect.fn("ReadToolTest.init")(function* () { const info = yield* ReadTool @@ -149,6 +169,16 @@ const asks = () => { } describe("tool.read external_directory permission", () => { + lspWarmupFailure.live("returns file contents when best-effort LSP warmup defects", () => + Effect.gen(function* () { + const dir = yield* tmpdirScoped() + yield* put(path.join(dir, "test.ts"), "export const value = 1\n") + + const result = yield* exec(dir, { filePath: path.join(dir, "test.ts") }).pipe(Effect.timeout("1 second")) + expect(result.output).toContain("export const value = 1") + }), + ) + it.live("allows reading absolute path inside project directory", () => Effect.gen(function* () { const dir = yield* tmpdirScoped() From f703f0df52824353a6be1852cedac58b31f53f9e Mon Sep 17 00:00:00 2001 From: wangtao Date: Sun, 17 May 2026 18:14:31 +0800 Subject: [PATCH 3/3] fix: publish lsp updates in instance context --- packages/opencode/src/lsp/lsp.ts | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/packages/opencode/src/lsp/lsp.ts b/packages/opencode/src/lsp/lsp.ts index 307e85ae739c..d4775e3692ed 100644 --- a/packages/opencode/src/lsp/lsp.ts +++ b/packages/opencode/src/lsp/lsp.ts @@ -212,9 +212,10 @@ export const layer = Layer.effect( const ctx = yield* InstanceState.context if (!containsPath(file, ctx)) return [] as LSPClient.Info[] const s = yield* InstanceState.get(state) - return yield* Effect.promise(async () => { + const next = yield* Effect.promise(async () => { const extension = path.parse(file).ext || file const result: LSPClient.Info[] = [] + let updated = false async function schedule(server: LSPServer.Info, root: string, key: string) { const handle = await server @@ -281,7 +282,7 @@ export const layer = Layer.effect( const task = schedule(server, root, root + server.id) s.spawning.set(root + server.id, task) - task.finally(() => { + void task.finally(() => { if (s.spawning.get(root + server.id) === task) { s.spawning.delete(root + server.id) } @@ -291,11 +292,14 @@ export const layer = Layer.effect( if (!client) continue result.push(client) - Bus.publish(Event.Updated, {}) + updated = true } - return result + return { result, updated } }) + + if (next.updated) yield* Effect.promise(() => Bus.publish(Event.Updated, {})) + return next.result }) const run = Effect.fnUntraced(function* (file: string, fn: (client: LSPClient.Info) => Promise) {