From 7d5e0bc00a0f5dbfb2caedb6d0704b351d4346c7 Mon Sep 17 00:00:00 2001 From: minicx Date: Sat, 16 May 2026 22:40:35 +0300 Subject: [PATCH 1/4] feat(session): add configurable fallback model chain Signed-off-by: minicx --- packages/core/src/session-event.ts | 13 + packages/core/src/session-message.ts | 5 + packages/opencode/src/agent/agent.ts | 11 + packages/opencode/src/cli/cmd/tui/app.tsx | 7 + .../src/cli/cmd/tui/context/sync-v2.tsx | 8 + .../tui/feature-plugins/system/session-v2.tsx | 16 +- .../src/cli/cmd/tui/routes/session/index.tsx | 24 +- packages/opencode/src/config/agent.ts | 4 + packages/opencode/src/config/config.ts | 6 + packages/opencode/src/session/fallback.ts | 408 +++++++++++ packages/opencode/src/session/llm.ts | 664 ++++++++++-------- packages/opencode/src/session/message-v2.ts | 3 + packages/opencode/src/session/processor.ts | 56 +- .../opencode/src/session/projectors-next.ts | 20 + packages/opencode/src/session/prompt.ts | 15 + packages/opencode/src/session/retry.ts | 9 +- 16 files changed, 959 insertions(+), 310 deletions(-) create mode 100644 packages/opencode/src/session/fallback.ts diff --git a/packages/core/src/session-event.ts b/packages/core/src/session-event.ts index a98d9cc05144..761724894ab5 100644 --- a/packages/core/src/session-event.ts +++ b/packages/core/src/session-event.ts @@ -210,6 +210,18 @@ export namespace Reasoning { export type Ended = typeof Ended.Type } +export namespace Model { + export const Updated = EventV2.define({ + type: "session.next.model.updated", + ...options, + schema: { + ...Base, + model: ModelV2.Ref, + }, + }) + export type Updated = typeof Updated.Type +} + export namespace Tool { export namespace Input { export const Started = EventV2.define({ @@ -366,6 +378,7 @@ export const All = Schema.Union( [ AgentSwitched, ModelSwitched, + Model.Updated, Prompted, Synthetic, Shell.Started, diff --git a/packages/core/src/session-message.ts b/packages/core/src/session-message.ts index 73b6dd7da2b9..30424e021128 100644 --- a/packages/core/src/session-message.ts +++ b/packages/core/src/session-message.ts @@ -113,6 +113,11 @@ export class AssistantTool extends Schema.Class("Session.Message. export class AssistantText extends Schema.Class("Session.Message.Assistant.Text")({ type: Schema.Literal("text"), text: Schema.String, + ignored: Schema.optional(Schema.Boolean), + synthetic: Schema.optional(Schema.Boolean), + fallbackNotice: Schema.optional( + Schema.Union([Schema.Literal("using"), Schema.Literal("switch"), Schema.Literal("resume")]), + ), }) {} export class AssistantReasoning extends Schema.Class("Session.Message.Assistant.Reasoning")({ diff --git a/packages/opencode/src/agent/agent.ts b/packages/opencode/src/agent/agent.ts index ce6cf30b6d55..a9ddcb39d04d 100644 --- a/packages/opencode/src/agent/agent.ts +++ b/packages/opencode/src/agent/agent.ts @@ -42,6 +42,14 @@ export const Info = Schema.Struct({ }), ), variant: Schema.optional(Schema.String), + fallbacks: Schema.optional( + Schema.Array( + Schema.Struct({ + providerID: ProviderID, + modelID: ModelID, + }), + ), + ), prompt: Schema.optional(Schema.String), options: Schema.Record(Schema.String, Schema.Unknown), steps: Schema.optional(Schema.Finite), @@ -292,6 +300,9 @@ export const layer = Layer.effect( native: false, } if (value.model) item.model = Provider.parseModel(value.model) + if (value.fallbacks) { + item.fallbacks = value.fallbacks.map((f) => Provider.parseModel(f)) + } item.variant = value.variant ?? item.variant item.prompt = value.prompt ?? item.prompt item.description = value.description ?? item.description diff --git a/packages/opencode/src/cli/cmd/tui/app.tsx b/packages/opencode/src/cli/cmd/tui/app.tsx index af9df4d42f46..0adc221de3ed 100644 --- a/packages/opencode/src/cli/cmd/tui/app.tsx +++ b/packages/opencode/src/cli/cmd/tui/app.tsx @@ -846,6 +846,13 @@ function App(props: { onSnapshot?: () => Promise }) { } }) + event.on("llm.fallback.triggered", (evt) => { + toast.show({ + message: `Falling back to ${evt.properties.modelID} (${evt.properties.reason})`, + variant: "warning", + }) + }) + event.on("session.error", (evt) => { const error = evt.properties.error if (error && typeof error === "object" && error.name === "MessageAbortedError") return diff --git a/packages/opencode/src/cli/cmd/tui/context/sync-v2.tsx b/packages/opencode/src/cli/cmd/tui/context/sync-v2.tsx index d9d23999d21a..eb8c638c6ad0 100644 --- a/packages/opencode/src/cli/cmd/tui/context/sync-v2.tsx +++ b/packages/opencode/src/cli/cmd/tui/context/sync-v2.tsx @@ -143,6 +143,14 @@ export const { use: useSyncV2, provider: SyncProviderV2 } = createSimpleContext( currentAssistant.snapshot = { ...currentAssistant.snapshot, end: event.properties.snapshot } }) break + case "session.next.model.updated": + update(event.properties.sessionID, (draft) => { + const match = + activeAssistant(draft) ?? + [...draft].reverse().find((m): m is SessionMessageAssistant => m.type === "assistant") + if (match) match.model = event.properties.model + }) + break case "session.next.step.failed": update(event.properties.sessionID, (draft) => { const currentAssistant = activeAssistant(draft) diff --git a/packages/opencode/src/cli/cmd/tui/feature-plugins/system/session-v2.tsx b/packages/opencode/src/cli/cmd/tui/feature-plugins/system/session-v2.tsx index 5017b77b0012..469bd1559be4 100644 --- a/packages/opencode/src/cli/cmd/tui/feature-plugins/system/session-v2.tsx +++ b/packages/opencode/src/cli/cmd/tui/feature-plugins/system/session-v2.tsx @@ -365,7 +365,17 @@ function AssistantMessage(props: { } function AssistantText(props: { part: SessionMessageAssistantText; syntax: SyntaxStyle }) { - const { theme } = useTheme() + const { theme, subtleSyntax } = useTheme() + const fallbackNotice = props.part.fallbackNotice + const isFallback = fallbackNotice != null + const fg = fallbackNotice === "resume" + ? theme.success + : isFallback + ? theme.error + : props.part.ignored + ? theme.textMuted + : theme.text + const syntaxStyle = isFallback || props.part.ignored ? subtleSyntax() : props.syntax return ( @@ -373,10 +383,10 @@ function AssistantText(props: { part: SessionMessageAssistantText; syntax: Synta filetype="markdown" drawUnstyledText={false} streaming={true} - syntaxStyle={props.syntax} + syntaxStyle={syntaxStyle} content={props.part.text.trim()} conceal={true} - fg={theme.text} + fg={fg} /> diff --git a/packages/opencode/src/cli/cmd/tui/routes/session/index.tsx b/packages/opencode/src/cli/cmd/tui/routes/session/index.tsx index ce651fdbe46c..a6fb9f617bd8 100644 --- a/packages/opencode/src/cli/cmd/tui/routes/session/index.tsx +++ b/packages/opencode/src/cli/cmd/tui/routes/session/index.tsx @@ -1408,6 +1408,10 @@ function AssistantMessage(props: { message: AssistantMessage; parts: Part[]; las const sync = useSync() const messages = createMemo(() => sync.data.message[props.message.sessionID] ?? []) const model = createMemo(() => Model.name(ctx.providers(), props.message.providerID, props.message.modelID)) + const providerName = createMemo(() => { + const p = ctx.providers()?.get(props.message.providerID) + return p?.name ?? props.message.providerID + }) const final = createMemo(() => { return props.message.finish && !["tool-calls", "unknown"].includes(props.message.finish) @@ -1477,7 +1481,7 @@ function AssistantMessage(props: { message: AssistantMessage; parts: Part[]; las ▣{" "} {" "} {Locale.titlecase(props.message.mode)} - · {model()} + · {model()} ({providerName()}) · {Locale.duration(duration())} @@ -1578,18 +1582,30 @@ function ReasoningPart(props: { last: boolean; part: ReasoningPart; message: Ass function TextPart(props: { last: boolean; part: TextPart; message: AssistantMessage }) { const ctx = use() - const { theme, syntax } = useTheme() + const { theme, syntax, subtleSyntax } = useTheme() + const fallbackNotice = () => (props.part as { fallbackNotice?: "using" | "switch" | "resume" }).fallbackNotice + const isFallback = () => fallbackNotice() != null + const fg = () => + fallbackNotice() === "resume" + ? theme.success + : isFallback() + ? theme.error + : (props.part as { ignored?: boolean }).ignored + ? theme.textMuted + : theme.markdownText + const syntaxStyle = () => + isFallback() || (props.part as { ignored?: boolean }).ignored ? subtleSyntax() : syntax() return ( diff --git a/packages/opencode/src/config/agent.ts b/packages/opencode/src/config/agent.ts index a6719e86743a..39c381e1784b 100644 --- a/packages/opencode/src/config/agent.ts +++ b/packages/opencode/src/config/agent.ts @@ -46,6 +46,9 @@ const AgentSchema = Schema.StructWithRest( }), maxSteps: Schema.optional(PositiveInt).annotate({ description: "@deprecated Use 'steps' field instead." }), permission: Schema.optional(ConfigPermission.Info), + fallbacks: Schema.optional(Schema.mutable(Schema.Array(ConfigModelID))).annotate({ + description: "Fallback models to try when the primary model fails, in provider/model format", + }), }), [Schema.Record(Schema.String, Schema.Any)], ) @@ -65,6 +68,7 @@ const KNOWN_KEYS = new Set([ "maxSteps", "options", "permission", + "fallbacks", "disable", "tools", ]) diff --git a/packages/opencode/src/config/config.ts b/packages/opencode/src/config/config.ts index b13d3a8c8131..93e3bb154bc6 100644 --- a/packages/opencode/src/config/config.ts +++ b/packages/opencode/src/config/config.ts @@ -165,6 +165,12 @@ export const Info = Schema.Struct({ model: Schema.optional(ConfigModelID).annotate({ description: "Model to use in the format of provider/model, eg anthropic/claude-2", }), + fallbacks: Schema.optional(Schema.mutable(Schema.Array(ConfigModelID))).annotate({ + description: "Fallback models to try when the primary model fails, in provider/model format", + }), + cooldown_seconds: Schema.optional(Schema.Number.check(Schema.isGreaterThan(0))).annotate({ + description: "Duration in seconds to put a provider/model in cooldown after a retryable error (default: 300)", + }), small_model: Schema.optional(ConfigModelID).annotate({ description: "Small model to use for tasks like title generation in the format of provider/model", }), diff --git a/packages/opencode/src/session/fallback.ts b/packages/opencode/src/session/fallback.ts new file mode 100644 index 000000000000..7bf8827d6388 --- /dev/null +++ b/packages/opencode/src/session/fallback.ts @@ -0,0 +1,408 @@ +import { BusEvent } from "@/bus/bus-event" +import { Schema, Effect, Clock, Cause, Duration } from "effect" +import * as Stream from "effect/Stream" +import * as Option from "effect/Option" +import { ProviderID, ModelID } from "@/provider/schema" +import { SessionID } from "./schema" +import { SessionRetry } from "./retry" +import type { Err } from "./retry" +import type { Provider } from "@/provider/provider" +import type { Bus } from "@/bus" +import type * as Log from "@opencode-ai/core/util/log" + +export const QUOTA_COOLDOWN_MS = 6 * 60 * 60 * 1000 +export const DEFAULT_COOLDOWN_SECONDS = 300 +export const NOTICE_REASON_MAX_LENGTH = 40 +export const WAIT_CAP_MS = 30_000 + +export const FALLBACK_NOTICE_ID = "fallback-notice" +export const FALLBACK_RESUME_ID = "fallback-resume" +export const FALLBACK_USING_ID = "fallback-using" + +export class CooldownManager { + private store = new Map() + + put(providerID: string, modelID: string, durationMs: number): Effect.Effect { + const key = `${providerID}/${modelID}` + const s = this.store + return Effect.gen(function* () { + const now = yield* Clock.currentTimeMillis + s.set(key, now + durationMs) + }) + } + + isCooledDown(providerID: string, modelID: string): Effect.Effect { + const key = `${providerID}/${modelID}` + const s = this.store + return Effect.gen(function* () { + const now = yield* Clock.currentTimeMillis + const expiry = s.get(key) + if (expiry === undefined) return false + if (now >= expiry) { + s.delete(key) + return false + } + return true + }) + } + + remaining(providerID: string, modelID: string): Effect.Effect { + const key = `${providerID}/${modelID}` + const s = this.store + return Effect.gen(function* () { + const now = yield* Clock.currentTimeMillis + const expiry = s.get(key) + if (expiry === undefined) return undefined + const left = expiry - now + if (left <= 0) { + s.delete(key) + return undefined + } + return left + }) + } + + clear(providerID: string, modelID: string): void { + this.store.delete(`${providerID}/${modelID}`) + } +} + +export const FallbackTriggered = BusEvent.define( + "llm.fallback.triggered", + Schema.Struct({ + sessionID: SessionID, + modelID: ModelID, + providerID: ProviderID, + reason: Schema.String, + }), +) + +export const FallbackUsed = BusEvent.define( + "llm.fallback.used", + Schema.Struct({ + sessionID: SessionID, + modelID: ModelID, + providerID: ProviderID, + }), +) + +export type ChainEntry = { providerID: string; modelID: string } + +export type ClassifiedError = { + error: unknown + isRetryable: boolean + retryInfo: SessionRetry.Retryable | undefined + reason: string +} + +export type StreamChunk = { + type: string + id?: string + text?: string + providerMetadata?: unknown + error?: unknown + [key: string]: unknown +} + +export type ProviderStreamResult = { + fullStream: AsyncIterable +} + +export interface FallbackDeps { + provider: { + getModel: (providerID: ProviderID, modelID: ModelID) => Effect.Effect + getProvider: (providerID: ProviderID) => Effect.Effect + } + bus: Bus.Interface + config: { + get: () => Effect.Effect<{ cooldown_seconds?: number }, unknown> + } + classifyError: ( + cause: Cause.Cause, + prevProviderID: string, + prevModelID: string, + cooldownSeconds: number, + ) => ClassifiedError | null + call: (model: Provider.Model, providerID: string, modelID: string) => Effect.Effect + log: Log.Logger + cooldown: CooldownManager + sessionFallbackState: SessionFallbackState +} + +export type FallbackInput = { + sessionID: string + model: Provider.Model & { providerID: string; id: string } + fallbacks?: Array<{ providerID: string; modelID: string }> + abort: AbortSignal +} + +export function isRetryable(error: Err, provider: string): boolean { + return SessionRetry.retryable(error, provider) !== undefined +} + +function noticeChunks(text: string, id: string): StreamChunk[] { + return [ + { type: "text-start", id }, + { type: "text-delta", id, text }, + { type: "text-end", id }, + ] +} + +async function* filterErrors(fullStream: AsyncIterable): AsyncGenerator { + for await (const chunk of fullStream) { + if (chunk.type === "error") { + const err = chunk.error + throw err instanceof Error ? err : new Error(String(err)) + } + yield chunk + } +} + +function toStream(result: ProviderStreamResult): Stream.Stream { + return Stream.fromAsyncIterable(filterErrors(result.fullStream), (e) => + e instanceof Error ? e : new Error(String(e)), + ) +} + +function cooldownDurationMs(error: ClassifiedError, cooldownSeconds: number): number { + if (error.retryInfo?.quotaLimit) return QUOTA_COOLDOWN_MS + const headers = (error.error as { data?: { responseHeaders?: Record; statusCode?: number } })?.data + ?.responseHeaders + if (headers) { + const retryAfterMs = headers["retry-after-ms"] + if (retryAfterMs) { + const parsed = Number.parseFloat(retryAfterMs) + if (!Number.isNaN(parsed)) return parsed + } + const retryAfter = headers["retry-after"] + if (retryAfter) { + const parsed = Number.parseFloat(retryAfter) * 1000 + if (!Number.isNaN(parsed)) return Math.ceil(parsed) + const dateParsed = Date.parse(retryAfter) - Date.now() + if (!Number.isNaN(dateParsed) && dateParsed > 0) return Math.ceil(dateParsed) + } + } + return cooldownSeconds * 1000 +} + +function truncateReason(reason: string): string { + return reason.length > NOTICE_REASON_MAX_LENGTH ? reason.slice(0, NOTICE_REASON_MAX_LENGTH - 3) + "..." : reason +} + +// Session-scoped state. A session is "on fallback" if its last successful +// stream used a fallback. We remember this across turns so that when primary +// recovers we can show "Switched back" notice. Cleared when we successfully +// return to primary. +export class SessionFallbackState { + private flags = new Set() + + isOnFallback(sessionID: string): boolean { + return this.flags.has(sessionID) + } + + markOnFallback(sessionID: string): void { + this.flags.add(sessionID) + } + + clear(sessionID: string): void { + this.flags.delete(sessionID) + } +} + +export function withFallback( + input: FallbackInput, + deps: FallbackDeps, +): Effect.Effect, unknown> { + return Effect.gen(function* () { + const cooldown = deps.cooldown + const cfg = yield* deps.config.get() + const cooldownSeconds = cfg.cooldown_seconds ?? DEFAULT_COOLDOWN_SECONDS + const fallbacks = input.fallbacks ?? [] + const primary: ChainEntry = { providerID: input.model.providerID, modelID: input.model.id } + + if (fallbacks.length === 0) { + // No fallbacks configured: just return the primary stream as-is. + const primaryResult = yield* deps.call(input.model, primary.providerID, primary.modelID) + return toStream(primaryResult) + } + + const wasOnFallback = deps.sessionFallbackState.isOnFallback(input.sessionID) + + const chainFallback = ( + stream: Stream.Stream, + prevEntry: ChainEntry, + entry: ChainEntry, + ): Stream.Stream => { + const el = deps.log.clone().tag("providerID", entry.providerID).tag("modelID", entry.modelID) + return stream.pipe( + Stream.catchCause((cause) => + Stream.unwrap( + Effect.gen(function* () { + if (input.abort.aborted) return yield* Effect.fail(new Error("Request aborted")) + + if (yield* cooldown.isCooledDown(entry.providerID, entry.modelID)) { + el.info("skipping cooled-down fallback") + return yield* Effect.failCause(cause) + } + + const resolved = yield* deps.provider + .getModel(ProviderID.make(entry.providerID), ModelID.make(entry.modelID)) + .pipe(Effect.option) + if (Option.isNone(resolved)) { + el.info("fallback model not found, skipping") + return yield* Effect.failCause(cause) + } + const model = resolved.value + + const classified = deps.classifyError(cause, prevEntry.providerID, prevEntry.modelID, cooldownSeconds) + if (!classified) { + el.info("non-retryable error, not falling back") + return yield* Effect.failCause(cause) + } + + const durationMs = cooldownDurationMs(classified, cooldownSeconds) + yield* cooldown.put(prevEntry.providerID, prevEntry.modelID, durationMs) + el.info("stream error, falling back", { cooldownMs: durationMs }) + + yield* deps.bus.publish(FallbackTriggered, { + sessionID: SessionID.make(input.sessionID), + modelID: ModelID.make(prevEntry.modelID), + providerID: ProviderID.make(prevEntry.providerID), + reason: classified.reason, + }) + + deps.sessionFallbackState.markOnFallback(input.sessionID) + yield* deps.bus.publish(FallbackUsed, { + sessionID: SessionID.make(input.sessionID), + modelID: ModelID.make(entry.modelID), + providerID: ProviderID.make(entry.providerID), + }) + + const providerInfo = yield* deps.provider + .getProvider(ProviderID.make(entry.providerID)) + .pipe(Effect.option) + const providerName = Option.isSome(providerInfo) ? providerInfo.value.name : entry.providerID + const reason = truncateReason(classified.reason) + const notice = `~> Switching to ${model.name} (${providerName})${reason ? ` — ${reason}` : ""}` + + const fallbackResult = yield* deps.call(model, entry.providerID, entry.modelID) + const fallbackStream = toStream(fallbackResult) + // Inject text-end for the in-progress text part before the notice + // so the previous (errored) text part is cleanly closed in the UI. + return Stream.concat(Stream.fromIterable(noticeChunks(notice, FALLBACK_NOTICE_ID)), fallbackStream) + }), + ), + ), + ) + } + + // Build the chain: primary first, then fallbacks in order. + const chain: Array = [primary, ...fallbacks] + + // Determine starting entry: first chain element that is not on cooldown. + let startIdx = 0 + for (let i = 0; i < chain.length; i++) { + if (!(yield* cooldown.isCooledDown(chain[i].providerID, chain[i].modelID))) { + startIdx = i + break + } + // If every entry is on cooldown the loop falls through with startIdx === 0; + // we still try primary (after a bounded wait below) so user gets a clear + // error rather than an indefinite hang. + if (i === chain.length - 1) startIdx = 0 + } + + // If every entry is on cooldown, sleep up to WAIT_CAP_MS for the soonest + // entry to come back, then proceed with whichever model is now available. + if (startIdx === 0 && (yield* cooldown.isCooledDown(primary.providerID, primary.modelID))) { + let soonest = Infinity + for (const e of chain) { + const r = (yield* cooldown.remaining(e.providerID, e.modelID)) ?? Infinity + if (r < soonest) soonest = r + } + const sleepMs = Math.min(soonest, WAIT_CAP_MS) + if (Number.isFinite(sleepMs) && sleepMs > 0) { + deps.log.info("all models on cooldown, waiting", { sleepMs }) + yield* Effect.sleep(Duration.millis(sleepMs)) + } + // Re-evaluate startIdx after sleep. + for (let i = 0; i < chain.length; i++) { + if (!(yield* cooldown.isCooledDown(chain[i].providerID, chain[i].modelID))) { + startIdx = i + break + } + } + } + + // Resolve the starting model. If the chosen entry's model is somehow + // unresolvable, skip ahead to the next resolvable one. + let resolvedIdx = startIdx + let resolvedModel: Option.Option = Option.none() + for (let i = startIdx; i < chain.length; i++) { + const e = chain[i] + const candidate = yield* deps.provider + .getModel(ProviderID.make(e.providerID), ModelID.make(e.modelID)) + .pipe(Effect.option) + if (Option.isSome(candidate)) { + resolvedIdx = i + resolvedModel = candidate + break + } + } + if (Option.isNone(resolvedModel)) { + deps.log.warn("no models resolvable, attempting primary as last resort") + cooldown.clear(primary.providerID, primary.modelID) + const primaryResult = yield* deps.call(input.model, primary.providerID, primary.modelID) + return toStream(primaryResult) + } + + const startEntry = chain[resolvedIdx] + const startModel = resolvedIdx === 0 ? input.model : resolvedModel.value + const startResult = yield* deps.call(startModel, startEntry.providerID, startEntry.modelID) + let stream: Stream.Stream = toStream(startResult) + + if (resolvedIdx === 0) { + // We are starting on primary. If we were previously on a fallback in + // this session, surface a resume notice and clear the flag. + if (wasOnFallback) { + const providerInfo = yield* deps.provider.getProvider(ProviderID.make(primary.providerID)).pipe(Effect.option) + const providerName = Option.isSome(providerInfo) ? providerInfo.value.name : primary.providerID + const notice = `~> Switched back to ${input.model.name} (${providerName})` + stream = Stream.concat(Stream.fromIterable(noticeChunks(notice, FALLBACK_RESUME_ID)), stream) + deps.sessionFallbackState.clear(input.sessionID) + // Publish FallbackUsed pointing back to primary so the processor + // updates the assistant message model field. + yield* deps.bus.publish(FallbackUsed, { + sessionID: SessionID.make(input.sessionID), + modelID: ModelID.make(primary.modelID), + providerID: ProviderID.make(primary.providerID), + }) + } + } else { + // Cold-start on a fallback. Mark session as on fallback and emit + // FallbackUsed so the assistant message picks up the right model. + deps.sessionFallbackState.markOnFallback(input.sessionID) + yield* deps.bus.publish(FallbackUsed, { + sessionID: SessionID.make(input.sessionID), + modelID: ModelID.make(startEntry.modelID), + providerID: ProviderID.make(startEntry.providerID), + }) + // Show a muted "using" notice so the user sees why we didn't start on + // primary. No FallbackTriggered toast: this is a cooldown-recovery + // start, not a fresh failure. + const providerInfo = yield* deps.provider + .getProvider(ProviderID.make(startEntry.providerID)) + .pipe(Effect.option) + const providerName = Option.isSome(providerInfo) ? providerInfo.value.name : startEntry.providerID + const notice = `~> Using ${startModel.name} (${providerName}) while ${input.model.name} is cooling down` + stream = Stream.concat(Stream.fromIterable(noticeChunks(notice, FALLBACK_USING_ID)), stream) + } + + // Wrap with chainFallback for every entry after the start. + for (let i = resolvedIdx + 1; i < chain.length; i++) { + stream = chainFallback(stream, chain[i - 1], chain[i]) + } + + return stream + }) +} diff --git a/packages/opencode/src/session/llm.ts b/packages/opencode/src/session/llm.ts index 0cf3a2398f9b..4a3afd685a55 100644 --- a/packages/opencode/src/session/llm.ts +++ b/packages/opencode/src/session/llm.ts @@ -1,6 +1,6 @@ import { Provider } from "@/provider/provider" import * as Log from "@opencode-ai/core/util/log" -import { Context, Effect, Layer, Record } from "effect" +import { Cause, Context, Effect, Layer, Record } from "effect" import * as Stream from "effect/Stream" import { streamText, wrapLanguageModel, type ModelMessage, type Tool, tool, jsonSchema } from "ai" import { mergeDeep } from "remeda" @@ -9,7 +9,7 @@ import { ProviderTransform } from "@/provider/transform" import { Config } from "@/config/config" import { InstanceState } from "@/effect/instance-state" import type { Agent } from "@/agent/agent" -import type { MessageV2 } from "./message-v2" +import { MessageV2 } from "./message-v2" import { Plugin } from "@/plugin" import { SystemPrompt } from "./system" import { Permission } from "@/permission" @@ -23,6 +23,15 @@ import { EffectBridge } from "@/effect/bridge" import { RuntimeFlags } from "@/effect/runtime-flags" import * as Option from "effect/Option" import * as OtelTracer from "@effect/opentelemetry/Tracer" +import { + CooldownManager, + SessionFallbackState, + isRetryable, + withFallback, + type ClassifiedError, +} from "./fallback" +import { ProviderID } from "@/provider/schema" +import { SessionRetry } from "./retry" const log = Log.create({ service: "llm" }) export const OUTPUT_TOKEN_MAX = ProviderTransform.OUTPUT_TOKEN_MAX @@ -45,6 +54,7 @@ export type StreamInput = { tools: Record retries?: number toolChoice?: "auto" | "required" | "none" + fallbacks?: Array<{ providerID: string; modelID: string }> } export type StreamRequest = StreamInput & { @@ -62,7 +72,7 @@ export class Service extends Context.Service()("@opencode/LL const live: Layer.Layer< Service, never, - Auth.Service | Config.Service | Provider.Service | Plugin.Service | Permission.Service | RuntimeFlags.Service + Auth.Service | Config.Service | Provider.Service | Plugin.Service | Permission.Service | Bus.Service | RuntimeFlags.Service > = Layer.effect( Service, Effect.gen(function* () { @@ -71,7 +81,44 @@ const live: Layer.Layer< const provider = yield* Provider.Service const plugin = yield* Plugin.Service const perm = yield* Permission.Service + const bus = yield* Bus.Service const flags = yield* RuntimeFlags.Service + const cooldown = new CooldownManager() + const sessionFallbackState = new SessionFallbackState() + + const classifyError = ( + cause: Cause.Cause, + prevProviderID: string, + _prevModelID: string, + _cooldownSeconds: number, + ): ClassifiedError | null => { + const error = Cause.squash(cause) + let err = MessageV2.fromError(error, { providerID: ProviderID.make(prevProviderID) }) + if ( + !MessageV2.APIError.isInstance(err) && + !MessageV2.ContextOverflowError.isInstance(err) && + !MessageV2.AbortedError.isInstance(err) + ) { + err = new MessageV2.APIError({ + message: + typeof error === "string" + ? error + : error instanceof Error + ? error.message + : "Unknown stream error", + isRetryable: true, + }).toObject() + } + if (MessageV2.AbortedError.isInstance(err)) return null + if (!isRetryable(err, prevProviderID)) return null + const retryInfo = SessionRetry.retryable(err as unknown as SessionRetry.Err, prevProviderID) + return { + error: err, + isRetryable: true, + retryInfo, + reason: retryInfo?.message ?? "error", + } + } const run = Effect.fn("LLM.run")(function* (input: StreamRequest) { const l = log @@ -87,321 +134,341 @@ const live: Layer.Layer< providerID: input.model.providerID, }) - const [language, cfg, item, info] = yield* Effect.all( - [ - provider.getLanguage(input.model), - config.get(), - provider.getProvider(input.model.providerID), - auth.get(input.model.providerID), - ], - { concurrency: "unbounded" }, - ) - - // TODO: move this to a proper hook - const isOpenaiOauth = item.id === "openai" && info?.type === "oauth" - - const system: string[] = [] - system.push( - [ - // use agent prompt otherwise provider prompt - ...(input.agent.prompt ? [input.agent.prompt] : SystemPrompt.provider(input.model)), - // any custom prompt passed into this call - ...input.system, - // any custom prompt from last user message - ...(input.user.system ? [input.user.system] : []), - ] - .filter((x) => x) - .join("\n"), - ) - - const header = system[0] - yield* plugin.trigger( - "experimental.chat.system.transform", - { sessionID: input.sessionID, model: input.model }, - { system }, - ) - // rejoin to maintain 2-part structure for caching if header unchanged - if (system.length > 2 && system[0] === header) { - const rest = system.slice(1) - system.length = 0 - system.push(header, rest.join("\n")) - } - - const variant = - !input.small && input.model.variants && input.user.model.variant - ? input.model.variants[input.user.model.variant] - : {} - const base = input.small - ? ProviderTransform.smallOptions(input.model) - : ProviderTransform.options({ - model: input.model, - sessionID: input.sessionID, - providerOptions: item.options, - }) - const options = mergeOptions(mergeOptions(mergeOptions(base, input.model.options), input.agent.options), variant) - if (isOpenaiOauth) { - options.instructions = system.join("\n") - } - - const isWorkflow = language instanceof GitLabWorkflowLanguageModel - const messages = isOpenaiOauth - ? input.messages - : isWorkflow - ? input.messages - : [ - ...system.map( - (x): ModelMessage => ({ - role: "system", - content: x, - }), - ), - ...input.messages, - ] - - const params = yield* plugin.trigger( - "chat.params", - { - sessionID: input.sessionID, - agent: input.agent.name, - model: input.model, - provider: item, - message: input.user, - }, - { - temperature: input.model.capabilities.temperature - ? (input.agent.temperature ?? ProviderTransform.temperature(input.model)) - : undefined, - topP: input.agent.topP ?? ProviderTransform.topP(input.model), - topK: ProviderTransform.topK(input.model), - maxOutputTokens: ProviderTransform.maxOutputTokens(input.model, flags.outputTokenMax), - options, - }, - ) - - const { headers } = yield* plugin.trigger( - "chat.headers", - { - sessionID: input.sessionID, - agent: input.agent.name, - model: input.model, - provider: item, - message: input.user, - }, - { - headers: {}, - }, - ) - + const cfg = yield* config.get() const tools = resolveTools(input) - // GitHub Copilot may require the tools parameter when message history contains - // tool calls but no tools are active (e.g. compaction). Inject a stub tool that - // is never meant to be invoked. LiteLLM-backed providers are excluded. - if ( - input.model.providerID.includes("github-copilot") && - Object.keys(tools).length === 0 && - hasToolCalls(input.messages) - ) { - tools["_noop"] = tool({ - description: "Do not call this tool. It exists only for API compatibility and must never be invoked.", - inputSchema: jsonSchema({ - type: "object", - properties: { - reason: { type: "string", description: "Unused" }, - }, - }), - execute: async () => ({ output: "", title: "", metadata: {} }), - }) - } - const sortedTools = Object.fromEntries(Object.entries(tools).toSorted(([a], [b]) => a.localeCompare(b))) - - // Wire up toolExecutor for DWS workflow models so that tool calls - // from the workflow service are executed via opencode's tool system - // and results sent back over the WebSocket. - if (language instanceof GitLabWorkflowLanguageModel) { - const workflowModel = language as GitLabWorkflowLanguageModel & { - sessionID?: string - sessionPreapprovedTools?: string[] - approvalHandler?: (approvalTools: { name: string; args: string }[]) => Promise<{ approved: boolean }> - } - workflowModel.sessionID = input.sessionID - workflowModel.systemPrompt = system.join("\n") - workflowModel.toolExecutor = async (toolName, argsJson, _requestID) => { - const t = sortedTools[toolName] - if (!t || !t.execute) { - return { result: "", error: `Unknown tool: ${toolName}` } + const tryProvider = (model: Provider.Model, providerID: string, _modelID: string) => + Effect.gen(function* () { + const language = yield* provider.getLanguage(model) + const [item, info] = yield* Effect.all( + [provider.getProvider(providerID), auth.get(providerID)], + { concurrency: "unbounded" }, + ) + + // TODO: move this to a proper hook + const isOpenaiOauth = item.id === "openai" && info?.type === "oauth" + + const system: string[] = [] + system.push( + [ + // use agent prompt otherwise provider prompt + ...(input.agent.prompt ? [input.agent.prompt] : SystemPrompt.provider(model)), + // any custom prompt passed into this call + ...input.system, + // any custom prompt from last user message + ...(input.user.system ? [input.user.system] : []), + ] + .filter((x) => x) + .join("\n"), + ) + + const header = system[0] + yield* plugin.trigger( + "experimental.chat.system.transform", + { sessionID: input.sessionID, model }, + { system }, + ) + // rejoin to maintain 2-part structure for caching if header unchanged + if (system.length > 2 && system[0] === header) { + const rest = system.slice(1) + system.length = 0 + system.push(header, rest.join("\n")) } - try { - const result = await t.execute!(JSON.parse(argsJson), { - toolCallId: _requestID, - messages: input.messages, - abortSignal: input.abort, + + // GitHub Copilot may require the tools parameter when message history contains + // tool calls but no tools are active (e.g. compaction). Inject a stub tool that + // is never meant to be invoked. + if ( + model.providerID.includes("github-copilot") && + Object.keys(tools).length === 0 && + hasToolCalls(input.messages) + ) { + tools["_noop"] = tool({ + description: "Do not call this tool. It exists only for API compatibility and must never be invoked.", + inputSchema: jsonSchema({ + type: "object", + properties: { + reason: { type: "string", description: "Unused" }, + }, + }), + execute: async () => ({ output: "", title: "", metadata: {} }), }) - const output = typeof result === "string" ? result : (result?.output ?? JSON.stringify(result)) - return { - result: output, - metadata: typeof result === "object" ? result?.metadata : undefined, - title: typeof result === "object" ? result?.title : undefined, - } - } catch (e: any) { - return { result: "", error: e.message ?? String(e) } } - } - - const ruleset = Permission.merge(input.agent.permission ?? [], input.permission ?? []) - workflowModel.sessionPreapprovedTools = Object.keys(sortedTools).filter((name) => { - const match = ruleset.findLast((rule) => Wildcard.match(name, rule.permission)) - return !match || match.action !== "ask" - }) + const sortedTools = Object.fromEntries(Object.entries(tools).toSorted(([a], [b]) => a.localeCompare(b))) - const bridge = yield* EffectBridge.make() - const approvedToolsForSession = new Set() - workflowModel.approvalHandler = bridge.bind(async (approvalTools) => { - const uniqueNames = [...new Set(approvalTools.map((t: { name: string }) => t.name))] as string[] - // Auto-approve tools that were already approved in this session - // (prevents infinite approval loops for server-side MCP tools) - if (uniqueNames.every((name) => approvedToolsForSession.has(name))) { - return { approved: true } - } + if (language instanceof GitLabWorkflowLanguageModel) { + const workflowModel = language as GitLabWorkflowLanguageModel & { + sessionID?: string + sessionPreapprovedTools?: string[] + approvalHandler?: (approvalTools: { name: string; args: string }[]) => Promise<{ approved: boolean }> + } + workflowModel.sessionID = input.sessionID + workflowModel.systemPrompt = system.join("\n") + workflowModel.toolExecutor = async (toolName, argsJson, _requestID) => { + const t = sortedTools[toolName] + if (!t || !t.execute) { + return { result: "", error: `Unknown tool: ${toolName}` } + } + try { + const result = await t.execute!(JSON.parse(argsJson), { + toolCallId: _requestID, + messages: input.messages, + abortSignal: input.abort, + }) + const output = typeof result === "string" ? result : (result?.output ?? JSON.stringify(result)) + return { + result: output, + metadata: typeof result === "object" ? result?.metadata : undefined, + title: typeof result === "object" ? result?.title : undefined, + } + } catch (e: any) { + return { result: "", error: e.message ?? String(e) } + } + } - const id = PermissionID.ascending() - let unsub: (() => void) | undefined - try { - unsub = Bus.subscribe(Permission.Event.Replied, (evt) => { - if (evt.properties.requestID === id) void evt.properties.reply + const ruleset = Permission.merge(input.agent.permission ?? [], input.permission ?? []) + workflowModel.sessionPreapprovedTools = Object.keys(sortedTools).filter((name) => { + const match = ruleset.findLast((rule) => Wildcard.match(name, rule.permission)) + return !match || match.action !== "ask" }) - const toolPatterns = approvalTools.map((t: { name: string; args: string }) => { + + const bridge = yield* EffectBridge.make() + const approvedToolsForSession = new Set() + workflowModel.approvalHandler = bridge.bind(async (approvalTools) => { + const uniqueNames = [...new Set(approvalTools.map((t: { name: string }) => t.name))] as string[] + if (uniqueNames.every((name) => approvedToolsForSession.has(name))) { + return { approved: true } + } + + const id = PermissionID.ascending() + let unsub: (() => void) | undefined try { - const parsed = JSON.parse(t.args) as Record - const title = (parsed?.title ?? parsed?.name ?? "") as string - return title ? `${t.name}: ${title}` : t.name + unsub = Bus.subscribe(Permission.Event.Replied, (evt) => { + if (evt.properties.requestID === id) void evt.properties.reply + }) + const toolPatterns = approvalTools.map((t: { name: string; args: string }) => { + try { + const parsed = JSON.parse(t.args) as Record + const title = (parsed?.title ?? parsed?.name ?? "") as string + return title ? `${t.name}: ${title}` : t.name + } catch { + return t.name + } + }) + const uniquePatterns = [...new Set(toolPatterns)] as string[] + await bridge.promise( + perm.ask({ + id, + sessionID: SessionID.make(input.sessionID), + permission: "workflow_tool_approval", + patterns: uniquePatterns, + metadata: { tools: approvalTools }, + always: uniquePatterns, + ruleset: [], + }), + ) + for (const name of uniqueNames) approvedToolsForSession.add(name) + workflowModel.sessionPreapprovedTools = [ + ...(workflowModel.sessionPreapprovedTools ?? []), + ...uniqueNames, + ] + return { approved: true } } catch { - return t.name + return { approved: false } + } finally { + unsub?.() } }) - const uniquePatterns = [...new Set(toolPatterns)] as string[] - await bridge.promise( - perm.ask({ - id, - sessionID: SessionID.make(input.sessionID), - permission: "workflow_tool_approval", - patterns: uniquePatterns, - metadata: { tools: approvalTools }, - always: uniquePatterns, - ruleset: [], - }), - ) - for (const name of uniqueNames) approvedToolsForSession.add(name) - workflowModel.sessionPreapprovedTools = [...(workflowModel.sessionPreapprovedTools ?? []), ...uniqueNames] - return { approved: true } - } catch { - return { approved: false } - } finally { - unsub?.() } - }) - } - const tracer = cfg.experimental?.openTelemetry - ? Option.getOrUndefined(yield* Effect.serviceOption(OtelTracer.OtelTracer)) - : undefined - const telemetryTracer = tracer - ? new Proxy(tracer, { - get(target, prop, receiver) { - if (prop !== "startSpan") return Reflect.get(target, prop, receiver) - return (...args: Parameters) => { - const span = target.startSpan(...args) - span.setAttribute("session.id", input.sessionID) - return span - } - }, - }) - : undefined + const variant = + !input.small && model.variants && input.user.model.variant + ? model.variants[input.user.model.variant] + : {} + const base = input.small + ? ProviderTransform.smallOptions(model) + : ProviderTransform.options({ + model, + sessionID: input.sessionID, + providerOptions: item.options, + }) + const options = mergeOptions(mergeOptions(mergeOptions(base, model.options), input.agent.options), variant) + if (isOpenaiOauth) { + options.instructions = system.join("\n") + } - const opencodeProjectID = input.model.providerID.startsWith("opencode") - ? (yield* InstanceState.context).project.id - : undefined + const isWorkflow = language instanceof GitLabWorkflowLanguageModel + const messages = isOpenaiOauth + ? input.messages + : isWorkflow + ? input.messages + : [ + ...system.map( + (x): ModelMessage => ({ + role: "system", + content: x, + }), + ), + ...input.messages, + ] + + const params = yield* plugin.trigger( + "chat.params", + { + sessionID: input.sessionID, + agent: input.agent.name, + model, + provider: item, + message: input.user, + }, + { + temperature: model.capabilities.temperature + ? (input.agent.temperature ?? ProviderTransform.temperature(model)) + : undefined, + topP: input.agent.topP ?? ProviderTransform.topP(model), + topK: ProviderTransform.topK(model), + maxOutputTokens: ProviderTransform.maxOutputTokens(model, flags.outputTokenMax), + options, + }, + ) - return streamText({ - onError(error) { - l.error("stream error", { - error, - }) - }, - async experimental_repairToolCall(failed) { - const lower = failed.toolCall.toolName.toLowerCase() - if (lower !== failed.toolCall.toolName && sortedTools[lower]) { - l.info("repairing tool call", { - tool: failed.toolCall.toolName, - repaired: lower, - }) - return { - ...failed.toolCall, - toolName: lower, - } - } - return { - ...failed.toolCall, - input: JSON.stringify({ - tool: failed.toolCall.toolName, - error: failed.error.message, - }), - toolName: "invalid", - } - }, - temperature: params.temperature, - topP: params.topP, - topK: params.topK, - providerOptions: ProviderTransform.providerOptions(input.model, params.options), - activeTools: Object.keys(sortedTools).filter((x) => x !== "invalid"), - tools: sortedTools, - toolChoice: input.toolChoice, - maxOutputTokens: params.maxOutputTokens, - abortSignal: input.abort, - headers: { - ...(input.model.providerID.startsWith("opencode") - ? { - "x-opencode-project": opencodeProjectID, - "x-opencode-session": input.sessionID, - "x-opencode-request": input.user.id, - "x-opencode-client": flags.client, - "User-Agent": `opencode/${InstallationVersion}`, - } - : { - "x-session-affinity": input.sessionID, - ...(input.parentSessionID ? { "x-parent-session-id": input.parentSessionID } : {}), - "User-Agent": `opencode/${InstallationVersion}`, - }), - ...input.model.headers, - ...headers, - }, - maxRetries: input.retries ?? 0, - messages, - model: wrapLanguageModel({ - model: language, - middleware: [ + const { headers } = yield* plugin.trigger( + "chat.headers", { - specificationVersion: "v3" as const, - async transformParams(args) { - if (args.type === "stream") { - // @ts-expect-error - args.params.prompt = ProviderTransform.message(args.params.prompt, input.model, options) + sessionID: input.sessionID, + agent: input.agent.name, + model, + provider: item, + message: input.user, + }, + { + headers: {}, + }, + ) + + const tracer = cfg.experimental?.openTelemetry + ? Option.getOrUndefined(yield* Effect.serviceOption(OtelTracer.OtelTracer)) + : undefined + const telemetryTracer = tracer + ? new Proxy(tracer, { + get(target, prop, receiver) { + if (prop !== "startSpan") return Reflect.get(target, prop, receiver) + return (...args: Parameters) => { + const span = target.startSpan(...args) + span.setAttribute("session.id", input.sessionID) + return span + } + }, + }) + : undefined + + const opencodeProjectID = model.providerID.startsWith("opencode") + ? (yield* InstanceState.context).project.id + : undefined + + return streamText({ + onError(error) { + l.error("stream error", { + error, + }) + }, + async experimental_repairToolCall(failed) { + const lower = failed.toolCall.toolName.toLowerCase() + if (lower !== failed.toolCall.toolName && sortedTools[lower]) { + l.info("repairing tool call", { + tool: failed.toolCall.toolName, + repaired: lower, + }) + return { + ...failed.toolCall, + toolName: lower, } - return args.params + } + return { + ...failed.toolCall, + input: JSON.stringify({ + tool: failed.toolCall.toolName, + error: failed.error.message, + }), + toolName: "invalid", + } + }, + temperature: params.temperature, + topP: params.topP, + topK: params.topK, + providerOptions: ProviderTransform.providerOptions(model, params.options), + activeTools: Object.keys(sortedTools).filter((x) => x !== "invalid"), + tools: sortedTools, + toolChoice: input.toolChoice, + maxOutputTokens: params.maxOutputTokens, + abortSignal: input.abort, + headers: { + ...(model.providerID.startsWith("opencode") + ? { + "x-opencode-project": opencodeProjectID, + "x-opencode-session": input.sessionID, + "x-opencode-request": input.user.id, + "x-opencode-client": flags.client, + "User-Agent": `opencode/${InstallationVersion}`, + } + : { + "x-session-affinity": input.sessionID, + ...(input.parentSessionID ? { "x-parent-session-id": input.parentSessionID } : {}), + "User-Agent": `opencode/${InstallationVersion}`, + }), + ...model.headers, + ...headers, + }, + maxRetries: input.retries ?? 0, + messages, + model: wrapLanguageModel({ + model: language, + middleware: [ + { + specificationVersion: "v3" as const, + async transformParams(args) { + if (args.type === "stream") { + // @ts-expect-error + args.params.prompt = ProviderTransform.message(args.params.prompt, model, options) + } + return args.params + }, + }, + ], + }), + experimental_telemetry: { + isEnabled: cfg.experimental?.openTelemetry, + functionId: "session.llm", + tracer: telemetryTracer, + metadata: { + userId: cfg.username ?? "unknown", + sessionId: input.sessionID, }, }, - ], - }), - experimental_telemetry: { - isEnabled: cfg.experimental?.openTelemetry, - functionId: "session.llm", - tracer: telemetryTracer, - metadata: { - userId: cfg.username ?? "unknown", - sessionId: input.sessionID, + }) + }) + + return yield* withFallback( + { + sessionID: input.sessionID, + model: input.model as Provider.Model & { providerID: string; id: string }, + fallbacks: input.fallbacks, + abort: input.abort, + }, + { + provider: { + getModel: (p, m) => provider.getModel(p, m) as Effect.Effect, + getProvider: (p) => provider.getProvider(p) as Effect.Effect, }, + bus, + config: { get: () => config.get() as Effect.Effect<{ cooldown_seconds?: number }, unknown> }, + classifyError, + call: tryProvider, + log: l, + cooldown, + sessionFallbackState, }, - }) + ) }) const stream: Interface["stream"] = (input) => @@ -412,10 +479,8 @@ const live: Layer.Layer< Effect.sync(() => new AbortController()), (ctrl) => Effect.sync(() => ctrl.abort()), ) - const result = yield* run({ ...input, abort: ctrl.signal }) - - return Stream.fromAsyncIterable(result.fullStream, (e) => (e instanceof Error ? e : new Error(String(e)))) + return result as unknown as Stream.Stream }), ), ) @@ -433,6 +498,7 @@ export const defaultLayer = Layer.suspend(() => Layer.provide(Provider.defaultLayer), Layer.provide(Plugin.defaultLayer), Layer.provide(RuntimeFlags.defaultLayer), + Layer.provide(Bus.defaultLayer), ), ) diff --git a/packages/opencode/src/session/message-v2.ts b/packages/opencode/src/session/message-v2.ts index d3d6a1dfcc10..f6efc428dd8f 100644 --- a/packages/opencode/src/session/message-v2.ts +++ b/packages/opencode/src/session/message-v2.ts @@ -100,6 +100,9 @@ export const TextPart = Schema.Struct({ text: Schema.String, synthetic: Schema.optional(Schema.Boolean), ignored: Schema.optional(Schema.Boolean), + fallbackNotice: Schema.optional( + Schema.Union([Schema.Literal("using"), Schema.Literal("switch"), Schema.Literal("resume")]), + ), time: Schema.optional( Schema.Struct({ start: NonNegativeInt, diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index aac893075f00..ed539f98952f 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -14,9 +14,11 @@ import { isOverflow } from "./overflow" import { PartID } from "./schema" import type { SessionID } from "./schema" import { SessionRetry } from "./retry" +import { FALLBACK_NOTICE_ID, FALLBACK_RESUME_ID, FALLBACK_USING_ID, FallbackUsed } from "./fallback" +import { ModelID, ProviderID } from "@/provider/schema" import { SessionStatus } from "./status" import { SessionSummary } from "./summary" -import type { Provider } from "@/provider/provider" +import { Provider } from "@/provider/provider" import { Question } from "@/question" import { errorMessage } from "@/util/error" import * as Log from "@opencode-ai/core/util/log" @@ -102,6 +104,7 @@ export const layer = Layer.effect( const image = yield* Image.Service const events = yield* EventV2Bridge.Service const flags = yield* RuntimeFlags.Service + const providerSvc = yield* Provider.Service const create = Effect.fn("SessionProcessor.create")(function* (input: Input) { // Pre-capture snapshot before the LLM stream starts. The AI SDK @@ -553,8 +556,16 @@ export const layer = Layer.effect( return } - case "text-start": - if (!ctx.assistantMessage.summary) { + case "text-start": { + const fallbackNotice = + value.id === FALLBACK_USING_ID + ? ("using" as const) + : value.id === FALLBACK_NOTICE_ID + ? ("switch" as const) + : value.id === FALLBACK_RESUME_ID + ? ("resume" as const) + : undefined + if (!ctx.assistantMessage.summary && !fallbackNotice) { // TODO(v2): Temporary dual-write while migrating session messages to v2 events. if (flags.experimentalEventSystem) { yield* events.publish(SessionEvent.Text.Started, { @@ -571,9 +582,13 @@ export const layer = Layer.effect( text: "", time: { start: Date.now() }, metadata: value.providerMetadata, + ...(fallbackNotice + ? { ignored: true as const, synthetic: true as const, fallbackNotice } + : {}), } yield* session.updatePart(ctx.currentText) return + } case "text-delta": if (!ctx.currentText) return @@ -723,6 +738,40 @@ export const layer = Layer.effect( ctx.needsCompaction = false ctx.shouldBreak = (yield* config.get()).experimental?.continue_loop_on_deny !== true + // Subscribe to fallback events scoped to this process call. When the + // LLM layer switches to a fallback model, update the in-flight + // assistant message and publish Model.Updated so the projector + + // sync layers pick up the new model. The subscription is detached + // via Effect.ensuring after the stream finishes (success or failure). + const unsubscribeFallback = yield* bus.subscribeCallback(FallbackUsed, (evt) => + Effect.runFork( + Effect.gen(function* () { + if (evt.properties.sessionID !== ctx.sessionID) return + ctx.assistantMessage.modelID = ModelID.make(evt.properties.modelID) + ctx.assistantMessage.providerID = ProviderID.make(evt.properties.providerID) + // Swap ctx.model so subsequent finish-step events bill at the + // new provider's pricing and the overflow check uses the new + // context limit. Failures are non-fatal — keep the old model. + const swapped = yield* providerSvc + .getModel(ProviderID.make(evt.properties.providerID), ModelID.make(evt.properties.modelID)) + .pipe(Effect.option) + if (swapped._tag === "Some") ctx.model = swapped.value + yield* session.updateMessage(ctx.assistantMessage) + if (flags.experimentalEventSystem) { + yield* events.publish(SessionEvent.Model.Updated, { + sessionID: ctx.sessionID, + model: { + id: ModelV2.ID.make(evt.properties.modelID), + providerID: ProviderV2.ID.make(evt.properties.providerID), + variant: ModelV2.VariantID.make(ctx.assistantMessage.variant ?? "default"), + }, + timestamp: DateTime.makeUnsafe(Date.now()), + }) + } + }), + ), + ) + return yield* Effect.gen(function* () { yield* Effect.gen(function* () { ctx.currentText = undefined @@ -780,6 +829,7 @@ export const layer = Layer.effect( ), Effect.catch(halt), Effect.ensuring(cleanup()), + Effect.ensuring(Effect.sync(() => unsubscribeFallback())), ) if (ctx.needsCompaction) return "compact" diff --git a/packages/opencode/src/session/projectors-next.ts b/packages/opencode/src/session/projectors-next.ts index ae5b9c5d2fb9..9f41a476f3e5 100644 --- a/packages/opencode/src/session/projectors-next.ts +++ b/packages/opencode/src/session/projectors-next.ts @@ -140,6 +140,26 @@ export default [ .run() update(db, { id: SessionMessage.ID.make(event.id), type: "session.next.model.switched", data }) }), + SyncEvent.project(EventV2Bridge.toSyncDefinition(SessionEvent.Model.Updated), (db, data, _event) => { + const row = db + .select() + .from(SessionMessageTable) + .where(and(eq(SessionMessageTable.session_id, data.sessionID), eq(SessionMessageTable.type, "assistant"))) + .orderBy(desc(SessionMessageTable.id)) + .limit(1) + .get() + if (row) { + const message = decodeMessage({ ...row.data, id: row.id, type: row.type }) + if (message.type === "assistant") { + message.model = data.model + const { id: _id, type: _type, ...updateData } = message + db.update(SessionMessageTable) + .set({ data: encodeMessageData(updateData) }) + .where(eq(SessionMessageTable.id, row.id)) + .run() + } + } + }), SyncEvent.project(EventV2Bridge.toSyncDefinition(SessionEvent.Prompted), (db, data, event) => { update(db, { id: SessionMessage.ID.make(event.id), type: "session.next.prompted", data }) }), diff --git a/packages/opencode/src/session/prompt.ts b/packages/opencode/src/session/prompt.ts index ba9a4d6f1a0f..973e2afb5909 100644 --- a/packages/opencode/src/session/prompt.ts +++ b/packages/opencode/src/session/prompt.ts @@ -352,6 +352,12 @@ export const layer = Layer.effect( const msgs = onlySubtasks ? [{ role: "user" as const, content: subtasks.map((p) => p.prompt).join("\n") }] : yield* MessageV2.toModelMessagesEffect(context, mdl) + // Title fallback: only agent-specific. We deliberately do NOT inherit + // cfg.fallbacks here — that list is sized for the main chat model and + // typically contains heavy/expensive models, which is wrong for the + // small title-generation pass. Users wanting title fallback configure + // it explicitly on the title agent. + const titleFallbacks = ag.fallbacks?.length ? ag.fallbacks : undefined const text = yield* llm .stream({ agent: ag, @@ -362,6 +368,7 @@ export const layer = Layer.effect( model: mdl, sessionID: input.session.id, retries: 2, + fallbacks: titleFallbacks, messages: [{ role: "user", content: "Generate a title for this conversation:\n" }, ...msgs], }) .pipe( @@ -1818,6 +1825,13 @@ NOTE: At any point in time through this workflow you should feel free to ask the const system = [...env, ...instructions, ...(skills ? [skills] : [])] const format = lastUser.format ?? { type: "text" as const } if (format.type === "json_schema") system.push(STRUCTURED_OUTPUT_SYSTEM_PROMPT) + const cfgForFallbacks = yield* config.get() + const fallbacks = + agent.fallbacks?.length + ? agent.fallbacks + : cfgForFallbacks.fallbacks?.length + ? cfgForFallbacks.fallbacks.map((f: string) => Provider.parseModel(f)) + : undefined const result = yield* handle.process({ user: lastUser, agent, @@ -1829,6 +1843,7 @@ NOTE: At any point in time through this workflow you should feel free to ask the tools, model, toolChoice: format.type === "json_schema" ? "required" : undefined, + fallbacks, }) if (structured !== undefined) { diff --git a/packages/opencode/src/session/retry.ts b/packages/opencode/src/session/retry.ts index 463bc27a95db..51cb333353fb 100644 --- a/packages/opencode/src/session/retry.ts +++ b/packages/opencode/src/session/retry.ts @@ -12,6 +12,7 @@ export type RetryReason = "free_tier_limit" | "account_rate_limit" | (string & { export type Retryable = { message: string + quotaLimit?: boolean action?: { reason: RetryReason provider: string @@ -118,7 +119,13 @@ export function retryable(error: Err, provider: string) { }, } } - return { message: error.data.message.includes("Overloaded") ? "Provider is overloaded" : error.data.message } + const lowerMsg = error.data.message.toLowerCase() + const isQuotaLimit = + lowerMsg.includes("weekly") || lowerMsg.includes("monthly") || lowerMsg.includes("exceeded your") + return { + message: error.data.message.includes("Overloaded") ? "Provider is overloaded" : error.data.message, + ...(isQuotaLimit ? { quotaLimit: true } : {}), + } } // Check for rate limit patterns in plain text error messages From 676dd749a7951c0454349e0e00e5e5230c8de500 Mon Sep 17 00:00:00 2001 From: minicx Date: Sat, 16 May 2026 22:53:48 +0300 Subject: [PATCH 2/4] test(fallback): cover bus-driven model swap Signed-off-by: minicx --- .../opencode/test/session/fallback.test.ts | 402 ++++++++++++++++++ .../test/session/processor-effect.test.ts | 121 ++++++ 2 files changed, 523 insertions(+) create mode 100644 packages/opencode/test/session/fallback.test.ts diff --git a/packages/opencode/test/session/fallback.test.ts b/packages/opencode/test/session/fallback.test.ts new file mode 100644 index 000000000000..df8df5404d88 --- /dev/null +++ b/packages/opencode/test/session/fallback.test.ts @@ -0,0 +1,402 @@ +import { describe, expect, test } from "bun:test" +import { Cause, Effect, Layer, Stream } from "effect" +import * as TestClock from "effect/testing/TestClock" +import { + CooldownManager, + DEFAULT_COOLDOWN_SECONDS, + FALLBACK_NOTICE_ID, + FALLBACK_RESUME_ID, + FALLBACK_USING_ID, + FallbackTriggered, + FallbackUsed, + NOTICE_REASON_MAX_LENGTH, + QUOTA_COOLDOWN_MS, + SessionFallbackState, + WAIT_CAP_MS, + withFallback, + type ClassifiedError, + type FallbackDeps, + type FallbackInput, + type ProviderStreamResult, + type StreamChunk, +} from "../../src/session/fallback" + +// --------------------------------------------------------------------------- +// CooldownManager +// --------------------------------------------------------------------------- + +describe("CooldownManager", () => { + test("isCooledDown returns false when no cooldown has been set", () => + Effect.runPromise( + Effect.gen(function* () { + const cm = new CooldownManager() + expect(yield* cm.isCooledDown("a", "x")).toBe(false) + }), + )) + + test("isCooledDown returns true after put", () => + Effect.runPromise( + Effect.gen(function* () { + const cm = new CooldownManager() + yield* cm.put("a", "x", 60_000) + expect(yield* cm.isCooledDown("a", "x")).toBe(true) + }), + )) + + test("isCooledDown returns false after expiry and cleans up the entry", () => + Effect.runPromise( + Effect.gen(function* () { + const cm = new CooldownManager() + yield* cm.put("a", "x", 100) + yield* TestClock.adjust("150 millis") + expect(yield* cm.isCooledDown("a", "x")).toBe(false) + expect(yield* cm.remaining("a", "x")).toBeUndefined() + }).pipe(Effect.provide(Layer.empty)), + )) + + test("clear removes an active cooldown", () => + Effect.runPromise( + Effect.gen(function* () { + const cm = new CooldownManager() + yield* cm.put("a", "x", 60_000) + cm.clear("a", "x") + expect(yield* cm.isCooledDown("a", "x")).toBe(false) + }), + )) + + test("put overwrites existing expiry with the new shorter value", () => + Effect.runPromise( + Effect.gen(function* () { + const cm = new CooldownManager() + yield* cm.put("a", "x", 60_000) + yield* cm.put("a", "x", 100) + yield* TestClock.adjust("150 millis") + expect(yield* cm.isCooledDown("a", "x")).toBe(false) + }).pipe(Effect.provide(Layer.empty)), + )) + + test("remaining returns ms left when active", () => + Effect.runPromise( + Effect.gen(function* () { + const cm = new CooldownManager() + yield* cm.put("a", "x", 60_000) + const left = yield* cm.remaining("a", "x") + expect(left).toBeGreaterThan(59_000) + expect(left!).toBeLessThanOrEqual(60_000) + }), + )) +}) + +// --------------------------------------------------------------------------- +// SessionFallbackState +// --------------------------------------------------------------------------- + +describe("SessionFallbackState", () => { + test("isOnFallback is false by default", () => { + const s = new SessionFallbackState() + expect(s.isOnFallback("sess")).toBe(false) + }) + + test("markOnFallback then isOnFallback returns true", () => { + const s = new SessionFallbackState() + s.markOnFallback("sess") + expect(s.isOnFallback("sess")).toBe(true) + }) + + test("clear removes only the targeted session", () => { + const s = new SessionFallbackState() + s.markOnFallback("a") + s.markOnFallback("b") + s.clear("a") + expect(s.isOnFallback("a")).toBe(false) + expect(s.isOnFallback("b")).toBe(true) + }) +}) + +// --------------------------------------------------------------------------- +// Constants +// --------------------------------------------------------------------------- + +describe("fallback constants", () => { + test("notice ids are stable strings", () => { + expect(FALLBACK_NOTICE_ID).toBe("fallback-notice") + expect(FALLBACK_RESUME_ID).toBe("fallback-resume") + expect(FALLBACK_USING_ID).toBe("fallback-using") + }) + test("QUOTA_COOLDOWN_MS is 6 hours", () => expect(QUOTA_COOLDOWN_MS).toBe(6 * 60 * 60 * 1000)) + test("DEFAULT_COOLDOWN_SECONDS is 300", () => expect(DEFAULT_COOLDOWN_SECONDS).toBe(300)) + test("NOTICE_REASON_MAX_LENGTH is 40", () => expect(NOTICE_REASON_MAX_LENGTH).toBe(40)) + test("WAIT_CAP_MS is 30 seconds", () => expect(WAIT_CAP_MS).toBe(30_000)) +}) + +// --------------------------------------------------------------------------- +// withFallback — mock harness +// --------------------------------------------------------------------------- + +type PublishedEvent = { type: "triggered" | "used"; properties: Record } + +function makeStream(chunks: StreamChunk[]): ProviderStreamResult { + return { + fullStream: (async function* () { + for (const c of chunks) yield c + })(), + } +} + +function errorStream(message: string): ProviderStreamResult { + return { + fullStream: (async function* () { + yield { type: "error", error: new Error(message) } + })(), + } +} + +function makeDeps(overrides: Partial & { events?: PublishedEvent[] } = {}): FallbackDeps { + const events = overrides.events ?? [] + const baseModel = { name: "Mock", id: "mock", providerID: "mock" } as any + const baseProvider = { name: "Mock Provider", id: "mock" } as any + return { + provider: { + getModel: (p, m) => + Effect.succeed({ ...baseModel, providerID: p, id: m, name: `${p}:${m}` } as any) as Effect.Effect< + any, + unknown + >, + getProvider: (p) => Effect.succeed({ ...baseProvider, id: p, name: `${p}-name` } as any), + ...overrides.provider, + }, + bus: { + publish: (def: any, properties: any) => + Effect.sync(() => { + if (def.type === "llm.fallback.triggered") events.push({ type: "triggered", properties }) + if (def.type === "llm.fallback.used") events.push({ type: "used", properties }) + }), + } as any, + config: { + get: () => Effect.succeed({ cooldown_seconds: 60 }) as Effect.Effect<{ cooldown_seconds?: number }, unknown>, + ...overrides.config, + }, + classifyError: ((_cause: Cause.Cause) => ({ + error: new Error("classified"), + isRetryable: true, + retryInfo: undefined, + reason: "test error", + })) satisfies FallbackDeps["classifyError"], + call: ((_model: any) => + Effect.succeed(makeStream([{ type: "text-delta", id: "1", text: "ok" }]))) as FallbackDeps["call"], + log: { + info: () => {}, + warn: () => {}, + error: () => {}, + debug: () => {}, + clone() { + return this + }, + tag() { + return this + }, + time: () => ({ + stop() {}, + [Symbol.dispose]() {}, + }), + } as any, + cooldown: new CooldownManager(), + sessionFallbackState: new SessionFallbackState(), + ...overrides, + } +} + +const primaryModel = { providerID: "primary", id: "primary-model", name: "Primary" } as any + +const collect = (stream: Stream.Stream) => + Effect.runPromise(stream.pipe(Stream.runCollect, Effect.map((c) => Array.from(c)))) + +// --------------------------------------------------------------------------- +// withFallback behaviour +// --------------------------------------------------------------------------- + +describe("withFallback", () => { + test("returns primary stream as-is when no fallbacks are configured", async () => { + const events: PublishedEvent[] = [] + const deps = makeDeps({ events }) + const input: FallbackInput = { + sessionID: "s", + model: primaryModel, + abort: new AbortController().signal, + } + const stream = await Effect.runPromise(withFallback(input, deps) as Effect.Effect) + const chunks = await collect(stream) + expect(chunks).toEqual([{ type: "text-delta", id: "1", text: "ok" }]) + expect(events).toEqual([]) + expect(deps.sessionFallbackState.isOnFallback("s")).toBe(false) + }) + + test("primary succeeds → fallbacks are wrapped but not invoked, no events emitted", async () => { + const events: PublishedEvent[] = [] + const callTargets: string[] = [] + const deps = makeDeps({ + events, + call: (model: any, p, m) => { + callTargets.push(`${p}/${m}`) + return Effect.succeed(makeStream([{ type: "text-delta", id: "p", text: "primary-ok" }])) as any + }, + }) + const input: FallbackInput = { + sessionID: "s", + model: primaryModel, + fallbacks: [{ providerID: "fb", modelID: "fb-model" }], + abort: new AbortController().signal, + } + const stream = await Effect.runPromise(withFallback(input, deps) as Effect.Effect) + const chunks = await collect(stream) + expect(chunks).toEqual([{ type: "text-delta", id: "p", text: "primary-ok" }]) + expect(callTargets).toEqual(["primary/primary-model"]) + expect(events).toEqual([]) + }) + + test("primary errors mid-stream → falls back, prepends switch notice, emits both events", async () => { + const events: PublishedEvent[] = [] + const callTargets: string[] = [] + const deps = makeDeps({ + events, + call: (model: any, p, m) => { + callTargets.push(`${p}/${m}`) + if (p === "primary") { + return Effect.succeed(errorStream("primary boom")) as any + } + return Effect.succeed(makeStream([{ type: "text-delta", id: "f", text: "fallback-ok" }])) as any + }, + }) + const input: FallbackInput = { + sessionID: "s", + model: primaryModel, + fallbacks: [{ providerID: "fb", modelID: "fb-model" }], + abort: new AbortController().signal, + } + const stream = await Effect.runPromise(withFallback(input, deps) as Effect.Effect) + const chunks = await collect(stream) + const noticeStart = chunks.find((c) => c.type === "text-start" && c.id === FALLBACK_NOTICE_ID) + const noticeDelta = chunks.find((c) => c.type === "text-delta" && c.id === FALLBACK_NOTICE_ID) + expect(noticeStart).toBeDefined() + expect(noticeDelta?.text).toContain("Switching to") + expect(chunks.some((c) => c.type === "text-delta" && c.id === "f")).toBe(true) + + expect(callTargets).toEqual(["primary/primary-model", "fb/fb-model"]) + expect(events.map((e) => e.type)).toEqual(["triggered", "used"]) + expect(events[1].properties.modelID).toBe("fb-model") + expect(deps.sessionFallbackState.isOnFallback("s")).toBe(true) + }) + + test("primary on cooldown → cold-starts on fallback with 'using' notice, only FallbackUsed event", async () => { + const events: PublishedEvent[] = [] + const callTargets: string[] = [] + const cm = new CooldownManager() + await Effect.runPromise(cm.put("primary", "primary-model", 60_000)) + const deps = makeDeps({ + events, + cooldown: cm, + call: (model: any, p, m) => { + callTargets.push(`${p}/${m}`) + return Effect.succeed(makeStream([{ type: "text-delta", id: "f", text: "fb" }])) as any + }, + }) + const input: FallbackInput = { + sessionID: "s", + model: primaryModel, + fallbacks: [{ providerID: "fb", modelID: "fb-model" }], + abort: new AbortController().signal, + } + const stream = await Effect.runPromise(withFallback(input, deps) as Effect.Effect) + const chunks = await collect(stream) + const usingNotice = chunks.find((c) => c.type === "text-delta" && c.id === FALLBACK_USING_ID) + expect(usingNotice).toBeDefined() + expect(usingNotice?.text).toContain("Using") + expect(callTargets).toEqual(["fb/fb-model"]) + // Cold-start on cooldown does NOT publish FallbackTriggered (that is for + // mid-stream errors only) but DOES publish FallbackUsed so the processor + // can update the assistant message model. + expect(events.map((e) => e.type)).toEqual(["used"]) + expect(deps.sessionFallbackState.isOnFallback("s")).toBe(true) + }) + + test("primary recovers after previous fallback → prepends resume notice, clears session state", async () => { + const events: PublishedEvent[] = [] + const state = new SessionFallbackState() + state.markOnFallback("s") + const deps = makeDeps({ events, sessionFallbackState: state }) + const input: FallbackInput = { + sessionID: "s", + model: primaryModel, + fallbacks: [{ providerID: "fb", modelID: "fb-model" }], + abort: new AbortController().signal, + } + const stream = await Effect.runPromise(withFallback(input, deps) as Effect.Effect) + const chunks = await collect(stream) + const resumeDelta = chunks.find((c) => c.type === "text-delta" && c.id === FALLBACK_RESUME_ID) + expect(resumeDelta).toBeDefined() + expect(resumeDelta?.text).toContain("Switched back to") + expect(state.isOnFallback("s")).toBe(false) + expect(events.map((e) => e.type)).toEqual(["used"]) + expect(events[0].properties.modelID).toBe("primary-model") + }) + + test("all models on cooldown → bounded wait, then takes whichever expires first", async () => { + const events: PublishedEvent[] = [] + const callTargets: string[] = [] + const cm = new CooldownManager() + // Primary cools down sooner than fallback so after the wait we should + // start on primary, not fallback. + await Effect.runPromise(cm.put("primary", "primary-model", 200)) + await Effect.runPromise(cm.put("fb", "fb-model", 60_000)) + const deps = makeDeps({ + events, + cooldown: cm, + call: (model: any, p, m) => { + callTargets.push(`${p}/${m}`) + return Effect.succeed(makeStream([{ type: "text-delta", id: "p", text: "ok" }])) as any + }, + }) + const input: FallbackInput = { + sessionID: "s", + model: primaryModel, + fallbacks: [{ providerID: "fb", modelID: "fb-model" }], + abort: new AbortController().signal, + } + const program = Effect.gen(function* () { + const fiber = yield* Effect.fork(withFallback(input, deps) as Effect.Effect) + // Move past the soonest expiry; the implementation caps the wait at + // WAIT_CAP_MS so this also exercises the cap-respect path. + yield* TestClock.adjust("300 millis") + return yield* fiber + }).pipe(Effect.provide(Layer.empty)) + const stream = await Effect.runPromise(program) + const chunks = await collect(stream) + expect(chunks.some((c) => c.type === "text-delta" && c.id === "p")).toBe(true) + // Primary expired first, so we should have started on primary, not fb. + expect(callTargets[0]).toBe("primary/primary-model") + }) + + test("non-retryable error from primary is not caught — propagates as failure", async () => { + const deps = makeDeps({ + classifyError: () => null, + call: (_model: any, p) => + p === "primary" + ? (Effect.succeed(errorStream("auth fail")) as any) + : (Effect.succeed(makeStream([{ type: "text-delta", id: "f", text: "fb" }])) as any), + }) + const input: FallbackInput = { + sessionID: "s", + model: primaryModel, + fallbacks: [{ providerID: "fb", modelID: "fb-model" }], + abort: new AbortController().signal, + } + const stream = await Effect.runPromise(withFallback(input, deps) as Effect.Effect) + const exit = await Effect.runPromise(stream.pipe(Stream.runCollect, Effect.exit)) + expect(exit._tag).toBe("Failure") + }) + + test("FallbackTriggered / FallbackUsed events are defined and have stable types", () => { + expect(FallbackTriggered.type).toBe("llm.fallback.triggered") + expect(FallbackUsed.type).toBe("llm.fallback.used") + }) +}) diff --git a/packages/opencode/test/session/processor-effect.test.ts b/packages/opencode/test/session/processor-effect.test.ts index 78c7e4c64228..2c04b11efab4 100644 --- a/packages/opencode/test/session/processor-effect.test.ts +++ b/packages/opencode/test/session/processor-effect.test.ts @@ -24,6 +24,7 @@ import { CrossSpawnSpawner } from "@opencode-ai/core/cross-spawn-spawner" import { provideTmpdirServer } from "../fixture/fixture" import { testEffect } from "../lib/effect" import { raw, reply, TestLLMServer } from "../lib/llm-server" +import { FallbackUsed } from "../../src/session/fallback" import { SyncEvent } from "@/sync" import { RuntimeFlags } from "@/effect/runtime-flags" import { EventV2Bridge } from "@/event-v2-bridge" @@ -64,6 +65,18 @@ const cfg = { cost: { input: 0, output: 0 }, options: {}, }, + "fallback-model": { + id: "fallback-model", + name: "Fallback Model", + attachment: false, + reasoning: false, + temperature: false, + tool_call: true, + release_date: "2025-01-01", + limit: { context: 100000, output: 10000 }, + cost: { input: 0, output: 0 }, + options: {}, + }, }, options: { apiKey: "test-key", @@ -853,3 +866,111 @@ it.live("session.processor effect tests mark interruptions aborted without manua { git: true, config: (url) => providerCfg(url) }, ), ) + +// --------------------------------------------------------------------------- +// Fallback integration: FallbackUsed bus event must update the assistant +// message model so subsessions / title / main session all see the right +// provider/model after a stream-level switch. Regression test for the +// `usedFallback`-mutation bug from PR #26292. +// --------------------------------------------------------------------------- + +it.live("session.processor stream error with fallbacks switches model on the message", () => + provideTmpdirServer( + ({ dir, llm }) => + Effect.gen(function* () { + const { processors, session, provider } = yield* boot() + const bus = yield* Bus.Service + + // Primary errors mid-stream; fallback returns a clean text response. + yield* llm.push(reply().streamError("boom").item()) + yield* llm.text("recovered") + + const chat = yield* session.create({}) + const parent = yield* user(chat.id, "fallback please") + const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) + const mdl = yield* provider.getModel(ref.providerID, ref.modelID) + + const used: string[] = [] + const off = yield* bus.subscribeCallback(FallbackUsed, (evt) => { + used.push(`${evt.properties.providerID}/${evt.properties.modelID}`) + }) + + const handle = yield* processors.create({ + assistantMessage: msg, + sessionID: chat.id, + model: mdl, + }) + + const value = yield* handle.process({ + user: { + id: parent.id, + sessionID: chat.id, + role: "user", + time: parent.time, + agent: parent.agent, + model: { providerID: ref.providerID, modelID: ref.modelID }, + } satisfies MessageV2.User, + sessionID: chat.id, + model: mdl, + agent: agent(), + system: [], + messages: [{ role: "user", content: "fallback please" }], + tools: {}, + fallbacks: [{ providerID: ref.providerID, modelID: ModelID.make("fallback-model") }], + }) + + off() + + expect(value).toBe("continue") + // Primary failed, fallback succeeded → 2 upstream calls. + expect(yield* llm.calls).toBe(2) + // FallbackUsed fired exactly once and points to the fallback model. + expect(used).toEqual([`${ref.providerID}/fallback-model`]) + // Processor subscription updated the in-memory assistant message. + expect(handle.message.modelID).toBe("fallback-model") + expect(handle.message.providerID).toBe(ref.providerID) + // Persisted message also reflects the new model. + const stored = yield* MessageV2.get({ sessionID: chat.id, messageID: msg.id }) + if (stored.info.role === "assistant") { + expect(stored.info.modelID).toBe("fallback-model") + } + }), + { git: true, config: (url) => providerCfg(url) }, + ), +) + +it.live("session.processor stream error without fallbacks halts without switching model", () => + provideTmpdirServer( + ({ dir, llm }) => + Effect.gen(function* () { + const { processors, session, provider } = yield* boot() + yield* llm.push(reply().streamError("boom").item()) + const chat = yield* session.create({}) + const parent = yield* user(chat.id, "no fallback") + const msg = yield* assistant(chat.id, parent.id, path.resolve(dir)) + const mdl = yield* provider.getModel(ref.providerID, ref.modelID) + const handle = yield* processors.create({ assistantMessage: msg, sessionID: chat.id, model: mdl }) + const value = yield* handle.process({ + user: { + id: parent.id, + sessionID: chat.id, + role: "user", + time: parent.time, + agent: parent.agent, + model: { providerID: ref.providerID, modelID: ref.modelID }, + } satisfies MessageV2.User, + sessionID: chat.id, + model: mdl, + agent: agent(), + system: [], + messages: [{ role: "user", content: "no fallback" }], + tools: {}, + }) + expect(value).toBe("stop") + // Model on the message stays at primary; no switching happened. + expect(handle.message.modelID).toBe(ref.modelID) + expect(handle.message.error).toBeDefined() + }), + { git: true, config: (url) => providerCfg(url) }, + ), +) From 4990b62ad1179ac3f5649969eab89a80a6fcb538 Mon Sep 17 00:00:00 2001 From: minicx Date: Sat, 16 May 2026 23:36:23 +0300 Subject: [PATCH 3/4] fix(session): filter ignored fallback notices from assistant messages Fallback notices (e.g. 'Using GLM-4.5-Air while ... is cooling down') are stored with ignored: true but were not filtered out when converting assistant parts to model messages, causing them to leak into the LLM context and be repeated in responses. --- packages/opencode/src/session/message-v2.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/opencode/src/session/message-v2.ts b/packages/opencode/src/session/message-v2.ts index f6efc428dd8f..24c7cf973461 100644 --- a/packages/opencode/src/session/message-v2.ts +++ b/packages/opencode/src/session/message-v2.ts @@ -775,7 +775,7 @@ export const toModelMessagesEffect = Effect.fnUntraced(function* ( return part.metadata?.anthropic?.signature != null }) for (const part of msg.parts) { - if (part.type === "text") { + if (part.type === "text" && !part.ignored) { const text = part.text === "" && hasSignedReasoning ? " " : part.text assistantMessage.parts.push({ type: "text", From 298596c843e5663ae30768acc381c9a7fb9b729c Mon Sep 17 00:00:00 2001 From: minicx Date: Sun, 17 May 2026 16:12:01 +0300 Subject: [PATCH 4/4] fix(session): add Provider.defaultLayer to processor defaultLayer --- packages/opencode/src/session/processor.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/opencode/src/session/processor.ts b/packages/opencode/src/session/processor.ts index ed539f98952f..6f6b065d10db 100644 --- a/packages/opencode/src/session/processor.ts +++ b/packages/opencode/src/session/processor.ts @@ -857,6 +857,7 @@ export const defaultLayer = Layer.suspend(() => Layer.provide(Session.defaultLayer), Layer.provide(Snapshot.defaultLayer), Layer.provide(Agent.defaultLayer), + Layer.provide(Provider.defaultLayer), Layer.provide(LLM.defaultLayer), Layer.provide(Permission.defaultLayer), Layer.provide(Plugin.defaultLayer),