diff --git a/src/app/v1/_lib/proxy/combine-abort-signals.ts b/src/app/v1/_lib/proxy/combine-abort-signals.ts new file mode 100644 index 000000000..94086b6a4 --- /dev/null +++ b/src/app/v1/_lib/proxy/combine-abort-signals.ts @@ -0,0 +1,55 @@ +/** + * 组合多个 AbortSignal 为单个信号,并返回显式 cleanup。 + * + * 优先使用原生 `AbortSignal.any`(Node.js 20.3+ / V8 内部管理 listener)。 + * 仅在原生不可用(例如 Next.js standalone 覆盖全局 AbortSignal)时使用 polyfill。 + * + * Polyfill 路径必须由调用方在请求生命周期结束时调用 cleanup,否则源信号上的 abort + * listener 会一直持有闭包(包含 combinedController、cleanups 数组及源信号引用), + * 导致 session/请求体无法被 GC——和 #1113 修复的 client abort listener 是同一类泄漏。 + */ +export interface CombinedAbortSignal { + signal: AbortSignal; + cleanup: () => void; +} + +const NOOP_CLEANUP = () => {}; + +export function combineAbortSignals(signals: AbortSignal[]): CombinedAbortSignal { + if ("any" in AbortSignal && typeof AbortSignal.any === "function") { + return { signal: AbortSignal.any(signals), cleanup: NOOP_CLEANUP }; + } + + const combinedController = new AbortController(); + const detachers: Array<() => void> = []; + let cleaned = false; + + const cleanup = () => { + if (cleaned) return; + cleaned = true; + for (const detach of detachers) { + detach(); + } + detachers.length = 0; + }; + + for (const signal of signals) { + if (signal.aborted) { + combinedController.abort(); + cleanup(); + break; + } + + const abortHandler = () => { + combinedController.abort(); + cleanup(); + }; + + signal.addEventListener("abort", abortHandler, { once: true }); + detachers.push(() => { + signal.removeEventListener("abort", abortHandler); + }); + } + + return { signal: combinedController.signal, cleanup }; +} diff --git a/src/app/v1/_lib/proxy/forwarder.ts b/src/app/v1/_lib/proxy/forwarder.ts index 8b6f383f1..c54625203 100644 --- a/src/app/v1/_lib/proxy/forwarder.ts +++ b/src/app/v1/_lib/proxy/forwarder.ts @@ -52,6 +52,7 @@ import { buildProxyUrl } from "../url"; import { rectifyBillingHeader } from "./billing-header-rectifier"; import { bindClientAbortListener } from "./client-abort-listener"; import { deriveClientSafeUpstreamErrorMessage } from "./client-error-message"; +import { combineAbortSignals } from "./combine-abort-signals"; import { isStandardProxyEndpointPath } from "./endpoint-family-catalog"; import { resolveEndpointPolicy, shouldEnforceStrictEndpointPoolPolicy } from "./endpoint-policy"; import { @@ -2706,56 +2707,18 @@ export class ProxyForwarder { } // 2. 组合双路信号:response + client - let combinedSignal: AbortSignal | undefined; const signals = [responseController.signal]; if (session.clientAbortSignal) { signals.push(session.clientAbortSignal); } - // ⭐ AbortSignal.any 实现(兼容所有环境) - // 原因:Next.js standalone 可能覆盖全局 AbortSignal,导致原生 any 方法不可用 - if ("any" in AbortSignal && typeof AbortSignal.any === "function") { - // 优先使用原生实现(Node.js 20.3+) - combinedSignal = AbortSignal.any(signals); - logger.debug("ProxyForwarder: Using native AbortSignal.any", { - signalCount: signals.length, - }); - } else { - // Polyfill: 手动实现多信号组合逻辑 - logger.debug("ProxyForwarder: Using AbortSignal.any polyfill", { - signalCount: signals.length, - reason: "Native AbortSignal.any not available", - }); - - const combinedController = new AbortController(); - const cleanupHandlers: Array<() => void> = []; - - // 为每个信号添加监听器 - for (const signal of signals) { - // 如果已经有信号中断,立即中断组合信号 - if (signal.aborted) { - combinedController.abort(); - break; - } - - // 监听信号中断事件 - const abortHandler = () => { - // 中断组合信号 - combinedController.abort(); - // 清理所有监听器(避免内存泄漏) - cleanupHandlers.forEach((cleanup) => cleanup()); - }; - - signal.addEventListener("abort", abortHandler, { once: true }); - - // 记录清理函数 - cleanupHandlers.push(() => { - signal.removeEventListener("abort", abortHandler); - }); - } - - combinedSignal = combinedController.signal; - } + // 优先 Node 20.3+ 原生 AbortSignal.any(V8 内部管理 listener,无需手动 cleanup); + // Next.js standalone 覆盖全局时 fallback 到 polyfill,由调用方在请求结束时调用 + // cleanupCombinedSignal 解绑源信号上的 listener,避免持有 session/请求体闭包。 + const { signal: combinedSignal, cleanup: cleanupCombinedSignal } = combineAbortSignals(signals); + logger.debug("ProxyForwarder: Combined abort signals", { + signalCount: signals.length, + }); const init: UndiciFetchOptions = { method: session.method, @@ -2849,6 +2812,9 @@ export class ProxyForwarder { clearTimeout(responseTimeoutId); } + // Polyfill 路径上需要主动解绑源信号的 abort listener(response-handler 不会执行)。 + cleanupCombinedSignal(); + // Release agent ref count on fetch failure (request never started streaming) const releaseKey = proxyConfig?.cacheKey ?? directConnectionCacheKey; const releaseDispatcherId = proxyConfig?.dispatcherId ?? directConnectionDispatcherId; @@ -3281,6 +3247,8 @@ export class ProxyForwarder { if (errorReleaseKey && errorReleaseDispatcherId) { getGlobalAgentPool().releaseAgent(errorReleaseKey, errorReleaseDispatcherId); } + // 同上:response-handler 不会跑,polyfill 路径上的源信号 listener 必须在此解绑。 + cleanupCombinedSignal(); } } @@ -3308,14 +3276,19 @@ export class ProxyForwarder { // Attach agent release callback for in-flight reference counting. // response-handler must call this in its finally block after the stream is fully consumed. + // 同时复用此回调作为 combineAbortSignals polyfill 的 cleanup 入口:response-handler 已经 + // 保证在请求结束时(成功/异常)幂等地调用 releaseAgent,把 cleanup 合并到这里就不必再 + // 改造 response-handler 的所有 finally 调用点。两个动作互不影响,cleanup 内部自带 cleaned + // 标志,重复调用安全。 const agentCacheKeyToRelease = proxyConfig?.cacheKey ?? directConnectionCacheKey; const agentDispatcherIdToRelease = proxyConfig?.dispatcherId ?? directConnectionDispatcherId; - if (agentCacheKeyToRelease && agentDispatcherIdToRelease) { - const pool = getGlobalAgentPool(); - sessionWithTimeout.releaseAgent = () => { + const pool = agentCacheKeyToRelease && agentDispatcherIdToRelease ? getGlobalAgentPool() : null; + sessionWithTimeout.releaseAgent = () => { + if (pool && agentCacheKeyToRelease && agentDispatcherIdToRelease) { pool.releaseAgent(agentCacheKeyToRelease, agentDispatcherIdToRelease); - }; - } + } + cleanupCombinedSignal(); + }; return response; } diff --git a/tests/unit/proxy/combine-abort-signals.test.ts b/tests/unit/proxy/combine-abort-signals.test.ts new file mode 100644 index 000000000..7a86ce975 --- /dev/null +++ b/tests/unit/proxy/combine-abort-signals.test.ts @@ -0,0 +1,90 @@ +import { afterEach, beforeEach, describe, expect, it } from "vitest"; +import { combineAbortSignals } from "@/app/v1/_lib/proxy/combine-abort-signals"; + +type MutableAbortSignal = { any?: unknown }; + +describe("combineAbortSignals", () => { + describe("native AbortSignal.any path", () => { + it("delegates to AbortSignal.any when available and cleanup is noop", () => { + const c1 = new AbortController(); + const c2 = new AbortController(); + + const { signal, cleanup } = combineAbortSignals([c1.signal, c2.signal]); + + expect(signal.aborted).toBe(false); + c1.abort(); + expect(signal.aborted).toBe(true); + + // cleanup should be safe to call (noop) — no listeners owned by us. + expect(() => cleanup()).not.toThrow(); + }); + }); + + describe("polyfill path (AbortSignal.any unavailable)", () => { + let originalAny: unknown; + + beforeEach(() => { + originalAny = (AbortSignal as MutableAbortSignal).any; + // 赋 undefined 让 helper 的 `typeof ... === "function"` check 走 polyfill; + // delete 在部分 V8 版本上对 static 不生效,赋值更可靠。 + (AbortSignal as MutableAbortSignal).any = undefined; + }); + + afterEach(() => { + (AbortSignal as MutableAbortSignal).any = originalAny; + }); + + it("aborts combined signal when any source aborts", () => { + const c1 = new AbortController(); + const c2 = new AbortController(); + + const { signal } = combineAbortSignals([c1.signal, c2.signal]); + expect(signal.aborted).toBe(false); + c2.abort(); + expect(signal.aborted).toBe(true); + }); + + it("source-side abort listeners are detached after cleanup is invoked", () => { + const c1 = new AbortController(); + const c2 = new AbortController(); + + const { signal, cleanup } = combineAbortSignals([c1.signal, c2.signal]); + expect(signal.aborted).toBe(false); + + // 模拟请求正常完成:调用方在 finally 中触发 cleanup。 + cleanup(); + + // 源信号此后再 abort,不应再传播到组合信号(listener 已解绑)。 + c1.abort(); + c2.abort(); + expect(signal.aborted).toBe(false); + }); + + it("auto-cleans listeners when a source aborts (does not require explicit cleanup)", () => { + const c1 = new AbortController(); + const c2 = new AbortController(); + + const { signal, cleanup } = combineAbortSignals([c1.signal, c2.signal]); + c1.abort(); + expect(signal.aborted).toBe(true); + + // 二次 cleanup 必须幂等(请求结束的 finally 仍会调)。 + expect(() => cleanup()).not.toThrow(); + expect(() => cleanup()).not.toThrow(); + }); + + it("immediately aborts and cleans up when a source signal is already aborted", () => { + const c1 = new AbortController(); + c1.abort(); + const c2 = new AbortController(); + + const { signal, cleanup } = combineAbortSignals([c1.signal, c2.signal]); + expect(signal.aborted).toBe(true); + + // 后到的源 abort 不应再触发任何路径(已 cleanup)。 + c2.abort(); + expect(signal.aborted).toBe(true); + expect(() => cleanup()).not.toThrow(); + }); + }); +});