Skip to content

Commit 856363c

Browse files
committed
perf(run-events): unblock bus dispatch with runFork in subscriber callbacks
Bus.subscribeCallback wraps the callback's return value in Effect.tryPromise(() => Promise.resolve().then(() => callback(msg))) inside a runForEach loop, so a Promise-returning callback (such as Effect.runPromise) serializes handler completion per subscription — each descendant question/permission event waits for the previous handler's reject/reply round trip before dispatch advances. Replace Effect.runPromise with Effect.runFork. runFork returns a Fiber synchronously (non-thenable), so the bus's tryPromise resolves immediately and the next event dispatches without waiting. Handler defects no longer surface through Bus.on's tryPromise.catch, so wrap the forked effect with Effect.tapCause + log.error. Drops the per-event Promise wrapper allocation and unblocks concurrent dispatch for long-running subagent loops with many simultaneous descendants. Addresses audit finding F7 (Opus diamond review, 2026-04-22).
1 parent 379dbf4 commit 856363c

1 file changed

Lines changed: 26 additions & 3 deletions

File tree

packages/opencode/src/cli/cmd/run-events.ts

Lines changed: 26 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { Effect, Option } from "effect"
1+
import { Effect, Fiber, Option } from "effect"
22
import { Bus } from "@/bus"
33
import { Permission } from "@/permission"
44
import { Question } from "@/question"
@@ -86,8 +86,29 @@ export const make = Effect.fn("RunEvents.make")(function* (config: Config) {
8686
}
8787
}
8888

89+
// bus.subscribeCallback wraps the callback in an Effect.tryPromise-based
90+
// subscription handler, so a Promise-returning callback (like Effect.runPromise)
91+
// serializes handler completion per subscription. runFork returns a Fiber
92+
// synchronously (non-thenable), unblocking dispatch so descendant question/
93+
// permission events are processed concurrently — important for long-running
94+
// subagent loops with many simultaneous descendants. Defects inside the forked
95+
// fiber do not surface through that subscription callback wrapper, so log them
96+
// here instead. Track in-flight fibers so unsubscribe() can interrupt them and
97+
// bound handler work to the RunEvents lifecycle.
98+
const inflight = new Set<Fiber.Fiber<void>>()
99+
const fork = (effect: Effect.Effect<void>) => {
100+
const fiber: Fiber.Fiber<void> = Effect.runFork(
101+
effect.pipe(
102+
Effect.tapCause((cause) => Effect.sync(() => log.error("handler failed", { cause }))),
103+
Effect.ensuring(Effect.sync(() => inflight.delete(fiber))),
104+
),
105+
)
106+
inflight.add(fiber)
107+
return fiber
108+
}
109+
89110
const unsubQuestion = yield* bus.subscribeCallback(Question.Event.Asked, (evt) =>
90-
Effect.runPromise(
111+
fork(
91112
Effect.gen(function* () {
92113
const mine = yield* isDescendant(evt.properties.sessionID)
93114
if (!mine) return
@@ -98,7 +119,7 @@ export const make = Effect.fn("RunEvents.make")(function* (config: Config) {
98119
)
99120

100121
const unsubPermission = yield* bus.subscribeCallback(Permission.Event.Asked, (evt) =>
101-
Effect.runPromise(
122+
fork(
102123
Effect.gen(function* () {
103124
const mine = yield* isDescendant(evt.properties.sessionID)
104125
if (!mine) return
@@ -115,6 +136,8 @@ export const make = Effect.fn("RunEvents.make")(function* (config: Config) {
115136
const unsubscribe = () => {
116137
unsubQuestion()
117138
unsubPermission()
139+
inflight.forEach((fiber) => Effect.runFork(Fiber.interrupt(fiber)))
140+
inflight.clear()
118141
}
119142

120143
return { stats, unsubscribe } satisfies Handle

0 commit comments

Comments
 (0)