Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions packages/opencode/src/session/processor.ts
Original file line number Diff line number Diff line change
Expand Up @@ -661,11 +661,12 @@ export const layer = Layer.effect(
}
ctx.reasoningMap = {}

yield* Effect.forEach(
Object.values(ctx.toolcalls),
(call) => Deferred.await(call.done).pipe(Effect.timeout("250 millis"), Effect.ignore),
{ concurrency: "unbounded" },
)
if (!aborted) {
yield* Effect.forEach(Object.values(ctx.toolcalls), (call) => Deferred.await(call.done), {
concurrency: "unbounded",
discard: true,
})
}

for (const toolCallID of Object.keys(ctx.toolcalls)) {
const match = yield* readToolCall(toolCallID)
Expand Down
124 changes: 123 additions & 1 deletion packages/opencode/test/session/processor-effect.test.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { NodeFileSystem } from "@effect/platform-node"
import { expect } from "bun:test"
import { Cause, Effect, Exit, Fiber, Layer } from "effect"
import { Cause, Effect, Exit, Fiber, Layer, Stream } from "effect"
import path from "path"
import type { Agent } from "../../src/agent/agent"
import { Agent as AgentSvc } from "../../src/agent/agent"
Expand Down Expand Up @@ -195,6 +195,70 @@ const env = Layer.mergeAll(

const it = testEffect(env)

const lateToolLlm = Layer.succeed(
LLM.Service,
LLM.Service.of({
stream: () =>
Stream.make(
{ type: "start" } satisfies LLM.Event,
{ type: "start-step", request: {}, warnings: [] } satisfies LLM.Event,
{ type: "tool-input-start", id: "call-1", toolName: "slow" } satisfies LLM.Event,
{ type: "tool-call", toolCallId: "call-1", toolName: "slow", input: {} } satisfies LLM.Event,
{
type: "finish-step",
finishReason: "tool-calls",
rawFinishReason: "tool_calls",
response: { id: "res", modelId: "test-model", timestamp: new Date() },
providerMetadata: undefined,
usage: {
inputTokens: 1,
outputTokens: 1,
totalTokens: 2,
inputTokenDetails: {
noCacheTokens: undefined,
cacheReadTokens: undefined,
cacheWriteTokens: undefined,
},
outputTokenDetails: {
textTokens: undefined,
reasoningTokens: undefined,
},
},
} satisfies LLM.Event,
{
type: "finish",
finishReason: "tool-calls",
rawFinishReason: "tool_calls",
totalUsage: {
inputTokens: 1,
outputTokens: 1,
totalTokens: 2,
inputTokenDetails: {
noCacheTokens: undefined,
cacheReadTokens: undefined,
cacheWriteTokens: undefined,
},
outputTokenDetails: {
textTokens: undefined,
reasoningTokens: undefined,
},
},
} satisfies LLM.Event,
),
}),
)
const lateToolEnv = Layer.mergeAll(
TestLLMServer.layer,
SessionProcessor.layer.pipe(
Layer.provide(lateToolLlm),
Layer.provide(summary),
Layer.provide(Image.defaultLayer),
Layer.provide(RuntimeFlags.layer({ experimentalEventSystem: true })),
Layer.provideMerge(deps),
),
)
const itLateTool = testEffect(lateToolEnv)

const boot = Effect.fn("test.boot")(function* () {
const processors = yield* SessionProcessor.Service
const session = yield* Session.Service
Expand Down Expand Up @@ -253,6 +317,64 @@ it.live("session.processor effect tests capture llm input cleanly", () =>
),
)

itLateTool.live("session.processor waits for late tool results before cleanup", () =>
provideTmpdirServer(
({ dir }) =>
Effect.gen(function* () {
const { processors, session, provider } = yield* boot()

const chat = yield* session.create({})
const parent = yield* user(chat.id, "run tool")
const msg = yield* assistant(chat.id, parent.id, path.resolve(dir))
const mdl = yield* provider.getModel(ref.providerID, ref.modelID)
const handle = yield* processors.create({
assistantMessage: msg,
sessionID: chat.id,
model: mdl,
})

const run = yield* handle
.process({
user: {
id: parent.id,
sessionID: chat.id,
role: "user",
time: parent.time,
agent: parent.agent,
model: { providerID: ref.providerID, modelID: ref.modelID },
} satisfies MessageV2.User,
sessionID: chat.id,
model: mdl,
agent: agent(),
system: [],
messages: [{ role: "user", content: "run tool" }],
tools: {},
})
.pipe(Effect.forkChild)

yield* waitFor(
Effect.sync(() =>
MessageV2.parts(msg.id).find(
(part): part is MessageV2.ToolPart => part.type === "tool" && part.state.status === "running",
),
),
"timed out waiting for running tool",
)
yield* Effect.sleep("350 millis")
yield* handle.completeToolCall("call-1", { title: "slow", metadata: {}, output: "done" })

const exit = yield* Fiber.await(run).pipe(Effect.timeout("1 second"))
const tool = MessageV2.parts(msg.id).find((part): part is MessageV2.ToolPart => part.type === "tool")

expect(Exit.isSuccess(exit)).toBe(true)
expect(tool?.state.status).toBe("completed")
if (tool?.state.status !== "completed") return
expect(tool.state.output).toBe("done")
}),
{ git: true, config: (url) => providerCfg(url) },
),
)

it.live("session.processor effect tests preserve text start time", () =>
provideTmpdirServer(
({ dir, llm }) =>
Expand Down
Loading