From 3f1fac52d98a51ae47b3b434a56012ee91b2687c Mon Sep 17 00:00:00 2001 From: Menci Date: Sat, 20 Jun 2026 00:38:52 +0800 Subject: [PATCH 1/3] feat(gateway): forward anthropic-ratelimit-* + request-id + cf-ray response headers across LLM surfaces Real Claude Code's `/status` indicator reads anthropic-ratelimit-unified-* headers off every /v1/messages response. When CC is pointed at our gateway, those headers must reach the downstream client untouched or the status bar shows nothing. Operator support tickets and live debugging additionally need request-id / x-request-id (Anthropic / OpenAI vendor traces) and cf-ray (Cloudflare edge trace) to flow through verbatim. Adds a prefix allowlist (`anthropic-ratelimit-`) plus an exact-name allowlist (`request-id`, `x-request-id`, `cf-ray`) on the response composer; future ratelimit dimensions the upstream introduces (e.g. a future `anthropic-ratelimit-tier-*`) are forwarded automatically without touching the composition logic. Two helpers cover both response shapes: `forwardUpstreamHeaders` stages the allowlisted entries onto the Hono context so `streamSSE` emits them, `mergeForwardedUpstreamHeaders` builds a `HeadersInit` for the non-streaming `Response.json` path. Both accept `undefined` so callers can pass `result.headers` directly. Wired into all four LLM surfaces (Messages / Chat Completions / Responses / Gemini). `EventResult.headers` is the field providers populate from the upstream Response so the source-side `respond` layer can read them; the provider-side plumbing lands as part of the broader provider rework. --- .../llm/chat-completions/respond.ts | 5 +- .../src/data-plane/llm/gemini/respond.ts | 5 +- .../src/data-plane/llm/messages/respond.ts | 5 +- .../data-plane/llm/messages/respond_test.ts | 98 ++++++++++++++++++- .../src/data-plane/llm/responses/respond.ts | 5 +- .../src/data-plane/llm/shared/respond.ts | 50 ++++++++++ packages/provider/src/result.ts | 8 ++ 7 files changed, 165 insertions(+), 11 deletions(-) diff --git a/packages/gateway/src/data-plane/llm/chat-completions/respond.ts b/packages/gateway/src/data-plane/llm/chat-completions/respond.ts index 07e56679b..1d3f5b50b 100644 --- a/packages/gateway/src/data-plane/llm/chat-completions/respond.ts +++ b/packages/gateway/src/data-plane/llm/chat-completions/respond.ts @@ -5,7 +5,7 @@ import { CHAT_COMPLETIONS_MISSING_TERMINAL_MESSAGE, collectChatCompletionsProtoc import { chatCompletionsProtocolFrameToSSEFrame } from './events/to-sse.ts'; import { tokenUsage } from '../../shared/telemetry/usage.ts'; import type { GatewayCtx } from '../shared/gateway-ctx.ts'; -import { SourceStreamState, eventResultMetadata, plainResultToResponse, recordPerformance, recordUsage } from '../shared/respond.ts'; +import { SourceStreamState, eventResultMetadata, forwardUpstreamHeaders, mergeForwardedUpstreamHeaders, plainResultToResponse, recordPerformance, recordUsage } from '../shared/respond.ts'; import { type StreamCompletion, writeSSEFrames } from '../shared/stream/sse.ts'; import type { ChatCompletionsStreamEvent, ChatCompletionsResult } from '@floway-dev/protocols/chat-completions'; import { chatCompletionsErrorPayloadMessage } from '@floway-dev/protocols/chat-completions'; @@ -47,13 +47,14 @@ export const respondChatCompletions = async ( const usage = response.usage ? tokenUsageFromChatCompletionsUsage(response.usage) : null; await recordUsage(ctx, metadata.modelIdentity, usage); recordPerformance(ctx, metadata.performance, state.failed); - return { success: true, response: Response.json(response) }; + return { success: true, response: Response.json(response, { headers: mergeForwardedUpstreamHeaders(undefined, result.headers) }) }; } catch (error) { recordPerformance(ctx, result.performance, true); return { success: false, response: internalChatCompletionsErrorResponse(502, toInternalDebugError(error, 'chat-completions')) }; } } + forwardUpstreamHeaders(c, result.headers); const response = streamSSE(c, async stream => { let completion: StreamCompletion = 'error'; try { diff --git a/packages/gateway/src/data-plane/llm/gemini/respond.ts b/packages/gateway/src/data-plane/llm/gemini/respond.ts index 5b2dae517..c8cd6ac97 100644 --- a/packages/gateway/src/data-plane/llm/gemini/respond.ts +++ b/packages/gateway/src/data-plane/llm/gemini/respond.ts @@ -6,7 +6,7 @@ import { GEMINI_MISSING_TERMINAL_MESSAGE, isGeminiErrorEvent, isGeminiTerminalEv import { geminiProtocolFrameToSSEFrame } from './events/to-sse.ts'; import { tokenUsage } from '../../shared/telemetry/usage.ts'; import type { GatewayCtx } from '../shared/gateway-ctx.ts'; -import { SourceStreamState, eventResultMetadata, plainResultToResponse, recordPerformance, recordUsage } from '../shared/respond.ts'; +import { SourceStreamState, eventResultMetadata, forwardUpstreamHeaders, mergeForwardedUpstreamHeaders, plainResultToResponse, recordPerformance, recordUsage } from '../shared/respond.ts'; import { type StreamCompletion, writeSSEFrames } from '../shared/stream/sse.ts'; import { type ProtocolFrame, sseCommentFrame, sseFrame } from '@floway-dev/protocols/common'; import type { GeminiErrorResponse, GeminiResult, GeminiStreamEvent, GeminiUsageMetadata } from '@floway-dev/protocols/gemini'; @@ -45,13 +45,14 @@ export const respondGemini = async ( const metadata = await eventResultMetadata(result); await recordUsage(ctx, metadata.modelIdentity, tokenUsageFromGeminiResponse(response)); recordPerformance(ctx, metadata.performance, state.failed); - return { success: true, response: Response.json(response) }; + return { success: true, response: Response.json(response, { headers: mergeForwardedUpstreamHeaders(undefined, result.headers) }) }; } catch (error) { recordPerformance(ctx, result.performance, true); return { success: false, response: geminiCollectErrorResponse(error) }; } } + forwardUpstreamHeaders(c, result.headers); const response = streamSSE(c, async stream => { let completion: StreamCompletion = 'error'; try { diff --git a/packages/gateway/src/data-plane/llm/messages/respond.ts b/packages/gateway/src/data-plane/llm/messages/respond.ts index 0008704cf..6f00b3f2d 100644 --- a/packages/gateway/src/data-plane/llm/messages/respond.ts +++ b/packages/gateway/src/data-plane/llm/messages/respond.ts @@ -5,7 +5,7 @@ import { MESSAGES_MISSING_TERMINAL_MESSAGE, collectMessagesProtocolEventsToResul import { messagesProtocolFrameToSSEFrame } from './events/to-sse.ts'; import { tokenUsage } from '../../shared/telemetry/usage.ts'; import type { GatewayCtx } from '../shared/gateway-ctx.ts'; -import { SourceStreamState, eventResultMetadata, plainResultToResponse, recordPerformance, recordUsage } from '../shared/respond.ts'; +import { SourceStreamState, eventResultMetadata, forwardUpstreamHeaders, mergeForwardedUpstreamHeaders, plainResultToResponse, recordPerformance, recordUsage } from '../shared/respond.ts'; import { type StreamCompletion, writeSSEFrames } from '../shared/stream/sse.ts'; import { type ProtocolFrame, sseFrame } from '@floway-dev/protocols/common'; import type { MessagesMessageDeltaEvent, MessagesStreamEvent, MessagesUsage } from '@floway-dev/protocols/messages'; @@ -47,13 +47,14 @@ export const respondMessages = async ( const metadata = await eventResultMetadata(result); await recordUsage(ctx, metadata.modelIdentity, tokenUsageFromMessagesUsage(response.usage)); recordPerformance(ctx, metadata.performance, state.failed); - return { success: true, response: Response.json(response) }; + return { success: true, response: Response.json(response, { headers: mergeForwardedUpstreamHeaders(undefined, result.headers) }) }; } catch (error) { recordPerformance(ctx, result.performance, true); return { success: false, response: internalMessagesErrorResponse(502, toInternalDebugError(error, 'messages')) }; } } + forwardUpstreamHeaders(c, result.headers); const response = streamSSE(c, async stream => { let completion: StreamCompletion = 'error'; try { diff --git a/packages/gateway/src/data-plane/llm/messages/respond_test.ts b/packages/gateway/src/data-plane/llm/messages/respond_test.ts index 91b186293..56d1b9cdb 100644 --- a/packages/gateway/src/data-plane/llm/messages/respond_test.ts +++ b/packages/gateway/src/data-plane/llm/messages/respond_test.ts @@ -1,9 +1,14 @@ +import { Hono } from 'hono'; import { test } from 'vitest'; -import { createMessagesStreamUsageState, tokenUsageFromMessagesFrame } from './respond.ts'; -import { eventFrame } from '@floway-dev/protocols/common'; +import { createMessagesStreamUsageState, respondMessages, tokenUsageFromMessagesFrame } from './respond.ts'; +import { initRepo } from '../../../repo/index.ts'; +import { InMemoryRepo } from '../../../repo/memory.ts'; +import type { GatewayCtx } from '../shared/gateway-ctx.ts'; +import { doneFrame, eventFrame, type ProtocolFrame } from '@floway-dev/protocols/common'; import type { MessagesStreamEvent } from '@floway-dev/protocols/messages'; -import { assertEquals } from '@floway-dev/test-utils'; +import { eventResult, type ExecuteResult } from '@floway-dev/provider'; +import { assertEquals, testTelemetryModelIdentity } from '@floway-dev/test-utils'; const stop = () => eventFrame({ type: 'message_stop' } satisfies MessagesStreamEvent); @@ -205,3 +210,90 @@ test('Messages stream usage falls back to the rolled-up cache_creation when the output: 1, }); }); + +// --- header forwarding --- + +const forwardedHeadersFixture = (): Headers => new Headers({ + 'anthropic-ratelimit-unified-status': 'allowed_warning', + 'anthropic-ratelimit-unified-fallback-percentage': '50', + 'request-id': 'req_anthropic_abc', + 'cf-ray': 'cf_ray_xyz', + 'x-internal-cache-id': 'cache-abc', + 'content-type': 'text/event-stream', +}); + +const makeRespondCtx = (): GatewayCtx => ({ + apiKeyId: 'key_respond_test', + upstreamIds: null, + wantsStream: false, + runtimeLocation: 'test', + backgroundScheduler: () => {}, + requestStartedAt: 0, + currentColo: null, +}); + +const messagesEventsForRespond = (): readonly MessagesStreamEvent[] => [ + { + type: 'message_start', + message: { + id: 'msg_1', type: 'message', role: 'assistant', content: [], model: 'claude-test', + stop_reason: null, stop_sequence: null, + usage: { input_tokens: 3, output_tokens: 0 }, + }, + }, + { type: 'content_block_start', index: 0, content_block: { type: 'text', text: '' } }, + { type: 'content_block_delta', index: 0, delta: { type: 'text_delta', text: 'hi' } }, + { type: 'content_block_stop', index: 0 }, + { type: 'message_delta', delta: { stop_reason: 'end_turn', stop_sequence: null }, usage: { output_tokens: 1 } }, + { type: 'message_stop' }, +]; + +const messagesProtocolFrames = async function* (): AsyncGenerator> { + for (const event of messagesEventsForRespond()) yield eventFrame(event); + yield doneFrame(); +}; + +const callRespond = async (wantsStream: boolean): Promise => { + initRepo(new InMemoryRepo()); + const app = new Hono(); + let captured: Response | undefined; + app.get('/', async c => { + const result: ExecuteResult> = eventResult( + messagesProtocolFrames(), + testTelemetryModelIdentity, + undefined, + undefined, + forwardedHeadersFixture(), + ); + const { response } = await respondMessages(c, result, wantsStream, makeRespondCtx()); + captured = response; + return response; + }); + await app.request('/'); + if (!captured) throw new Error('respondMessages did not produce a Response'); + return captured; +}; + +test('respondMessages forwards allowlisted upstream headers on the non-streaming JSON response', async () => { + const response = await callRespond(false); + assertEquals(response.headers.get('anthropic-ratelimit-unified-status'), 'allowed_warning'); + assertEquals(response.headers.get('anthropic-ratelimit-unified-fallback-percentage'), '50'); + assertEquals(response.headers.get('request-id'), 'req_anthropic_abc'); + assertEquals(response.headers.get('cf-ray'), 'cf_ray_xyz'); + // The allowlist is by prefix or exact name — unrelated upstream headers + // must not be proxied to the client. + assertEquals(response.headers.get('x-internal-cache-id'), null); +}); + +test('respondMessages forwards allowlisted upstream headers on the streaming SSE response', async () => { + const response = await callRespond(true); + assertEquals(response.headers.get('anthropic-ratelimit-unified-status'), 'allowed_warning'); + assertEquals(response.headers.get('anthropic-ratelimit-unified-fallback-percentage'), '50'); + assertEquals(response.headers.get('request-id'), 'req_anthropic_abc'); + assertEquals(response.headers.get('cf-ray'), 'cf_ray_xyz'); + assertEquals(response.headers.get('x-internal-cache-id'), null); + // Drain the body so the lazy generator releases its resources and the + // background `finally` block in `streamSSE` doesn't keep the test runner + // alive. + await response.text(); +}); diff --git a/packages/gateway/src/data-plane/llm/responses/respond.ts b/packages/gateway/src/data-plane/llm/responses/respond.ts index 58795f9c7..e50712060 100644 --- a/packages/gateway/src/data-plane/llm/responses/respond.ts +++ b/packages/gateway/src/data-plane/llm/responses/respond.ts @@ -5,7 +5,7 @@ import { RESPONSES_MISSING_TERMINAL_MESSAGE, collectResponsesProtocolEventsToRes import { responsesProtocolFrameToSSEFrame } from './events/to-sse.ts'; import { tokenUsage } from '../../shared/telemetry/usage.ts'; import type { GatewayCtx } from '../shared/gateway-ctx.ts'; -import { SourceStreamState, eventResultMetadata, plainResultToResponse, recordPerformance, recordUsage } from '../shared/respond.ts'; +import { SourceStreamState, eventResultMetadata, forwardUpstreamHeaders, mergeForwardedUpstreamHeaders, plainResultToResponse, recordPerformance, recordUsage } from '../shared/respond.ts'; import { type StreamCompletion, writeSSEFrames } from '../shared/stream/sse.ts'; import { type ProtocolFrame, sseCommentFrame, sseFrame } from '@floway-dev/protocols/common'; import { isResponsesTerminalEvent, type ResponsesResult, type ResponsesStreamEvent, responsesResultFromStreamEvent } from '@floway-dev/protocols/responses'; @@ -44,13 +44,14 @@ export const respondResponses = async ( const metadata = await eventResultMetadata(result); await recordUsage(ctx, metadata.modelIdentity, tokenUsageFromResponsesResult(response)); recordPerformance(ctx, metadata.performance, state.failed || response.status === 'failed'); - return { success: true, response: Response.json(response) }; + return { success: true, response: Response.json(response, { headers: mergeForwardedUpstreamHeaders(undefined, result.headers) }) }; } catch (error) { recordPerformance(ctx, result.performance, true); return { success: false, response: internalResponsesErrorResponse(502, toInternalDebugError(error, 'responses')) }; } } + forwardUpstreamHeaders(c, result.headers); const response = streamSSE(c, async stream => { let completion: StreamCompletion = 'error'; try { diff --git a/packages/gateway/src/data-plane/llm/shared/respond.ts b/packages/gateway/src/data-plane/llm/shared/respond.ts index 10789cdc1..4bb383d64 100644 --- a/packages/gateway/src/data-plane/llm/shared/respond.ts +++ b/packages/gateway/src/data-plane/llm/shared/respond.ts @@ -1,3 +1,5 @@ +import type { Context } from 'hono'; + import type { StreamCompletion } from './stream/sse.ts'; import type { TokenUsage } from '../../../repo/types.ts'; import { recordRequestPerformance } from '../../shared/telemetry/performance.ts'; @@ -60,3 +62,51 @@ export const recordUsage = async (ctx: GatewayCtx, modelIdentity: TelemetryModel export const recordPerformance = (ctx: GatewayCtx, context: EventResultMetadata['performance'], failed: boolean): void => { recordRequestPerformance(ctx.backgroundScheduler, context, failed, performance.now() - ctx.requestStartedAt); }; + +// Upstream-emitted hints we propagate verbatim to the downstream client. +// +// The prefix list covers Anthropic's plan-billing surface: the +// `anthropic-ratelimit-unified-*` family carries quotas, resets, and warning +// thresholds, and the official `claude-code` CLI's `/status` indicator reads +// them. Dropping the headers makes the gateway look like an account with no +// rate-limit state. The allowlist is by prefix so new dimensions the upstream +// introduces (e.g. a future `anthropic-ratelimit-tier-*`) are forwarded +// automatically. +// +// The exact-name list covers operator-trace identifiers — `request-id` / +// `x-request-id` (Anthropic / OpenAI vendor traces) and `cf-ray` (Cloudflare's +// edge trace). Support tickets and live debugging rely on these reaching the +// downstream client unmodified. +const FORWARDED_HEADER_PREFIXES = ['anthropic-ratelimit-'] as const; +const FORWARDED_HEADER_NAMES = new Set(['request-id', 'x-request-id', 'cf-ray']); + +export const isForwardableUpstreamHeader = (name: string): boolean => { + const lowered = name.toLowerCase(); + if (FORWARDED_HEADER_NAMES.has(lowered)) return true; + return FORWARDED_HEADER_PREFIXES.some(prefix => lowered.startsWith(prefix)); +}; + +// Stages allowlisted upstream headers onto the Hono context so the next +// `c.newResponse` (or `streamSSE`'s internal `c.newResponse`) emits them on +// the response. Hono's `c.header()` is the only knob that survives a later +// `c.json` or `streamSSE` call without being overwritten. Safe to call with +// `undefined` so callers can pass `result.headers` directly. +export const forwardUpstreamHeaders = (c: Context, headers: Headers | undefined): void => { + if (!headers) return; + for (const [name, value] of headers) { + if (isForwardableUpstreamHeader(name)) c.header(name, value); + } +}; + +// Returns a `HeadersInit` extending `base` with every allowlisted entry from +// `upstream`. Used by non-streaming JSON responses where the response is +// built directly (`Response.json(...)`) instead of through Hono's `c`. +export const mergeForwardedUpstreamHeaders = (base: HeadersInit | undefined, upstream: Headers | undefined): HeadersInit => { + const merged = new Headers(base); + if (upstream) { + for (const [name, value] of upstream) { + if (isForwardableUpstreamHeader(name)) merged.set(name, value); + } + } + return merged; +}; diff --git a/packages/provider/src/result.ts b/packages/provider/src/result.ts index 24d7048b2..c47a31a43 100644 --- a/packages/provider/src/result.ts +++ b/packages/provider/src/result.ts @@ -7,6 +7,12 @@ export interface EventResult { modelIdentity: TelemetryModelIdentity; performance?: PerformanceTelemetryContext; finalMetadata?: Promise; + // Upstream HTTP response headers, propagated from the provider so the + // source-side `respond` layer can forward billing/rate-limit hints + // (`anthropic-ratelimit-*`) and operator-trace identifiers (`request-id`, + // `x-request-id`, `cf-ray`) verbatim to the downstream client. Absent on + // lifted/synthesized streams that have no raw upstream Response. + headers?: Headers; } export interface EventResultMetadata { @@ -48,10 +54,12 @@ export const eventResult = ( modelIdentity: TelemetryModelIdentity, performance?: PerformanceTelemetryContext, finalMetadata?: Promise, + headers?: Headers, ): EventResult => { const result: EventResult = { type: 'events', events, modelIdentity }; if (performance !== undefined) result.performance = performance; if (finalMetadata !== undefined) result.finalMetadata = finalMetadata; + if (headers !== undefined) result.headers = headers; return result; }; From 14f607041ddc45f1291ee0e8fe56a6fe4a3c931c Mon Sep 17 00:00:00 2001 From: Menci Date: Sat, 20 Jun 2026 01:31:53 +0800 Subject: [PATCH 2/3] fix(provider): carry upstream Response headers on ProviderStreamResult so respond layer forwards them MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The header-forwarding helpers added in commit f3446be0 read `result.headers` at 8 respond.ts call sites, but no provider was populating the field — every streaming success funneled `undefined` to `eventResult`. Without the wire, the allowlist (`anthropic-ratelimit-*`, `request-id`, `x-request-id`, `cf-ray`) was dead code on the happy path. Thread the upstream `Headers` through the streaming-success branch of `ProviderStreamResult` and propagate it from `streamingProviderCall` (populated for every provider that goes through it: Copilot, Custom, Azure, Codex) all the way to the EventResult that `respond` reads. The single shared `providerStreamResultToExecuteResult` helper is the seam where every protocol's `attempt` converts ProviderStreamResult into EventResult, so wiring it once covers Messages, Chat Completions, Responses, and Gemini (which reaches its upstreams via translation through the other three). The field stays optional on `ProviderStreamResult.ok:true`, matching the same shape on `EventResult`: synthesized streams that have no upstream Response behind them (e.g. a future Copilot boundary interceptor that constructs events from a non-wire source) genuinely have nothing to forward, so the contract reflects that rather than forcing producers to fabricate an empty `Headers`. Also forwards `headers` through the two existing EventResult rebuild sites that drop fields by default — the Responses `canonicalize-encrypted-content` interceptor and the `responsesAttempt.generate` wrap that mints the stored response id — so a header that survives the provider boundary survives the inner chain too. Tests added: - Per-protocol `attempt_test.ts`: stub the provider with a known Headers fixture and assert it lands on the resulting EventResult (Messages, Chat Completions, Responses, Gemini via Chat Completions). - `messages/http_test.ts`: full provider → attempt → respond chain for both streaming and non-streaming, asserting allowlisted entries reach the outgoing HTTP response and non-allowlisted ones do not. --- .../llm/chat-completions/attempt_test.ts | 30 +++++++++-- .../llm/chat-completions/http_test.ts | 10 ++-- .../llm/chat-completions/serve_test.ts | 10 ++-- .../src/data-plane/llm/gemini/attempt_test.ts | 28 ++++++++-- .../src/data-plane/llm/gemini/http_test.ts | 8 +-- .../src/data-plane/llm/gemini/serve_test.ts | 10 ++-- .../data-plane/llm/messages/attempt_test.ts | 28 +++++++++- .../src/data-plane/llm/messages/http_test.ts | 54 ++++++++++++++++++- .../data-plane/llm/messages/respond_test.ts | 6 +-- .../src/data-plane/llm/messages/serve_test.ts | 5 +- .../src/data-plane/llm/responses/attempt.ts | 9 ++-- .../data-plane/llm/responses/attempt_test.ts | 38 ++++++++++++- .../src/data-plane/llm/responses/http_test.ts | 3 ++ .../canonicalize-encrypted-content.ts | 6 ++- .../interceptors/retry-cyber-policy_test.ts | 2 +- .../data-plane/llm/responses/serve_test.ts | 6 ++- .../data-plane/llm/shared/attempt-helpers.ts | 2 +- .../src/data-plane/llm/shared/respond.ts | 2 +- .../inject-default-instructions_test.ts | 2 +- .../responses/inject-session-id_test.ts | 2 +- .../strip-unsupported-fields_test.ts | 2 +- packages/provider-copilot/src/provider.ts | 15 +++++- packages/provider/src/provider.ts | 12 +++-- packages/provider/src/result.ts | 24 +++++---- packages/provider/src/streaming.ts | 2 +- 25 files changed, 251 insertions(+), 65 deletions(-) diff --git a/packages/gateway/src/data-plane/llm/chat-completions/attempt_test.ts b/packages/gateway/src/data-plane/llm/chat-completions/attempt_test.ts index 22fba3a31..9651da970 100644 --- a/packages/gateway/src/data-plane/llm/chat-completions/attempt_test.ts +++ b/packages/gateway/src/data-plane/llm/chat-completions/attempt_test.ts @@ -114,7 +114,7 @@ const installRepo = (): InMemoryRepo => { test('generate native chat-completions target calls provider.callChatCompletions', async () => { installRepo(); const callChatCompletions = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', headers: new Headers(), })); const result = await chatCompletionsAttempt.generate({ payload: makePayload(), @@ -132,7 +132,7 @@ test('generate native chat-completions target calls provider.callChatCompletions test('generate translate-to-messages branch routes through messagesAttempt', async () => { installRepo(); const callMessages = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', headers: new Headers(), })); const result = await chatCompletionsAttempt.generate({ payload: makePayload(), @@ -161,6 +161,7 @@ test('generate translate-to-responses branch routes through responsesAttempt', a ok: true, events: makeProtocolFrames([{ type: 'response.completed', sequence_number: 0, response: respResp }]), modelKey: 'k', + headers: new Headers(), })); const result = await chatCompletionsAttempt.generate({ payload: makePayload(), @@ -180,7 +181,7 @@ test('generate inherits invocation headers across translation to Messages', asyn let observedHeaders: Record | undefined; const callMessages = vi.fn(async (_model: unknown, _body: unknown, _signal?: AbortSignal, headers?: Record): Promise> => { observedHeaders = headers; - return { ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k' }; + return { ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', headers: new Headers() }; }); const result = await chatCompletionsAttempt.generate({ payload: makePayload(), @@ -212,6 +213,7 @@ test('generate inherits invocation headers across translation to Responses', asy ok: true, events: makeProtocolFrames([{ type: 'response.completed', sequence_number: 0, response: respResp }]), modelKey: 'k', + headers: new Headers(), }; }); const result = await chatCompletionsAttempt.generate({ @@ -226,3 +228,25 @@ test('generate inherits invocation headers across translation to Responses', asy await collectEvents(result.events); assertEquals(observedHeaders?.['x-test'], 'abc'); }); + +test('generate propagates upstream response headers onto the EventResult so respond can forward them', async () => { + installRepo(); + const upstreamHeaders = new Headers({ + 'anthropic-ratelimit-unified-status': 'allowed', + 'cf-ray': 'cf_ray_cc', + }); + const callChatCompletions = vi.fn(async (): Promise> => ({ + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', headers: upstreamHeaders, + })); + const result = await chatCompletionsAttempt.generate({ + payload: makePayload(), + ctx: makeGatewayCtx(), + store: createNonResponsesSourceStore(API_KEY_ID), + candidate: makeCandidate({ callChatCompletions }), + }); + assertEquals(result.type, 'events'); + if (result.type !== 'events') throw new Error('unreachable'); + assertEquals(result.headers?.get('anthropic-ratelimit-unified-status'), 'allowed'); + assertEquals(result.headers?.get('cf-ray'), 'cf_ray_cc'); + await collectEvents(result.events); +}); diff --git a/packages/gateway/src/data-plane/llm/chat-completions/http_test.ts b/packages/gateway/src/data-plane/llm/chat-completions/http_test.ts index eda9aa6ee..93adb4c83 100644 --- a/packages/gateway/src/data-plane/llm/chat-completions/http_test.ts +++ b/packages/gateway/src/data-plane/llm/chat-completions/http_test.ts @@ -106,7 +106,7 @@ const makeCandidate = (overrides: { test('POST /v1/chat/completions streams a successful SSE body', async () => { installRepo(); const callChatCompletions = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', headers: new Headers(), })); queueCandidates([makeCandidate({ callChatCompletions })]); @@ -127,7 +127,7 @@ test('POST /v1/chat/completions streams a successful SSE body', async () => { test('POST /v1/chat/completions returns a single JSON body when stream is omitted', async () => { installRepo(); const callChatCompletions = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', headers: new Headers(), })); queueCandidates([makeCandidate({ callChatCompletions })]); @@ -147,7 +147,7 @@ test('POST /v1/chat/completions returns a single JSON body when stream is omitte test('POST /v1/chat/completions omits the usage-only chunk unless stream_options.include_usage is set', async () => { installRepo(); const callChatCompletions = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', headers: new Headers(), })); queueCandidates([makeCandidate({ callChatCompletions })]); @@ -166,7 +166,7 @@ test('POST /v1/chat/completions omits the usage-only chunk unless stream_options test('POST /v1/chat/completions emits the usage-only chunk when stream_options.include_usage is true', async () => { installRepo(); const callChatCompletions = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', headers: new Headers(), })); queueCandidates([makeCandidate({ callChatCompletions })]); @@ -194,7 +194,7 @@ test('POST /v1/chat/completions emits the usage-only chunk when stream_options.i test('POST /v1/chat/completions does not write any non-auth Hono context slot', async () => { installRepo(); const callChatCompletions = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', headers: new Headers(), })); queueCandidates([makeCandidate({ callChatCompletions })]); diff --git a/packages/gateway/src/data-plane/llm/chat-completions/serve_test.ts b/packages/gateway/src/data-plane/llm/chat-completions/serve_test.ts index b852e4423..e8e488ea9 100644 --- a/packages/gateway/src/data-plane/llm/chat-completions/serve_test.ts +++ b/packages/gateway/src/data-plane/llm/chat-completions/serve_test.ts @@ -146,7 +146,7 @@ const collectEvents = async (events: AsyncIterable test('generate routes a native Chat Completions candidate end to end', async () => { installRepo(); const callChatCompletions = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'test-model-key', + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'test-model-key', headers: new Headers(), })); queueCandidates([makeCandidate({ upstream: 'up_a', callChatCompletions })]); @@ -166,7 +166,7 @@ test('generate routes a native Chat Completions candidate end to end', async () test('generate translates through the Messages target when only that endpoint is exposed', async () => { installRepo(); const callMessages = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeMessagesResultEvents()), modelKey: 'messages-model-key', + ok: true, events: makeProtocolFrames(makeMessagesResultEvents()), modelKey: 'messages-model-key', headers: new Headers(), })); queueCandidates([makeCandidate({ upstream: 'up_m', targetApi: 'messages', callMessages })]); @@ -185,7 +185,7 @@ test('generate translates through the Messages target when only that endpoint is test('generate translates through the Responses target when only that endpoint is exposed', async () => { installRepo(); const callResponses = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames([makeResponsesResultEvent()]), modelKey: 'responses-model-key', + ok: true, events: makeProtocolFrames([makeResponsesResultEvent()]), modelKey: 'responses-model-key', headers: new Headers(), })); queueCandidates([makeCandidate({ upstream: 'up_r', targetApi: 'responses', callResponses })]); @@ -210,7 +210,7 @@ test('generate stops at the first candidate even when it yields an upstream erro ok: false, response: firstError, modelKey: 'first-key', })); const secondCall = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'second-key', + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'second-key', headers: new Headers(), })); queueCandidates([ makeCandidate({ upstream: 'up_a', callChatCompletions: firstCall }), @@ -234,7 +234,7 @@ test('generate stops at the first candidate even when it yields an upstream erro test('generate is a routing no-op when the payload carries no reasoning carriers (degenerate path)', async () => { installRepo(); const callChatCompletions = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'test-model-key', + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'test-model-key', headers: new Headers(), })); queueCandidates([ makeCandidate({ upstream: 'up_a', callChatCompletions }), diff --git a/packages/gateway/src/data-plane/llm/gemini/attempt_test.ts b/packages/gateway/src/data-plane/llm/gemini/attempt_test.ts index 92f9c3a03..810334829 100644 --- a/packages/gateway/src/data-plane/llm/gemini/attempt_test.ts +++ b/packages/gateway/src/data-plane/llm/gemini/attempt_test.ts @@ -123,7 +123,7 @@ const collectEvents = async (events: AsyncIterable test('generate translates through Chat Completions when targetApi is chat-completions', async () => { installRepo(); const callChatCompletions = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', headers: new Headers(), })); const result = await geminiAttempt.generate({ payload: makePayload(), @@ -141,7 +141,7 @@ test('generate translates through Chat Completions when targetApi is chat-comple test('generate translates through Messages when targetApi is messages', async () => { installRepo(); const callMessages = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', headers: new Headers(), })); const result = await geminiAttempt.generate({ payload: makePayload(), @@ -159,7 +159,7 @@ test('generate translates through Messages when targetApi is messages', async () test('generate translates through Responses when targetApi is responses', async () => { installRepo(); const callResponses = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames([makeResponsesResultEvent()]), modelKey: 'k', + ok: true, events: makeProtocolFrames([makeResponsesResultEvent()]), modelKey: 'k', headers: new Headers(), })); const result = await geminiAttempt.generate({ payload: makePayload(), @@ -260,3 +260,25 @@ test('countTokens refuses a non-messages candidate', async () => { if (!(thrown instanceof Error)) throw new Error('expected an Error to be thrown'); assertEquals(thrown.message.includes("targetApi='messages'"), true); }); + +test('generate propagates upstream response headers through the chat-completions translation', async () => { + installRepo(); + const upstreamHeaders = new Headers({ + 'anthropic-ratelimit-unified-status': 'allowed', + 'x-request-id': 'req_gemini_xyz', + }); + const callChatCompletions = vi.fn(async (): Promise> => ({ + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', headers: upstreamHeaders, + })); + const result = await geminiAttempt.generate({ + payload: makePayload(), + ctx: makeGatewayCtx(), + store: createNonResponsesSourceStore(API_KEY_ID), + candidate: makeCandidate({ targetApi: 'chat-completions', callChatCompletions }), + }); + assertEquals(result.type, 'events'); + if (result.type !== 'events') throw new Error('unreachable'); + assertEquals(result.headers?.get('anthropic-ratelimit-unified-status'), 'allowed'); + assertEquals(result.headers?.get('x-request-id'), 'req_gemini_xyz'); + await collectEvents(result.events); +}); diff --git a/packages/gateway/src/data-plane/llm/gemini/http_test.ts b/packages/gateway/src/data-plane/llm/gemini/http_test.ts index 96c6ce3f8..57cf11670 100644 --- a/packages/gateway/src/data-plane/llm/gemini/http_test.ts +++ b/packages/gateway/src/data-plane/llm/gemini/http_test.ts @@ -113,7 +113,7 @@ const makeMessagesEvents = (): readonly MessagesStreamEvent[] => [ test('POST /v1beta/models/:model:generateContent returns a single JSON body for non-stream generate', async () => { installRepo(); const callChatCompletions = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', headers: new Headers(), })); queueCandidates([makeCandidate({ callChatCompletions })]); @@ -133,7 +133,7 @@ test('POST /v1beta/models/:model:generateContent returns a single JSON body for test('POST /v1beta/models/:model:streamGenerateContent streams a Gemini SSE body', async () => { installRepo(); const callChatCompletions = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', headers: new Headers(), })); queueCandidates([makeCandidate({ callChatCompletions })]); @@ -196,7 +196,7 @@ test('POST /v1beta/models/:model:countTokens accepts the generateContentRequest test('POST /v1beta/models/:model:generateContent translates through Messages target end to end', async () => { installRepo(); const callMessages = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', headers: new Headers(), })); queueCandidates([makeCandidate({ targetApi: 'messages', callMessages })]); @@ -234,7 +234,7 @@ test('POST /v1beta/models/models/:model:generateContent accepts the models/ pref let resolvedModel: string | undefined; const callChatCompletions = vi.fn(async (model): Promise> => { resolvedModel = (model as { id: string }).id; - return { ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k' }; + return { ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', headers: new Headers() }; }); queueCandidates([makeCandidate({ callChatCompletions })]); diff --git a/packages/gateway/src/data-plane/llm/gemini/serve_test.ts b/packages/gateway/src/data-plane/llm/gemini/serve_test.ts index d20bc3387..c7cb7c7ef 100644 --- a/packages/gateway/src/data-plane/llm/gemini/serve_test.ts +++ b/packages/gateway/src/data-plane/llm/gemini/serve_test.ts @@ -146,7 +146,7 @@ const expectType = (r: T, k: K) test('generate translates through native Chat Completions target end to end', async () => { installRepo(); const callChatCompletions = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', headers: new Headers(), })); queueCandidates([makeCandidate({ targetApi: 'chat-completions', callChatCompletions })]); @@ -165,7 +165,7 @@ test('generate translates through native Chat Completions target end to end', as test('generate translates through Messages when only that endpoint is exposed', async () => { installRepo(); const callMessages = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', headers: new Headers(), })); queueCandidates([makeCandidate({ targetApi: 'messages', callMessages })]); @@ -184,7 +184,7 @@ test('generate translates through Messages when only that endpoint is exposed', test('generate translates through Responses when only that endpoint is exposed', async () => { installRepo(); const callResponses = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames([makeResponsesResultEvent()]), modelKey: 'k', + ok: true, events: makeProtocolFrames([makeResponsesResultEvent()]), modelKey: 'k', headers: new Headers(), })); queueCandidates([makeCandidate({ targetApi: 'responses', callResponses })]); @@ -209,7 +209,7 @@ test('generate stops at the first candidate even when it yields an upstream erro ok: false, response: firstError, modelKey: 'first-key', })); const secondCall = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'second-key', + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'second-key', headers: new Headers(), })); queueCandidates([ makeCandidate({ upstream: 'up_a', targetApi: 'chat-completions', callChatCompletions: firstCall }), @@ -234,7 +234,7 @@ test('generate stops at the first candidate even when it yields an upstream erro test('generate is a routing no-op for a bare user-text request (degenerate path)', async () => { installRepo(); const callChatCompletions = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', headers: new Headers(), })); queueCandidates([ makeCandidate({ upstream: 'up_a', targetApi: 'chat-completions', callChatCompletions }), diff --git a/packages/gateway/src/data-plane/llm/messages/attempt_test.ts b/packages/gateway/src/data-plane/llm/messages/attempt_test.ts index 515e63b4a..64a405aea 100644 --- a/packages/gateway/src/data-plane/llm/messages/attempt_test.ts +++ b/packages/gateway/src/data-plane/llm/messages/attempt_test.ts @@ -115,7 +115,7 @@ const installRepo = (): InMemoryRepo => { test('generate native messages target calls provider.callMessages with no rewrite', async () => { installRepo(); const callMessages = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', headers: new Headers(), })); const result = await messagesAttempt.generate({ payload: makePayload(), @@ -144,6 +144,7 @@ test('generate translate-to-responses branch routes through responsesAttempt', a ok: true, events: makeProtocolFrames([{ type: 'response.completed', sequence_number: 0, response: respResp }]), modelKey: 'k', + headers: new Headers(), })); const result = await messagesAttempt.generate({ payload: makePayload(), @@ -206,7 +207,7 @@ test('generate attaches the performance context and records upstream_success', a backgroundScheduler: promise => { background.push(promise); }, }; const callMessages = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'gpt-test', + ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'gpt-test', headers: new Headers(), })); const result = await messagesAttempt.generate({ @@ -235,3 +236,26 @@ test('generate attaches the performance context and records upstream_success', a assertEquals(upstreamSamples[0]?.upstream, 'up_perf'); assertEquals(upstreamSamples[0]?.requests, 1); }); + +test('generate propagates upstream response headers onto the EventResult so respond can forward them', async () => { + installRepo(); + const upstreamHeaders = new Headers({ + 'anthropic-ratelimit-unified-status': 'allowed', + 'request-id': 'req_messages_xyz', + }); + const callMessages = vi.fn(async (): Promise> => ({ + ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', headers: upstreamHeaders, + })); + const result = await messagesAttempt.generate({ + payload: makePayload(), + ctx: makeGatewayCtx(), + store: createNonResponsesSourceStore(API_KEY_ID), + candidate: makeCandidate({ callMessages }), + }); + + assertEquals(result.type, 'events'); + if (result.type !== 'events') throw new Error('unreachable'); + assertEquals(result.headers?.get('anthropic-ratelimit-unified-status'), 'allowed'); + assertEquals(result.headers?.get('request-id'), 'req_messages_xyz'); + await collectEvents(result.events); +}); diff --git a/packages/gateway/src/data-plane/llm/messages/http_test.ts b/packages/gateway/src/data-plane/llm/messages/http_test.ts index ec1d9d5e8..f842a7640 100644 --- a/packages/gateway/src/data-plane/llm/messages/http_test.ts +++ b/packages/gateway/src/data-plane/llm/messages/http_test.ts @@ -113,7 +113,7 @@ const makeCandidate = (overrides: { test('POST /v1/messages streams a successful SSE body', async () => { installRepo(); const callMessages = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', headers: new Headers(), })); queueCandidates([makeCandidate({ callMessages })]); @@ -134,7 +134,7 @@ test('POST /v1/messages streams a successful SSE body', async () => { test('POST /v1/messages returns a single JSON body when stream is omitted', async () => { installRepo(); const callMessages = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', headers: new Headers(), })); queueCandidates([makeCandidate({ callMessages })]); @@ -189,3 +189,53 @@ test('POST /v1/messages/count_tokens proxies the upstream measurement body', asy assertEquals(body.input_tokens, 99); assertEquals(callMessagesCountTokens.mock.calls.length, 1); }); + +test('POST /v1/messages forwards allowlisted upstream response headers end-to-end (streaming)', async () => { + installRepo(); + const upstreamHeaders = new Headers({ + 'anthropic-ratelimit-unified-status': 'allowed', + 'anthropic-ratelimit-unified-remaining': '99', + 'request-id': 'req_e2e_stream', + 'x-internal-cache-id': 'cache-omit', + }); + const callMessages = vi.fn(async (): Promise> => ({ + ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', headers: upstreamHeaders, + })); + queueCandidates([makeCandidate({ callMessages })]); + + const response = await makeApp().request('/v1/messages', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ model: 'test-model', max_tokens: 32, stream: true, messages: [{ role: 'user', content: 'hello' }] }), + }); + + assertEquals(response.status, 200); + assertEquals(response.headers.get('anthropic-ratelimit-unified-status'), 'allowed'); + assertEquals(response.headers.get('anthropic-ratelimit-unified-remaining'), '99'); + assertEquals(response.headers.get('request-id'), 'req_e2e_stream'); + // Non-allowlisted upstream headers must not leak through. + assertEquals(response.headers.get('x-internal-cache-id'), null); + await response.text(); +}); + +test('POST /v1/messages forwards allowlisted upstream response headers end-to-end (non-streaming)', async () => { + installRepo(); + const upstreamHeaders = new Headers({ + 'anthropic-ratelimit-unified-status': 'allowed', + 'cf-ray': 'cf_ray_e2e', + }); + const callMessages = vi.fn(async (): Promise> => ({ + ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', headers: upstreamHeaders, + })); + queueCandidates([makeCandidate({ callMessages })]); + + const response = await makeApp().request('/v1/messages', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ model: 'test-model', max_tokens: 32, messages: [{ role: 'user', content: 'hello' }] }), + }); + + assertEquals(response.status, 200); + assertEquals(response.headers.get('anthropic-ratelimit-unified-status'), 'allowed'); + assertEquals(response.headers.get('cf-ray'), 'cf_ray_e2e'); +}); diff --git a/packages/gateway/src/data-plane/llm/messages/respond_test.ts b/packages/gateway/src/data-plane/llm/messages/respond_test.ts index 56d1b9cdb..eef257969 100644 --- a/packages/gateway/src/data-plane/llm/messages/respond_test.ts +++ b/packages/gateway/src/data-plane/llm/messages/respond_test.ts @@ -261,9 +261,7 @@ const callRespond = async (wantsStream: boolean): Promise => { const result: ExecuteResult> = eventResult( messagesProtocolFrames(), testTelemetryModelIdentity, - undefined, - undefined, - forwardedHeadersFixture(), + { headers: forwardedHeadersFixture() }, ); const { response } = await respondMessages(c, result, wantsStream, makeRespondCtx()); captured = response; @@ -280,8 +278,6 @@ test('respondMessages forwards allowlisted upstream headers on the non-streaming assertEquals(response.headers.get('anthropic-ratelimit-unified-fallback-percentage'), '50'); assertEquals(response.headers.get('request-id'), 'req_anthropic_abc'); assertEquals(response.headers.get('cf-ray'), 'cf_ray_xyz'); - // The allowlist is by prefix or exact name — unrelated upstream headers - // must not be proxied to the client. assertEquals(response.headers.get('x-internal-cache-id'), null); }); diff --git a/packages/gateway/src/data-plane/llm/messages/serve_test.ts b/packages/gateway/src/data-plane/llm/messages/serve_test.ts index 6bc2e87e9..d77aa67dd 100644 --- a/packages/gateway/src/data-plane/llm/messages/serve_test.ts +++ b/packages/gateway/src/data-plane/llm/messages/serve_test.ts @@ -165,6 +165,7 @@ test('generate routes a native Messages candidate end to end', async () => { ok: true, events: makeProtocolFrames(makeMessagesResultEvents()), modelKey: 'test-model-key', + headers: new Headers(), })); queueCandidates([makeCandidate({ upstream: 'up_a', callMessages })]); @@ -187,6 +188,7 @@ test('generate translates through the Responses target when only that endpoint i ok: true, events: makeProtocolFrames([makeResponsesResultEvent()]), modelKey: 'responses-model-key', + headers: new Headers(), })); queueCandidates([makeCandidate({ upstream: 'up_r', targetApi: 'responses', callResponses })]); @@ -211,7 +213,7 @@ test('generate stops at the first candidate even when it yields an upstream erro ok: false, response: firstError, modelKey: 'first-key', })); const secondCall = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeMessagesResultEvents('msg_second')), modelKey: 'second-key', + ok: true, events: makeProtocolFrames(makeMessagesResultEvents('msg_second')), modelKey: 'second-key', headers: new Headers(), })); queueCandidates([ makeCandidate({ upstream: 'up_a', callMessages: firstCall }), @@ -238,6 +240,7 @@ test('generate is a routing no-op when the payload carries no reasoning carriers ok: true, events: makeProtocolFrames(makeMessagesResultEvents()), modelKey: 'test-model-key', + headers: new Headers(), })); queueCandidates([ makeCandidate({ upstream: 'up_a', callMessages }), diff --git a/packages/gateway/src/data-plane/llm/responses/attempt.ts b/packages/gateway/src/data-plane/llm/responses/attempt.ts index cc8d20c91..9c8935c89 100644 --- a/packages/gateway/src/data-plane/llm/responses/attempt.ts +++ b/packages/gateway/src/data-plane/llm/responses/attempt.ts @@ -95,8 +95,11 @@ export const responsesAttempt = { responseId, }), chainResult.modelIdentity, - chainResult.performance, - chainResult.finalMetadata, + { + performance: chainResult.performance, + finalMetadata: chainResult.finalMetadata, + headers: chainResult.headers, + }, ); }, @@ -249,6 +252,6 @@ const callResponsesCompactAsExecuteResult = async ( return eventResult( syntheticEventsFromResult(providerResult.result), telemetryModelIdentity(candidate, providerResult.modelKey), - context, + { performance: context }, ); }; diff --git a/packages/gateway/src/data-plane/llm/responses/attempt_test.ts b/packages/gateway/src/data-plane/llm/responses/attempt_test.ts index 94dbc0d29..1b3da86ec 100644 --- a/packages/gateway/src/data-plane/llm/responses/attempt_test.ts +++ b/packages/gateway/src/data-plane/llm/responses/attempt_test.ts @@ -157,6 +157,7 @@ test('generate native success wraps the upstream event stream once', async () => ok: true, events: makeProviderEvents([completedEvent]), modelKey: 'test-model-key', + headers: new Headers(), })); const candidate = makeCandidate(callResponses); @@ -345,6 +346,7 @@ test('generate inherits invocation headers across translation to Messages', asyn yield doneFrame(); })(), modelKey: 'k', + headers: new Headers(), }; }, }); @@ -441,7 +443,7 @@ test('generate seeds privatePayload before interceptors so the web-search shim r ]; const callResponses = vi.fn(async (_model, body): Promise> => { capturedBody = body as { input?: unknown[] }; - return { ok: true, events: makeProviderEvents(upstreamEvents), modelKey: 'test-model-key' }; + return { ok: true, events: makeProviderEvents(upstreamEvents), modelKey: 'test-model-key', headers: new Headers() }; }); const candidate = makeCandidate(callResponses); // The shim early-returns inactive unless the binding has the flag. The @@ -502,3 +504,37 @@ test('generate seeds privatePayload before interceptors so the web-search shim r 'shim emitted the not-preserved placeholder despite a stored private payload', ); }); + +test('generate propagates upstream response headers onto the EventResult so respond can forward them', async () => { + installRepo(); + const completedEvent: ResponsesStreamEvent = { + type: 'response.completed', + sequence_number: 0, + response: makeResponsesResult(), + }; + const upstreamHeaders = new Headers({ + 'anthropic-ratelimit-unified-status': 'allowed', + 'request-id': 'req_resp_xyz', + }); + const callResponses = vi.fn(async (): Promise> => ({ + ok: true, + events: makeProviderEvents([completedEvent]), + modelKey: 'test-model-key', + headers: upstreamHeaders, + })); + const candidate = makeCandidate(callResponses); + const store = createResponsesHttpStore(API_KEY_ID, true); + const result = await responsesAttempt.generate({ + payload: makePayload(), + ctx: makeGatewayCtx(), + store, + candidate, + snapshotMode: 'append', + }); + + assertEquals(result.type, 'events'); + if (result.type !== 'events') throw new Error('unreachable'); + assertEquals(result.headers?.get('anthropic-ratelimit-unified-status'), 'allowed'); + assertEquals(result.headers?.get('request-id'), 'req_resp_xyz'); + await collectEvents(result.events); +}); diff --git a/packages/gateway/src/data-plane/llm/responses/http_test.ts b/packages/gateway/src/data-plane/llm/responses/http_test.ts index 40066bf24..f445f0841 100644 --- a/packages/gateway/src/data-plane/llm/responses/http_test.ts +++ b/packages/gateway/src/data-plane/llm/responses/http_test.ts @@ -127,6 +127,7 @@ test('POST /v1/responses streams a successful SSE body', async () => { ok: true, events: makeProviderEvents([completedEvent()]), modelKey: 'test-model-key', + headers: new Headers(), })); queueCandidates([makeCandidate({ callResponses })]); @@ -153,6 +154,7 @@ test('POST /v1/responses returns a single JSON body when stream is omitted', asy ok: true, events: makeProviderEvents([completedEvent('resp_nonstream')]), modelKey: 'test-model-key', + headers: new Headers(), })); queueCandidates([makeCandidate({ callResponses })]); @@ -268,6 +270,7 @@ test('POST /v1/responses rewrites the codex-auto-review alias before routing', a ok: true, events: makeProviderEvents([completedEvent()]), modelKey: 'test-model-key', + headers: new Headers(), }; }); queueCandidates([makeCandidate({ callResponses })]); diff --git a/packages/gateway/src/data-plane/llm/responses/interceptors/canonicalize-encrypted-content.ts b/packages/gateway/src/data-plane/llm/responses/interceptors/canonicalize-encrypted-content.ts index b25be487a..31284c93e 100644 --- a/packages/gateway/src/data-plane/llm/responses/interceptors/canonicalize-encrypted-content.ts +++ b/packages/gateway/src/data-plane/llm/responses/interceptors/canonicalize-encrypted-content.ts @@ -53,5 +53,9 @@ export const withReasoningEncryptedContentCanonicalized: ResponsesInterceptor = const result: ExecuteResult> = await run(); if (result.type !== 'events') return result; - return eventResult(canonicalizeEncryptedContent(result.events), result.modelIdentity, result.performance, result.finalMetadata); + return eventResult(canonicalizeEncryptedContent(result.events), result.modelIdentity, { + performance: result.performance, + finalMetadata: result.finalMetadata, + headers: result.headers, + }); }; diff --git a/packages/gateway/src/data-plane/llm/responses/interceptors/retry-cyber-policy_test.ts b/packages/gateway/src/data-plane/llm/responses/interceptors/retry-cyber-policy_test.ts index 2c37cccac..32a448b00 100644 --- a/packages/gateway/src/data-plane/llm/responses/interceptors/retry-cyber-policy_test.ts +++ b/packages/gateway/src/data-plane/llm/responses/interceptors/retry-cyber-policy_test.ts @@ -134,7 +134,7 @@ const protocolResult = (events: readonly ResponsesStreamEvent[], modelIdentity = for (const event of events) yield eventFrame(event); })(), modelIdentity, - performance, + { performance }, ); const collectFrames = async (events: AsyncIterable>): Promise[]> => { diff --git a/packages/gateway/src/data-plane/llm/responses/serve_test.ts b/packages/gateway/src/data-plane/llm/responses/serve_test.ts index 825044cea..544249043 100644 --- a/packages/gateway/src/data-plane/llm/responses/serve_test.ts +++ b/packages/gateway/src/data-plane/llm/responses/serve_test.ts @@ -142,6 +142,7 @@ test('generate routes a native Responses candidate end to end', async () => { ok: true, events: makeProtocolFrames([completed]), modelKey: 'test-model-key', + headers: new Headers(), })); const candidate = makeCandidate({ upstream: 'up_a', callResponses }); queueCandidates([candidate]); @@ -201,7 +202,7 @@ test('generate stops at the first candidate even when it yields an upstream erro response: makeResponsesResult('resp_second'), }; const secondCall = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames([completed]), modelKey: 'second-key', + ok: true, events: makeProtocolFrames([completed]), modelKey: 'second-key', headers: new Headers(), })); const first = makeCandidate({ upstream: 'up_a', callResponses: firstCall }); const second = makeCandidate({ upstream: 'up_b', callResponses: secondCall }); @@ -422,6 +423,7 @@ test('generate falls through translate-out to messages target', async () => { { type: 'message_stop' }, ]), modelKey: 'messages-key', + headers: new Headers(), })); const upstreamModel = stubUpstreamModel(); const provider = stubProvider({ callMessages }); @@ -475,6 +477,7 @@ test('generate falls through translate-out to chat-completions target', async () }, ]), modelKey: 'chat-completions-key', + headers: new Headers(), })); const upstreamModel = stubUpstreamModel(); const provider = stubProvider({ callChatCompletions }); @@ -518,6 +521,7 @@ test('generate reuses an existing input row when a later turn echoes the same us response: makeResponsesResult(`resp_turn_${turn}`), }]), modelKey: 'test-model-key', + headers: new Headers(), }; }); const store = createResponsesHttpStore(API_KEY_ID, true); diff --git a/packages/gateway/src/data-plane/llm/shared/attempt-helpers.ts b/packages/gateway/src/data-plane/llm/shared/attempt-helpers.ts index 323f391f1..66dd9a046 100644 --- a/packages/gateway/src/data-plane/llm/shared/attempt-helpers.ts +++ b/packages/gateway/src/data-plane/llm/shared/attempt-helpers.ts @@ -35,6 +35,6 @@ export const providerStreamResultToExecuteResult = async ( return eventResult( withUpstreamTelemetry(providerResult.events as AsyncIterable>, ctx, context, candidate.targetApi, durationMs), telemetryModelIdentity(candidate, providerResult.modelKey), - context, + { performance: context, headers: providerResult.headers }, ); }; diff --git a/packages/gateway/src/data-plane/llm/shared/respond.ts b/packages/gateway/src/data-plane/llm/shared/respond.ts index 4bb383d64..4c20cb36b 100644 --- a/packages/gateway/src/data-plane/llm/shared/respond.ts +++ b/packages/gateway/src/data-plane/llm/shared/respond.ts @@ -80,7 +80,7 @@ export const recordPerformance = (ctx: GatewayCtx, context: EventResultMetadata[ const FORWARDED_HEADER_PREFIXES = ['anthropic-ratelimit-'] as const; const FORWARDED_HEADER_NAMES = new Set(['request-id', 'x-request-id', 'cf-ray']); -export const isForwardableUpstreamHeader = (name: string): boolean => { +const isForwardableUpstreamHeader = (name: string): boolean => { const lowered = name.toLowerCase(); if (FORWARDED_HEADER_NAMES.has(lowered)) return true; return FORWARDED_HEADER_PREFIXES.some(prefix => lowered.startsWith(prefix)); diff --git a/packages/provider-codex/src/interceptors/responses/inject-default-instructions_test.ts b/packages/provider-codex/src/interceptors/responses/inject-default-instructions_test.ts index d4b001820..870fad12e 100644 --- a/packages/provider-codex/src/interceptors/responses/inject-default-instructions_test.ts +++ b/packages/provider-codex/src/interceptors/responses/inject-default-instructions_test.ts @@ -9,7 +9,7 @@ import { assertEquals, stubUpstreamModel } from '@floway-dev/test-utils'; const stubRequest = {}; const okEvents = (): Promise> => - Promise.resolve({ ok: true, events: (async function* () {})(), modelKey: 'test' }); + Promise.resolve({ ok: true, events: (async function* () {})(), modelKey: 'test', headers: new Headers() }); const invocation = (payload: ResponsesPayload): ResponsesBoundaryCtx => ({ payload, diff --git a/packages/provider-codex/src/interceptors/responses/inject-session-id_test.ts b/packages/provider-codex/src/interceptors/responses/inject-session-id_test.ts index 6d4b702e6..c7d93d24f 100644 --- a/packages/provider-codex/src/interceptors/responses/inject-session-id_test.ts +++ b/packages/provider-codex/src/interceptors/responses/inject-session-id_test.ts @@ -9,7 +9,7 @@ import { assert, assertEquals, stubUpstreamModel } from '@floway-dev/test-utils' const stubRequest = {}; const okEvents = (): Promise> => - Promise.resolve({ ok: true, events: (async function* () {})(), modelKey: 'test' }); + Promise.resolve({ ok: true, events: (async function* () {})(), modelKey: 'test', headers: new Headers() }); const invocation = (payload: ResponsesPayload, headers: Record = {}): ResponsesBoundaryCtx => ({ payload, diff --git a/packages/provider-codex/src/interceptors/responses/strip-unsupported-fields_test.ts b/packages/provider-codex/src/interceptors/responses/strip-unsupported-fields_test.ts index 70594678a..7f523a1e3 100644 --- a/packages/provider-codex/src/interceptors/responses/strip-unsupported-fields_test.ts +++ b/packages/provider-codex/src/interceptors/responses/strip-unsupported-fields_test.ts @@ -9,7 +9,7 @@ import { assertEquals, assertFalse, stubUpstreamModel } from '@floway-dev/test-u const stubRequest = {}; const okEvents = (): Promise> => - Promise.resolve({ ok: true, events: (async function* () {})(), modelKey: 'test' }); + Promise.resolve({ ok: true, events: (async function* () {})(), modelKey: 'test', headers: new Headers() }); const invocation = (payload: ResponsesPayload): ResponsesBoundaryCtx => ({ payload, diff --git a/packages/provider-copilot/src/provider.ts b/packages/provider-copilot/src/provider.ts index b28e9b65a..8ca4e1051 100644 --- a/packages/provider-copilot/src/provider.ts +++ b/packages/provider-copilot/src/provider.ts @@ -227,7 +227,13 @@ export const createCopilotProvider = async (record: UpstreamRecord): Promise>, ): Promise>> => { const stream = await streamPromise; - if (stream.ok) return eventResult(stream.events as AsyncIterable>, placeholderIdentity(stream.modelKey)); + if (stream.ok) { + return eventResult( + stream.events as AsyncIterable>, + placeholderIdentity(stream.modelKey), + { headers: stream.headers }, + ); + } return await readUpstreamError(stream.response); }; @@ -241,7 +247,12 @@ export const createCopilotProvider = async (record: UpstreamRecord): Promise => { if (result.type === 'events') { - return { ok: true, events: result.events as AsyncIterable>, modelKey }; + return { + ok: true, + events: result.events as AsyncIterable>, + modelKey, + ...(result.headers ? { headers: result.headers } : {}), + }; } if (result.type === 'upstream-error') { return { ok: false, response: upstreamErrorToResponse(result), modelKey }; diff --git a/packages/provider/src/provider.ts b/packages/provider/src/provider.ts index 893faf0cd..8f32e4e03 100644 --- a/packages/provider/src/provider.ts +++ b/packages/provider/src/provider.ts @@ -41,12 +41,16 @@ export interface ProviderCallResult { // Streaming endpoints (Messages / Responses / ChatCompletions) return decoded // protocol frames directly — the provider drives the upstream fetch, parses // the SSE wire via @floway-dev/protocols, and emits the typed event stream. +// `ok: true` optionally carries the raw upstream `Headers` so the source-side +// `respond` layer can forward an allowlist (see gateway `shared/respond.ts`). +// Absent on lifted/synthesized streams that have no upstream Response behind +// them, matching the same shape on `EventResult`. // `ok: false` carries the raw upstream Response verbatim so the gateway -// boundary can relay status + body unchanged. Non-2xx-but-not-SSE responses -// throw from the provider as a contract violation (provider always forces -// stream=true on streaming endpoints). +// boundary can relay status + body + headers unchanged. Non-2xx-but-not-SSE +// responses throw from the provider as a contract violation (provider always +// forces stream=true on streaming endpoints). export type ProviderStreamResult = - | { ok: true; events: AsyncIterable>; modelKey: string } + | { ok: true; events: AsyncIterable>; modelKey: string; headers?: Headers } | { ok: false; response: Response; modelKey: string }; // `/responses/compact` is non-streaming — the upstream returns a single diff --git a/packages/provider/src/result.ts b/packages/provider/src/result.ts index c47a31a43..a2922cd4b 100644 --- a/packages/provider/src/result.ts +++ b/packages/provider/src/result.ts @@ -7,11 +7,9 @@ export interface EventResult { modelIdentity: TelemetryModelIdentity; performance?: PerformanceTelemetryContext; finalMetadata?: Promise; - // Upstream HTTP response headers, propagated from the provider so the - // source-side `respond` layer can forward billing/rate-limit hints - // (`anthropic-ratelimit-*`) and operator-trace identifiers (`request-id`, - // `x-request-id`, `cf-ray`) verbatim to the downstream client. Absent on - // lifted/synthesized streams that have no raw upstream Response. + // Raw upstream response headers for the source-side `respond` layer to + // forward (allowlist in gateway `shared/respond.ts`). Absent on + // lifted/synthesized streams that have no upstream Response behind them. headers?: Headers; } @@ -49,17 +47,21 @@ export interface PlainResult { export type ExecuteResult = EventResult | UpstreamErrorResult | InternalErrorResult; +export interface EventResultOptions { + performance?: PerformanceTelemetryContext; + finalMetadata?: Promise; + headers?: Headers; +} + export const eventResult = ( events: AsyncIterable, modelIdentity: TelemetryModelIdentity, - performance?: PerformanceTelemetryContext, - finalMetadata?: Promise, - headers?: Headers, + options: EventResultOptions = {}, ): EventResult => { const result: EventResult = { type: 'events', events, modelIdentity }; - if (performance !== undefined) result.performance = performance; - if (finalMetadata !== undefined) result.finalMetadata = finalMetadata; - if (headers !== undefined) result.headers = headers; + if (options.performance !== undefined) result.performance = options.performance; + if (options.finalMetadata !== undefined) result.finalMetadata = options.finalMetadata; + if (options.headers !== undefined) result.headers = options.headers; return result; }; diff --git a/packages/provider/src/streaming.ts b/packages/provider/src/streaming.ts index 2153efd2c..d449c5dff 100644 --- a/packages/provider/src/streaming.ts +++ b/packages/provider/src/streaming.ts @@ -39,5 +39,5 @@ export const streamingProviderCall = async ( const snippet = await readBodySnippet(response); throw new Error(`Upstream returned ${response.status} with content-type "${contentType || 'unknown'}" but stream is required (provider must force stream=true and return text/event-stream when response.ok). Body: ${snippet}`); } - return { ok: true, events: parser(response.body, { signal }), modelKey }; + return { ok: true, events: parser(response.body, { signal }), modelKey, headers: response.headers }; }; From dc06eca07dece96fa4864a0aedb1d6e7af7a675b Mon Sep 17 00:00:00 2001 From: Menci Date: Sun, 21 Jun 2026 01:24:21 +0800 Subject: [PATCH 3/3] refactor(gateway): forward upstream headers by default, blocklist only what must be stripped MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The allowlist was over-restrictive: every new useful upstream header (e.g. `openai-version`, `openai-organization`, `openai-processing-ms`, `anthropic-organization-id`, future vendor `x-*`) required a code change. Operators want to see what the upstream sent verbatim, and there is no security argument for hiding response headers a well-behaved upstream emits. `isForwardableUpstreamHeader` now returns `true` for everything except a fixed blocklist: - hop-by-hop per RFC 7230 §6.1 — `connection`, `keep-alive`, `proxy-*`, `te`, `trailer`, `transfer-encoding`, `upgrade`. MUST NOT be forwarded by intermediaries. - body framing — `content-length`, `content-encoding`, `content-type`. The streaming layer rewrites the body (SSE re-framing, optional decompression + re-encode), so upstream values would mis-frame the downstream response. - cookies — `set-cookie`, `set-cookie2`. The gateway did not issue these; propagating upstream session bindings is a footgun. Tests retire the `x-internal-cache-id` fixture (it is no longer "blocked" under the new policy) and assert the blocklist semantics directly: arbitrary `openai-version` / `x-custom-thing` flow through, hop-by-hop / framing / cookie entries are stripped. The streaming-SSE assertions use distinctive upstream values for `connection` and `transfer-encoding` because Hono's `streamSSE` writer sets its own `keep-alive` / `chunked` that we cannot suppress at the respond layer. --- .../src/data-plane/llm/messages/http_test.ts | 17 ++++-- .../data-plane/llm/messages/respond_test.ts | 48 ++++++++++++--- .../src/data-plane/llm/shared/respond.ts | 59 ++++++++++++------- packages/provider/src/provider.ts | 7 ++- packages/provider/src/result.ts | 2 +- 5 files changed, 95 insertions(+), 38 deletions(-) diff --git a/packages/gateway/src/data-plane/llm/messages/http_test.ts b/packages/gateway/src/data-plane/llm/messages/http_test.ts index f842a7640..25014bd0c 100644 --- a/packages/gateway/src/data-plane/llm/messages/http_test.ts +++ b/packages/gateway/src/data-plane/llm/messages/http_test.ts @@ -190,13 +190,15 @@ test('POST /v1/messages/count_tokens proxies the upstream measurement body', asy assertEquals(callMessagesCountTokens.mock.calls.length, 1); }); -test('POST /v1/messages forwards allowlisted upstream response headers end-to-end (streaming)', async () => { +test('POST /v1/messages forwards upstream response headers end-to-end (streaming) and strips hop-by-hop / cookies', async () => { installRepo(); const upstreamHeaders = new Headers({ 'anthropic-ratelimit-unified-status': 'allowed', 'anthropic-ratelimit-unified-remaining': '99', 'request-id': 'req_e2e_stream', - 'x-internal-cache-id': 'cache-omit', + 'openai-version': '2024-10-21', + 'connection': 'close', + 'set-cookie': 'session=secret', }); const callMessages = vi.fn(async (): Promise> => ({ ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', headers: upstreamHeaders, @@ -213,12 +215,17 @@ test('POST /v1/messages forwards allowlisted upstream response headers end-to-en assertEquals(response.headers.get('anthropic-ratelimit-unified-status'), 'allowed'); assertEquals(response.headers.get('anthropic-ratelimit-unified-remaining'), '99'); assertEquals(response.headers.get('request-id'), 'req_e2e_stream'); - // Non-allowlisted upstream headers must not leak through. - assertEquals(response.headers.get('x-internal-cache-id'), null); + assertEquals(response.headers.get('openai-version'), '2024-10-21'); + // hop-by-hop and cookies are stripped. `connection` is special-cased + // because Hono's streamSSE writer sets its own `keep-alive`; assert + // upstream's distinctive `close` did not survive instead of asserting + // absence. + assert(response.headers.get('connection') !== 'close'); + assertEquals(response.headers.get('set-cookie'), null); await response.text(); }); -test('POST /v1/messages forwards allowlisted upstream response headers end-to-end (non-streaming)', async () => { +test('POST /v1/messages forwards upstream response headers end-to-end (non-streaming)', async () => { installRepo(); const upstreamHeaders = new Headers({ 'anthropic-ratelimit-unified-status': 'allowed', diff --git a/packages/gateway/src/data-plane/llm/messages/respond_test.ts b/packages/gateway/src/data-plane/llm/messages/respond_test.ts index eef257969..e0581bad4 100644 --- a/packages/gateway/src/data-plane/llm/messages/respond_test.ts +++ b/packages/gateway/src/data-plane/llm/messages/respond_test.ts @@ -8,7 +8,7 @@ import type { GatewayCtx } from '../shared/gateway-ctx.ts'; import { doneFrame, eventFrame, type ProtocolFrame } from '@floway-dev/protocols/common'; import type { MessagesStreamEvent } from '@floway-dev/protocols/messages'; import { eventResult, type ExecuteResult } from '@floway-dev/provider'; -import { assertEquals, testTelemetryModelIdentity } from '@floway-dev/test-utils'; +import { assert, assertEquals, testTelemetryModelIdentity } from '@floway-dev/test-utils'; const stop = () => eventFrame({ type: 'message_stop' } satisfies MessagesStreamEvent); @@ -214,12 +214,21 @@ test('Messages stream usage falls back to the rolled-up cache_creation when the // --- header forwarding --- const forwardedHeadersFixture = (): Headers => new Headers({ + // forwardable: vendor traces, plan billing, vendor `x-*`, arbitrary custom 'anthropic-ratelimit-unified-status': 'allowed_warning', 'anthropic-ratelimit-unified-fallback-percentage': '50', 'request-id': 'req_anthropic_abc', 'cf-ray': 'cf_ray_xyz', - 'x-internal-cache-id': 'cache-abc', - 'content-type': 'text/event-stream', + 'openai-version': '2024-10-21', + 'x-custom-thing': 'ok', + // blocked: hop-by-hop, body framing, cookies. Distinctive values so we can + // tell the upstream's header from anything Hono's writers add. + 'connection': 'close', + 'transfer-encoding': 'gzip', + 'content-length': '999', + 'content-encoding': 'br', + 'content-type': 'application/x-upstream-quirk', + 'set-cookie': 'session=secret', }); const makeRespondCtx = (): GatewayCtx => ({ @@ -272,22 +281,47 @@ const callRespond = async (wantsStream: boolean): Promise => { return captured; }; -test('respondMessages forwards allowlisted upstream headers on the non-streaming JSON response', async () => { +test('respondMessages forwards upstream headers and strips hop-by-hop / framing / cookie headers on the non-streaming JSON response', async () => { const response = await callRespond(false); + // forwarded verbatim assertEquals(response.headers.get('anthropic-ratelimit-unified-status'), 'allowed_warning'); assertEquals(response.headers.get('anthropic-ratelimit-unified-fallback-percentage'), '50'); assertEquals(response.headers.get('request-id'), 'req_anthropic_abc'); assertEquals(response.headers.get('cf-ray'), 'cf_ray_xyz'); - assertEquals(response.headers.get('x-internal-cache-id'), null); + assertEquals(response.headers.get('openai-version'), '2024-10-21'); + assertEquals(response.headers.get('x-custom-thing'), 'ok'); + // hop-by-hop and cookies dropped + assertEquals(response.headers.get('connection'), null); + assertEquals(response.headers.get('transfer-encoding'), null); + assertEquals(response.headers.get('set-cookie'), null); + // framing headers dropped — upstream values would mis-frame the response; + // Response.json sets its own content-type, which must not echo upstream's + assertEquals(response.headers.get('content-length'), null); + assertEquals(response.headers.get('content-encoding'), null); + assertEquals(response.headers.get('content-type'), 'application/json'); }); -test('respondMessages forwards allowlisted upstream headers on the streaming SSE response', async () => { +test('respondMessages forwards upstream headers and strips hop-by-hop / framing / cookie headers on the streaming SSE response', async () => { const response = await callRespond(true); + // forwarded verbatim assertEquals(response.headers.get('anthropic-ratelimit-unified-status'), 'allowed_warning'); assertEquals(response.headers.get('anthropic-ratelimit-unified-fallback-percentage'), '50'); assertEquals(response.headers.get('request-id'), 'req_anthropic_abc'); assertEquals(response.headers.get('cf-ray'), 'cf_ray_xyz'); - assertEquals(response.headers.get('x-internal-cache-id'), null); + assertEquals(response.headers.get('openai-version'), '2024-10-21'); + assertEquals(response.headers.get('x-custom-thing'), 'ok'); + // hop-by-hop and cookies dropped. `connection` and `transfer-encoding` + // are special-cased: Hono's streamSSE writer sets its own `keep-alive` / + // `chunked`, so we assert upstream's distinctive values did not survive + // rather than asserting absence. + assert(response.headers.get('connection') !== 'close'); + assert(response.headers.get('transfer-encoding') !== 'gzip'); + assertEquals(response.headers.get('set-cookie'), null); + // framing headers dropped; streamSSE writes its own text/event-stream and + // never emits content-length or content-encoding for a streamed body + assertEquals(response.headers.get('content-length'), null); + assertEquals(response.headers.get('content-encoding'), null); + assertEquals(response.headers.get('content-type')?.split(';')[0], 'text/event-stream'); // Drain the body so the lazy generator releases its resources and the // background `finally` block in `streamSSE` doesn't keep the test runner // alive. diff --git a/packages/gateway/src/data-plane/llm/shared/respond.ts b/packages/gateway/src/data-plane/llm/shared/respond.ts index 4c20cb36b..cc4b9cd01 100644 --- a/packages/gateway/src/data-plane/llm/shared/respond.ts +++ b/packages/gateway/src/data-plane/llm/shared/respond.ts @@ -63,30 +63,45 @@ export const recordPerformance = (ctx: GatewayCtx, context: EventResultMetadata[ recordRequestPerformance(ctx.backgroundScheduler, context, failed, performance.now() - ctx.requestStartedAt); }; -// Upstream-emitted hints we propagate verbatim to the downstream client. +// Upstream response headers we propagate verbatim to the downstream client. +// A blocklist (not an allowlist): operators want to see what the upstream +// actually sent — vendor traces (`request-id`, `cf-ray`), plan-billing state +// (`anthropic-ratelimit-*`, which the official `claude-code` CLI's `/status` +// indicator reads), and any future `x-*` an upstream introduces. We only +// strip what we MUST or what would actively break downstream framing: // -// The prefix list covers Anthropic's plan-billing surface: the -// `anthropic-ratelimit-unified-*` family carries quotas, resets, and warning -// thresholds, and the official `claude-code` CLI's `/status` indicator reads -// them. Dropping the headers makes the gateway look like an account with no -// rate-limit state. The allowlist is by prefix so new dimensions the upstream -// introduces (e.g. a future `anthropic-ratelimit-tier-*`) are forwarded -// automatically. -// -// The exact-name list covers operator-trace identifiers — `request-id` / -// `x-request-id` (Anthropic / OpenAI vendor traces) and `cf-ray` (Cloudflare's -// edge trace). Support tickets and live debugging rely on these reaching the -// downstream client unmodified. -const FORWARDED_HEADER_PREFIXES = ['anthropic-ratelimit-'] as const; -const FORWARDED_HEADER_NAMES = new Set(['request-id', 'x-request-id', 'cf-ray']); +// - hop-by-hop headers (RFC 7230 §6.1) MUST NOT be forwarded by +// intermediaries. +// - `content-length` / `content-encoding` / `content-type` are managed by +// the streaming layer: it rewrites the body (SSE re-framing, optional +// decompression + re-encode) so upstream's values would mis-frame the +// downstream response. The SSE writer sets its own `text/event-stream`; +// non-SSE pass-throughs hand content-type back via their own path. +// - `set-cookie` / `set-cookie2`: we didn't issue these and propagating +// upstream session bindings is a footgun. +const BLOCKED_UPSTREAM_HEADERS: ReadonlySet = new Set([ + // hop-by-hop (RFC 7230 §6.1) + 'connection', + 'keep-alive', + 'proxy-authenticate', + 'proxy-authorization', + 'te', + 'trailer', + 'transfer-encoding', + 'upgrade', + // body framing — owned by the streaming layer + 'content-length', + 'content-encoding', + 'content-type', + // cookies + 'set-cookie', + 'set-cookie2', +]); -const isForwardableUpstreamHeader = (name: string): boolean => { - const lowered = name.toLowerCase(); - if (FORWARDED_HEADER_NAMES.has(lowered)) return true; - return FORWARDED_HEADER_PREFIXES.some(prefix => lowered.startsWith(prefix)); -}; +export const isForwardableUpstreamHeader = (name: string): boolean => + !BLOCKED_UPSTREAM_HEADERS.has(name.toLowerCase()); -// Stages allowlisted upstream headers onto the Hono context so the next +// Stages forwardable upstream headers onto the Hono context so the next // `c.newResponse` (or `streamSSE`'s internal `c.newResponse`) emits them on // the response. Hono's `c.header()` is the only knob that survives a later // `c.json` or `streamSSE` call without being overwritten. Safe to call with @@ -98,7 +113,7 @@ export const forwardUpstreamHeaders = (c: Context, headers: Headers | undefined) } }; -// Returns a `HeadersInit` extending `base` with every allowlisted entry from +// Returns a `HeadersInit` extending `base` with every forwardable entry from // `upstream`. Used by non-streaming JSON responses where the response is // built directly (`Response.json(...)`) instead of through Hono's `c`. export const mergeForwardedUpstreamHeaders = (base: HeadersInit | undefined, upstream: Headers | undefined): HeadersInit => { diff --git a/packages/provider/src/provider.ts b/packages/provider/src/provider.ts index 8f32e4e03..7b8675dd4 100644 --- a/packages/provider/src/provider.ts +++ b/packages/provider/src/provider.ts @@ -42,9 +42,10 @@ export interface ProviderCallResult { // protocol frames directly — the provider drives the upstream fetch, parses // the SSE wire via @floway-dev/protocols, and emits the typed event stream. // `ok: true` optionally carries the raw upstream `Headers` so the source-side -// `respond` layer can forward an allowlist (see gateway `shared/respond.ts`). -// Absent on lifted/synthesized streams that have no upstream Response behind -// them, matching the same shape on `EventResult`. +// `respond` layer can forward them to the downstream client (blocklist in +// gateway `shared/respond.ts` — hop-by-hop, body framing, cookies). Absent +// on lifted/synthesized streams that have no upstream Response behind them, +// matching the same shape on `EventResult`. // `ok: false` carries the raw upstream Response verbatim so the gateway // boundary can relay status + body + headers unchanged. Non-2xx-but-not-SSE // responses throw from the provider as a contract violation (provider always diff --git a/packages/provider/src/result.ts b/packages/provider/src/result.ts index a2922cd4b..e1d2365c0 100644 --- a/packages/provider/src/result.ts +++ b/packages/provider/src/result.ts @@ -8,7 +8,7 @@ export interface EventResult { performance?: PerformanceTelemetryContext; finalMetadata?: Promise; // Raw upstream response headers for the source-side `respond` layer to - // forward (allowlist in gateway `shared/respond.ts`). Absent on + // forward (blocklist in gateway `shared/respond.ts`). Absent on // lifted/synthesized streams that have no upstream Response behind them. headers?: Headers; }