diff --git a/src/app/v1/_lib/proxy/response-fixer/index.ts b/src/app/v1/_lib/proxy/response-fixer/index.ts index 56f25928c..8fdae6aa3 100644 --- a/src/app/v1/_lib/proxy/response-fixer/index.ts +++ b/src/app/v1/_lib/proxy/response-fixer/index.ts @@ -24,6 +24,9 @@ const DEFAULT_CONFIG: ResponseFixerConfig = { maxFixSize: 1024 * 1024, }; +const UTF8_DECODER = new TextDecoder(); +const UTF8_ENCODER = new TextEncoder(); + function nowMs(): number { if (typeof performance !== "undefined" && typeof performance.now === "function") { return performance.now(); @@ -31,6 +34,45 @@ function nowMs(): number { return Date.now(); } +function isRecord(value: unknown): value is Record { + return typeof value === "object" && value !== null && !Array.isArray(value); +} + +function hasMeaningfulValue(value: unknown): boolean { + if (value == null) return false; + if (typeof value === "string") return value.length > 0; + if (Array.isArray(value)) return value.length > 0; + if (isRecord(value)) return Object.keys(value).length > 0; + return true; +} + +function isInertChatCompletionChoice(choice: unknown): boolean { + if (!isRecord(choice)) return false; + if (choice.finish_reason != null) return false; + + const delta = choice.delta; + if (!isRecord(delta)) { + return true; + } + + for (const [key, value] of Object.entries(delta)) { + if (key === "role") continue; + if (hasMeaningfulValue(value)) return false; + } + + return true; +} + +function isInertChatCompletionChunkPayload(payload: unknown): boolean { + if (!isRecord(payload)) return false; + if (payload.object !== "chat.completion.chunk") return false; + if (hasMeaningfulValue(payload.usage)) return false; + + const choices = payload.choices; + if (!Array.isArray(choices) || choices.length === 0) return false; + return choices.every(isInertChatCompletionChoice); +} + function toArrayBufferUint8Array(input: Uint8Array): Uint8Array { // Response/BodyInit 在 DOM 类型中要求 ArrayBufferView(buffer 为 ArrayBuffer),这里避免 SharedArrayBuffer 类型污染 if (input.buffer instanceof ArrayBuffer) { @@ -385,6 +427,13 @@ export class ResponseFixer { } } + const filtered = ResponseFixer.filterInertResponsesChatCompletionChunks(session, data); + if (filtered.applied) { + applied.sse.applied = true; + applied.sse.details ??= filtered.details; + data = filtered.data; + } + controller.enqueue(data); }, flush(controller) { @@ -418,6 +467,13 @@ export class ResponseFixer { } } + const filtered = ResponseFixer.filterInertResponsesChatCompletionChunks(session, data); + if (filtered.applied) { + applied.sse.applied = true; + applied.sse.details ??= filtered.details; + data = filtered.data; + } + controller.enqueue(data); } @@ -526,6 +582,75 @@ export class ResponseFixer { return { data: concatUint8Chunks(chunks), applied }; } + private static filterInertResponsesChatCompletionChunks( + session: ProxySession, + data: Uint8Array + ): { data: Uint8Array; applied: boolean; details?: string } { + if (session.originalFormat !== "response") { + return { data, applied: false }; + } + + const text = UTF8_DECODER.decode(data); + if (!text.includes('"chat.completion.chunk"')) { + return { data, applied: false }; + } + + const lines = text.split("\n"); + const out: string[] = []; + let applied = false; + let skipNextBlankLine = false; + + for (let i = 0; i < lines.length; i += 1) { + const line = lines[i]; + const hasLineBreak = i < lines.length - 1; + + if (skipNextBlankLine && ResponseFixer.isBlankSseSeparatorLine(line)) { + skipNextBlankLine = false; + continue; + } + skipNextBlankLine = false; + + if (ResponseFixer.isInertChatCompletionDataLine(line)) { + applied = true; + skipNextBlankLine = true; + continue; + } + + out.push(line); + if (hasLineBreak) out.push("\n"); + } + + if (!applied) { + return { data, applied: false }; + } + + return { + data: UTF8_ENCODER.encode(out.join("")), + applied: true, + details: "filtered_inert_chat_completion_chunk", + }; + } + + private static isInertChatCompletionDataLine(line: string): boolean { + if (!line.startsWith("data:")) return false; + + let payloadText = line.slice(5); + if (payloadText.startsWith(" ")) { + payloadText = payloadText.slice(1); + } + if (!payloadText.startsWith("{")) return false; + + try { + return isInertChatCompletionChunkPayload(JSON.parse(payloadText)); + } catch { + return false; + } + } + + private static isBlankSseSeparatorLine(line: string): boolean { + return line === "" || line === "\r"; + } + private static fixMaybeDataJsonLine( line: Uint8Array, jsonFixer: JsonFixer diff --git a/src/app/v1/_lib/proxy/response-fixer/response-fixer.test.ts b/src/app/v1/_lib/proxy/response-fixer/response-fixer.test.ts index 2e4f2728c..e99602d55 100644 --- a/src/app/v1/_lib/proxy/response-fixer/response-fixer.test.ts +++ b/src/app/v1/_lib/proxy/response-fixer/response-fixer.test.ts @@ -44,6 +44,20 @@ function createSession() { } as any; } +function createSseResponse(payloadLines: string[]): Response { + const encoder = new TextEncoder(); + const stream = new ReadableStream({ + start(controller) { + controller.enqueue(encoder.encode(payloadLines.join("\n"))); + controller.close(); + }, + }); + + return new Response(stream, { + headers: { "content-type": "text/event-stream" }, + }); +} + describe("ResponseFixer", () => { beforeEach(() => { vi.clearAllMocks(); @@ -205,6 +219,163 @@ describe("ResponseFixer", () => { expect(session.getSpecialSettings()).toBeNull(); }); + test("流式 Responses SSE:应过滤上游混入的空 Chat Completions chunk", async () => { + const { ResponseFixer } = await import("./index"); + + const session = createSession(); + session.originalFormat = "response"; + const encoder = new TextEncoder(); + const emptyChatChunk = { + id: "chatcmpl-dummy", + object: "chat.completion.chunk", + created: 1780753978, + model: "gpt-5.5", + choices: [{ index: 0, delta: { role: "assistant", content: "" } }], + }; + const responseDelta = { + type: "response.output_text.delta", + delta: "Hi", + }; + const responseCompleted = { + type: "response.completed", + response: { id: "resp_test", object: "response" }, + }; + + const stream = new ReadableStream({ + start(controller) { + controller.enqueue( + encoder.encode( + [ + `data: ${JSON.stringify(emptyChatChunk)}`, + "", + "event: response.output_text.delta", + `data: ${JSON.stringify(responseDelta)}`, + "", + "event: response.completed", + `data: ${JSON.stringify(responseCompleted)}`, + "", + ].join("\n") + ) + ); + controller.close(); + }, + }); + + const response = new Response(stream, { + headers: { "content-type": "text/event-stream" }, + }); + + const fixed = await ResponseFixer.process(session, response); + const text = await fixed.text(); + + expect(text).not.toContain("chat.completion.chunk"); + expect(text).not.toContain("chatcmpl-dummy"); + expect(text.startsWith("event: response.output_text.delta")).toBe(true); + expect(text).toContain("response.output_text.delta"); + expect(text).toContain("response.completed"); + expect(session.getSpecialSettings()).not.toBeNull(); + }); + + test("流式 Responses SSE:包含实际 content 的 Chat Completions chunk 应保留", async () => { + const { ResponseFixer } = await import("./index"); + + const session = createSession(); + session.originalFormat = "response"; + const chatChunk = { + id: "chatcmpl-content", + object: "chat.completion.chunk", + created: 1780753978, + model: "gpt-5.5", + choices: [{ index: 0, delta: { role: "assistant", content: "Hi" } }], + }; + + const fixed = await ResponseFixer.process( + session, + createSseResponse([`data: ${JSON.stringify(chatChunk)}`, ""]) + ); + const text = await fixed.text(); + + expect(text).toContain("chat.completion.chunk"); + expect(text).toContain("chatcmpl-content"); + expect(text).toContain('"content":"Hi"'); + expect(session.getSpecialSettings()).toBeNull(); + }); + + test("流式 Responses SSE:带 finish_reason 的 Chat Completions chunk 应保留", async () => { + const { ResponseFixer } = await import("./index"); + + const session = createSession(); + session.originalFormat = "response"; + const chatChunk = { + id: "chatcmpl-finish", + object: "chat.completion.chunk", + created: 1780753978, + model: "gpt-5.5", + choices: [{ index: 0, delta: {}, finish_reason: "stop" }], + }; + + const fixed = await ResponseFixer.process( + session, + createSseResponse([`data: ${JSON.stringify(chatChunk)}`, ""]) + ); + const text = await fixed.text(); + + expect(text).toContain("chat.completion.chunk"); + expect(text).toContain("chatcmpl-finish"); + expect(text).toContain("finish_reason"); + expect(session.getSpecialSettings()).toBeNull(); + }); + + test("流式 Responses SSE:带 usage 的 Chat Completions chunk 应保留", async () => { + const { ResponseFixer } = await import("./index"); + + const session = createSession(); + session.originalFormat = "response"; + const chatChunk = { + id: "chatcmpl-usage", + object: "chat.completion.chunk", + created: 1780753978, + model: "gpt-5.5", + choices: [{ index: 0, delta: { role: "assistant", content: "" } }], + usage: { prompt_tokens: 1, completion_tokens: 1, total_tokens: 2 }, + }; + + const fixed = await ResponseFixer.process( + session, + createSseResponse([`data: ${JSON.stringify(chatChunk)}`, ""]) + ); + const text = await fixed.text(); + + expect(text).toContain("chat.completion.chunk"); + expect(text).toContain("chatcmpl-usage"); + expect(text).toContain("prompt_tokens"); + expect(session.getSpecialSettings()).toBeNull(); + }); + + test("流式非 Responses SSE:空 Chat Completions chunk 应保留", async () => { + const { ResponseFixer } = await import("./index"); + + const session = createSession(); + session.originalFormat = "chat"; + const emptyChatChunk = { + id: "chatcmpl-chat-format", + object: "chat.completion.chunk", + created: 1780753978, + model: "gpt-5.5", + choices: [{ index: 0, delta: { role: "assistant", content: "" } }], + }; + + const fixed = await ResponseFixer.process( + session, + createSseResponse([`data: ${JSON.stringify(emptyChatChunk)}`, ""]) + ); + const text = await fixed.text(); + + expect(text).toContain("chat.completion.chunk"); + expect(text).toContain("chatcmpl-chat-format"); + expect(session.getSpecialSettings()).toBeNull(); + }); + test("流式 SSE:无换行且超过 maxFixSize 时应降级输出,避免无限缓冲", async () => { const { ResponseFixer } = await import("./index"); diff --git a/src/app/v1/_lib/proxy/response-handler.ts b/src/app/v1/_lib/proxy/response-handler.ts index e811b3b59..76d2c83d3 100644 --- a/src/app/v1/_lib/proxy/response-handler.ts +++ b/src/app/v1/_lib/proxy/response-handler.ts @@ -560,6 +560,24 @@ async function finalizeDeferredStreamingFinalizationIfNeeded( const detected = shouldDetectFake200 ? detectUpstreamErrorFromSseOrJsonText(allContent) : ({ isError: false } as const); + const clientAbortCompleteSuccess = (() => { + if ( + streamEndedNormally || + !clientAborted || + upstreamStatusCode < 200 || + upstreamStatusCode >= 300 + ) { + return false; + } + + const abortDetected = detectUpstreamErrorFromSseOrJsonText(allContent); + if (abortDetected.isError) { + return false; + } + + const { usageMetrics } = parseUsageFromResponseText(allContent, provider?.providerType); + return hasPositiveBillableTokens(usageMetrics); + })(); // “内部结算用”的状态码(不会改变客户端实际 HTTP 状态码)。 // - 假 200:优先映射为“推断得到的 4xx/5xx”(未命中则回退 502),确保内部统计/熔断/会话绑定把它当作失败。 @@ -578,6 +596,9 @@ async function finalizeDeferredStreamingFinalizationIfNeeded( effectiveStatusCode = 502; } errorMessage = detected.detail ? `${detected.code}: ${detected.detail}` : detected.code; + } else if (clientAbortCompleteSuccess) { + effectiveStatusCode = upstreamStatusCode; + errorMessage = null; } else if (!streamEndedNormally) { effectiveStatusCode = clientAborted ? 499 : 502; errorMessage = clientAborted ? "CLIENT_ABORTED" : (abortReason ?? "STREAM_ABORTED"); @@ -596,7 +617,7 @@ async function finalizeDeferredStreamingFinalizationIfNeeded( } const shouldClearSessionBindingOnFailure = - !streamEndedNormally || + (!streamEndedNormally && !clientAbortCompleteSuccess) || detected.isError || (upstreamStatusCode >= 400 && errorMessage !== null); @@ -659,7 +680,7 @@ async function finalizeDeferredStreamingFinalizationIfNeeded( // 同时,为了让故障转移/熔断能正确工作: // - 客户端主动中断:不计入熔断器(这通常不是供应商问题) // - 非客户端中断:计入 provider/endpoint 熔断失败(与 timeout 路径保持一致) - if (!streamEndedNormally) { + if (!streamEndedNormally && !clientAbortCompleteSuccess) { await clearSessionBinding(); if (!clientAborted && session.getEndpointPolicy().allowCircuitBreakerAccounting) { @@ -2246,28 +2267,53 @@ export class ProxyResponseHandler { // 使用 AsyncTaskManager 管理后台处理任务 const taskId = `stream-${messageContext?.id || `unknown-${Date.now()}`}`; const abortController = new AbortController(); + const idleTimeoutMs = + provider.streamingIdleTimeoutMs > 0 ? provider.streamingIdleTimeoutMs : Infinity; + const clientAbortDrainTimeoutMs = idleTimeoutMs === Infinity ? 60_000 : idleTimeoutMs; // ⭐ 提升 idleTimeoutId 到外部作用域,以便客户端断开时能清除 let idleTimeoutId: NodeJS.Timeout | null = null; + let clientAbortDrainTimeoutId: NodeJS.Timeout | null = null; + const clearClientAbortDrainTimer = () => { + if (clientAbortDrainTimeoutId) { + clearTimeout(clientAbortDrainTimeoutId); + clientAbortDrainTimeoutId = null; + } + }; const cleanupClientAbortListener = bindClientAbortListener(session.clientAbortSignal, () => { logger.debug("ResponseHandler: Client disconnected, cleaning up", { taskId, providerId: provider.id, messageId: messageContext.id, }); - - // 客户端断开时清除 idle timeout,避免任务已取消后仍误触发。 - if (idleTimeoutId) { - clearTimeout(idleTimeoutId); - idleTimeoutId = null; - logger.debug("ResponseHandler: Idle timeout cleared due to client disconnect", { + // Do not cancel internal accounting on pure client disconnect. If the + // upstream stream has already completed, the tee'd internal branch can + // still drain buffered final usage and record the request as successful. + // Idle/response timeout paths still abort via abortController. + clearClientAbortDrainTimer(); + clientAbortDrainTimeoutId = setTimeout(() => { + logger.info("ResponseHandler: Client abort drain window exceeded", { taskId, providerId: provider.id, + messageId: messageContext.id, + clientAbortDrainTimeoutMs, }); - } - AsyncTaskManager.cancel(taskId); - abortController.abort(); + try { + const sessionWithController = session as typeof session & { + responseController?: AbortController; + }; + sessionWithController.responseController?.abort(new Error("client_abort_drain_timeout")); + } catch (e) { + logger.warn("ResponseHandler: Failed to abort upstream after client drain timeout", { + taskId, + providerId: provider.id, + error: e, + }); + } + + abortController.abort(new Error("client_abort_drain_timeout")); + }, clientAbortDrainTimeoutMs); }); const processingPromise = (async () => { @@ -2280,9 +2326,6 @@ export class ProxyResponseHandler { let usageForCost: UsageMetrics | null = null; let isFirstChunk = true; // ⭐ 标记是否为第一块数据 - // ⭐ 静默期 Watchdog:监控流式请求中途卡住(无新数据推送) - const idleTimeoutMs = - provider.streamingIdleTimeoutMs > 0 ? provider.streamingIdleTimeoutMs : Infinity; const startIdleTimer = () => { if (idleTimeoutMs === Infinity) return; // 禁用时跳过 clearIdleTimer(); // 清除旧的 @@ -2655,7 +2698,7 @@ export class ProxyResponseHandler { let streamEndedNormally = false; while (true) { // 检查取消信号 - if (session.clientAbortSignal?.aborted || abortController.signal.aborted) { + if (abortController.signal.aborted) { logger.info("ResponseHandler: Stream processing cancelled", { taskId, providerId: provider.id, @@ -2933,6 +2976,7 @@ export class ProxyResponseHandler { } finally { // 确保资源释放 cleanupClientAbortListener(); + clearClientAbortDrainTimer(); clearIdleTimer(); // ⭐ 清除静默期计时器(防止泄漏) try { reader.releaseLock(); diff --git a/tests/unit/proxy/response-handler-client-abort-drain.test.ts b/tests/unit/proxy/response-handler-client-abort-drain.test.ts new file mode 100644 index 000000000..d2dfa54be --- /dev/null +++ b/tests/unit/proxy/response-handler-client-abort-drain.test.ts @@ -0,0 +1,512 @@ +import { beforeEach, describe, expect, it, vi } from "vitest"; +import { resolveEndpointPolicy } from "@/app/v1/_lib/proxy/endpoint-policy"; +import { ProxyResponseHandler } from "@/app/v1/_lib/proxy/response-handler"; +import { ProxySession } from "@/app/v1/_lib/proxy/session"; +import { setDeferredStreamingFinalization } from "@/app/v1/_lib/proxy/stream-finalization"; +import { AsyncTaskManager } from "@/lib/async-task-manager"; +import { updateMessageRequestDetails, updateMessageRequestDuration } from "@/repository/message"; +import type { Provider } from "@/types/provider"; + +const asyncTasks: Promise[] = []; + +vi.mock("@/app/v1/_lib/proxy/response-fixer", () => ({ + ResponseFixer: { + process: async (_session: unknown, response: Response) => response, + }, +})); + +vi.mock("@/lib/async-task-manager", () => ({ + AsyncTaskManager: { + register: vi.fn((_taskId: string, promise: Promise) => { + asyncTasks.push(promise); + return new AbortController(); + }), + cleanup: vi.fn(), + cancel: vi.fn(), + }, +})); + +vi.mock("@/lib/config/system-settings-cache", () => ({ + getCachedSystemSettings: vi.fn(async () => ({ billNonSuccessfulRequests: false })), +})); + +vi.mock("@/lib/langfuse/emit-proxy-trace", () => ({ + emitProxyLangfuseTrace: vi.fn(), +})); + +vi.mock("@/lib/logger", () => ({ + logger: { + debug: vi.fn(), + info: vi.fn(), + warn: vi.fn(), + error: vi.fn(), + trace: vi.fn(), + }, +})); + +vi.mock("@/lib/price-sync/cloud-price-updater", () => ({ + requestCloudPriceTableSync: vi.fn(), +})); + +vi.mock("@/lib/proxy-status-tracker", () => ({ + ProxyStatusTracker: { + getInstance: () => ({ + endRequest: vi.fn(), + }), + }, +})); + +vi.mock("@/lib/rate-limit", () => ({ + RateLimitService: { + trackCost: vi.fn(), + trackUserDailyCost: vi.fn(), + decrementLeaseBudget: vi.fn(), + }, +})); + +vi.mock("@/lib/redis/live-chain-store", () => ({ + deleteLiveChain: vi.fn(), +})); + +vi.mock("@/lib/session-manager", () => ({ + SessionManager: { + clearSessionProvider: vi.fn(), + extractCodexPromptCacheKey: vi.fn(), + storeSessionResponse: vi.fn(), + storeSessionRequestPhaseSnapshot: vi.fn(), + storeSessionResponsePhaseSnapshot: vi.fn(), + storeSessionRequestHeaders: vi.fn(), + storeSessionResponseHeaders: vi.fn(), + storeSessionSpecialSettings: vi.fn(), + storeSessionUpstreamRequestMeta: vi.fn(), + storeSessionUpstreamResponseMeta: vi.fn(), + updateSessionProvider: vi.fn(), + updateSessionUsage: vi.fn(), + updateSessionWithCodexCacheKey: vi.fn(), + }, +})); + +vi.mock("@/lib/session-tracker", () => ({ + SessionTracker: { + refreshSession: vi.fn(), + }, +})); + +vi.mock("@/lib/circuit-breaker", () => ({ + recordFailure: vi.fn(), + recordSuccess: vi.fn(), +})); + +vi.mock("@/lib/endpoint-circuit-breaker", () => ({ + recordEndpointFailure: vi.fn(), + recordEndpointSuccess: vi.fn(), + resetEndpointCircuit: vi.fn(), +})); + +vi.mock("@/repository/message", () => ({ + updateMessageRequestCostWithBreakdown: vi.fn(), + updateMessageRequestDetails: vi.fn(), + updateMessageRequestDuration: vi.fn(), +})); + +function createProvider(): Provider { + return { + id: 1, + name: "avemujica-responses", + url: "https://api.test.invalid/v1", + key: "sk-test", + providerVendorId: null, + providerType: "codex", + isEnabled: true, + weight: 1, + priority: 1, + groupPriorities: null, + costMultiplier: 1, + groupTag: "OpenAI", + modelRedirects: null, + allowedModels: null, + mcpPassthroughType: "none", + mcpPassthroughUrl: null, + preserveClientIp: false, + limit5hUsd: null, + limitDailyUsd: null, + dailyResetMode: "fixed", + dailyResetTime: "00:00", + limitWeeklyUsd: null, + limitMonthlyUsd: null, + limitTotalUsd: null, + totalCostResetAt: null, + limitConcurrentSessions: 0, + maxRetryAttempts: null, + circuitBreakerFailureThreshold: 5, + circuitBreakerOpenDuration: 1_800_000, + circuitBreakerHalfOpenSuccessThreshold: 2, + proxyUrl: null, + proxyFallbackToDirect: false, + firstByteTimeoutStreamingMs: 0, + streamingIdleTimeoutMs: 0, + requestTimeoutNonStreamingMs: 0, + websiteUrl: null, + faviconUrl: null, + cacheTtlPreference: null, + context1mPreference: null, + codexReasoningEffortPreference: null, + codexReasoningSummaryPreference: null, + codexTextVerbosityPreference: null, + codexParallelToolCallsPreference: null, + anthropicMaxTokensPreference: null, + anthropicThinkingBudgetPreference: null, + geminiGoogleSearchPreference: null, + tpm: 0, + rpm: 0, + rpd: 0, + cc: 0, + createdAt: new Date(), + updatedAt: new Date(), + deletedAt: null, + } as Provider; +} + +function createSession(signal: AbortSignal): ProxySession { + const provider = createProvider(); + const user = { id: 1, name: "admin" }; + const key = { id: 2, name: "Omni" }; + const session = Object.create(ProxySession.prototype) as ProxySession; + + Object.assign(session, { + authState: { success: true, user, key, apiKey: "sk-test" }, + cacheTtlResolved: null, + clientAbortSignal: signal, + context: {}, + context1mApplied: false, + forwardedRequestBody: "", + headerLog: "", + headers: new Headers(), + method: "POST", + messageContext: { + id: 123, + createdAt: new Date(), + user, + key, + apiKey: "sk-test", + }, + originalFormat: "response", + originalModelName: "gpt-5.4-mini", + originalUrlPathname: "/v1/responses", + provider, + providerChain: [], + providerType: "codex", + request: { + log: "", + message: { model: "gpt-5.4-mini", stream: true }, + model: "gpt-5.4-mini", + }, + requestSequence: 1, + requestUrl: new URL("http://localhost/v1/responses"), + sessionId: null, + specialSettings: [], + startTime: Date.now(), + ttfbMs: null, + userAgent: "Go-http-client/1.1", + userName: "admin", + addProviderToChain(this: ProxySession & { providerChain: unknown[] }, prov: Provider, meta) { + this.providerChain.push({ id: prov.id, name: prov.name, ...(meta ?? {}) }); + }, + clearResponseTimeout: vi.fn(), + getContext1mApplied: () => false, + getCurrentModel: () => "gpt-5.4-mini", + getEndpoint: () => "/v1/responses", + getEndpointPolicy: () => resolveEndpointPolicy("/v1/responses"), + getGroupCostMultiplier: () => 1, + getOriginalModel: () => "gpt-5.4-mini", + getProviderChain: () => session.providerChain, + getResolvedPricingByBillingSource: async () => null, + getSpecialSettings: () => [], + isHeaderModified: () => false, + recordTtfb: vi.fn(), + releaseAgent: vi.fn(), + setContext1mApplied: vi.fn(), + shouldPersistSessionDebugArtifacts: () => false, + shouldTrackSessionObservability: () => false, + }); + + return session; +} + +function createResponsesSse(): Response { + const body = [ + `event: response.output_text.done\ndata: ${JSON.stringify({ + type: "response.output_text.done", + text: "短标题", + })}`, + `event: response.completed\ndata: ${JSON.stringify({ + type: "response.completed", + response: { + id: "resp_test", + model: "gpt-5.4-mini-2026-03-17", + usage: { + input_tokens: 463, + output_tokens: 11, + }, + }, + })}`, + "", + ].join("\n\n"); + + return new Response(body, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); +} + +function createErroredResponsesSse(): Response { + const encoder = new TextEncoder(); + const stream = new ReadableStream({ + start(controller) { + controller.enqueue( + encoder.encode( + `event: response.output_text.delta\ndata: ${JSON.stringify({ + type: "response.output_text.delta", + delta: "短", + })}\n\n` + ) + ); + const error = new Error("Response transmission interrupted"); + error.name = "ResponseAborted"; + controller.error(error); + }, + }); + + return new Response(stream, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); +} + +function createHangingResponsesSse(upstreamSignal: AbortSignal): Response { + const encoder = new TextEncoder(); + const stream = new ReadableStream({ + start(controller) { + controller.enqueue( + encoder.encode( + `event: response.output_text.delta\ndata: ${JSON.stringify({ + type: "response.output_text.delta", + delta: "短", + })}\n\n` + ) + ); + upstreamSignal.addEventListener( + "abort", + () => { + const error = new Error("client_abort_drain_timeout"); + error.name = "AbortError"; + controller.error(error); + }, + { once: true } + ); + }, + }); + + return new Response(stream, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); +} + +function createCompletedThenErroredResponsesSse(): Response { + const encoder = new TextEncoder(); + const chunks = [ + `event: response.output_text.done\ndata: ${JSON.stringify({ + type: "response.output_text.done", + text: "短标题", + })}\n\n`, + `event: response.completed\ndata: ${JSON.stringify({ + type: "response.completed", + response: { + id: "resp_test", + model: "gpt-5.4-mini-2026-03-17", + usage: { + input_tokens: 463, + output_tokens: 11, + }, + }, + })}\n\n`, + ]; + let index = 0; + + const stream = new ReadableStream({ + pull(controller) { + if (index < chunks.length) { + controller.enqueue(encoder.encode(chunks[index++])); + return; + } + + const error = new Error("Response transmission interrupted after final usage"); + error.name = "ResponseAborted"; + controller.error(error); + }, + }); + + return new Response(stream, { + status: 200, + headers: { "content-type": "text/event-stream" }, + }); +} + +async function drainAsyncTasks(): Promise { + while (asyncTasks.length > 0) { + const tasks = asyncTasks.splice(0, asyncTasks.length); + await Promise.allSettled(tasks); + await new Promise((resolve) => setTimeout(resolve, 0)); + } +} + +describe("ProxyResponseHandler stream client abort finalization", () => { + beforeEach(() => { + asyncTasks.splice(0, asyncTasks.length); + vi.clearAllMocks(); + }); + + it("finalizes a complete upstream responses stream as success when the downstream client already closed", async () => { + const controller = new AbortController(); + controller.abort(); + const session = createSession(controller.signal); + setDeferredStreamingFinalization(session, { + providerId: 1, + providerName: "avemujica-responses", + providerPriority: 1, + attemptNumber: 1, + totalProvidersAttempted: 1, + isFirstAttempt: true, + isFailoverSuccess: false, + endpointId: 42, + endpointUrl: "https://api.test.invalid/v1", + upstreamStatusCode: 200, + }); + + await ProxyResponseHandler.dispatch(session, createResponsesSse()); + await drainAsyncTasks(); + + expect(AsyncTaskManager.cancel).not.toHaveBeenCalled(); + expect(updateMessageRequestDuration).toHaveBeenCalledWith(123, expect.any(Number)); + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 123, + expect.objectContaining({ + statusCode: 200, + inputTokens: 463, + outputTokens: 11, + }) + ); + }); + + it("reclassifies a client-aborted stream as success when final usage was already received", async () => { + const controller = new AbortController(); + controller.abort(); + const session = createSession(controller.signal); + setDeferredStreamingFinalization(session, { + providerId: 1, + providerName: "avemujica-responses", + providerPriority: 1, + attemptNumber: 1, + totalProvidersAttempted: 1, + isFirstAttempt: true, + isFailoverSuccess: false, + endpointId: 42, + endpointUrl: "https://api.test.invalid/v1", + upstreamStatusCode: 200, + }); + + await ProxyResponseHandler.dispatch(session, createCompletedThenErroredResponsesSse()); + await drainAsyncTasks(); + + expect(AsyncTaskManager.cancel).not.toHaveBeenCalled(); + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 123, + expect.objectContaining({ + statusCode: 200, + inputTokens: 463, + outputTokens: 11, + providerChain: [ + expect.objectContaining({ + reason: "request_success", + statusCode: 200, + }), + ], + }) + ); + }); + + it("keeps a genuinely aborted upstream responses stream as 499", async () => { + const controller = new AbortController(); + controller.abort(); + const session = createSession(controller.signal); + setDeferredStreamingFinalization(session, { + providerId: 1, + providerName: "avemujica-responses", + providerPriority: 1, + attemptNumber: 1, + totalProvidersAttempted: 1, + isFirstAttempt: true, + isFailoverSuccess: false, + endpointId: 42, + endpointUrl: "https://api.test.invalid/v1", + upstreamStatusCode: 200, + }); + + await ProxyResponseHandler.dispatch(session, createErroredResponsesSse()); + await drainAsyncTasks(); + + expect(AsyncTaskManager.cancel).not.toHaveBeenCalled(); + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 123, + expect.objectContaining({ + statusCode: 499, + errorMessage: "CLIENT_ABORTED", + }) + ); + }); + + it("bounds client-abort drain when the upstream stream hangs", async () => { + vi.useFakeTimers(); + try { + const clientController = new AbortController(); + const upstreamController = new AbortController(); + const session = createSession(clientController.signal); + Object.assign(session, { responseController: upstreamController }); + setDeferredStreamingFinalization(session, { + providerId: 1, + providerName: "avemujica-responses", + providerPriority: 1, + attemptNumber: 1, + totalProvidersAttempted: 1, + isFirstAttempt: true, + isFailoverSuccess: false, + endpointId: 42, + endpointUrl: "https://api.test.invalid/v1", + upstreamStatusCode: 200, + }); + + await ProxyResponseHandler.dispatch( + session, + createHangingResponsesSse(upstreamController.signal) + ); + clientController.abort(); + + await vi.advanceTimersByTimeAsync(60_000); + const tasks = asyncTasks.splice(0, asyncTasks.length); + await Promise.allSettled(tasks); + + expect(upstreamController.signal.aborted).toBe(true); + expect(AsyncTaskManager.cancel).not.toHaveBeenCalled(); + expect(updateMessageRequestDetails).toHaveBeenCalledWith( + 123, + expect.objectContaining({ + statusCode: 499, + errorMessage: "CLIENT_ABORTED", + }) + ); + } finally { + vi.useRealTimers(); + } + }); +});