From da9bf030fdcb7c9605bba8c5827721948f90616a Mon Sep 17 00:00:00 2001 From: John Doe Date: Fri, 5 Jun 2026 03:26:27 +0300 Subject: [PATCH 1/2] fix: 499 completed Codex responses streams Codex CLI closes the connection immediately after receiving the terminal SSE event (response.completed/incomplete). Previously CCH observed the client abort before the upstream reader finished, so successful requests were logged as 499 CLIENT_ABORTED. Root cause: the internal reader that builds allContent breaks early on clientAbort, so allContent is truncated and misses the terminal event. Fix: track Codex terminal state, usage, service_tier and prompt_cache_key incrementally in a TransformStream before tee(), so the data is captured regardless of abort timing. Changes: - response-handler.ts: add createCodexResponsesTerminalStateTracker (incremental SSE parser, last-event-wins, finalize() for trailing data without boundary newline). tracker is the primary source of truth for Codex deferred streams; allContent is the fallback. codexTerminalNonSuccess (failed/error) -> 502 + recordFailure. codexTerminalFinished (completed/incomplete) + clientAbort -> 200. providerType captured in deferred meta at set-time for failover. - stream-finalization.ts: add optional providerType to DeferredStreamingFinalization; normalize at set-time from meta or session provider. - tests: 30 cases covering full terminal-state matrix (completed/failed/incomplete/error/response.error, abort timing, provider-mismatch, multiline SSE, trailing-\n\n-less event, non-Codex unaffected, prompt_cache_key preservation). Co-Authored-By: Claude Opus 4.8 --- src/app/v1/_lib/proxy/response-handler.ts | 349 ++++- src/app/v1/_lib/proxy/stream-finalization.ts | 5 +- ...nse-handler-abort-listener-cleanup.test.ts | 1227 ++++++++++++++++- 3 files changed, 1540 insertions(+), 41 deletions(-) diff --git a/src/app/v1/_lib/proxy/response-handler.ts b/src/app/v1/_lib/proxy/response-handler.ts index e811b3b59..74e61e324 100644 --- a/src/app/v1/_lib/proxy/response-handler.ts +++ b/src/app/v1/_lib/proxy/response-handler.ts @@ -515,6 +515,192 @@ type FinalizeDeferredStreamingResult = { billHedgeLosers: boolean; }; +type CodexResponsesTerminalState = "completed" | "failed" | "incomplete" | "error" | "none"; + +function applyCodexResponsesTerminalEvent( + terminalState: CodexResponsesTerminalState, + eventType: string +): CodexResponsesTerminalState { + switch (eventType) { + case "response.completed": + return "completed"; + case "response.failed": + return "failed"; + case "response.incomplete": + return "incomplete"; + case "error": + case "response.error": + return "error"; + default: + return terminalState; + } +} + +function getCodexResponsesTerminalState(allContent: string): CodexResponsesTerminalState { + let terminalState: CodexResponsesTerminalState = "none"; + for (const event of parseSSEData(allContent)) { + const data = + event.data && typeof event.data === "object" && !Array.isArray(event.data) + ? (event.data as { type?: unknown }) + : null; + const eventType = typeof data?.type === "string" ? data.type : event.event; + terminalState = applyCodexResponsesTerminalEvent(terminalState, eventType); + } + + return terminalState; +} + +function getCodexPromptCacheKeyFromSSEText(text: string): string | null { + for (const event of parseSSEData(text)) { + if (!event.data || typeof event.data !== "object" || Array.isArray(event.data)) continue; + const promptCacheKey = SessionManager.extractCodexPromptCacheKey( + event.data as Record + ); + if (promptCacheKey) { + return promptCacheKey; + } + } + return null; +} + +function getCompleteSSEEventTexts(text: string): string[] { + const events: string[] = []; + const boundaryPattern = /\r?\n\r?\n/g; + let start = 0; + + for (const match of text.matchAll(boundaryPattern)) { + const end = (match.index ?? 0) + match[0].length; + const eventText = text.slice(start, end); + start = end; + if (eventText.trim()) { + events.push(eventText); + } + } + + return events; +} + +function findLastCompleteSSEEventBoundary(text: string): number { + let lastBoundary = -1; + const boundaryPattern = /\r?\n\r?\n/g; + for (const match of text.matchAll(boundaryPattern)) { + lastBoundary = (match.index ?? 0) + match[0].length; + } + return lastBoundary; +} + +function createCodexResponsesTerminalStateTracker(): { + push: (text: string) => void; + finalize: () => void; + getTerminalState: () => CodexResponsesTerminalState; + getUsageMetrics: () => UsageMetrics | null; + getServiceTier: () => string | null; + getPromptCacheKey: () => string | null; +} { + let bufferedText = ""; + let terminalState: CodexResponsesTerminalState = "none"; + let terminalUsageMetrics: UsageMetrics | null = null; + let terminalServiceTier: string | null = null; + let terminalPromptCacheKey: string | null = null; + + return { + push(text: string) { + bufferedText += text; + const boundary = findLastCompleteSSEEventBoundary(bufferedText); + if (boundary < 0) return; + + const completeText = bufferedText.slice(0, boundary); + bufferedText = bufferedText.slice(boundary); + for (const eventText of getCompleteSSEEventTexts(completeText)) { + terminalPromptCacheKey ??= getCodexPromptCacheKeyFromSSEText(eventText); + const parsedTerminalState = getCodexResponsesTerminalState(eventText); + if (parsedTerminalState !== "none") { + terminalState = parsedTerminalState; + terminalUsageMetrics = parseUsageFromResponseText(eventText, "codex").usageMetrics; + terminalServiceTier = parseServiceTierFromResponseText(eventText); + } + } + }, + finalize() { + if (!bufferedText) return; + + const remainder = bufferedText.trim(); + if (!remainder) return; + + bufferedText = ""; + let eventType = ""; + const dataLines: string[] = []; + for (const line of remainder.split(/\r?\n/)) { + if (line.startsWith("event:")) { + eventType = line.slice(6).trim(); + continue; + } + + if (line.startsWith("data:")) { + let value = line.slice(5); + if (value.startsWith(" ")) { + value = value.slice(1); + } + dataLines.push(value); + } + } + + const dataStr = dataLines.join("\n"); + let parsedData: unknown; + try { + parsedData = JSON.parse(dataStr); + } catch { + parsedData = dataStr; + } + + const parsedObject = + parsedData && typeof parsedData === "object" && !Array.isArray(parsedData) + ? (parsedData as Record) + : null; + const resolvedType = typeof parsedObject?.type === "string" ? parsedObject.type : eventType; + const newState = applyCodexResponsesTerminalEvent(terminalState, resolvedType); + if (newState !== "none" && newState !== terminalState) { + terminalState = newState; + if (parsedObject) { + terminalUsageMetrics = parseUsageFromResponseText(remainder, "codex").usageMetrics; + terminalServiceTier = parseServiceTierFromResponseText(remainder); + } + } + + if (!terminalPromptCacheKey && parsedObject) { + terminalPromptCacheKey = getCodexPromptCacheKeyFromSSEText(remainder); + } + }, + getTerminalState() { + return terminalState; + }, + getUsageMetrics() { + return terminalUsageMetrics; + }, + getServiceTier() { + return terminalServiceTier; + }, + getPromptCacheKey() { + return terminalPromptCacheKey; + }, + }; +} + +export const __codexResponsesTerminalStateTrackerForTests = { + create: createCodexResponsesTerminalStateTracker, +}; + +function getCodexTerminalErrorMessage(state: CodexResponsesTerminalState): string | null { + switch (state) { + case "failed": + return "CODEX_RESPONSE_FAILED"; + case "error": + return "CODEX_RESPONSE_ERROR"; + default: + return null; + } +} + /** * 若本次 SSE 被标记为“延迟结算”,则在流结束后补齐成功/失败的最终判定。 * @@ -538,7 +724,8 @@ async function finalizeDeferredStreamingFinalizationIfNeeded( upstreamStatusCode: number, streamEndedNormally: boolean, clientAborted: boolean, - abortReason?: string + abortReason?: string, + forwardedCodexTerminalState: CodexResponsesTerminalState = "none" ): Promise { const meta = consumeDeferredStreamingFinalization(session); const provider = session.provider; @@ -550,13 +737,27 @@ async function finalizeDeferredStreamingFinalizationIfNeeded( const providerIdForPersistence = meta?.providerId ?? provider?.id ?? null; const isHedgeWinner = meta?.isHedgeWinner === true; const billHedgeLosers = meta?.billHedgeLosers === true; - - // 仅在“上游 HTTP=200 且流自然结束”时做“假 200”检测: + const shouldInspectCodexTerminalState = + upstreamStatusCode >= 200 && upstreamStatusCode < 300 && meta?.providerType === "codex"; + const codexTerminalState = shouldInspectCodexTerminalState + ? forwardedCodexTerminalState !== "none" + ? forwardedCodexTerminalState + : getCodexResponsesTerminalState(allContent) + : "none"; + const codexTerminalErrorMessage = getCodexTerminalErrorMessage(codexTerminalState); + const codexTerminalNonSuccess = codexTerminalErrorMessage !== null; + const codexTerminalFinished = + codexTerminalState === "completed" || codexTerminalState === "incomplete"; + const streamCompletedForFinalization = + !codexTerminalNonSuccess && (streamEndedNormally || codexTerminalFinished); + + // 仅在“上游 HTTP=200 且流已完成”时做“假 200”检测: // - 非 200:HTTP 已经表明失败(无需额外启发式) - // - 非自然结束:内容可能是部分流/截断,启发式会显著提高误判风险 + // - 未完成:内容可能是部分流/截断,启发式会显著提高误判风险 + // - Codex terminal success 后的非正常结束:协议已给出 terminal result,可安全做最终检查 // // 此处返回 `{isError:false}` 仅表示“跳过检测”,最终仍会在下面按中断/超时视为失败结算。 - const shouldDetectFake200 = streamEndedNormally && upstreamStatusCode === 200; + const shouldDetectFake200 = streamCompletedForFinalization && upstreamStatusCode === 200; const detected = shouldDetectFake200 ? detectUpstreamErrorFromSseOrJsonText(allContent) : ({ isError: false } as const); @@ -578,11 +779,14 @@ async function finalizeDeferredStreamingFinalizationIfNeeded( effectiveStatusCode = 502; } errorMessage = detected.detail ? `${detected.code}: ${detected.detail}` : detected.code; - } else if (!streamEndedNormally) { + } else if (codexTerminalNonSuccess) { + effectiveStatusCode = 502; + errorMessage = codexTerminalErrorMessage; + } else if (!streamCompletedForFinalization) { effectiveStatusCode = clientAborted ? 499 : 502; errorMessage = clientAborted ? "CLIENT_ABORTED" : (abortReason ?? "STREAM_ABORTED"); } else { - // streamEndedNormally=true + // streamEndedNormally=true, or Codex Responses emitted response.completed before a non-normal end. effectiveStatusCode = upstreamStatusCode; if (upstreamStatusCode >= 400) { @@ -596,7 +800,8 @@ async function finalizeDeferredStreamingFinalizationIfNeeded( } const shouldClearSessionBindingOnFailure = - !streamEndedNormally || + codexTerminalNonSuccess || + !streamCompletedForFinalization || detected.isError || (upstreamStatusCode >= 400 && errorMessage !== null); @@ -654,12 +859,57 @@ async function finalizeDeferredStreamingFinalizationIfNeeded( } } + if (codexTerminalNonSuccess) { + await clearSessionBinding(); + + logger.warn("[ResponseHandler] Codex Responses stream ended with terminal non-success", { + providerId: meta.providerId, + providerName: meta.providerName, + upstreamStatusCode: meta.upstreamStatusCode, + effectiveStatusCode, + terminalState: codexTerminalState, + errorMessage, + }); + + if (session.getEndpointPolicy().allowCircuitBreakerAccounting) { + try { + const { recordFailure } = await import("@/lib/circuit-breaker"); + await recordFailure(meta.providerId, new Error(errorMessage ?? "CODEX_RESPONSE_FAILED")); + } catch (cbError) { + logger.warn("[ResponseHandler] Failed to record Codex terminal failure", { + providerId: meta.providerId, + sessionId: session.sessionId ?? null, + error: cbError, + }); + } + } + + // NOTE: Do NOT call recordEndpointFailure here. A Codex terminal body-level + // failure means the endpoint returned an HTTP 2xx stream successfully. + session.addProviderToChain(providerForChain, { + endpointId: meta.endpointId, + endpointUrl: meta.endpointUrl, + reason: "retry_failed", + attemptNumber: meta.attemptNumber, + statusCode: effectiveStatusCode, + errorMessage: errorMessage ?? undefined, + }); + + return { + effectiveStatusCode, + errorMessage, + providerIdForPersistence, + isHedgeWinner, + billHedgeLosers, + }; + } + // 未自然结束:不更新 session 绑定(避免把会话粘到不稳定 provider),但要避免把它误记为 200 completed。 // // 同时,为了让故障转移/熔断能正确工作: // - 客户端主动中断:不计入熔断器(这通常不是供应商问题) // - 非客户端中断:计入 provider/endpoint 熔断失败(与 timeout 路径保持一致) - if (!streamEndedNormally) { + if (!streamCompletedForFinalization) { await clearSessionBinding(); if (!clientAborted && session.getEndpointPolicy().allowCircuitBreakerAccounting) { @@ -2229,14 +2479,32 @@ export class ProxyResponseHandler { // ⭐ 使用 TransformStream 包装流,以便在 idle timeout 时能关闭客户端流 // 这解决了 tee() 后 internalStream abort 不影响 clientStream 的问题 let streamController: TransformStreamDefaultController | null = null; + const deferredFinalizationMeta = peekDeferredStreamingFinalization(session); + const forwardedCodexTerminalTracker = createCodexResponsesTerminalStateTracker(); + const forwardedCodexTerminalDecoder = new TextDecoder(); + const shouldTrackForwardedCodexTerminalState = + (deferredFinalizationMeta?.providerType ?? provider.providerType) === "codex"; const controllableStream = processedStream.pipeThrough( new TransformStream({ start(controller) { streamController = controller; // 保存 controller 引用 }, transform(chunk, controller) { + if (shouldTrackForwardedCodexTerminalState) { + forwardedCodexTerminalTracker.push( + forwardedCodexTerminalDecoder.decode(chunk, { stream: true }) + ); + } controller.enqueue(chunk); // 透传数据 }, + flush() { + if (shouldTrackForwardedCodexTerminalState) { + const flushed = forwardedCodexTerminalDecoder.decode(); + if (flushed) { + forwardedCodexTerminalTracker.push(flushed); + } + } + }, }) ); @@ -2359,13 +2627,18 @@ export class ProxyResponseHandler { clientAborted: boolean, abortReason?: string ): Promise => { + if (shouldTrackForwardedCodexTerminalState) { + forwardedCodexTerminalTracker.finalize(); + } + const finalized = await finalizeDeferredStreamingFinalizationIfNeeded( session, allContent, statusCode, streamEndedNormally, clientAborted, - abortReason + abortReason, + forwardedCodexTerminalTracker.getTerminalState() ); const effectiveStatusCode = finalized.effectiveStatusCode; const streamErrorMessage = finalized.errorMessage; @@ -2410,9 +2683,15 @@ export class ProxyResponseHandler { tracker.endRequest(messageContext.user.id, messageContext.id); const usageResult = parseUsageFromResponseText(allContent, provider.providerType); - usageForCost = usageResult.usageMetrics; - - const actualServiceTier = parseServiceTierFromResponseText(allContent); + usageForCost = + (shouldTrackForwardedCodexTerminalState + ? forwardedCodexTerminalTracker.getUsageMetrics() + : null) ?? usageResult.usageMetrics; + + const actualServiceTier = + (shouldTrackForwardedCodexTerminalState + ? forwardedCodexTerminalTracker.getServiceTier() + : null) ?? parseServiceTierFromResponseText(allContent); const codexPriorityBillingDecision = await resolveCodexPriorityBillingDecision( session, actualServiceTier @@ -2433,27 +2712,37 @@ export class ProxyResponseHandler { maybeSetCodexContext1m(session, provider, usageForCost?.input_tokens); // Codex: Extract prompt_cache_key from SSE events and update session binding - if (provider.providerType === "codex" && session.sessionId && provider.id) { + const codexSessionProviderId = providerIdForPersistence ?? provider.id; + if ( + shouldTrackForwardedCodexTerminalState && + effectiveStatusCode >= 200 && + effectiveStatusCode < 300 && + session.sessionId && + codexSessionProviderId != null + ) { try { - const sseEvents = parseSSEData(allContent); - for (const event of sseEvents) { - if (typeof event.data === "object" && event.data) { - const promptCacheKey = SessionManager.extractCodexPromptCacheKey( - event.data as Record - ); - if (promptCacheKey) { - void SessionManager.updateSessionWithCodexCacheKey( - session.sessionId, - promptCacheKey, - provider.id, - session.authState?.key?.id ?? session.messageContext?.key?.id ?? null - ).catch((err) => { - logger.error("[ResponseHandler] Failed to update Codex session (stream):", err); - }); - break; // Only need first prompt_cache_key + let promptCacheKey: string | null = forwardedCodexTerminalTracker.getPromptCacheKey(); + if (!promptCacheKey) { + const sseEvents = parseSSEData(allContent); + for (const event of sseEvents) { + if (typeof event.data === "object" && event.data) { + promptCacheKey = SessionManager.extractCodexPromptCacheKey( + event.data as Record + ); + if (promptCacheKey) break; } } } + if (promptCacheKey) { + void SessionManager.updateSessionWithCodexCacheKey( + session.sessionId, + promptCacheKey, + codexSessionProviderId, + session.authState?.key?.id ?? session.messageContext?.key?.id ?? null + ).catch((err) => { + logger.error("[ResponseHandler] Failed to update Codex session (stream):", err); + }); + } } catch (parseError) { logger.trace("[ResponseHandler] Failed to parse SSE for Codex session:", parseError); } diff --git a/src/app/v1/_lib/proxy/stream-finalization.ts b/src/app/v1/_lib/proxy/stream-finalization.ts index 0f989bb3b..03881b10c 100644 --- a/src/app/v1/_lib/proxy/stream-finalization.ts +++ b/src/app/v1/_lib/proxy/stream-finalization.ts @@ -1,3 +1,4 @@ +import type { Provider } from "@/types/provider"; import type { ProxySession } from "./session"; /** @@ -19,6 +20,7 @@ import type { ProxySession } from "./session"; export type DeferredStreamingFinalization = { providerId: number; providerName: string; + providerType?: Provider["providerType"]; providerPriority: number; attemptNumber: number; totalProvidersAttempted: number; @@ -44,7 +46,8 @@ export function setDeferredStreamingFinalization( meta: DeferredStreamingFinalization ): void { // Forwarder 在识别到 SSE 时调用:标记该请求需要在流结束后“二次结算”。 - deferredMeta.set(session, meta); + const providerType = meta.providerType ?? session.provider?.providerType; + deferredMeta.set(session, providerType ? { ...meta, providerType } : meta); } /** diff --git a/tests/unit/proxy/response-handler-abort-listener-cleanup.test.ts b/tests/unit/proxy/response-handler-abort-listener-cleanup.test.ts index 350c33d06..7384c62ff 100644 --- a/tests/unit/proxy/response-handler-abort-listener-cleanup.test.ts +++ b/tests/unit/proxy/response-handler-abort-listener-cleanup.test.ts @@ -1,7 +1,15 @@ import { beforeEach, describe, expect, it, vi } from "vitest"; import { resolveEndpointPolicy } from "@/app/v1/_lib/proxy/endpoint-policy"; -import { ProxyResponseHandler } from "@/app/v1/_lib/proxy/response-handler"; +import { + __codexResponsesTerminalStateTrackerForTests, + ProxyResponseHandler, +} from "@/app/v1/_lib/proxy/response-handler"; import type { ProxySession } from "@/app/v1/_lib/proxy/session"; +import { setDeferredStreamingFinalization } from "@/app/v1/_lib/proxy/stream-finalization"; +import { recordFailure } from "@/lib/circuit-breaker"; +import { recordEndpointSuccess } from "@/lib/endpoint-circuit-breaker"; +import { SessionManager } from "@/lib/session-manager"; +import { updateMessageRequestDetails } from "@/repository/message"; import type { Provider } from "@/types/provider"; const testState = vi.hoisted(() => ({ @@ -73,6 +81,25 @@ vi.mock("@/lib/session-manager", () => ({ storeSessionRequestHeaders: vi.fn(), storeSessionResponseHeaders: vi.fn(), storeSessionUpstreamResponseMeta: vi.fn(), + extractCodexPromptCacheKey: vi.fn((responseData: Record) => { + const response = responseData.response as Record | undefined; + if (typeof response?.prompt_cache_key === "string" && response.prompt_cache_key.length > 0) { + return response.prompt_cache_key; + } + return typeof responseData.prompt_cache_key === "string" && + responseData.prompt_cache_key.length > 0 + ? responseData.prompt_cache_key + : null; + }), + updateSessionWithCodexCacheKey: vi.fn().mockResolvedValue({ + sessionId: "codex-cache-session", + updated: true, + }), + updateSessionBindingSmart: vi.fn().mockResolvedValue({ + updated: false, + reason: "test", + details: {}, + }), }, })); @@ -125,9 +152,20 @@ function makeProvider(overrides: Partial = {}): Provider { } as Provider; } -function makeSession(clientAbortSignal: AbortSignal | null, stream: boolean): ProxySession { - const endpointPolicy = resolveEndpointPolicy("/v1/chat/completions"); - const provider = makeProvider(); +function makeSession( + clientAbortSignal: AbortSignal | null, + stream: boolean, + overrides: { + provider?: Partial; + pathname?: string; + originalFormat?: string; + providerType?: string; + } = {} +): ProxySession { + const pathname = overrides.pathname ?? "/v1/chat/completions"; + const endpointPolicy = resolveEndpointPolicy(pathname); + const provider = makeProvider(overrides.provider); + const specialSettings: unknown[] = []; const session = { request: { model: "gpt-5.5", @@ -140,7 +178,7 @@ function makeSession(clientAbortSignal: AbortSignal | null, stream: boolean): Pr }, startTime: Date.now(), method: "POST", - requestUrl: new URL("http://localhost/v1/chat/completions"), + requestUrl: new URL(`http://localhost${pathname}`), headers: new Headers(), headerLog: "", userAgent: null, @@ -165,25 +203,30 @@ function makeSession(clientAbortSignal: AbortSignal | null, stream: boolean): Pr }, sessionId: null, requestSequence: 1, - originalFormat: "openai", - providerType: "openai", + originalFormat: overrides.originalFormat ?? "openai", + providerType: overrides.providerType ?? provider.providerType, originalModelName: "gpt-5.5", - originalUrlPathname: "/v1/chat/completions", + originalUrlPathname: pathname, providerChain: [], cacheTtlResolved: null, context1mApplied: false, - specialSettings: [], + specialSettings, cachedPriceData: undefined, cachedBillingModelSource: undefined, endpointPolicy, isHeaderModified: () => false, getEndpointPolicy: () => endpointPolicy, + getEndpoint: () => pathname, getContext1mApplied: () => false, getGroupCostMultiplier: () => 1, getOriginalModel: () => "gpt-5.5", getCurrentModel: () => "gpt-5.5", getProviderChain: () => [], - getSpecialSettings: () => [], + getSpecialSettings: () => specialSettings, + addSpecialSetting: (setting: unknown) => { + specialSettings.push(setting); + }, + getCodexPriorityBillingSource: async () => "requested", shouldPersistSessionDebugArtifacts: () => false, shouldTrackSessionObservability: () => false, getResolvedPricingByBillingSource: async () => null, @@ -197,12 +240,92 @@ function makeSession(clientAbortSignal: AbortSignal | null, stream: boolean): Pr return session as unknown as ProxySession; } +function setDeferredMeta(session: ProxySession, providerType?: Provider["providerType"]): void { + const meta = { + providerId: 99, + providerName: "test-provider", + providerPriority: 1, + attemptNumber: 1, + totalProvidersAttempted: 1, + isFirstAttempt: true, + isFailoverSuccess: false, + endpointId: 42, + endpointUrl: "https://api.test.invalid/v1/responses", + upstreamStatusCode: 200, + }; + setDeferredStreamingFinalization(session, providerType ? { ...meta, providerType } : meta); +} + +function makeCodexResponsesSession(clientAbortSignal: AbortSignal | null = null): ProxySession { + return makeSession(clientAbortSignal, true, { + provider: { providerType: "codex" }, + pathname: "/v1/responses", + originalFormat: "response", + providerType: "codex", + }); +} + +function createClosedStreamResponse(lines: string[]): Response { + return new Response(lines.join("\n"), { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); +} + +function createHangingStream(firstChunk: string): { + stream: ReadableStream; + controller: AbortController; + enqueue: (chunk: string) => void; + error: (error: unknown) => void; +} { + const encoder = new TextEncoder(); + let controllerRef: ReadableStreamDefaultController | null = null; + const abortController = new AbortController(); + + return { + stream: new ReadableStream({ + start(controller) { + controllerRef = controller; + controller.enqueue(encoder.encode(firstChunk)); + abortController.signal.addEventListener( + "abort", + () => { + controller.error(Object.assign(new Error("aborted"), { name: "AbortError" })); + }, + { once: true } + ); + }, + }), + controller: abortController, + enqueue(chunk: string) { + controllerRef?.enqueue(encoder.encode(chunk)); + }, + error(error: unknown) { + controllerRef?.error(error); + }, + }; +} + +async function readFirstClientChunk(response: Response): Promise { + const reader = response.body?.getReader(); + if (!reader) return ""; + const { value } = await reader.read(); + reader.releaseLock(); + return value ? new TextDecoder().decode(value) : ""; +} + describe("ProxyResponseHandler client abort listener cleanup", () => { beforeEach(() => { testState.asyncTasks = []; + vi.clearAllMocks(); testState.cancelTask.mockClear(); testState.cleanupTask.mockClear(); vi.restoreAllMocks(); + vi.mocked(SessionManager.updateSessionBindingSmart).mockResolvedValue({ + updated: false, + reason: "test", + details: {}, + }); }); it("removes non-stream client abort listener after response processing completes", async () => { @@ -280,4 +403,1088 @@ describe("ProxyResponseHandler client abort listener cleanup", () => { expect(removeSpy.mock.calls.filter(([type]) => type === "abort")).toHaveLength(0); expect(testState.cancelTask).toHaveBeenCalled(); }); + + it("records normally closed Codex response.completed as successful", async () => { + const session = makeCodexResponsesSession(); + setDeferredMeta(session); + const upstreamResponse = createClosedStreamResponse([ + "event: response.completed", + 'data: {"type":"response.completed","response":{"id":"resp_1","status":"completed"}}', + "", + "", + ]); + + const response = await ProxyResponseHandler.dispatch(session, upstreamResponse); + await expect(response.text()).resolves.toContain("response.completed"); + await drainAsyncTasks(); + + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 123, + expect.objectContaining({ + statusCode: 200, + }) + ); + expect(recordFailure).not.toHaveBeenCalled(); + expect(recordEndpointSuccess).toHaveBeenCalledWith(42); + expect(session.addProviderToChain).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + reason: "request_success", + statusCode: 200, + }) + ); + }); + + it("tracks Codex terminal usage with last-terminal-event semantics", () => { + const tracker = __codexResponsesTerminalStateTrackerForTests.create(); + + tracker.push( + [ + "event: response.completed", + 'data: {"type":"response.completed","response":{"status":"completed","service_tier":"priority","usage":{"input_tokens":12,"output_tokens":4}}}', + "", + "", + ].join("\n") + ); + + expect(tracker.getTerminalState()).toBe("completed"); + expect(tracker.getUsageMetrics()).toEqual({ + input_tokens: 12, + output_tokens: 4, + }); + expect(tracker.getServiceTier()).toBe("priority"); + + tracker.push( + [ + "event: response.failed", + 'data: {"type":"response.failed","response":{"status":"failed"}}', + "", + "", + ].join("\n") + ); + + expect(tracker.getTerminalState()).toBe("failed"); + expect(tracker.getUsageMetrics()).toBeNull(); + expect(tracker.getServiceTier()).toBeNull(); + }); + + it("records normally closed Codex response.failed as terminal failure", async () => { + const session = makeCodexResponsesSession(); + (session as ProxySession & { sessionId: string }).sessionId = "codex-session"; + setDeferredMeta(session); + const upstreamResponse = createClosedStreamResponse([ + "event: response.failed", + 'data: {"type":"response.failed","response":{"id":"resp_1","status":"failed"}}', + "", + "", + ]); + + const response = await ProxyResponseHandler.dispatch(session, upstreamResponse); + await expect(response.text()).resolves.toContain("response.failed"); + await drainAsyncTasks(); + + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 123, + expect.objectContaining({ + statusCode: 502, + errorMessage: "CODEX_RESPONSE_FAILED", + }) + ); + expect(SessionManager.clearSessionProvider).toHaveBeenCalledWith("codex-session"); + expect(recordFailure).toHaveBeenCalledWith( + 99, + expect.objectContaining({ message: "CODEX_RESPONSE_FAILED" }) + ); + expect(recordEndpointSuccess).not.toHaveBeenCalled(); + expect(session.addProviderToChain).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + reason: "retry_failed", + statusCode: 502, + errorMessage: "CODEX_RESPONSE_FAILED", + }) + ); + }); + + it("records normally closed Codex response.error as terminal failure", async () => { + const session = makeCodexResponsesSession(); + setDeferredMeta(session); + const upstreamResponse = createClosedStreamResponse([ + "event: response.error", + 'data: {"type":"response.error","error":{"message":"boom"}}', + "", + "", + ]); + + const response = await ProxyResponseHandler.dispatch(session, upstreamResponse); + await expect(response.text()).resolves.toContain("response.error"); + await drainAsyncTasks(); + + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 123, + expect.objectContaining({ + statusCode: 502, + errorMessage: "CODEX_RESPONSE_ERROR", + }) + ); + expect(recordFailure).toHaveBeenCalledWith( + 99, + expect.objectContaining({ message: "CODEX_RESPONSE_ERROR" }) + ); + expect(recordEndpointSuccess).not.toHaveBeenCalled(); + }); + + it("lets a normally closed plain-text error event override response.completed", async () => { + const session = makeCodexResponsesSession(); + setDeferredMeta(session); + const upstreamResponse = createClosedStreamResponse([ + "event: response.completed", + 'data: {"type":"response.completed","response":{"id":"resp_1","status":"completed"}}', + "", + "event: error", + "data: upstream connection closed", + "", + "", + ]); + + const response = await ProxyResponseHandler.dispatch(session, upstreamResponse); + await expect(response.text()).resolves.toContain("response.completed"); + await drainAsyncTasks(); + + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 123, + expect.objectContaining({ + statusCode: 502, + errorMessage: "CODEX_RESPONSE_ERROR", + }) + ); + expect(recordFailure).toHaveBeenCalledWith( + 99, + expect.objectContaining({ message: "CODEX_RESPONSE_ERROR" }) + ); + expect(recordEndpointSuccess).not.toHaveBeenCalled(); + }); + + it("records normally closed Codex response.incomplete as successful terminal result", async () => { + const session = makeCodexResponsesSession(); + (session as ProxySession & { sessionId: string }).sessionId = "codex-session"; + setDeferredMeta(session); + const upstreamResponse = createClosedStreamResponse([ + "event: response.incomplete", + 'data: {"type":"response.incomplete","response":{"id":"resp_1","status":"incomplete"}}', + "", + "", + ]); + + const response = await ProxyResponseHandler.dispatch(session, upstreamResponse); + await expect(response.text()).resolves.toContain("response.incomplete"); + await drainAsyncTasks(); + + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 123, + expect.objectContaining({ + statusCode: 200, + }) + ); + expect(SessionManager.clearSessionProvider).not.toHaveBeenCalledWith("codex-session"); + expect(recordFailure).not.toHaveBeenCalled(); + expect(recordEndpointSuccess).toHaveBeenCalledWith(42); + expect(session.addProviderToChain).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + reason: "request_success", + statusCode: 200, + }) + ); + }); + + it("keeps fake-200 detection after normally closed Codex response.completed", async () => { + const session = makeCodexResponsesSession(); + setDeferredMeta(session); + const upstreamResponse = createClosedStreamResponse([ + "event: response.completed", + 'data: {"type":"response.completed","response":{"id":"resp_1","status":"completed"}}', + "", + 'data: {"error":{"message":"invalid api key"}}', + "", + "", + ]); + + const response = await ProxyResponseHandler.dispatch(session, upstreamResponse); + await expect(response.text()).resolves.toContain("response.completed"); + await drainAsyncTasks(); + + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 123, + expect.objectContaining({ + statusCode: 401, + errorMessage: expect.stringContaining("FAKE_200_JSON_ERROR_MESSAGE_NON_EMPTY"), + }) + ); + expect(recordFailure).toHaveBeenCalledWith( + 99, + expect.objectContaining({ message: "FAKE_200_JSON_ERROR_MESSAGE_NON_EMPTY" }) + ); + expect(recordEndpointSuccess).not.toHaveBeenCalled(); + }); + + it("does not apply Codex terminal-state rules to normally closed non-Codex streams", async () => { + const session = makeSession(null, true, { + provider: { providerType: "openai-compatible" }, + pathname: "/v1/responses", + originalFormat: "response", + providerType: "openai", + }); + setDeferredMeta(session); + const upstreamResponse = createClosedStreamResponse([ + "event: response.failed", + 'data: {"type":"response.failed","response":{"id":"resp_1","status":"failed"}}', + "", + "", + ]); + + const response = await ProxyResponseHandler.dispatch(session, upstreamResponse); + await expect(response.text()).resolves.toContain("response.failed"); + await drainAsyncTasks(); + + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 123, + expect.objectContaining({ + statusCode: 200, + }) + ); + expect(recordFailure).not.toHaveBeenCalled(); + expect(recordEndpointSuccess).toHaveBeenCalledWith(42); + }); + + it("records Codex Responses stream as successful when client aborts after response.completed", async () => { + const controller = new AbortController(); + const session = makeSession(controller.signal, true, { + provider: { providerType: "codex" }, + pathname: "/v1/responses", + originalFormat: "response", + providerType: "codex", + }); + ( + session as ProxySession & { + getCodexPriorityBillingSource: () => Promise<"actual">; + } + ).getCodexPriorityBillingSource = async () => "actual"; + setDeferredMeta(session); + const upstream = createHangingStream( + [ + "event: response.completed", + 'data: {"type":"response.completed","response":{"id":"resp_1","status":"completed","service_tier":"priority","usage":{"input_tokens":12,"output_tokens":4}}}', + "", + "", + ].join("\n") + ); + const upstreamResponse = new Response(upstream.stream, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); + + const response = await ProxyResponseHandler.dispatch(session, upstreamResponse); + await expect(readFirstClientChunk(response)).resolves.toContain("response.completed"); + + controller.abort(new Error("client_closed_after_completed")); + upstream.enqueue(":\n\n"); + await drainAsyncTasks(); + + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 123, + expect.objectContaining({ + statusCode: 200, + inputTokens: 12, + outputTokens: 4, + specialSettings: expect.arrayContaining([ + expect.objectContaining({ + type: "codex_service_tier_result", + actualServiceTier: "priority", + resolvedFrom: "actual", + effectivePriority: true, + }), + ]), + }) + ); + expect(updateMessageRequestDetails).not.toHaveBeenCalledWith( + 123, + expect.objectContaining({ + errorMessage: "CLIENT_ABORTED", + }) + ); + expect(session.addProviderToChain).not.toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + reason: "system_error", + errorMessage: "CLIENT_ABORTED", + }) + ); + }); + + it("preserves Codex prompt_cache_key when client aborts after response.completed", async () => { + const controller = new AbortController(); + const session = makeCodexResponsesSession(controller.signal); + (session as ProxySession & { sessionId: string }).sessionId = "codex-session"; + setDeferredMeta(session); + const upstream = createHangingStream( + [ + "event: response.completed", + 'data: {"type":"response.completed","response":{"id":"resp_1","status":"completed","prompt_cache_key":"cache-key-1","usage":{"input_tokens":12,"output_tokens":4}}}', + "", + "", + ].join("\n") + ); + const upstreamResponse = new Response(upstream.stream, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); + + const response = await ProxyResponseHandler.dispatch(session, upstreamResponse); + await expect(readFirstClientChunk(response)).resolves.toContain("response.completed"); + + controller.abort(new Error("client_closed_after_completed")); + upstream.enqueue(":\n\n"); + await drainAsyncTasks(); + + expect(SessionManager.updateSessionWithCodexCacheKey).toHaveBeenCalledWith( + "codex-session", + "cache-key-1", + 99, + 2 + ); + }); + + it("keeps terminal failure when Codex Responses stream aborts after response.failed", async () => { + const controller = new AbortController(); + const session = makeSession(controller.signal, true, { + provider: { providerType: "codex" }, + pathname: "/v1/responses", + originalFormat: "response", + providerType: "codex", + }); + setDeferredMeta(session); + const upstream = createHangingStream( + [ + "event: response.failed", + 'data: {"type":"response.failed","response":{"id":"resp_1","status":"failed"}}', + "", + "", + ].join("\n") + ); + const upstreamResponse = new Response(upstream.stream, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); + + const response = await ProxyResponseHandler.dispatch(session, upstreamResponse); + await expect(readFirstClientChunk(response)).resolves.toContain("response.failed"); + + controller.abort(new Error("client_closed_after_failed")); + upstream.enqueue(":\n\n"); + await drainAsyncTasks(); + + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 123, + expect.objectContaining({ + statusCode: 502, + errorMessage: "CODEX_RESPONSE_FAILED", + }) + ); + expect(session.addProviderToChain).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + reason: "retry_failed", + statusCode: 502, + errorMessage: "CODEX_RESPONSE_FAILED", + }) + ); + }); + + it("does not update Codex prompt_cache_key after terminal failure", async () => { + const controller = new AbortController(); + const session = makeCodexResponsesSession(controller.signal); + (session as ProxySession & { sessionId: string }).sessionId = "codex-session"; + setDeferredMeta(session); + const upstream = createHangingStream( + [ + "event: response.failed", + 'data: {"type":"response.failed","response":{"id":"resp_1","status":"failed","prompt_cache_key":"cache-key-1"}}', + "", + "", + ].join("\n") + ); + const upstreamResponse = new Response(upstream.stream, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); + + const response = await ProxyResponseHandler.dispatch(session, upstreamResponse); + await expect(readFirstClientChunk(response)).resolves.toContain("response.failed"); + + controller.abort(new Error("client_closed_after_failed")); + upstream.enqueue(":\n\n"); + await drainAsyncTasks(); + + expect(SessionManager.updateSessionWithCodexCacheKey).not.toHaveBeenCalled(); + }); + + it("keeps 499 CLIENT_ABORTED when Codex Responses stream aborts before response.completed", async () => { + const controller = new AbortController(); + const session = makeSession(controller.signal, true, { + provider: { providerType: "codex" }, + pathname: "/v1/responses", + originalFormat: "response", + providerType: "codex", + }); + setDeferredMeta(session); + const upstream = createHangingStream( + [ + "event: response.output_text.delta", + 'data: {"type":"response.output_text.delta","delta":"partial"}', + "", + "", + ].join("\n") + ); + const upstreamResponse = new Response(upstream.stream, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); + + const response = await ProxyResponseHandler.dispatch(session, upstreamResponse); + await expect(readFirstClientChunk(response)).resolves.toContain("partial"); + + controller.abort(new Error("client_closed_before_completed")); + upstream.enqueue(":\n\n"); + await drainAsyncTasks(); + + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 123, + expect.objectContaining({ + statusCode: 499, + errorMessage: "CLIENT_ABORTED", + }) + ); + expect(session.addProviderToChain).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + reason: "system_error", + statusCode: 499, + errorMessage: "CLIENT_ABORTED", + }) + ); + }); + + it("records Codex terminal response.completed as successful on non-execution response paths", async () => { + const controller = new AbortController(); + const session = makeSession(controller.signal, true, { + provider: { providerType: "codex" }, + pathname: "/v1/responses/resp_123", + originalFormat: "response", + providerType: "codex", + }); + setDeferredMeta(session); + const upstream = createHangingStream( + [ + "event: response.completed", + 'data: {"type":"response.completed","response":{"id":"resp_1","status":"completed"}}', + "", + "", + ].join("\n") + ); + const upstreamResponse = new Response(upstream.stream, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); + + const response = await ProxyResponseHandler.dispatch(session, upstreamResponse); + await expect(readFirstClientChunk(response)).resolves.toContain("response.completed"); + + controller.abort(new Error("client_closed_after_completed_non_responses")); + upstream.enqueue(":\n\n"); + await drainAsyncTasks(); + + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 123, + expect.objectContaining({ + statusCode: 200, + }) + ); + expect(updateMessageRequestDetails).not.toHaveBeenCalledWith( + 123, + expect.objectContaining({ + errorMessage: "CLIENT_ABORTED", + }) + ); + expect(session.addProviderToChain).not.toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + reason: "system_error", + errorMessage: "CLIENT_ABORTED", + }) + ); + }); + + it("records data-only Codex response.completed SSE as successful after client abort", async () => { + const controller = new AbortController(); + const session = makeSession(controller.signal, true, { + provider: { providerType: "codex" }, + pathname: "/v1/responses", + originalFormat: "response", + providerType: "codex", + }); + setDeferredMeta(session); + const upstream = createHangingStream( + [ + 'data: {"type":"response.completed","response":{"id":"resp_1","status":"completed"}}', + "", + "", + ].join("\n") + ); + const upstreamResponse = new Response(upstream.stream, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); + + const response = await ProxyResponseHandler.dispatch(session, upstreamResponse); + await expect(readFirstClientChunk(response)).resolves.toContain("response.completed"); + + controller.abort(new Error("client_closed_after_completed_response_resource")); + upstream.enqueue(":\n\n"); + await drainAsyncTasks(); + + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 123, + expect.objectContaining({ + statusCode: 200, + }) + ); + expect(updateMessageRequestDetails).not.toHaveBeenCalledWith( + 123, + expect.objectContaining({ + errorMessage: "CLIENT_ABORTED", + }) + ); + expect(session.addProviderToChain).not.toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + reason: "system_error", + errorMessage: "CLIENT_ABORTED", + }) + ); + }); + + it("uses deferred meta providerType when current session provider no longer looks like Codex", async () => { + const controller = new AbortController(); + const session = makeSession(controller.signal, true, { + provider: { providerType: "openai-compatible" }, + pathname: "/v1/responses", + originalFormat: "response", + providerType: "openai", + }); + setDeferredMeta(session, "codex"); + const upstream = createHangingStream( + [ + 'data: {"type":"response.completed","response":{"id":"resp_1","status":"completed","usage":{"input_tokens":12,"output_tokens":4}}}', + "", + "", + ].join("\n") + ); + const upstreamResponse = new Response(upstream.stream, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); + + const response = await ProxyResponseHandler.dispatch(session, upstreamResponse); + await expect(readFirstClientChunk(response)).resolves.toContain("response.completed"); + + controller.abort(new Error("client_closed_after_completed_meta_codex")); + upstream.enqueue(":\n\n"); + await drainAsyncTasks(); + + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 123, + expect.objectContaining({ + statusCode: 200, + inputTokens: 12, + outputTokens: 4, + }) + ); + expect(updateMessageRequestDetails).not.toHaveBeenCalledWith( + 123, + expect.objectContaining({ + errorMessage: "CLIENT_ABORTED", + }) + ); + expect(session.addProviderToChain).not.toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + reason: "system_error", + errorMessage: "CLIENT_ABORTED", + }) + ); + }); + + it("does not use current session providerType to mark non-Codex meta as completed", async () => { + const controller = new AbortController(); + const session = makeSession(controller.signal, true, { + provider: { providerType: "codex" }, + pathname: "/v1/responses", + originalFormat: "response", + providerType: "codex", + }); + setDeferredMeta(session, "openai-compatible"); + const upstream = createHangingStream( + [ + "event: response.completed", + 'data: {"type":"response.completed","response":{"id":"resp_1","status":"completed"}}', + "", + "", + ].join("\n") + ); + const upstreamResponse = new Response(upstream.stream, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); + + const response = await ProxyResponseHandler.dispatch(session, upstreamResponse); + await expect(readFirstClientChunk(response)).resolves.toContain("response.completed"); + + controller.abort(new Error("client_closed_after_completed_meta_non_codex")); + upstream.enqueue(":\n\n"); + await drainAsyncTasks(); + + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 123, + expect.objectContaining({ + statusCode: 499, + errorMessage: "CLIENT_ABORTED", + }) + ); + expect(session.addProviderToChain).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + reason: "system_error", + statusCode: 499, + errorMessage: "CLIENT_ABORTED", + }) + ); + }); + + it("keeps 499 CLIENT_ABORTED for non-Codex streams after response.completed", async () => { + const controller = new AbortController(); + const session = makeSession(controller.signal, true, { + provider: { providerType: "openai-compatible" }, + pathname: "/v1/responses", + originalFormat: "response", + providerType: "openai", + }); + setDeferredMeta(session); + const upstream = createHangingStream( + [ + "event: response.completed", + 'data: {"type":"response.completed","response":{"id":"resp_1","status":"completed"}}', + "", + "", + ].join("\n") + ); + const upstreamResponse = new Response(upstream.stream, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); + + const response = await ProxyResponseHandler.dispatch(session, upstreamResponse); + await expect(readFirstClientChunk(response)).resolves.toContain("response.completed"); + + controller.abort(new Error("client_closed_after_completed_non_codex")); + upstream.enqueue(":\n\n"); + await drainAsyncTasks(); + + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 123, + expect.objectContaining({ + statusCode: 499, + errorMessage: "CLIENT_ABORTED", + }) + ); + expect(session.addProviderToChain).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + reason: "system_error", + statusCode: 499, + errorMessage: "CLIENT_ABORTED", + }) + ); + }); + + it("records Codex response.incomplete as successful when client aborts after terminal result", async () => { + const controller = new AbortController(); + const session = makeSession(controller.signal, true, { + provider: { providerType: "codex" }, + pathname: "/v1/responses", + originalFormat: "response", + providerType: "codex", + }); + setDeferredMeta(session); + const upstream = createHangingStream( + [ + "event: response.incomplete", + 'data: {"type":"response.incomplete","response":{"id":"resp_1","status":"incomplete"}}', + "", + "", + ].join("\n") + ); + const upstreamResponse = new Response(upstream.stream, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); + + const response = await ProxyResponseHandler.dispatch(session, upstreamResponse); + await expect(readFirstClientChunk(response)).resolves.toContain("response.incomplete"); + + controller.abort(new Error("client_closed_after_incomplete")); + upstream.enqueue(":\n\n"); + await drainAsyncTasks(); + + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 123, + expect.objectContaining({ + statusCode: 200, + }) + ); + expect(session.addProviderToChain).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + reason: "request_success", + statusCode: 200, + }) + ); + expect(updateMessageRequestDetails).not.toHaveBeenCalledWith( + 123, + expect.objectContaining({ + errorMessage: "CLIENT_ABORTED", + }) + ); + }); + + it("keeps terminal failure when a failure follows response.completed", async () => { + const controller = new AbortController(); + const session = makeSession(controller.signal, true, { + provider: { providerType: "codex" }, + pathname: "/v1/responses", + originalFormat: "response", + providerType: "codex", + }); + setDeferredMeta(session); + const upstream = createHangingStream( + [ + "event: response.completed", + 'data: {"type":"response.completed","response":{"id":"resp_1","status":"completed"}}', + "", + "event: response.failed", + 'data: {"type":"response.failed","response":{"id":"resp_1","status":"failed"}}', + "", + "", + ].join("\n") + ); + const upstreamResponse = new Response(upstream.stream, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); + + const response = await ProxyResponseHandler.dispatch(session, upstreamResponse); + await expect(readFirstClientChunk(response)).resolves.toContain("response.completed"); + + controller.abort(new Error("client_closed_after_terminal_failure")); + upstream.enqueue(":\n\n"); + await drainAsyncTasks(); + + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 123, + expect.objectContaining({ + statusCode: 502, + errorMessage: "CODEX_RESPONSE_FAILED", + }) + ); + expect(session.addProviderToChain).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + reason: "retry_failed", + statusCode: 502, + errorMessage: "CODEX_RESPONSE_FAILED", + }) + ); + }); + + it("keeps terminal error when Codex stream aborts after response.error", async () => { + const controller = new AbortController(); + const session = makeSession(controller.signal, true, { + provider: { providerType: "codex" }, + pathname: "/v1/responses", + originalFormat: "response", + providerType: "codex", + }); + setDeferredMeta(session); + const upstream = createHangingStream( + [ + "event: response.error", + 'data: {"type":"response.error","error":{"message":"boom"}}', + "", + "", + ].join("\n") + ); + const upstreamResponse = new Response(upstream.stream, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); + + const response = await ProxyResponseHandler.dispatch(session, upstreamResponse); + await expect(readFirstClientChunk(response)).resolves.toContain("response.error"); + + controller.abort(new Error("client_closed_after_response_error")); + upstream.enqueue(":\n\n"); + await drainAsyncTasks(); + + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 123, + expect.objectContaining({ + statusCode: 502, + errorMessage: "CODEX_RESPONSE_ERROR", + }) + ); + expect(session.addProviderToChain).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + reason: "retry_failed", + statusCode: 502, + errorMessage: "CODEX_RESPONSE_ERROR", + }) + ); + }); + + it("keeps terminal error when a plain-text error event follows response.completed", async () => { + const controller = new AbortController(); + const session = makeSession(controller.signal, true, { + provider: { providerType: "codex" }, + pathname: "/v1/responses", + originalFormat: "response", + providerType: "codex", + }); + setDeferredMeta(session); + const upstream = createHangingStream( + [ + "event: response.completed", + 'data: {"type":"response.completed","response":{"id":"resp_1","status":"completed"}}', + "", + "event: error", + "data: upstream connection closed", + "", + "", + ].join("\n") + ); + const upstreamResponse = new Response(upstream.stream, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); + + const response = await ProxyResponseHandler.dispatch(session, upstreamResponse); + await expect(readFirstClientChunk(response)).resolves.toContain("response.completed"); + + controller.abort(new Error("client_closed_after_plain_text_error")); + upstream.enqueue(":\n\n"); + await drainAsyncTasks(); + + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 123, + expect.objectContaining({ + statusCode: 502, + errorMessage: "CODEX_RESPONSE_ERROR", + }) + ); + expect(session.addProviderToChain).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + reason: "retry_failed", + statusCode: 502, + errorMessage: "CODEX_RESPONSE_ERROR", + }) + ); + }); + + it("records Codex response.completed as successful when upstream aborts after terminal success", async () => { + const session = makeSession(null, true, { + provider: { providerType: "codex" }, + pathname: "/v1/responses", + originalFormat: "response", + providerType: "codex", + }); + setDeferredMeta(session); + const upstream = createHangingStream( + [ + "event: response.completed", + 'data: {"type":"response.completed","response":{"id":"resp_1","status":"completed"}}', + "", + "", + ].join("\n") + ); + const upstreamResponse = new Response(upstream.stream, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); + + const response = await ProxyResponseHandler.dispatch(session, upstreamResponse); + await expect(readFirstClientChunk(response)).resolves.toContain("response.completed"); + + upstream.error(new Error("upstream_closed_after_completed")); + await drainAsyncTasks(); + + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 123, + expect.objectContaining({ + statusCode: 200, + }) + ); + expect(updateMessageRequestDetails).not.toHaveBeenCalledWith( + 123, + expect.objectContaining({ + errorMessage: "STREAM_PROCESSING_ERROR", + }) + ); + expect(session.addProviderToChain).not.toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + reason: "system_error", + errorMessage: "STREAM_PROCESSING_ERROR", + }) + ); + }); + + it("records Codex Responses multiline completed event as successful after client abort", async () => { + const controller = new AbortController(); + const session = makeSession(controller.signal, true, { + provider: { providerType: "codex" }, + pathname: "/v1/responses", + originalFormat: "response", + providerType: "codex", + }); + setDeferredMeta(session); + const upstream = createHangingStream( + [ + "event: response.completed", + 'data: {"type":"response.completed",', + 'data: "response":{"id":"resp_1","status":"completed"}}', + "", + "", + ].join("\n") + ); + const upstreamResponse = new Response(upstream.stream, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); + + const response = await ProxyResponseHandler.dispatch(session, upstreamResponse); + await expect(readFirstClientChunk(response)).resolves.toContain("response.completed"); + + controller.abort(new Error("client_closed_after_multiline_completed")); + upstream.enqueue(":\n\n"); + await drainAsyncTasks(); + + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 123, + expect.objectContaining({ + statusCode: 200, + }) + ); + expect(updateMessageRequestDetails).not.toHaveBeenCalledWith( + 123, + expect.objectContaining({ + errorMessage: "CLIENT_ABORTED", + }) + ); + expect(session.addProviderToChain).not.toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + reason: "system_error", + errorMessage: "CLIENT_ABORTED", + }) + ); + }); + + it("client abort after response.error terminal event — recorded as 502", async () => { + const controller = new AbortController(); + const session = makeSession(controller.signal, true, { + provider: { providerType: "codex" }, + pathname: "/v1/responses", + originalFormat: "response", + providerType: "codex", + }); + setDeferredMeta(session); + const upstream = createHangingStream( + [ + "event: response.error", + 'data: {"type":"response.error","error":{"message":"boom"}}', + "", + "", + ].join("\n") + ); + const upstreamResponse = new Response(upstream.stream, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); + + const response = await ProxyResponseHandler.dispatch(session, upstreamResponse); + await expect(readFirstClientChunk(response)).resolves.toContain("response.error"); + + controller.abort(new Error("client_closed_after_response_error")); + upstream.enqueue(":\n\n"); + await drainAsyncTasks(); + + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 123, + expect.objectContaining({ + statusCode: 502, + errorMessage: "CODEX_RESPONSE_ERROR", + }) + ); + expect(session.addProviderToChain).toHaveBeenCalledWith( + expect.anything(), + expect.objectContaining({ + reason: "retry_failed", + statusCode: 502, + errorMessage: "CODEX_RESPONSE_ERROR", + }) + ); + }); + + it("client abort after response.completed without trailing newline — recorded as 200", async () => { + const controller = new AbortController(); + const session = makeSession(controller.signal, true, { + provider: { providerType: "codex" }, + pathname: "/v1/responses", + originalFormat: "response", + providerType: "codex", + }); + setDeferredMeta(session); + const upstream = createHangingStream( + 'event: response.completed\ndata: {"type":"response.completed","response":{"id":"resp_test"}}\n' + ); + const upstreamResponse = new Response(upstream.stream, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); + + const response = await ProxyResponseHandler.dispatch(session, upstreamResponse); + await expect(readFirstClientChunk(response)).resolves.toContain("response.completed"); + + controller.abort(new Error("client_closed_after_completed_without_trailing_newline")); + upstream.controller.abort(); + await drainAsyncTasks(); + + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 123, + expect.objectContaining({ + statusCode: 200, + }) + ); + }); }); From cbdaedb1cbf8c86d116cc759fb1a14084a635b1e Mon Sep 17 00:00:00 2001 From: John Doe Date: Fri, 5 Jun 2026 09:54:45 +0300 Subject: [PATCH 2/2] fix: address review comments on codex terminal-state tracker MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Gate fake-200 heuristic on streamEndedNormally instead of streamCompletedForFinalization: empty/truncated allContent on client-abort no longer triggers EMPTY_BODY→502 when the forwarded tracker already confirmed response.completed (chatgpt-codex @763) - finalize(): skip terminal classification when JSON parse fails (truncated mid-event buffer) to avoid misidentifying partial data as a completed state; remove redundant parsedObject guard for usage/service_tier (coderabbit @547/@660, gemini @672) - prompt_cache_key: switch to last-event-wins in getCodexPromptCacheKeyFromSSEText, tracker push(), and allContent fallback loop — consistent with usage/service_tier semantics (coderabbit @563) - Flush TextDecoder before tracker finalize() to drain pending multi-byte UTF-8 sequences that abort skips (greptile @2507) - Add explanatory comment on intentional event.event fallback in getCodexResponsesTerminalState (greptile @668) - Add 3 regression tests: P1 fix verification, truncation safety, last-wins prompt_cache_key (33 tests total, all pass) Co-Authored-By: Claude Opus 4.8 --- src/app/v1/_lib/proxy/response-handler.ts | 61 +++++++++++-------- ...nse-handler-abort-listener-cleanup.test.ts | 40 ++++++++++++ 2 files changed, 76 insertions(+), 25 deletions(-) diff --git a/src/app/v1/_lib/proxy/response-handler.ts b/src/app/v1/_lib/proxy/response-handler.ts index 74e61e324..9da8cb4e8 100644 --- a/src/app/v1/_lib/proxy/response-handler.ts +++ b/src/app/v1/_lib/proxy/response-handler.ts @@ -543,6 +543,9 @@ function getCodexResponsesTerminalState(allContent: string): CodexResponsesTermi event.data && typeof event.data === "object" && !Array.isArray(event.data) ? (event.data as { type?: unknown }) : null; + // Intentional: fall back to event.event for events with non-JSON body (e.g. "error" events + // with plain-text data). This path processes complete events from allContent (post-tee), + // where the forwarded tracker already holds the terminal verdict — so truncation risk is nil. const eventType = typeof data?.type === "string" ? data.type : event.event; terminalState = applyCodexResponsesTerminalEvent(terminalState, eventType); } @@ -551,16 +554,17 @@ function getCodexResponsesTerminalState(allContent: string): CodexResponsesTermi } function getCodexPromptCacheKeyFromSSEText(text: string): string | null { + let lastKey: string | null = null; for (const event of parseSSEData(text)) { if (!event.data || typeof event.data !== "object" || Array.isArray(event.data)) continue; const promptCacheKey = SessionManager.extractCodexPromptCacheKey( event.data as Record ); if (promptCacheKey) { - return promptCacheKey; + lastKey = promptCacheKey; } } - return null; + return lastKey; } function getCompleteSSEEventTexts(text: string): string[] { @@ -612,7 +616,8 @@ function createCodexResponsesTerminalStateTracker(): { const completeText = bufferedText.slice(0, boundary); bufferedText = bufferedText.slice(boundary); for (const eventText of getCompleteSSEEventTexts(completeText)) { - terminalPromptCacheKey ??= getCodexPromptCacheKeyFromSSEText(eventText); + const keyFromEvent = getCodexPromptCacheKeyFromSSEText(eventText); + if (keyFromEvent) terminalPromptCacheKey = keyFromEvent; const parsedTerminalState = getCodexResponsesTerminalState(eventText); if (parsedTerminalState !== "none") { terminalState = parsedTerminalState; @@ -628,14 +633,8 @@ function createCodexResponsesTerminalStateTracker(): { if (!remainder) return; bufferedText = ""; - let eventType = ""; const dataLines: string[] = []; for (const line of remainder.split(/\r?\n/)) { - if (line.startsWith("event:")) { - eventType = line.slice(6).trim(); - continue; - } - if (line.startsWith("data:")) { let value = line.slice(5); if (value.startsWith(" ")) { @@ -657,14 +656,19 @@ function createCodexResponsesTerminalStateTracker(): { parsedData && typeof parsedData === "object" && !Array.isArray(parsedData) ? (parsedData as Record) : null; - const resolvedType = typeof parsedObject?.type === "string" ? parsedObject.type : eventType; - const newState = applyCodexResponsesTerminalEvent(terminalState, resolvedType); + // Only classify terminal state when we have a valid JSON object with a string type field. + // If JSON parse failed (truncated mid-event), parsedObject is null and we skip classification + // to avoid misidentifying a partially-received event as a terminal state. + // Intentional contrast with push() which can rely on event.event for complete events. + const resolvedType = typeof parsedObject?.type === "string" ? parsedObject.type : null; + const newState = resolvedType + ? applyCodexResponsesTerminalEvent(terminalState, resolvedType) + : "none"; if (newState !== "none" && newState !== terminalState) { terminalState = newState; - if (parsedObject) { - terminalUsageMetrics = parseUsageFromResponseText(remainder, "codex").usageMetrics; - terminalServiceTier = parseServiceTierFromResponseText(remainder); - } + // parsedObject is guaranteed non-null here (resolvedType is derived from it), guard is redundant. + terminalUsageMetrics = parseUsageFromResponseText(remainder, "codex").usageMetrics; + terminalServiceTier = parseServiceTierFromResponseText(remainder); } if (!terminalPromptCacheKey && parsedObject) { @@ -751,16 +755,18 @@ async function finalizeDeferredStreamingFinalizationIfNeeded( const streamCompletedForFinalization = !codexTerminalNonSuccess && (streamEndedNormally || codexTerminalFinished); - // 仅在“上游 HTTP=200 且流已完成”时做“假 200”检测: + // 仅在"上游 HTTP=200 且流正常结束"时做"假 200"检测: // - 非 200:HTTP 已经表明失败(无需额外启发式) - // - 未完成:内容可能是部分流/截断,启发式会显著提高误判风险 - // - Codex terminal success 后的非正常结束:协议已给出 terminal result,可安全做最终检查 + // - 未完成/客户端中断:allContent 可能被截断,空内容会被误判为 EMPTY_BODY→502; + // 此时已有 terminal-tracker 给出权威结果(completed→200,failed/error→502), + // 不需要也不能依赖对不完整 body 的启发式。 // - // 此处返回 `{isError:false}` 仅表示“跳过检测”,最终仍会在下面按中断/超时视为失败结算。 - const shouldDetectFake200 = streamCompletedForFinalization && upstreamStatusCode === 200; - const detected = shouldDetectFake200 - ? detectUpstreamErrorFromSseOrJsonText(allContent) - : ({ isError: false } as const); + // 此处返回 `{isError:false}` 仅表示"跳过检测",最终仍会在下面按中断/超时视为失败结算。 + const shouldDetectFake200 = streamEndedNormally && upstreamStatusCode === 200; + const detected = + shouldDetectFake200 && !codexTerminalNonSuccess + ? detectUpstreamErrorFromSseOrJsonText(allContent) + : ({ isError: false } as const); // “内部结算用”的状态码(不会改变客户端实际 HTTP 状态码)。 // - 假 200:优先映射为“推断得到的 4xx/5xx”(未命中则回退 502),确保内部统计/熔断/会话绑定把它当作失败。 @@ -2628,6 +2634,11 @@ export class ProxyResponseHandler { abortReason?: string ): Promise => { if (shouldTrackForwardedCodexTerminalState) { + // Flush any pending multi-byte UTF-8 sequence from the decoder. On normal close, + // TransformStream flush() already drains it; on abort, flush() is never called. + // TextDecoder.decode() without {stream:true} flushes the internal state — idempotent. + const decoderTail = forwardedCodexTerminalDecoder.decode(); + if (decoderTail) forwardedCodexTerminalTracker.push(decoderTail); forwardedCodexTerminalTracker.finalize(); } @@ -2726,10 +2737,10 @@ export class ProxyResponseHandler { const sseEvents = parseSSEData(allContent); for (const event of sseEvents) { if (typeof event.data === "object" && event.data) { - promptCacheKey = SessionManager.extractCodexPromptCacheKey( + const key = SessionManager.extractCodexPromptCacheKey( event.data as Record ); - if (promptCacheKey) break; + if (key) promptCacheKey = key; // last-event-wins, no break } } } diff --git a/tests/unit/proxy/response-handler-abort-listener-cleanup.test.ts b/tests/unit/proxy/response-handler-abort-listener-cleanup.test.ts index 7384c62ff..72a6b26ee 100644 --- a/tests/unit/proxy/response-handler-abort-listener-cleanup.test.ts +++ b/tests/unit/proxy/response-handler-abort-listener-cleanup.test.ts @@ -1487,4 +1487,44 @@ describe("ProxyResponseHandler client abort listener cleanup", () => { }) ); }); + + it("tracker: response.completed recognized even when allContent is empty (Fix 1 P1 regression)", () => { + const tracker = __codexResponsesTerminalStateTrackerForTests.create(); + tracker.push( + 'event: response.completed\ndata: {"type":"response.completed","usage":{"input_tokens":10,"output_tokens":5}}\n\n' + ); + tracker.finalize(); + expect(tracker.getTerminalState()).toBe("completed"); + // Fix 1 ensures shouldDetectFake200 uses streamEndedNormally (not streamCompletedForFinalization) + // so empty allContent won't trigger EMPTY_BODY->502 when tracker shows completed + }); + + it("tracker finalize: truncated JSON event is not classified as terminal state (Fix 2)", () => { + const tracker = __codexResponsesTerminalStateTrackerForTests.create(); + tracker.push('event: response.completed\ndata: {"type":"respon'); + tracker.finalize(); + expect(tracker.getTerminalState()).toBe("none"); + }); + + it("tracker push: last prompt_cache_key wins over earlier ones (Fix 3)", () => { + const tracker = __codexResponsesTerminalStateTrackerForTests.create(); + tracker.push( + [ + "event: response.created", + 'data: {"type":"response.created","response":{"prompt_cache_key":"key-first"}}', + "", + "", + ].join("\n") + ); + tracker.push( + [ + "event: response.completed", + 'data: {"type":"response.completed","response":{"prompt_cache_key":"key-last"}}', + "", + "", + ].join("\n") + ); + tracker.finalize(); + expect(tracker.getPromptCacheKey()).toBe("key-last"); + }); });