fix: streaming timeout, Anthropic-native fallback, and routing on the provider-architecture branch#84
Merged
samueltuyizere merged 10 commits intoJun 20, 2026
Conversation
…l schema
Two warnings from kilo-code-bot:
1. StreamingTimeoutMs only guards request startup; once GetStreamingBody
returns, the body read was tied to the request context (no timeout),
so a mid-stream stall could sit forever. Pass the per-model attempt
context into ProxyStream/ProxyResponsesStream/ProxyGeminiStream and
the raw Anthropic io.Copy, and wrap the upstream body with a tiny
ctxio.NewCtxReadCloser so the body Read also respects the deadline.
2. transformTools panicked on valid JSON that unmarshals to a nil map
(e.g. " null " with decorative whitespace). Treat that case the same
as a successful parse of "{}" — fall back to the default schema.
Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com>
…e and update streaming error handlers
…onfig fix Apply the full feature set, test coverage, and documentation from PR routatic#80 (fix: stabilize Anthropic-native streaming, timeout handling, and fallback cancellation) onto the new provider-architecture branch (4fe96a7). Also address issues raised in the post-merge review: * Add streaming_timeout_ms to OpenCodeGoConfig and OpenCodeZenConfig, with HTTP client relying on per-request context timeouts. * Add RequestTimeout and StreamingTimeout helpers on OpenCodeClient, and StreamIdleTimeout for per-byte idle-gap enforcement. * Expose heartbeat-paused flag during raw Anthropic streaming to prevent keepalive injection into SSE frames. * Bind streaming body reads to the per-attempt ctx via ctxio (NewCtxReader / NewCtxReadCloser) so streaming_timeout_ms aborts mid-stream, with ErrStreamReadCanceled surfaced. * Stop fallback chain early on parent ctx cancellation or deadline exceeded, and do not record client-cancel as a circuit-breaker failure. * Harden transformTools: skip empty/whitespace names, normalize null/empty schemas, validate type==object and properties is an object, guard schemaObj nil for whitespace null. * ApplyDefaults now falls back to StreamingTimeoutMs before TimeoutMs when StreamTimeoutMs is unset, so the user's streaming_timeout_ms is honored by the idle watchdog. * Expand IsAnthropicModel and isAnthropicNativeGo to include minimax-m2.5/2.7/3 and qwen-plus on the Go provider — these models reject OpenAI-format streaming with 400 and must use the Anthropic-native /v1/messages branch. * Document Streaming Scenario Routing in CONFIGURATION.md and README.md; add streaming_timeout_ms to config.example.json. * Reload messaging in atomic.go now reports timeout changes as effective immediately. * Add walkthrough.md documenting the integration and timeout fix. Co-authored-by: factory-droid[bot] <138933559+factory-droid[bot]@users.noreply.github.com>
- Explicitly discard Write return value in concurrent test to satisfy Go vet - Remove redundant type assertion on NewCtxReader return value
… and streamline SSE error messages
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
This PR integrates the streaming/fallback/timeout stabilization work from the original PR #80 onto the new provider-architecture base (Provider Abstraction + Unified Request Model + Routing Policy Engine, PR #82), and adds two fixes surfaced during the post-merge review.
The original PR #80 (#80) was opened against the pre-#82 architecture and is now obsolete; it has been closed. This PR is its replacement, re-applied cleanly on top of the current
main(4fe96a7).Why a new PR instead of rebasing #80
PR #80 was authored against the pre-architecture-rewrite base. After the provider architecture (#82) was merged, the streaming/fallback code paths moved from the handler/transformer layer into the new provider/router layer (scenarios.go, fallback.go refactor, request transformer split, Anthropic/Zen/Responses/Gemini routing policy engine). A clean re-integration on the current base is more readable than a rebase, and lets the new architecture own the work from the start.
What's included
From PR #80 (re-applied on the new architecture)
streaming_timeout_msfor bothopencode_goandopencode_zen, with helpersRequestTimeout(model)andStreamingTimeout(model)on the client.http.Client.Timeout; per-request contexts carry the timeout. The proxy never kills a stream that is actively producing bytes, and the server-levelWriteTimeoutis set to0. Each upstream read uses a per-Readdeadline viahttp.ResponseController.SetReadDeadlinethat is renewed on every successful byte.handleStreamingderives each attempt from the client request context and binds body reads via the newctxio.NewCtxReadClosersostreaming_timeout_msaborts mid-stream (returnsErrStreamReadCanceled).ExecuteWithFallbackshort-circuits on parentctx.Err()and oncontext.DeadlineExceeded, and does not record client cancellation as a circuit-breaker failure.:keepaliveinjection into event frames (theclient disconnected during anthropic stream error="context canceled"/Unexpected EOFfailure mode seen on Claude Code).responseWriterserializes concurrent writes (heartbeat + stream body copy) with a mutex, and exposes aflushWriterwrapper for raw passthrough so events are not buffered in net/http'sbufio.Writer.transformToolshardening: skip empty/whitespace names, normalizenull/{}/missing schemas, validatetype == "object", validatepropertiesis an object, guardschemaObjnil for whitespace null (panic fix from PR fix: stabilize Anthropic-native streaming, timeout handling, and fallback cancellation #80 review).enable_streaming_scenario_routing) documented inCONFIGURATION.mdandREADME.md.atomic.goreports timeout changes as effective immediately for bothtimeout_msandstreaming_timeout_mson both Go and Zen.walkthrough.mddocuments the integration and timeout fix for future maintainers.New on this branch (post-#80 review findings)
loader.go::applyDefaults: whenstream_timeout_msis unset, the loader now inheritsstreaming_timeout_msbefore falling back totimeout_ms. Without this, a user-configured 600sstreaming_timeout_mswas silently downgraded to 300s by the idle watchdog (StreamIdleTimeoutreadsStreamTimeoutMsonly). New regression test:TestDefaults_StreamingTimeoutFallback.IsAnthropicModelandisAnthropicNativeGonow routeminimax-m2.5,minimax-m2.7,minimax-m3, andqwen-plusthrough/v1/messagesdirectly. These models reject OpenAI-format streaming with tools (400 invalid params, function name or parameters is empty); the previous code only routedqwen3.7-maxthrough the Anthropic-native path on the Go side, so all other Minimax/Qwen variants fell back to the broken Chat Completions branch.Commits (5)
Tests
TestRequestTimeout_*andTestStreamingTimeout_*for both Go and Zen providers, including default fallback, configured override, and small-value edge cases.TestExecuteWithFallback_*for cancelled parent context, parent deadline exceeded, per-model timeout, circuit-breaker accounting under cancellation.TestHandleStreaming_*for configurable timeout, client-context cancellation, mid-stream disconnect, per-model timeout fallback, Anthropic raw no-keepalive injection, concurrent response-writer behavior.TestHandleNonStreaming_ParentContextCanceled_No502andTestHandleNonStreaming_ParentDeadlineExceeded_No502to prevent the false-502 all models failedregression.TestTransformTools_*covering empty names, whitespace names,null/{}schemas, missingtype, missingproperties, malformed JSON, valid schema preservation, and the whitespace-null panic guard.TestDefaults_StreamingTimeoutFallbackfor the loader fix.TestNewCtxReader_*andTestNewCtxReadCloser_*for the newctxiohelpers.All packages pass under
go test -count=1 ./....Validation
go build ./... go test -count=1 ./...Manual checks recommended:
tools=29+,long_context → minimax-m3Unexpected EOF/ noclient disconnected during anthropic stream error="context canceled"for Anthropic-native raw streamsall models failedstreaming_timeout_ms: 600000inconfig.jsonactually results in a 600s idle gap (verify via log:stream idle timeout reachedfires after ~600s of upstream silence, not 300s)Files
22 files changed, +2369 / -112:
CONFIGURATION.md(+46)README.md(+1)walkthrough.md(new, +46)configs/config.example.json(+6/-2)internal/client/opencode.go(refactored timeout helpers)internal/client/opencode_test.go(+157)internal/config/atomic.go(timeout reload messaging)internal/config/config.go(StreamingTimeoutMsfield)internal/config/loader.go(timeout fallback tostreaming_timeout_ms)internal/config/loader_test.go(+42)internal/handlers/messages.go(responseWriter mutex, flushWriter, ctxio binding, parent-ctx short-circuit)internal/handlers/messages_test.go(+1106)internal/handlers/streaming.go(+2)internal/provider/opencode_go.go(isAnthropicNativeGoexpanded)internal/router/fallback.go(parent-ctx/parent-deadline short-circuit, no circuit-breaker on cancel)internal/router/fallback_test.go(new, +267)internal/transformer/ctxio.go(new, +76)internal/transformer/ctxio_test.go(new, +101)internal/transformer/request.go(transformToolshardening)internal/transformer/request_test.go(+205)internal/transformer/stream.go(heartbeat-paused flag for raw Anthropic)internal/transformer/stream_test.go(+49)Related