Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions packages/opencode/src/session/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -661,11 +661,21 @@ export const layer = Layer.effect(
}
ctx.reasoningMap = {}

yield* Effect.forEach(
Object.values(ctx.toolcalls),
(call) => Deferred.await(call.done).pipe(Effect.timeout("250 millis"), Effect.ignore),
{ concurrency: "unbounded" },
)
// Only running tool calls can produce results. Pending calls are still
// streaming input; if the stream fails before `tool-call`, waiting here
// would block cleanup from marking them interrupted.
if (!aborted && !ctx.assistantMessage.error) {
yield* Effect.forEach(
Object.keys(ctx.toolcalls),
(toolCallID) =>
Effect.gen(function* () {
const match = yield* readToolCall(toolCallID)
if (match?.part.state.status !== "running") return
yield* Deferred.await(match.call.done)
}),
{ concurrency: "unbounded", discard: true },
)
}

for (const toolCallID of Object.keys(ctx.toolcalls)) {
const match = yield* readToolCall(toolCallID)
Expand Down
20 changes: 16 additions & 4 deletions packages/opencode/src/tool/read.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import { Effect, Option, Schema, Scope, Stream } from "effect"
import { Cause, Effect, Option, Schema, Scope, Stream } from "effect"
import { NonNegativeInt } from "@opencode-ai/core/schema"
import * as path from "path"
import * as Tool from "./tool"
import { AppFileSystem } from "@opencode-ai/core/filesystem"
import { LSP } from "@/lsp/lsp"
import { InstanceRef } from "@/effect/instance-ref"
import DESCRIPTION from "./read.txt"
import { InstanceState } from "@/effect/instance-state"
import { assertExternalDirectoryEffect } from "./external-directory"
import { Instruction } from "../session/instruction"
import { isPdfAttachment, sniffAttachmentMime } from "@/util/media"
import { Reference } from "@/reference/reference"
import * as Log from "@opencode-ai/core/util/log"
import type { InstanceContext } from "@/project/instance-context"

const DEFAULT_READ_LIMIT = 2000
const MAX_LINE_LENGTH = 2000
Expand All @@ -18,6 +21,7 @@ const MAX_BYTES = 50 * 1024
const MAX_BYTES_LABEL = `${MAX_BYTES / 1024} KB`
const SAMPLE_BYTES = 4096
const SUPPORTED_IMAGE_MIMES = new Set(["image/jpeg", "image/png", "image/gif", "image/webp"])
const log = Log.create({ service: "tool.read" })

class ReadStop extends Schema.TaggedErrorClass<ReadStop>()("ReadStop", {}) {}

Expand Down Expand Up @@ -86,8 +90,16 @@ export const ReadTool = Tool.define(
).pipe(Effect.map((items: string[]) => items.sort((a, b) => a.localeCompare(b))))
})

const warm = Effect.fn("ReadTool.warm")(function* (filepath: string) {
yield* lsp.touchFile(filepath).pipe(Effect.ignore, Effect.forkIn(scope))
const warm = Effect.fn("ReadTool.warm")(function* (filepath: string, instance: InstanceContext) {
yield* lsp
.touchFile(filepath)
.pipe(
Effect.provideService(InstanceRef, instance),
Effect.catchCause((cause) =>
Effect.sync(() => log.warn("lsp warmup failed", { file: filepath, cause: Cause.pretty(cause) })),
),
Effect.forkIn(scope),
)
})

const readSample = Effect.fn("ReadTool.readSample")(function* (
Expand Down Expand Up @@ -314,7 +326,7 @@ export const ReadTool = Tool.define(
}
output += "\n</content>"

yield* warm(filepath)
yield* warm(filepath, instance)

if (loaded.length > 0) {
output += `\n\n<system-reminder>\n${loaded.map((item) => item.content).join("\n\n")}\n</system-reminder>`
Expand Down
197 changes: 196 additions & 1 deletion packages/opencode/test/session/processor-effect.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { NodeFileSystem } from "@effect/platform-node"
import { expect } from "bun:test"
import { Cause, Effect, Exit, Fiber, Layer } from "effect"
import { Cause, Effect, Exit, Fiber, Layer, Stream } from "effect"
import path from "path"
import type { Agent } from "../../src/agent/agent"
import { Agent as AgentSvc } from "../../src/agent/agent"
Expand Down Expand Up @@ -195,6 +195,96 @@ const env = Layer.mergeAll(

const it = testEffect(env)

const lateToolLlm = Layer.succeed(
LLM.Service,
LLM.Service.of({
stream: () =>
Stream.make(
{ type: "start" } satisfies LLM.Event,
{ type: "start-step", request: {}, warnings: [] } satisfies LLM.Event,
{ type: "tool-input-start", id: "call-1", toolName: "slow" } satisfies LLM.Event,
{ type: "tool-call", toolCallId: "call-1", toolName: "slow", input: {} } satisfies LLM.Event,
{
type: "finish-step",
finishReason: "tool-calls",
rawFinishReason: "tool_calls",
response: { id: "res", modelId: "test-model", timestamp: new Date() },
providerMetadata: undefined,
usage: {
inputTokens: 1,
outputTokens: 1,
totalTokens: 2,
inputTokenDetails: {
noCacheTokens: undefined,
cacheReadTokens: undefined,
cacheWriteTokens: undefined,
},
outputTokenDetails: {
textTokens: undefined,
reasoningTokens: undefined,
},
},
} satisfies LLM.Event,
{
type: "finish",
finishReason: "tool-calls",
rawFinishReason: "tool_calls",
totalUsage: {
inputTokens: 1,
outputTokens: 1,
totalTokens: 2,
inputTokenDetails: {
noCacheTokens: undefined,
cacheReadTokens: undefined,
cacheWriteTokens: undefined,
},
outputTokenDetails: {
textTokens: undefined,
reasoningTokens: undefined,
},
},
} satisfies LLM.Event,
),
}),
)
const lateToolEnv = Layer.mergeAll(
TestLLMServer.layer,
SessionProcessor.layer.pipe(
Layer.provide(lateToolLlm),
Layer.provide(summary),
Layer.provide(Image.defaultLayer),
Layer.provide(RuntimeFlags.layer({ experimentalEventSystem: true })),
Layer.provideMerge(deps),
),
)
const itLateTool = testEffect(lateToolEnv)

const interruptedToolInputLlm = Layer.succeed(
LLM.Service,
LLM.Service.of({
stream: () =>
Stream.concat(
Stream.make(
{ type: "start" } satisfies LLM.Event,
{ type: "start-step", request: {}, warnings: [] } satisfies LLM.Event,
{ type: "tool-input-start", id: "call-1", toolName: "bash" } satisfies LLM.Event,
),
Stream.fail(new Error("InstanceRef not provided")),
),
}),
)
const interruptedToolInputEnv = Layer.mergeAll(
TestLLMServer.layer,
SessionProcessor.layer.pipe(
Layer.provide(interruptedToolInputLlm),
Layer.provide(summary),
Layer.provide(Image.defaultLayer),
Layer.provide(RuntimeFlags.layer({ experimentalEventSystem: true })),
Layer.provideMerge(deps),
),
)
const itInterruptedToolInput = testEffect(interruptedToolInputEnv)

const boot = Effect.fn("test.boot")(function* () {
const processors = yield* SessionProcessor.Service
const session = yield* Session.Service
Expand Down Expand Up @@ -253,6 +343,111 @@ it.live("session.processor effect tests capture llm input cleanly", () =>
),
)

itLateTool.live("session.processor waits for late tool results before cleanup", () =>
provideTmpdirServer(
({ dir }) =>
Effect.gen(function* () {
const { processors, session, provider } = yield* boot()

const chat = yield* session.create({})
const parent = yield* user(chat.id, "run tool")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
})

const run = yield* handle
.process({
user: {
id: parent.id,
sessionID: chat.id,
role: "user",
time: parent.time,
agent: parent.agent,
model: { providerID: ref.providerID, modelID: ref.modelID },
} satisfies MessageV2.User,
sessionID: chat.id,
model: mdl,
agent: agent(),
system: [],
messages: [{ role: "user", content: "run tool" }],
tools: {},
})
.pipe(Effect.forkChild)

yield* waitFor(
Effect.sync(() =>
MessageV2.parts(msg.id).find(
(part): part is MessageV2.ToolPart => part.type === "tool" && part.state.status === "running",
),
),
"timed out waiting for running tool",
)
yield* Effect.sleep("350 millis")
yield* handle.completeToolCall("call-1", { title: "slow", metadata: {}, output: "done" })

const exit = yield* Fiber.await(run).pipe(Effect.timeout("1 second"))
const tool = MessageV2.parts(msg.id).find((part): part is MessageV2.ToolPart => part.type === "tool")

expect(Exit.isSuccess(exit)).toBe(true)
expect(tool?.state.status).toBe("completed")
if (tool?.state.status !== "completed") return
expect(tool.state.output).toBe("done")
}),
{ git: true, config: (url) => providerCfg(url) },
),
)

itInterruptedToolInput.live("session.processor does not wait forever for interrupted pending tool input", () =>
provideTmpdirServer(
({ dir }) =>
Effect.gen(function* () {
const { processors, session, provider } = yield* boot()

const chat = yield* session.create({})
const parent = yield* user(chat.id, "run tool")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
})

const value = yield* handle
.process({
user: {
id: parent.id,
sessionID: chat.id,
role: "user",
time: parent.time,
agent: parent.agent,
model: { providerID: ref.providerID, modelID: ref.modelID },
} satisfies MessageV2.User,
sessionID: chat.id,
model: mdl,
agent: agent(),
system: [],
messages: [{ role: "user", content: "run tool" }],
tools: {},
})
.pipe(Effect.timeout("1 second"))
const tool = MessageV2.parts(msg.id).find((part): part is MessageV2.ToolPart => part.type === "tool")

expect(value).toBe("stop")
expect(tool?.state.status).toBe("error")
if (tool?.state.status !== "error") return
expect(tool.state.input).toEqual({})
expect(tool.state.error).toBe("Tool execution aborted")
expect(tool.state.metadata?.interrupted).toBe(true)
}),
{ git: true, config: (url) => providerCfg(url) },
),
)

it.live("session.processor effect tests preserve text start time", () =>
provideTmpdirServer(
({ dir, llm }) =>
Expand Down
34 changes: 32 additions & 2 deletions packages/opencode/test/tool/read.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,39 @@ const referenceLayer = (flags: Partial<RuntimeFlags.Info> = {}) =>
Layer.provide(RuntimeFlags.layer(flags)),
)

const readLayer = (flags: Partial<RuntimeFlags.Info> = {}) =>
const readLayer = (flags: Partial<RuntimeFlags.Info> = {}, lspLayer = LSP.defaultLayer) =>
Layer.mergeAll(
Agent.defaultLayer,
AppFileSystem.defaultLayer,
CrossSpawnSpawner.defaultLayer,
Instruction.defaultLayer,
LSP.defaultLayer,
lspLayer,
referenceLayer(flags),
Truncate.defaultLayer,
)

const it = testEffect(readLayer())
const scout = testEffect(readLayer({ experimentalScout: true }))
const failingLsp = Layer.succeed(
LSP.Service,
LSP.Service.of({
init: () => Effect.void,
status: () => Effect.succeed([]),
hasClients: () => Effect.succeed(true),
touchFile: () => Effect.die(new Error("InstanceRef not provided")),
diagnostics: () => Effect.succeed({}),
hover: () => Effect.succeed(undefined),
definition: () => Effect.succeed([]),
references: () => Effect.succeed([]),
implementation: () => Effect.succeed([]),
documentSymbol: () => Effect.succeed([]),
workspaceSymbol: () => Effect.succeed([]),
prepareCallHierarchy: () => Effect.succeed([]),
incomingCalls: () => Effect.succeed([]),
outgoingCalls: () => Effect.succeed([]),
}),
)
const lspWarmupFailure = testEffect(readLayer({}, failingLsp))

const init = Effect.fn("ReadToolTest.init")(function* () {
const info = yield* ReadTool
Expand Down Expand Up @@ -149,6 +169,16 @@ const asks = () => {
}

describe("tool.read external_directory permission", () => {
lspWarmupFailure.live("returns file contents when best-effort LSP warmup defects", () =>
Effect.gen(function* () {
const dir = yield* tmpdirScoped()
yield* put(path.join(dir, "test.ts"), "export const value = 1\n")

const result = yield* exec(dir, { filePath: path.join(dir, "test.ts") }).pipe(Effect.timeout("1 second"))
expect(result.output).toContain("export const value = 1")
}),
)

it.live("allows reading absolute path inside project directory", () =>
Effect.gen(function* () {
const dir = yield* tmpdirScoped()
Expand Down
Loading