fix(proxy): finalize completed responses streams after client abort#1251
Conversation
📝 WalkthroughWalkthroughPR 在 ResponseFixer 与 ResponseHandler 中修正 499 状态误记录问题。当客户端中止但上游已返回 HTTP 200 且包含有效 token 时,新增 clientAbortCompleteSuccess 逻辑将其重分类为成功;同时引入 SSE 惰性 chunk 过滤(仅 Responses 格式),并优化客户端断开边界的 drain 超时控制。 ChangesSSE 过滤与客户端中止处理
Estimated code review effort🎯 3 (Moderate) | ⏱️ ~25 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. ✨ Finishing Touches🧪 Generate unit tests (beta)
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
There was a problem hiding this comment.
Code Review
This pull request introduces a mechanism to filter out inert or empty chat completion chunks from SSE streams and updates the response handler to treat client-aborted streams as successful if final usage metrics have already been received. Unit tests have been added to verify both features. The review feedback suggests simplifying the isInertChatCompletionChoice helper function by removing a redundant check for the 'content' key, which is already covered by the fallback validation.
Important
The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.
| function isInertChatCompletionChoice(choice: unknown): boolean { | ||
| if (!isRecord(choice)) return false; | ||
| if (choice.finish_reason != null) return false; | ||
|
|
||
| const delta = choice.delta; | ||
| if (!isRecord(delta)) { | ||
| return true; | ||
| } | ||
|
|
||
| for (const [key, value] of Object.entries(delta)) { | ||
| if (key === "role") continue; | ||
| if (key === "content") { | ||
| if (hasMeaningfulValue(value)) return false; | ||
| continue; | ||
| } | ||
| if (hasMeaningfulValue(value)) return false; | ||
| } | ||
|
|
||
| return true; | ||
| } |
There was a problem hiding this comment.
The key === "content" check inside the loop is redundant because the fallback hasMeaningfulValue(value) check performs the exact same validation and returns false if the value is meaningful. We can simplify the loop by removing this redundant block.
function isInertChatCompletionChoice(choice: unknown): boolean {
if (!isRecord(choice)) return false;
if (choice.finish_reason != null) return false;
const delta = choice.delta;
if (!isRecord(delta)) {
return true;
}
for (const [key, value] of Object.entries(delta)) {
if (key === "role") continue;
if (hasMeaningfulValue(value)) return false;
}
return true;
}There was a problem hiding this comment.
💡 Codex Review
Here are some automated review suggestions for this pull request.
Reviewed commit: 1ac5195544
ℹ️ About Codex in GitHub
Codex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
- Open a pull request for review
- Mark a draft as ready
- Comment "@codex review".
If Codex has suggestions, it will comment; otherwise it will react with 👍.
When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback".
| // Do not cancel internal accounting on pure client disconnect. If the | ||
| // upstream stream has already completed, the tee'd internal branch can | ||
| // still drain buffered final usage and record the request as successful. | ||
| // Idle/response timeout paths still abort via abortController. |
There was a problem hiding this comment.
Keep aborting unfinished streams on client disconnect
When the downstream client closes before the upstream SSE has finished, this listener now leaves the internal tee running instead of cancelling/aborting it. The internal reader can drain the upstream to done, and finalizeStream is then called with streamEndedNormally=true even though clientAborted=true, so the request is persisted as the upstream 2xx success rather than a 499 and the provider keeps generating a response the client will never receive. This only intended to rescue already-completed streams, but it changes normal early disconnects into completed billable requests.
Useful? React with 👍 / 👎.
There was a problem hiding this comment.
🧹 Nitpick comments (2)
tests/unit/proxy/response-handler-client-abort-drain.test.ts (1)
91-106: ⚡ Quick win缺少
emitProxyLangfuseTrace的 mock
ProxyResponseHandler在 finalize 流程中会调用emitProxyLangfuseTrace(来自@/lib/langfuse/emit-proxy-trace),但此测试文件未 mock 该模块。如果该模块有外部依赖(如网络调用或配置读取),可能导致测试不稳定或意外失败。建议添加:
vi.mock("`@/lib/langfuse/emit-proxy-trace`", () => ({ emitProxyLangfuseTrace: vi.fn(), }));🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/unit/proxy/response-handler-client-abort-drain.test.ts` around lines 91 - 106, The test is missing a mock for the emitProxyLangfuseTrace call used by ProxyResponseHandler.finalize; add a mock for "`@/lib/langfuse/emit-proxy-trace`" that exports emitProxyLangfuseTrace as a vi.fn() so the finalize flow won't perform external work—look for tests referencing ProxyResponseHandler.finalize and add vi.mock("`@/lib/langfuse/emit-proxy-trace`", () => ({ emitProxyLangfuseTrace: vi.fn() })) alongside the other vi.mock calls.src/app/v1/_lib/proxy/response-fixer/response-fixer.test.ts (1)
208-262: 💤 Low value可选:考虑补充边界情况测试覆盖
当前测试用例验证了核心场景(惰性 chunk 被过滤),但可以考虑补充以下边界情况的测试,以增强置信度:
- 非惰性 chunk 不被过滤:验证包含实际 content 的 chunk 被保留
- 带 finish_reason 的 chunk 不被过滤:验证完成标记的 chunk 被保留
- 带 usage 的 chunk 不被过滤:验证包含用量数据的 chunk 被保留
- originalFormat !== "response" 时不触发过滤:验证过滤逻辑仅在 Responses 格式时生效
不过,当前测试已覆盖主流路径,上述补充可在后续迭代中添加。
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/app/v1/_lib/proxy/response-fixer/response-fixer.test.ts` around lines 208 - 262, Add additional unit tests in response-fixer.test.ts that cover the edge cases the reviewer suggested: create tests that (1) send a chat.completion.chunk with a non-empty choices[0].delta.content and assert ResponseFixer.process(session, response).text() still includes that chunk (use createSession() with session.originalFormat = "response"), (2) send a chunk containing a finish_reason field and assert it is preserved, (3) send a chunk containing usage data and assert it is preserved, and (4) create a session with originalFormat !== "response" and assert that the filtering logic does not run (i.e., the empty chat chunk remains). For each test reuse the existing pattern (ReadableStream + SSE lines, ResponseFixer.process, await fixed.text()) and assert presence/absence of identifiers like "chat.completion.chunk", "finish_reason", and usage keys to locate behavior around ResponseFixer.process and createSession.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Nitpick comments:
In `@src/app/v1/_lib/proxy/response-fixer/response-fixer.test.ts`:
- Around line 208-262: Add additional unit tests in response-fixer.test.ts that
cover the edge cases the reviewer suggested: create tests that (1) send a
chat.completion.chunk with a non-empty choices[0].delta.content and assert
ResponseFixer.process(session, response).text() still includes that chunk (use
createSession() with session.originalFormat = "response"), (2) send a chunk
containing a finish_reason field and assert it is preserved, (3) send a chunk
containing usage data and assert it is preserved, and (4) create a session with
originalFormat !== "response" and assert that the filtering logic does not run
(i.e., the empty chat chunk remains). For each test reuse the existing pattern
(ReadableStream + SSE lines, ResponseFixer.process, await fixed.text()) and
assert presence/absence of identifiers like "chat.completion.chunk",
"finish_reason", and usage keys to locate behavior around ResponseFixer.process
and createSession.
In `@tests/unit/proxy/response-handler-client-abort-drain.test.ts`:
- Around line 91-106: The test is missing a mock for the emitProxyLangfuseTrace
call used by ProxyResponseHandler.finalize; add a mock for
"`@/lib/langfuse/emit-proxy-trace`" that exports emitProxyLangfuseTrace as a
vi.fn() so the finalize flow won't perform external work—look for tests
referencing ProxyResponseHandler.finalize and add
vi.mock("`@/lib/langfuse/emit-proxy-trace`", () => ({ emitProxyLangfuseTrace:
vi.fn() })) alongside the other vi.mock calls.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 5647b540-ded8-4997-ba41-aa5e0c546862
📒 Files selected for processing (4)
src/app/v1/_lib/proxy/response-fixer/index.tssrc/app/v1/_lib/proxy/response-fixer/response-fixer.test.tssrc/app/v1/_lib/proxy/response-handler.tstests/unit/proxy/response-handler-client-abort-drain.test.ts
| // Do not cancel internal accounting on pure client disconnect. If the | ||
| // upstream stream has already completed, the tee'd internal branch can | ||
| // still drain buffered final usage and record the request as successful. | ||
| // Idle/response timeout paths still abort via abortController. |
There was a problem hiding this comment.
[Medium] [PERFORMANCE-ISSUE] Unbounded resource usage when idle timeout is disabled and upstream hangs
Why this is a problem: The client abort handler no longer calls abortController.abort() or AsyncTaskManager.cancel(taskId). The processing loop at line 2620 also no longer checks session.clientAbortSignal?.aborted, relying entirely on abortController being triggered by the idle timeout. However, when streamingIdleTimeoutMs is 0 (disabled), startIdleTimer() returns early and no timeout is ever set. In this configuration, if the client disconnects and the upstream hangs (stops sending data but does not close the connection), reader.read() blocks indefinitely:
- The
finallyblock at line 2883 never executes, so the reader lock is never released - Accumulated chunks remain in memory
- The processing promise never resolves
AsyncTaskManager.cleanup()at line 2896 never runs
The AsyncTaskManager has a 10-minute stale threshold that calls cancel(), but that aborts the task's own AbortController (returned by register()), not the local abortController that the processing loop checks, so it does not break the loop.
Suggested fix: Add a bounded drain window in the client abort handler that acts as a safety net when idle timeout is disabled:
let drainTimeoutId: NodeJS.Timeout | null = null;
const cleanupClientAbortListener = bindClientAbortListener(session.clientAbortSignal, () => {
logger.debug("ResponseHandler: Client disconnected, cleaning up", {
taskId,
providerId: provider.id,
messageId: messageContext.id,
});
// Allow internal stream to drain for finalization, but cap the drain
// window to prevent indefinite resource usage when idle timeout is
// disabled and the upstream hangs.
const maxDrainMs = idleTimeoutMs === Infinity ? 60_000 : idleTimeoutMs;
drainTimeoutId = setTimeout(() => {
if (\!abortController.signal.aborted) {
logger.info("ResponseHandler: Max drain window exceeded after client disconnect", {
taskId,
providerId: provider.id,
});
abortController.abort();
}
}, maxDrainMs);
});
// ... then in the finally block, add:
if (drainTimeoutId) {
clearTimeout(drainTimeoutId);
}This reuses the configured idle timeout when available and falls back to 60 seconds when disabled, preserving the drain behavior while bounding resource usage.
There was a problem hiding this comment.
{"body": "## Code Review Summary\n\nThis PR introduces two independent features: (1) allowing internal stream accounting to continue after client disconnect so completed upstream Responses streams can be finalized with their real 2xx status and usage, and (2) filtering inert chat.completion.chunk data lines from Responses SSE output. The implementation is well-structured with conservative reclassification logic and good test coverage. One resource-leak concern was identified when idle timeout is disabled.\n\n### PR Size: L\n- Lines changed: 651\n- Files changed: 4\n\nSplit suggestion: This PR contains two independent changes that could be reviewed separately:\n1. Response-fixer SSE filtering (response-fixer/index.ts + test) - filters inert chat completion chunks from Responses streams\n2. Client abort drain (response-handler.ts + test) - allows internal accounting to continue after client disconnect\n\n### Issues Found\n\n| Category | Critical | High | Medium | Low |\n|----------|----------|------|--------|-----|\n| Logic/Bugs | 0 | 0 | 0 | 0 |\n| Security | 0 | 0 | 0 | 0 |\n| Error Handling | 0 | 0 | 0 | 0 |\n| Types | 0 | 0 | 0 | 0 |\n| Comments/Docs | 0 | 0 | 0 | 0 |\n| Tests | 0 | 0 | 0 | 0 |\n| Performance | 0 | 0 | 1 | 0 |\n| Simplification | 0 | 0 | 0 | 0 |\n\n### Medium Priority Issues (Should Fix)\n\n1. Unbounded resource usage when idle timeout disabled (response-handler.ts:2238)\n - When streamingIdleTimeoutMs is 0 (disabled) and a client disconnects, the internal processing loop has no safety net to abort if the upstream hangs. The old code called abortController.abort() on client disconnect; the new code intentionally skips this but relies on idle timeout as the sole safety mechanism. When idle timeout is disabled, reader.read() can block indefinitely, preventing the finally block from releasing the reader lock, freeing memory, or cleaning up the AsyncTaskManager entry. The suggested fix is to add a bounded drain window (e.g., 60s fallback) in the client abort handler.\n\n### Review Coverage\n- [x] Logic and correctness\n- [x] Security (OWASP Top 10)\n- [x] Error handling\n- [x] Type safety\n- [x] Documentation accuracy\n- [x] Test coverage\n- [x] Code clarity\n\n---\nAutomated review by Claude AI"}
There was a problem hiding this comment.
Code Review Summary
This PR introduces two independent features: (1) allowing internal stream accounting to continue after client disconnect so completed upstream Responses streams can be finalized with their real 2xx status and usage, and (2) filtering inert chat.completion.chunk data lines from Responses SSE output. The implementation is well-structured with conservative reclassification logic and good test coverage. One resource-leak concern was identified when idle timeout is disabled.
PR Size: L
- Lines changed: 651
- Files changed: 4
Split suggestion: This PR contains two independent changes that could be reviewed separately:
- Response-fixer SSE filtering (
response-fixer/index.ts+ test) - filters inert chat completion chunks from Responses streams - Client abort drain (
response-handler.ts+ test) - allows internal accounting to continue after client disconnect
Issues Found
| Category | Critical | High | Medium | Low |
|---|---|---|---|---|
| Logic/Bugs | 0 | 0 | 0 | 0 |
| Security | 0 | 0 | 0 | 0 |
| Error Handling | 0 | 0 | 0 | 0 |
| Types | 0 | 0 | 0 | 0 |
| Comments/Docs | 0 | 0 | 0 | 0 |
| Tests | 0 | 0 | 0 | 0 |
| Performance | 0 | 0 | 1 | 0 |
| Simplification | 0 | 0 | 0 | 0 |
Medium Priority Issues (Should Fix)
- Unbounded resource usage when idle timeout disabled (
response-handler.ts:2238)- When
streamingIdleTimeoutMsis 0 (disabled) and a client disconnects, the internal processing loop has no safety net to abort if the upstream hangs. The old code calledabortController.abort()on client disconnect; the new code intentionally skips this but relies on idle timeout as the sole safety mechanism. When idle timeout is disabled,reader.read()can block indefinitely, preventing thefinallyblock from releasing the reader lock, freeing memory, or cleaning up the AsyncTaskManager entry. The suggested fix is to add a bounded drain window (e.g., 60s fallback) in the client abort handler.
- When
Review Coverage
- Logic and correctness
- Security (OWASP Top 10)
- Error handling
- Type safety
- Documentation accuracy
- Test coverage
- Code clarity
Automated review by Claude AI
1ac5195 to
0e07d38
Compare
💡 Codex Reviewclaude-code-hub/src/app/v1/_lib/proxy/response-handler.ts Lines 3881 to 3884 in 0e07d38 When the initial provider loses to an alternative, ℹ️ About Codex in GitHubCodex has been enabled to automatically review pull requests in this repo. Reviews are triggered when you
If Codex has suggestions, it will comment; otherwise it will react with 👍. When you sign up for Codex through ChatGPT, Codex can also answer questions or update the PR, like "@codex address that feedback". |
|
This looks like a valid issue in the hedge-loser billing path, but it belongs to the current I’ll track this separately so #1251 stays scoped to the 499/client-abort fix. |
Summary
499 CLIENT_ABORTEDchat.completion.chunkSSE data lines from/v1/responsesoutput, while preserving non-empty chunks, finish markers, usage-bearing chunks, and non-Responses sessionsRelated Issues:
499 CLIENT_ABORTED/v1/responsesstream area; EPIPE crash addressed separately in fix(lifecycle): don't crash process on benign broken-pipe from streaming disconnect (#1234) #1239tee()Approach
Client-abort finalization
finalizeDeferredStreamingFinalizationIfNeededreclassifies client-aborted streams as success only when the upstream status is 2xx, no upstream error payload is detected, and positive usage was parsed.499, and timeout/upstream failures still finalize as failures.responseControllerand local accounting loop if the drain window is exceeded.Responses SSE sanitizer
ResponseFixerstrips only inertchat.completion.chunkdata:lines forsession.originalFormat === "response".finish_reason, usage, or non-Responses format are preserved.Rebased on
devdevand preserves feat(billing): 供应商竞速输家计费(保活拿回响应 + 幂等累加回写) #1247's hedge-loser billing semantics.finalized.billHedgeLosers, so client-abort success reclassification does not clobber additive loser billing.Commits
fix(proxy): finalize complete responses after client abortfix(proxy): sanitize inert chat chunks in responses streamsValidation
bunx @biomejs/biome@2.4.15 check src/app/v1/_lib/proxy/response-fixer/index.ts src/app/v1/_lib/proxy/response-fixer/response-fixer.test.ts src/app/v1/_lib/proxy/response-handler.ts tests/unit/proxy/response-handler-client-abort-drain.test.ts bun run test src/app/v1/_lib/proxy/response-fixer/response-fixer.test.ts tests/unit/proxy/response-handler-client-abort-drain.test.ts bun run buildAdditional local check:
bun run testThis full local test run currently fails in two files outside this PR's changed paths:
tests/unit/k8s-deploy-shell-helpers.test.ts- macOStr: Illegal byte sequencein the/dev/urandomfallback pathsrc/components/ui/__tests__/language-switcher.test.tsx- expectedconsole.errorcall is not emitted in the blockedsessionStoragetestThose failures are reproducible when run individually and are not in files touched by this PR.
Greptile Summary
This PR addresses two distinct problems in the Responses streaming proxy path: (1) false
499 CLIENT_ABORTEDstatus codes recorded for completed upstream streams when the downstream client disconnects before the internal accounting loop observes the final SSE event, and (2) schema-validation failures in strict Responses clients when OpenAI-compatible upstreams inject emptychat.completion.chunkevents into a/v1/responsesstream. It also introduces a hedge-loser billing feature that drains losing-race-attempt responses in the background to capture and bill their real token usage.tee'd accounting branch now continues draining buffered data rather than being cancelled immediately; a newclientAbortCompleteSuccessgate reclassifies the stream as the real 2xx status when 2xx upstream code, no fake-error detected, and positive billable tokens are all confirmed.ResponseFixer.filterInertResponsesChatCompletionChunksstrips inertchat.completion.chunkdata lines (empty content, no tool calls, no finish reason, no usage) from Responses SSE output only whensession.originalFormat === \"response\", with the blank-line separator following each filtered line also elided via theskipNextBlankLineflag.winner: winnerCost + SUM(hedge_losers);loser: costUsd += delta) that is commutative regardless of ordering.Confidence Score: 5/5
Safe to merge; the drain-based reclassification is conservatively gated on three independent checks, the SSE sanitizer early-exits and returns original bytes when no inert chunk is present, and the hedge-loser billing SQL is commutative and idempotent regardless of winner/loser write ordering.
The clientAbortCompleteSuccess reclassification requires 2xx upstream status, no fake-error detected, and positive billable tokens — all three must hold simultaneously, making false reclassification of genuinely aborted streams effectively impossible. The SSE sanitizer's JSON.parse safety gate means malformed or partial chunks pass through unchanged. The concurrent-write SQL for hedge-loser billing converges to the correct grand total under all Postgres row-lock orderings because the winner uses an idempotent replacement (winnerCost + SUM(current losers)) and each loser uses an additive delta with a JSONB dedup guard. Four new unit tests cover the critical paths including the drain-timeout bound. Only a stale inline comment remains.
No files require special attention; the comment at line 558 of response-handler.ts is a documentation nit with no behavioral impact.
Important Files Changed
Sequence Diagram
sequenceDiagram participant Client participant ProxyResponseHandler participant InternalLoop as Internal Accounting Loop participant DB as Database Client->>ProxyResponseHandler: POST /v1/responses (streaming) ProxyResponseHandler->>ProxyResponseHandler: tee() body into client branch + internal branch ProxyResponseHandler-->>Client: stream chunks Note over Client: Client closes connection early Client--xProxyResponseHandler: disconnect (clientAbortSignal.aborted) ProxyResponseHandler->>ProxyResponseHandler: bindClientAbortListener fires Note over ProxyResponseHandler: start clientAbortDrainTimer (60s), do NOT cancel internal loop InternalLoop->>InternalLoop: continues draining buffered tee branch InternalLoop->>InternalLoop: reads response.completed with usage tokens alt "streamEndedNormally=false, clientAborted=true, 2xx + no fake error + positive usage" InternalLoop->>DB: "updateMessageRequestDetails(statusCode=200)" Note over InternalLoop: Reclassified as success else upstream truly aborted mid-stream InternalLoop->>DB: "updateMessageRequestDetails(statusCode=499, CLIENT_ABORTED)" end ProxyResponseHandler->>ProxyResponseHandler: clearClientAbortDrainTimer()Prompt To Fix All With AI
Reviews (2): Last reviewed commit: "fix(proxy): sanitize inert chat chunks i..." | Re-trigger Greptile