diff --git a/packages/gateway/src/data-plane/llm/chat-completions/attempt_test.ts b/packages/gateway/src/data-plane/llm/chat-completions/attempt_test.ts index 22fba3a3..9651da97 100644 --- a/packages/gateway/src/data-plane/llm/chat-completions/attempt_test.ts +++ b/packages/gateway/src/data-plane/llm/chat-completions/attempt_test.ts @@ -114,7 +114,7 @@ const installRepo = (): InMemoryRepo => { test('generate native chat-completions target calls provider.callChatCompletions', async () => { installRepo(); const callChatCompletions = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', headers: new Headers(), })); const result = await chatCompletionsAttempt.generate({ payload: makePayload(), @@ -132,7 +132,7 @@ test('generate native chat-completions target calls provider.callChatCompletions test('generate translate-to-messages branch routes through messagesAttempt', async () => { installRepo(); const callMessages = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', headers: new Headers(), })); const result = await chatCompletionsAttempt.generate({ payload: makePayload(), @@ -161,6 +161,7 @@ test('generate translate-to-responses branch routes through responsesAttempt', a ok: true, events: makeProtocolFrames([{ type: 'response.completed', sequence_number: 0, response: respResp }]), modelKey: 'k', + headers: new Headers(), })); const result = await chatCompletionsAttempt.generate({ payload: makePayload(), @@ -180,7 +181,7 @@ test('generate inherits invocation headers across translation to Messages', asyn let observedHeaders: Record | undefined; const callMessages = vi.fn(async (_model: unknown, _body: unknown, _signal?: AbortSignal, headers?: Record): Promise> => { observedHeaders = headers; - return { ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k' }; + return { ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', headers: new Headers() }; }); const result = await chatCompletionsAttempt.generate({ payload: makePayload(), @@ -212,6 +213,7 @@ test('generate inherits invocation headers across translation to Responses', asy ok: true, events: makeProtocolFrames([{ type: 'response.completed', sequence_number: 0, response: respResp }]), modelKey: 'k', + headers: new Headers(), }; }); const result = await chatCompletionsAttempt.generate({ @@ -226,3 +228,25 @@ test('generate inherits invocation headers across translation to Responses', asy await collectEvents(result.events); assertEquals(observedHeaders?.['x-test'], 'abc'); }); + +test('generate propagates upstream response headers onto the EventResult so respond can forward them', async () => { + installRepo(); + const upstreamHeaders = new Headers({ + 'anthropic-ratelimit-unified-status': 'allowed', + 'cf-ray': 'cf_ray_cc', + }); + const callChatCompletions = vi.fn(async (): Promise> => ({ + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', headers: upstreamHeaders, + })); + const result = await chatCompletionsAttempt.generate({ + payload: makePayload(), + ctx: makeGatewayCtx(), + store: createNonResponsesSourceStore(API_KEY_ID), + candidate: makeCandidate({ callChatCompletions }), + }); + assertEquals(result.type, 'events'); + if (result.type !== 'events') throw new Error('unreachable'); + assertEquals(result.headers?.get('anthropic-ratelimit-unified-status'), 'allowed'); + assertEquals(result.headers?.get('cf-ray'), 'cf_ray_cc'); + await collectEvents(result.events); +}); diff --git a/packages/gateway/src/data-plane/llm/chat-completions/http_test.ts b/packages/gateway/src/data-plane/llm/chat-completions/http_test.ts index eda9aa6e..93adb4c8 100644 --- a/packages/gateway/src/data-plane/llm/chat-completions/http_test.ts +++ b/packages/gateway/src/data-plane/llm/chat-completions/http_test.ts @@ -106,7 +106,7 @@ const makeCandidate = (overrides: { test('POST /v1/chat/completions streams a successful SSE body', async () => { installRepo(); const callChatCompletions = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', headers: new Headers(), })); queueCandidates([makeCandidate({ callChatCompletions })]); @@ -127,7 +127,7 @@ test('POST /v1/chat/completions streams a successful SSE body', async () => { test('POST /v1/chat/completions returns a single JSON body when stream is omitted', async () => { installRepo(); const callChatCompletions = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', headers: new Headers(), })); queueCandidates([makeCandidate({ callChatCompletions })]); @@ -147,7 +147,7 @@ test('POST /v1/chat/completions returns a single JSON body when stream is omitte test('POST /v1/chat/completions omits the usage-only chunk unless stream_options.include_usage is set', async () => { installRepo(); const callChatCompletions = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', headers: new Headers(), })); queueCandidates([makeCandidate({ callChatCompletions })]); @@ -166,7 +166,7 @@ test('POST /v1/chat/completions omits the usage-only chunk unless stream_options test('POST /v1/chat/completions emits the usage-only chunk when stream_options.include_usage is true', async () => { installRepo(); const callChatCompletions = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', headers: new Headers(), })); queueCandidates([makeCandidate({ callChatCompletions })]); @@ -194,7 +194,7 @@ test('POST /v1/chat/completions emits the usage-only chunk when stream_options.i test('POST /v1/chat/completions does not write any non-auth Hono context slot', async () => { installRepo(); const callChatCompletions = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', headers: new Headers(), })); queueCandidates([makeCandidate({ callChatCompletions })]); diff --git a/packages/gateway/src/data-plane/llm/chat-completions/respond.ts b/packages/gateway/src/data-plane/llm/chat-completions/respond.ts index 07e56679..1d3f5b50 100644 --- a/packages/gateway/src/data-plane/llm/chat-completions/respond.ts +++ b/packages/gateway/src/data-plane/llm/chat-completions/respond.ts @@ -5,7 +5,7 @@ import { CHAT_COMPLETIONS_MISSING_TERMINAL_MESSAGE, collectChatCompletionsProtoc import { chatCompletionsProtocolFrameToSSEFrame } from './events/to-sse.ts'; import { tokenUsage } from '../../shared/telemetry/usage.ts'; import type { GatewayCtx } from '../shared/gateway-ctx.ts'; -import { SourceStreamState, eventResultMetadata, plainResultToResponse, recordPerformance, recordUsage } from '../shared/respond.ts'; +import { SourceStreamState, eventResultMetadata, forwardUpstreamHeaders, mergeForwardedUpstreamHeaders, plainResultToResponse, recordPerformance, recordUsage } from '../shared/respond.ts'; import { type StreamCompletion, writeSSEFrames } from '../shared/stream/sse.ts'; import type { ChatCompletionsStreamEvent, ChatCompletionsResult } from '@floway-dev/protocols/chat-completions'; import { chatCompletionsErrorPayloadMessage } from '@floway-dev/protocols/chat-completions'; @@ -47,13 +47,14 @@ export const respondChatCompletions = async ( const usage = response.usage ? tokenUsageFromChatCompletionsUsage(response.usage) : null; await recordUsage(ctx, metadata.modelIdentity, usage); recordPerformance(ctx, metadata.performance, state.failed); - return { success: true, response: Response.json(response) }; + return { success: true, response: Response.json(response, { headers: mergeForwardedUpstreamHeaders(undefined, result.headers) }) }; } catch (error) { recordPerformance(ctx, result.performance, true); return { success: false, response: internalChatCompletionsErrorResponse(502, toInternalDebugError(error, 'chat-completions')) }; } } + forwardUpstreamHeaders(c, result.headers); const response = streamSSE(c, async stream => { let completion: StreamCompletion = 'error'; try { diff --git a/packages/gateway/src/data-plane/llm/chat-completions/serve_test.ts b/packages/gateway/src/data-plane/llm/chat-completions/serve_test.ts index b852e442..e8e488ea 100644 --- a/packages/gateway/src/data-plane/llm/chat-completions/serve_test.ts +++ b/packages/gateway/src/data-plane/llm/chat-completions/serve_test.ts @@ -146,7 +146,7 @@ const collectEvents = async (events: AsyncIterable test('generate routes a native Chat Completions candidate end to end', async () => { installRepo(); const callChatCompletions = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'test-model-key', + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'test-model-key', headers: new Headers(), })); queueCandidates([makeCandidate({ upstream: 'up_a', callChatCompletions })]); @@ -166,7 +166,7 @@ test('generate routes a native Chat Completions candidate end to end', async () test('generate translates through the Messages target when only that endpoint is exposed', async () => { installRepo(); const callMessages = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeMessagesResultEvents()), modelKey: 'messages-model-key', + ok: true, events: makeProtocolFrames(makeMessagesResultEvents()), modelKey: 'messages-model-key', headers: new Headers(), })); queueCandidates([makeCandidate({ upstream: 'up_m', targetApi: 'messages', callMessages })]); @@ -185,7 +185,7 @@ test('generate translates through the Messages target when only that endpoint is test('generate translates through the Responses target when only that endpoint is exposed', async () => { installRepo(); const callResponses = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames([makeResponsesResultEvent()]), modelKey: 'responses-model-key', + ok: true, events: makeProtocolFrames([makeResponsesResultEvent()]), modelKey: 'responses-model-key', headers: new Headers(), })); queueCandidates([makeCandidate({ upstream: 'up_r', targetApi: 'responses', callResponses })]); @@ -210,7 +210,7 @@ test('generate stops at the first candidate even when it yields an upstream erro ok: false, response: firstError, modelKey: 'first-key', })); const secondCall = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'second-key', + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'second-key', headers: new Headers(), })); queueCandidates([ makeCandidate({ upstream: 'up_a', callChatCompletions: firstCall }), @@ -234,7 +234,7 @@ test('generate stops at the first candidate even when it yields an upstream erro test('generate is a routing no-op when the payload carries no reasoning carriers (degenerate path)', async () => { installRepo(); const callChatCompletions = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'test-model-key', + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'test-model-key', headers: new Headers(), })); queueCandidates([ makeCandidate({ upstream: 'up_a', callChatCompletions }), diff --git a/packages/gateway/src/data-plane/llm/gemini/attempt_test.ts b/packages/gateway/src/data-plane/llm/gemini/attempt_test.ts index 92f9c3a0..81033482 100644 --- a/packages/gateway/src/data-plane/llm/gemini/attempt_test.ts +++ b/packages/gateway/src/data-plane/llm/gemini/attempt_test.ts @@ -123,7 +123,7 @@ const collectEvents = async (events: AsyncIterable test('generate translates through Chat Completions when targetApi is chat-completions', async () => { installRepo(); const callChatCompletions = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', headers: new Headers(), })); const result = await geminiAttempt.generate({ payload: makePayload(), @@ -141,7 +141,7 @@ test('generate translates through Chat Completions when targetApi is chat-comple test('generate translates through Messages when targetApi is messages', async () => { installRepo(); const callMessages = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', headers: new Headers(), })); const result = await geminiAttempt.generate({ payload: makePayload(), @@ -159,7 +159,7 @@ test('generate translates through Messages when targetApi is messages', async () test('generate translates through Responses when targetApi is responses', async () => { installRepo(); const callResponses = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames([makeResponsesResultEvent()]), modelKey: 'k', + ok: true, events: makeProtocolFrames([makeResponsesResultEvent()]), modelKey: 'k', headers: new Headers(), })); const result = await geminiAttempt.generate({ payload: makePayload(), @@ -260,3 +260,25 @@ test('countTokens refuses a non-messages candidate', async () => { if (!(thrown instanceof Error)) throw new Error('expected an Error to be thrown'); assertEquals(thrown.message.includes("targetApi='messages'"), true); }); + +test('generate propagates upstream response headers through the chat-completions translation', async () => { + installRepo(); + const upstreamHeaders = new Headers({ + 'anthropic-ratelimit-unified-status': 'allowed', + 'x-request-id': 'req_gemini_xyz', + }); + const callChatCompletions = vi.fn(async (): Promise> => ({ + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', headers: upstreamHeaders, + })); + const result = await geminiAttempt.generate({ + payload: makePayload(), + ctx: makeGatewayCtx(), + store: createNonResponsesSourceStore(API_KEY_ID), + candidate: makeCandidate({ targetApi: 'chat-completions', callChatCompletions }), + }); + assertEquals(result.type, 'events'); + if (result.type !== 'events') throw new Error('unreachable'); + assertEquals(result.headers?.get('anthropic-ratelimit-unified-status'), 'allowed'); + assertEquals(result.headers?.get('x-request-id'), 'req_gemini_xyz'); + await collectEvents(result.events); +}); diff --git a/packages/gateway/src/data-plane/llm/gemini/http_test.ts b/packages/gateway/src/data-plane/llm/gemini/http_test.ts index 96c6ce3f..57cf1167 100644 --- a/packages/gateway/src/data-plane/llm/gemini/http_test.ts +++ b/packages/gateway/src/data-plane/llm/gemini/http_test.ts @@ -113,7 +113,7 @@ const makeMessagesEvents = (): readonly MessagesStreamEvent[] => [ test('POST /v1beta/models/:model:generateContent returns a single JSON body for non-stream generate', async () => { installRepo(); const callChatCompletions = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', headers: new Headers(), })); queueCandidates([makeCandidate({ callChatCompletions })]); @@ -133,7 +133,7 @@ test('POST /v1beta/models/:model:generateContent returns a single JSON body for test('POST /v1beta/models/:model:streamGenerateContent streams a Gemini SSE body', async () => { installRepo(); const callChatCompletions = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', headers: new Headers(), })); queueCandidates([makeCandidate({ callChatCompletions })]); @@ -196,7 +196,7 @@ test('POST /v1beta/models/:model:countTokens accepts the generateContentRequest test('POST /v1beta/models/:model:generateContent translates through Messages target end to end', async () => { installRepo(); const callMessages = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', headers: new Headers(), })); queueCandidates([makeCandidate({ targetApi: 'messages', callMessages })]); @@ -234,7 +234,7 @@ test('POST /v1beta/models/models/:model:generateContent accepts the models/ pref let resolvedModel: string | undefined; const callChatCompletions = vi.fn(async (model): Promise> => { resolvedModel = (model as { id: string }).id; - return { ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k' }; + return { ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', headers: new Headers() }; }); queueCandidates([makeCandidate({ callChatCompletions })]); diff --git a/packages/gateway/src/data-plane/llm/gemini/respond.ts b/packages/gateway/src/data-plane/llm/gemini/respond.ts index 5b2dae51..c8cd6ac9 100644 --- a/packages/gateway/src/data-plane/llm/gemini/respond.ts +++ b/packages/gateway/src/data-plane/llm/gemini/respond.ts @@ -6,7 +6,7 @@ import { GEMINI_MISSING_TERMINAL_MESSAGE, isGeminiErrorEvent, isGeminiTerminalEv import { geminiProtocolFrameToSSEFrame } from './events/to-sse.ts'; import { tokenUsage } from '../../shared/telemetry/usage.ts'; import type { GatewayCtx } from '../shared/gateway-ctx.ts'; -import { SourceStreamState, eventResultMetadata, plainResultToResponse, recordPerformance, recordUsage } from '../shared/respond.ts'; +import { SourceStreamState, eventResultMetadata, forwardUpstreamHeaders, mergeForwardedUpstreamHeaders, plainResultToResponse, recordPerformance, recordUsage } from '../shared/respond.ts'; import { type StreamCompletion, writeSSEFrames } from '../shared/stream/sse.ts'; import { type ProtocolFrame, sseCommentFrame, sseFrame } from '@floway-dev/protocols/common'; import type { GeminiErrorResponse, GeminiResult, GeminiStreamEvent, GeminiUsageMetadata } from '@floway-dev/protocols/gemini'; @@ -45,13 +45,14 @@ export const respondGemini = async ( const metadata = await eventResultMetadata(result); await recordUsage(ctx, metadata.modelIdentity, tokenUsageFromGeminiResponse(response)); recordPerformance(ctx, metadata.performance, state.failed); - return { success: true, response: Response.json(response) }; + return { success: true, response: Response.json(response, { headers: mergeForwardedUpstreamHeaders(undefined, result.headers) }) }; } catch (error) { recordPerformance(ctx, result.performance, true); return { success: false, response: geminiCollectErrorResponse(error) }; } } + forwardUpstreamHeaders(c, result.headers); const response = streamSSE(c, async stream => { let completion: StreamCompletion = 'error'; try { diff --git a/packages/gateway/src/data-plane/llm/gemini/serve_test.ts b/packages/gateway/src/data-plane/llm/gemini/serve_test.ts index d20bc338..c7cb7c7e 100644 --- a/packages/gateway/src/data-plane/llm/gemini/serve_test.ts +++ b/packages/gateway/src/data-plane/llm/gemini/serve_test.ts @@ -146,7 +146,7 @@ const expectType = (r: T, k: K) test('generate translates through native Chat Completions target end to end', async () => { installRepo(); const callChatCompletions = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', headers: new Headers(), })); queueCandidates([makeCandidate({ targetApi: 'chat-completions', callChatCompletions })]); @@ -165,7 +165,7 @@ test('generate translates through native Chat Completions target end to end', as test('generate translates through Messages when only that endpoint is exposed', async () => { installRepo(); const callMessages = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', headers: new Headers(), })); queueCandidates([makeCandidate({ targetApi: 'messages', callMessages })]); @@ -184,7 +184,7 @@ test('generate translates through Messages when only that endpoint is exposed', test('generate translates through Responses when only that endpoint is exposed', async () => { installRepo(); const callResponses = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames([makeResponsesResultEvent()]), modelKey: 'k', + ok: true, events: makeProtocolFrames([makeResponsesResultEvent()]), modelKey: 'k', headers: new Headers(), })); queueCandidates([makeCandidate({ targetApi: 'responses', callResponses })]); @@ -209,7 +209,7 @@ test('generate stops at the first candidate even when it yields an upstream erro ok: false, response: firstError, modelKey: 'first-key', })); const secondCall = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'second-key', + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'second-key', headers: new Headers(), })); queueCandidates([ makeCandidate({ upstream: 'up_a', targetApi: 'chat-completions', callChatCompletions: firstCall }), @@ -234,7 +234,7 @@ test('generate stops at the first candidate even when it yields an upstream erro test('generate is a routing no-op for a bare user-text request (degenerate path)', async () => { installRepo(); const callChatCompletions = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeChatCompletionsEvents()), modelKey: 'k', headers: new Headers(), })); queueCandidates([ makeCandidate({ upstream: 'up_a', targetApi: 'chat-completions', callChatCompletions }), diff --git a/packages/gateway/src/data-plane/llm/messages/attempt_test.ts b/packages/gateway/src/data-plane/llm/messages/attempt_test.ts index 515e63b4..64a405ae 100644 --- a/packages/gateway/src/data-plane/llm/messages/attempt_test.ts +++ b/packages/gateway/src/data-plane/llm/messages/attempt_test.ts @@ -115,7 +115,7 @@ const installRepo = (): InMemoryRepo => { test('generate native messages target calls provider.callMessages with no rewrite', async () => { installRepo(); const callMessages = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', headers: new Headers(), })); const result = await messagesAttempt.generate({ payload: makePayload(), @@ -144,6 +144,7 @@ test('generate translate-to-responses branch routes through responsesAttempt', a ok: true, events: makeProtocolFrames([{ type: 'response.completed', sequence_number: 0, response: respResp }]), modelKey: 'k', + headers: new Headers(), })); const result = await messagesAttempt.generate({ payload: makePayload(), @@ -206,7 +207,7 @@ test('generate attaches the performance context and records upstream_success', a backgroundScheduler: promise => { background.push(promise); }, }; const callMessages = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'gpt-test', + ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'gpt-test', headers: new Headers(), })); const result = await messagesAttempt.generate({ @@ -235,3 +236,26 @@ test('generate attaches the performance context and records upstream_success', a assertEquals(upstreamSamples[0]?.upstream, 'up_perf'); assertEquals(upstreamSamples[0]?.requests, 1); }); + +test('generate propagates upstream response headers onto the EventResult so respond can forward them', async () => { + installRepo(); + const upstreamHeaders = new Headers({ + 'anthropic-ratelimit-unified-status': 'allowed', + 'request-id': 'req_messages_xyz', + }); + const callMessages = vi.fn(async (): Promise> => ({ + ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', headers: upstreamHeaders, + })); + const result = await messagesAttempt.generate({ + payload: makePayload(), + ctx: makeGatewayCtx(), + store: createNonResponsesSourceStore(API_KEY_ID), + candidate: makeCandidate({ callMessages }), + }); + + assertEquals(result.type, 'events'); + if (result.type !== 'events') throw new Error('unreachable'); + assertEquals(result.headers?.get('anthropic-ratelimit-unified-status'), 'allowed'); + assertEquals(result.headers?.get('request-id'), 'req_messages_xyz'); + await collectEvents(result.events); +}); diff --git a/packages/gateway/src/data-plane/llm/messages/http_test.ts b/packages/gateway/src/data-plane/llm/messages/http_test.ts index ec1d9d5e..25014bd0 100644 --- a/packages/gateway/src/data-plane/llm/messages/http_test.ts +++ b/packages/gateway/src/data-plane/llm/messages/http_test.ts @@ -113,7 +113,7 @@ const makeCandidate = (overrides: { test('POST /v1/messages streams a successful SSE body', async () => { installRepo(); const callMessages = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', headers: new Headers(), })); queueCandidates([makeCandidate({ callMessages })]); @@ -134,7 +134,7 @@ test('POST /v1/messages streams a successful SSE body', async () => { test('POST /v1/messages returns a single JSON body when stream is omitted', async () => { installRepo(); const callMessages = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', + ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', headers: new Headers(), })); queueCandidates([makeCandidate({ callMessages })]); @@ -189,3 +189,60 @@ test('POST /v1/messages/count_tokens proxies the upstream measurement body', asy assertEquals(body.input_tokens, 99); assertEquals(callMessagesCountTokens.mock.calls.length, 1); }); + +test('POST /v1/messages forwards upstream response headers end-to-end (streaming) and strips hop-by-hop / cookies', async () => { + installRepo(); + const upstreamHeaders = new Headers({ + 'anthropic-ratelimit-unified-status': 'allowed', + 'anthropic-ratelimit-unified-remaining': '99', + 'request-id': 'req_e2e_stream', + 'openai-version': '2024-10-21', + 'connection': 'close', + 'set-cookie': 'session=secret', + }); + const callMessages = vi.fn(async (): Promise> => ({ + ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', headers: upstreamHeaders, + })); + queueCandidates([makeCandidate({ callMessages })]); + + const response = await makeApp().request('/v1/messages', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ model: 'test-model', max_tokens: 32, stream: true, messages: [{ role: 'user', content: 'hello' }] }), + }); + + assertEquals(response.status, 200); + assertEquals(response.headers.get('anthropic-ratelimit-unified-status'), 'allowed'); + assertEquals(response.headers.get('anthropic-ratelimit-unified-remaining'), '99'); + assertEquals(response.headers.get('request-id'), 'req_e2e_stream'); + assertEquals(response.headers.get('openai-version'), '2024-10-21'); + // hop-by-hop and cookies are stripped. `connection` is special-cased + // because Hono's streamSSE writer sets its own `keep-alive`; assert + // upstream's distinctive `close` did not survive instead of asserting + // absence. + assert(response.headers.get('connection') !== 'close'); + assertEquals(response.headers.get('set-cookie'), null); + await response.text(); +}); + +test('POST /v1/messages forwards upstream response headers end-to-end (non-streaming)', async () => { + installRepo(); + const upstreamHeaders = new Headers({ + 'anthropic-ratelimit-unified-status': 'allowed', + 'cf-ray': 'cf_ray_e2e', + }); + const callMessages = vi.fn(async (): Promise> => ({ + ok: true, events: makeProtocolFrames(makeMessagesEvents()), modelKey: 'k', headers: upstreamHeaders, + })); + queueCandidates([makeCandidate({ callMessages })]); + + const response = await makeApp().request('/v1/messages', { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ model: 'test-model', max_tokens: 32, messages: [{ role: 'user', content: 'hello' }] }), + }); + + assertEquals(response.status, 200); + assertEquals(response.headers.get('anthropic-ratelimit-unified-status'), 'allowed'); + assertEquals(response.headers.get('cf-ray'), 'cf_ray_e2e'); +}); diff --git a/packages/gateway/src/data-plane/llm/messages/respond.ts b/packages/gateway/src/data-plane/llm/messages/respond.ts index 0008704c..6f00b3f2 100644 --- a/packages/gateway/src/data-plane/llm/messages/respond.ts +++ b/packages/gateway/src/data-plane/llm/messages/respond.ts @@ -5,7 +5,7 @@ import { MESSAGES_MISSING_TERMINAL_MESSAGE, collectMessagesProtocolEventsToResul import { messagesProtocolFrameToSSEFrame } from './events/to-sse.ts'; import { tokenUsage } from '../../shared/telemetry/usage.ts'; import type { GatewayCtx } from '../shared/gateway-ctx.ts'; -import { SourceStreamState, eventResultMetadata, plainResultToResponse, recordPerformance, recordUsage } from '../shared/respond.ts'; +import { SourceStreamState, eventResultMetadata, forwardUpstreamHeaders, mergeForwardedUpstreamHeaders, plainResultToResponse, recordPerformance, recordUsage } from '../shared/respond.ts'; import { type StreamCompletion, writeSSEFrames } from '../shared/stream/sse.ts'; import { type ProtocolFrame, sseFrame } from '@floway-dev/protocols/common'; import type { MessagesMessageDeltaEvent, MessagesStreamEvent, MessagesUsage } from '@floway-dev/protocols/messages'; @@ -47,13 +47,14 @@ export const respondMessages = async ( const metadata = await eventResultMetadata(result); await recordUsage(ctx, metadata.modelIdentity, tokenUsageFromMessagesUsage(response.usage)); recordPerformance(ctx, metadata.performance, state.failed); - return { success: true, response: Response.json(response) }; + return { success: true, response: Response.json(response, { headers: mergeForwardedUpstreamHeaders(undefined, result.headers) }) }; } catch (error) { recordPerformance(ctx, result.performance, true); return { success: false, response: internalMessagesErrorResponse(502, toInternalDebugError(error, 'messages')) }; } } + forwardUpstreamHeaders(c, result.headers); const response = streamSSE(c, async stream => { let completion: StreamCompletion = 'error'; try { diff --git a/packages/gateway/src/data-plane/llm/messages/respond_test.ts b/packages/gateway/src/data-plane/llm/messages/respond_test.ts index 91b18629..e0581bad 100644 --- a/packages/gateway/src/data-plane/llm/messages/respond_test.ts +++ b/packages/gateway/src/data-plane/llm/messages/respond_test.ts @@ -1,9 +1,14 @@ +import { Hono } from 'hono'; import { test } from 'vitest'; -import { createMessagesStreamUsageState, tokenUsageFromMessagesFrame } from './respond.ts'; -import { eventFrame } from '@floway-dev/protocols/common'; +import { createMessagesStreamUsageState, respondMessages, tokenUsageFromMessagesFrame } from './respond.ts'; +import { initRepo } from '../../../repo/index.ts'; +import { InMemoryRepo } from '../../../repo/memory.ts'; +import type { GatewayCtx } from '../shared/gateway-ctx.ts'; +import { doneFrame, eventFrame, type ProtocolFrame } from '@floway-dev/protocols/common'; import type { MessagesStreamEvent } from '@floway-dev/protocols/messages'; -import { assertEquals } from '@floway-dev/test-utils'; +import { eventResult, type ExecuteResult } from '@floway-dev/provider'; +import { assert, assertEquals, testTelemetryModelIdentity } from '@floway-dev/test-utils'; const stop = () => eventFrame({ type: 'message_stop' } satisfies MessagesStreamEvent); @@ -205,3 +210,120 @@ test('Messages stream usage falls back to the rolled-up cache_creation when the output: 1, }); }); + +// --- header forwarding --- + +const forwardedHeadersFixture = (): Headers => new Headers({ + // forwardable: vendor traces, plan billing, vendor `x-*`, arbitrary custom + 'anthropic-ratelimit-unified-status': 'allowed_warning', + 'anthropic-ratelimit-unified-fallback-percentage': '50', + 'request-id': 'req_anthropic_abc', + 'cf-ray': 'cf_ray_xyz', + 'openai-version': '2024-10-21', + 'x-custom-thing': 'ok', + // blocked: hop-by-hop, body framing, cookies. Distinctive values so we can + // tell the upstream's header from anything Hono's writers add. + 'connection': 'close', + 'transfer-encoding': 'gzip', + 'content-length': '999', + 'content-encoding': 'br', + 'content-type': 'application/x-upstream-quirk', + 'set-cookie': 'session=secret', +}); + +const makeRespondCtx = (): GatewayCtx => ({ + apiKeyId: 'key_respond_test', + upstreamIds: null, + wantsStream: false, + runtimeLocation: 'test', + backgroundScheduler: () => {}, + requestStartedAt: 0, + currentColo: null, +}); + +const messagesEventsForRespond = (): readonly MessagesStreamEvent[] => [ + { + type: 'message_start', + message: { + id: 'msg_1', type: 'message', role: 'assistant', content: [], model: 'claude-test', + stop_reason: null, stop_sequence: null, + usage: { input_tokens: 3, output_tokens: 0 }, + }, + }, + { type: 'content_block_start', index: 0, content_block: { type: 'text', text: '' } }, + { type: 'content_block_delta', index: 0, delta: { type: 'text_delta', text: 'hi' } }, + { type: 'content_block_stop', index: 0 }, + { type: 'message_delta', delta: { stop_reason: 'end_turn', stop_sequence: null }, usage: { output_tokens: 1 } }, + { type: 'message_stop' }, +]; + +const messagesProtocolFrames = async function* (): AsyncGenerator> { + for (const event of messagesEventsForRespond()) yield eventFrame(event); + yield doneFrame(); +}; + +const callRespond = async (wantsStream: boolean): Promise => { + initRepo(new InMemoryRepo()); + const app = new Hono(); + let captured: Response | undefined; + app.get('/', async c => { + const result: ExecuteResult> = eventResult( + messagesProtocolFrames(), + testTelemetryModelIdentity, + { headers: forwardedHeadersFixture() }, + ); + const { response } = await respondMessages(c, result, wantsStream, makeRespondCtx()); + captured = response; + return response; + }); + await app.request('/'); + if (!captured) throw new Error('respondMessages did not produce a Response'); + return captured; +}; + +test('respondMessages forwards upstream headers and strips hop-by-hop / framing / cookie headers on the non-streaming JSON response', async () => { + const response = await callRespond(false); + // forwarded verbatim + assertEquals(response.headers.get('anthropic-ratelimit-unified-status'), 'allowed_warning'); + assertEquals(response.headers.get('anthropic-ratelimit-unified-fallback-percentage'), '50'); + assertEquals(response.headers.get('request-id'), 'req_anthropic_abc'); + assertEquals(response.headers.get('cf-ray'), 'cf_ray_xyz'); + assertEquals(response.headers.get('openai-version'), '2024-10-21'); + assertEquals(response.headers.get('x-custom-thing'), 'ok'); + // hop-by-hop and cookies dropped + assertEquals(response.headers.get('connection'), null); + assertEquals(response.headers.get('transfer-encoding'), null); + assertEquals(response.headers.get('set-cookie'), null); + // framing headers dropped — upstream values would mis-frame the response; + // Response.json sets its own content-type, which must not echo upstream's + assertEquals(response.headers.get('content-length'), null); + assertEquals(response.headers.get('content-encoding'), null); + assertEquals(response.headers.get('content-type'), 'application/json'); +}); + +test('respondMessages forwards upstream headers and strips hop-by-hop / framing / cookie headers on the streaming SSE response', async () => { + const response = await callRespond(true); + // forwarded verbatim + assertEquals(response.headers.get('anthropic-ratelimit-unified-status'), 'allowed_warning'); + assertEquals(response.headers.get('anthropic-ratelimit-unified-fallback-percentage'), '50'); + assertEquals(response.headers.get('request-id'), 'req_anthropic_abc'); + assertEquals(response.headers.get('cf-ray'), 'cf_ray_xyz'); + assertEquals(response.headers.get('openai-version'), '2024-10-21'); + assertEquals(response.headers.get('x-custom-thing'), 'ok'); + // hop-by-hop and cookies dropped. `connection` and `transfer-encoding` + // are special-cased: Hono's streamSSE writer sets its own `keep-alive` / + // `chunked`, so we assert upstream's distinctive values did not survive + // rather than asserting absence. + assert(response.headers.get('connection') !== 'close'); + assert(response.headers.get('transfer-encoding') !== 'gzip'); + assertEquals(response.headers.get('set-cookie'), null); + // framing headers dropped; streamSSE writes its own text/event-stream and + // never emits content-length or content-encoding for a streamed body + assertEquals(response.headers.get('content-length'), null); + assertEquals(response.headers.get('content-encoding'), null); + assertEquals(response.headers.get('content-type')?.split(';')[0], 'text/event-stream'); + // Drain the body so the lazy generator releases its resources and the + // background `finally` block in `streamSSE` doesn't keep the test runner + // alive. + await response.text(); +}); diff --git a/packages/gateway/src/data-plane/llm/messages/serve_test.ts b/packages/gateway/src/data-plane/llm/messages/serve_test.ts index 6bc2e87e..d77aa67d 100644 --- a/packages/gateway/src/data-plane/llm/messages/serve_test.ts +++ b/packages/gateway/src/data-plane/llm/messages/serve_test.ts @@ -165,6 +165,7 @@ test('generate routes a native Messages candidate end to end', async () => { ok: true, events: makeProtocolFrames(makeMessagesResultEvents()), modelKey: 'test-model-key', + headers: new Headers(), })); queueCandidates([makeCandidate({ upstream: 'up_a', callMessages })]); @@ -187,6 +188,7 @@ test('generate translates through the Responses target when only that endpoint i ok: true, events: makeProtocolFrames([makeResponsesResultEvent()]), modelKey: 'responses-model-key', + headers: new Headers(), })); queueCandidates([makeCandidate({ upstream: 'up_r', targetApi: 'responses', callResponses })]); @@ -211,7 +213,7 @@ test('generate stops at the first candidate even when it yields an upstream erro ok: false, response: firstError, modelKey: 'first-key', })); const secondCall = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames(makeMessagesResultEvents('msg_second')), modelKey: 'second-key', + ok: true, events: makeProtocolFrames(makeMessagesResultEvents('msg_second')), modelKey: 'second-key', headers: new Headers(), })); queueCandidates([ makeCandidate({ upstream: 'up_a', callMessages: firstCall }), @@ -238,6 +240,7 @@ test('generate is a routing no-op when the payload carries no reasoning carriers ok: true, events: makeProtocolFrames(makeMessagesResultEvents()), modelKey: 'test-model-key', + headers: new Headers(), })); queueCandidates([ makeCandidate({ upstream: 'up_a', callMessages }), diff --git a/packages/gateway/src/data-plane/llm/responses/attempt.ts b/packages/gateway/src/data-plane/llm/responses/attempt.ts index cc8d20c9..9c8935c8 100644 --- a/packages/gateway/src/data-plane/llm/responses/attempt.ts +++ b/packages/gateway/src/data-plane/llm/responses/attempt.ts @@ -95,8 +95,11 @@ export const responsesAttempt = { responseId, }), chainResult.modelIdentity, - chainResult.performance, - chainResult.finalMetadata, + { + performance: chainResult.performance, + finalMetadata: chainResult.finalMetadata, + headers: chainResult.headers, + }, ); }, @@ -249,6 +252,6 @@ const callResponsesCompactAsExecuteResult = async ( return eventResult( syntheticEventsFromResult(providerResult.result), telemetryModelIdentity(candidate, providerResult.modelKey), - context, + { performance: context }, ); }; diff --git a/packages/gateway/src/data-plane/llm/responses/attempt_test.ts b/packages/gateway/src/data-plane/llm/responses/attempt_test.ts index 94dbc0d2..1b3da86e 100644 --- a/packages/gateway/src/data-plane/llm/responses/attempt_test.ts +++ b/packages/gateway/src/data-plane/llm/responses/attempt_test.ts @@ -157,6 +157,7 @@ test('generate native success wraps the upstream event stream once', async () => ok: true, events: makeProviderEvents([completedEvent]), modelKey: 'test-model-key', + headers: new Headers(), })); const candidate = makeCandidate(callResponses); @@ -345,6 +346,7 @@ test('generate inherits invocation headers across translation to Messages', asyn yield doneFrame(); })(), modelKey: 'k', + headers: new Headers(), }; }, }); @@ -441,7 +443,7 @@ test('generate seeds privatePayload before interceptors so the web-search shim r ]; const callResponses = vi.fn(async (_model, body): Promise> => { capturedBody = body as { input?: unknown[] }; - return { ok: true, events: makeProviderEvents(upstreamEvents), modelKey: 'test-model-key' }; + return { ok: true, events: makeProviderEvents(upstreamEvents), modelKey: 'test-model-key', headers: new Headers() }; }); const candidate = makeCandidate(callResponses); // The shim early-returns inactive unless the binding has the flag. The @@ -502,3 +504,37 @@ test('generate seeds privatePayload before interceptors so the web-search shim r 'shim emitted the not-preserved placeholder despite a stored private payload', ); }); + +test('generate propagates upstream response headers onto the EventResult so respond can forward them', async () => { + installRepo(); + const completedEvent: ResponsesStreamEvent = { + type: 'response.completed', + sequence_number: 0, + response: makeResponsesResult(), + }; + const upstreamHeaders = new Headers({ + 'anthropic-ratelimit-unified-status': 'allowed', + 'request-id': 'req_resp_xyz', + }); + const callResponses = vi.fn(async (): Promise> => ({ + ok: true, + events: makeProviderEvents([completedEvent]), + modelKey: 'test-model-key', + headers: upstreamHeaders, + })); + const candidate = makeCandidate(callResponses); + const store = createResponsesHttpStore(API_KEY_ID, true); + const result = await responsesAttempt.generate({ + payload: makePayload(), + ctx: makeGatewayCtx(), + store, + candidate, + snapshotMode: 'append', + }); + + assertEquals(result.type, 'events'); + if (result.type !== 'events') throw new Error('unreachable'); + assertEquals(result.headers?.get('anthropic-ratelimit-unified-status'), 'allowed'); + assertEquals(result.headers?.get('request-id'), 'req_resp_xyz'); + await collectEvents(result.events); +}); diff --git a/packages/gateway/src/data-plane/llm/responses/http_test.ts b/packages/gateway/src/data-plane/llm/responses/http_test.ts index 40066bf2..f445f084 100644 --- a/packages/gateway/src/data-plane/llm/responses/http_test.ts +++ b/packages/gateway/src/data-plane/llm/responses/http_test.ts @@ -127,6 +127,7 @@ test('POST /v1/responses streams a successful SSE body', async () => { ok: true, events: makeProviderEvents([completedEvent()]), modelKey: 'test-model-key', + headers: new Headers(), })); queueCandidates([makeCandidate({ callResponses })]); @@ -153,6 +154,7 @@ test('POST /v1/responses returns a single JSON body when stream is omitted', asy ok: true, events: makeProviderEvents([completedEvent('resp_nonstream')]), modelKey: 'test-model-key', + headers: new Headers(), })); queueCandidates([makeCandidate({ callResponses })]); @@ -268,6 +270,7 @@ test('POST /v1/responses rewrites the codex-auto-review alias before routing', a ok: true, events: makeProviderEvents([completedEvent()]), modelKey: 'test-model-key', + headers: new Headers(), }; }); queueCandidates([makeCandidate({ callResponses })]); diff --git a/packages/gateway/src/data-plane/llm/responses/interceptors/canonicalize-encrypted-content.ts b/packages/gateway/src/data-plane/llm/responses/interceptors/canonicalize-encrypted-content.ts index b25be487..31284c93 100644 --- a/packages/gateway/src/data-plane/llm/responses/interceptors/canonicalize-encrypted-content.ts +++ b/packages/gateway/src/data-plane/llm/responses/interceptors/canonicalize-encrypted-content.ts @@ -53,5 +53,9 @@ export const withReasoningEncryptedContentCanonicalized: ResponsesInterceptor = const result: ExecuteResult> = await run(); if (result.type !== 'events') return result; - return eventResult(canonicalizeEncryptedContent(result.events), result.modelIdentity, result.performance, result.finalMetadata); + return eventResult(canonicalizeEncryptedContent(result.events), result.modelIdentity, { + performance: result.performance, + finalMetadata: result.finalMetadata, + headers: result.headers, + }); }; diff --git a/packages/gateway/src/data-plane/llm/responses/interceptors/retry-cyber-policy_test.ts b/packages/gateway/src/data-plane/llm/responses/interceptors/retry-cyber-policy_test.ts index 2c37ccca..32a448b0 100644 --- a/packages/gateway/src/data-plane/llm/responses/interceptors/retry-cyber-policy_test.ts +++ b/packages/gateway/src/data-plane/llm/responses/interceptors/retry-cyber-policy_test.ts @@ -134,7 +134,7 @@ const protocolResult = (events: readonly ResponsesStreamEvent[], modelIdentity = for (const event of events) yield eventFrame(event); })(), modelIdentity, - performance, + { performance }, ); const collectFrames = async (events: AsyncIterable>): Promise[]> => { diff --git a/packages/gateway/src/data-plane/llm/responses/respond.ts b/packages/gateway/src/data-plane/llm/responses/respond.ts index 58795f9c..e5071206 100644 --- a/packages/gateway/src/data-plane/llm/responses/respond.ts +++ b/packages/gateway/src/data-plane/llm/responses/respond.ts @@ -5,7 +5,7 @@ import { RESPONSES_MISSING_TERMINAL_MESSAGE, collectResponsesProtocolEventsToRes import { responsesProtocolFrameToSSEFrame } from './events/to-sse.ts'; import { tokenUsage } from '../../shared/telemetry/usage.ts'; import type { GatewayCtx } from '../shared/gateway-ctx.ts'; -import { SourceStreamState, eventResultMetadata, plainResultToResponse, recordPerformance, recordUsage } from '../shared/respond.ts'; +import { SourceStreamState, eventResultMetadata, forwardUpstreamHeaders, mergeForwardedUpstreamHeaders, plainResultToResponse, recordPerformance, recordUsage } from '../shared/respond.ts'; import { type StreamCompletion, writeSSEFrames } from '../shared/stream/sse.ts'; import { type ProtocolFrame, sseCommentFrame, sseFrame } from '@floway-dev/protocols/common'; import { isResponsesTerminalEvent, type ResponsesResult, type ResponsesStreamEvent, responsesResultFromStreamEvent } from '@floway-dev/protocols/responses'; @@ -44,13 +44,14 @@ export const respondResponses = async ( const metadata = await eventResultMetadata(result); await recordUsage(ctx, metadata.modelIdentity, tokenUsageFromResponsesResult(response)); recordPerformance(ctx, metadata.performance, state.failed || response.status === 'failed'); - return { success: true, response: Response.json(response) }; + return { success: true, response: Response.json(response, { headers: mergeForwardedUpstreamHeaders(undefined, result.headers) }) }; } catch (error) { recordPerformance(ctx, result.performance, true); return { success: false, response: internalResponsesErrorResponse(502, toInternalDebugError(error, 'responses')) }; } } + forwardUpstreamHeaders(c, result.headers); const response = streamSSE(c, async stream => { let completion: StreamCompletion = 'error'; try { diff --git a/packages/gateway/src/data-plane/llm/responses/serve_test.ts b/packages/gateway/src/data-plane/llm/responses/serve_test.ts index 825044ce..54424904 100644 --- a/packages/gateway/src/data-plane/llm/responses/serve_test.ts +++ b/packages/gateway/src/data-plane/llm/responses/serve_test.ts @@ -142,6 +142,7 @@ test('generate routes a native Responses candidate end to end', async () => { ok: true, events: makeProtocolFrames([completed]), modelKey: 'test-model-key', + headers: new Headers(), })); const candidate = makeCandidate({ upstream: 'up_a', callResponses }); queueCandidates([candidate]); @@ -201,7 +202,7 @@ test('generate stops at the first candidate even when it yields an upstream erro response: makeResponsesResult('resp_second'), }; const secondCall = vi.fn(async (): Promise> => ({ - ok: true, events: makeProtocolFrames([completed]), modelKey: 'second-key', + ok: true, events: makeProtocolFrames([completed]), modelKey: 'second-key', headers: new Headers(), })); const first = makeCandidate({ upstream: 'up_a', callResponses: firstCall }); const second = makeCandidate({ upstream: 'up_b', callResponses: secondCall }); @@ -422,6 +423,7 @@ test('generate falls through translate-out to messages target', async () => { { type: 'message_stop' }, ]), modelKey: 'messages-key', + headers: new Headers(), })); const upstreamModel = stubUpstreamModel(); const provider = stubProvider({ callMessages }); @@ -475,6 +477,7 @@ test('generate falls through translate-out to chat-completions target', async () }, ]), modelKey: 'chat-completions-key', + headers: new Headers(), })); const upstreamModel = stubUpstreamModel(); const provider = stubProvider({ callChatCompletions }); @@ -518,6 +521,7 @@ test('generate reuses an existing input row when a later turn echoes the same us response: makeResponsesResult(`resp_turn_${turn}`), }]), modelKey: 'test-model-key', + headers: new Headers(), }; }); const store = createResponsesHttpStore(API_KEY_ID, true); diff --git a/packages/gateway/src/data-plane/llm/shared/attempt-helpers.ts b/packages/gateway/src/data-plane/llm/shared/attempt-helpers.ts index 323f391f..66dd9a04 100644 --- a/packages/gateway/src/data-plane/llm/shared/attempt-helpers.ts +++ b/packages/gateway/src/data-plane/llm/shared/attempt-helpers.ts @@ -35,6 +35,6 @@ export const providerStreamResultToExecuteResult = async ( return eventResult( withUpstreamTelemetry(providerResult.events as AsyncIterable>, ctx, context, candidate.targetApi, durationMs), telemetryModelIdentity(candidate, providerResult.modelKey), - context, + { performance: context, headers: providerResult.headers }, ); }; diff --git a/packages/gateway/src/data-plane/llm/shared/respond.ts b/packages/gateway/src/data-plane/llm/shared/respond.ts index 10789cdc..cc4b9cd0 100644 --- a/packages/gateway/src/data-plane/llm/shared/respond.ts +++ b/packages/gateway/src/data-plane/llm/shared/respond.ts @@ -1,3 +1,5 @@ +import type { Context } from 'hono'; + import type { StreamCompletion } from './stream/sse.ts'; import type { TokenUsage } from '../../../repo/types.ts'; import { recordRequestPerformance } from '../../shared/telemetry/performance.ts'; @@ -60,3 +62,66 @@ export const recordUsage = async (ctx: GatewayCtx, modelIdentity: TelemetryModel export const recordPerformance = (ctx: GatewayCtx, context: EventResultMetadata['performance'], failed: boolean): void => { recordRequestPerformance(ctx.backgroundScheduler, context, failed, performance.now() - ctx.requestStartedAt); }; + +// Upstream response headers we propagate verbatim to the downstream client. +// A blocklist (not an allowlist): operators want to see what the upstream +// actually sent — vendor traces (`request-id`, `cf-ray`), plan-billing state +// (`anthropic-ratelimit-*`, which the official `claude-code` CLI's `/status` +// indicator reads), and any future `x-*` an upstream introduces. We only +// strip what we MUST or what would actively break downstream framing: +// +// - hop-by-hop headers (RFC 7230 §6.1) MUST NOT be forwarded by +// intermediaries. +// - `content-length` / `content-encoding` / `content-type` are managed by +// the streaming layer: it rewrites the body (SSE re-framing, optional +// decompression + re-encode) so upstream's values would mis-frame the +// downstream response. The SSE writer sets its own `text/event-stream`; +// non-SSE pass-throughs hand content-type back via their own path. +// - `set-cookie` / `set-cookie2`: we didn't issue these and propagating +// upstream session bindings is a footgun. +const BLOCKED_UPSTREAM_HEADERS: ReadonlySet = new Set([ + // hop-by-hop (RFC 7230 §6.1) + 'connection', + 'keep-alive', + 'proxy-authenticate', + 'proxy-authorization', + 'te', + 'trailer', + 'transfer-encoding', + 'upgrade', + // body framing — owned by the streaming layer + 'content-length', + 'content-encoding', + 'content-type', + // cookies + 'set-cookie', + 'set-cookie2', +]); + +export const isForwardableUpstreamHeader = (name: string): boolean => + !BLOCKED_UPSTREAM_HEADERS.has(name.toLowerCase()); + +// Stages forwardable upstream headers onto the Hono context so the next +// `c.newResponse` (or `streamSSE`'s internal `c.newResponse`) emits them on +// the response. Hono's `c.header()` is the only knob that survives a later +// `c.json` or `streamSSE` call without being overwritten. Safe to call with +// `undefined` so callers can pass `result.headers` directly. +export const forwardUpstreamHeaders = (c: Context, headers: Headers | undefined): void => { + if (!headers) return; + for (const [name, value] of headers) { + if (isForwardableUpstreamHeader(name)) c.header(name, value); + } +}; + +// Returns a `HeadersInit` extending `base` with every forwardable entry from +// `upstream`. Used by non-streaming JSON responses where the response is +// built directly (`Response.json(...)`) instead of through Hono's `c`. +export const mergeForwardedUpstreamHeaders = (base: HeadersInit | undefined, upstream: Headers | undefined): HeadersInit => { + const merged = new Headers(base); + if (upstream) { + for (const [name, value] of upstream) { + if (isForwardableUpstreamHeader(name)) merged.set(name, value); + } + } + return merged; +}; diff --git a/packages/provider-codex/src/interceptors/responses/inject-default-instructions_test.ts b/packages/provider-codex/src/interceptors/responses/inject-default-instructions_test.ts index d4b00182..870fad12 100644 --- a/packages/provider-codex/src/interceptors/responses/inject-default-instructions_test.ts +++ b/packages/provider-codex/src/interceptors/responses/inject-default-instructions_test.ts @@ -9,7 +9,7 @@ import { assertEquals, stubUpstreamModel } from '@floway-dev/test-utils'; const stubRequest = {}; const okEvents = (): Promise> => - Promise.resolve({ ok: true, events: (async function* () {})(), modelKey: 'test' }); + Promise.resolve({ ok: true, events: (async function* () {})(), modelKey: 'test', headers: new Headers() }); const invocation = (payload: ResponsesPayload): ResponsesBoundaryCtx => ({ payload, diff --git a/packages/provider-codex/src/interceptors/responses/inject-session-id_test.ts b/packages/provider-codex/src/interceptors/responses/inject-session-id_test.ts index 6d4b702e..c7d93d24 100644 --- a/packages/provider-codex/src/interceptors/responses/inject-session-id_test.ts +++ b/packages/provider-codex/src/interceptors/responses/inject-session-id_test.ts @@ -9,7 +9,7 @@ import { assert, assertEquals, stubUpstreamModel } from '@floway-dev/test-utils' const stubRequest = {}; const okEvents = (): Promise> => - Promise.resolve({ ok: true, events: (async function* () {})(), modelKey: 'test' }); + Promise.resolve({ ok: true, events: (async function* () {})(), modelKey: 'test', headers: new Headers() }); const invocation = (payload: ResponsesPayload, headers: Record = {}): ResponsesBoundaryCtx => ({ payload, diff --git a/packages/provider-codex/src/interceptors/responses/strip-unsupported-fields_test.ts b/packages/provider-codex/src/interceptors/responses/strip-unsupported-fields_test.ts index 70594678..7f523a1e 100644 --- a/packages/provider-codex/src/interceptors/responses/strip-unsupported-fields_test.ts +++ b/packages/provider-codex/src/interceptors/responses/strip-unsupported-fields_test.ts @@ -9,7 +9,7 @@ import { assertEquals, assertFalse, stubUpstreamModel } from '@floway-dev/test-u const stubRequest = {}; const okEvents = (): Promise> => - Promise.resolve({ ok: true, events: (async function* () {})(), modelKey: 'test' }); + Promise.resolve({ ok: true, events: (async function* () {})(), modelKey: 'test', headers: new Headers() }); const invocation = (payload: ResponsesPayload): ResponsesBoundaryCtx => ({ payload, diff --git a/packages/provider-copilot/src/provider.ts b/packages/provider-copilot/src/provider.ts index b28e9b65..8ca4e105 100644 --- a/packages/provider-copilot/src/provider.ts +++ b/packages/provider-copilot/src/provider.ts @@ -227,7 +227,13 @@ export const createCopilotProvider = async (record: UpstreamRecord): Promise>, ): Promise>> => { const stream = await streamPromise; - if (stream.ok) return eventResult(stream.events as AsyncIterable>, placeholderIdentity(stream.modelKey)); + if (stream.ok) { + return eventResult( + stream.events as AsyncIterable>, + placeholderIdentity(stream.modelKey), + { headers: stream.headers }, + ); + } return await readUpstreamError(stream.response); }; @@ -241,7 +247,12 @@ export const createCopilotProvider = async (record: UpstreamRecord): Promise => { if (result.type === 'events') { - return { ok: true, events: result.events as AsyncIterable>, modelKey }; + return { + ok: true, + events: result.events as AsyncIterable>, + modelKey, + ...(result.headers ? { headers: result.headers } : {}), + }; } if (result.type === 'upstream-error') { return { ok: false, response: upstreamErrorToResponse(result), modelKey }; diff --git a/packages/provider/src/provider.ts b/packages/provider/src/provider.ts index 893faf0c..7b8675dd 100644 --- a/packages/provider/src/provider.ts +++ b/packages/provider/src/provider.ts @@ -41,12 +41,17 @@ export interface ProviderCallResult { // Streaming endpoints (Messages / Responses / ChatCompletions) return decoded // protocol frames directly — the provider drives the upstream fetch, parses // the SSE wire via @floway-dev/protocols, and emits the typed event stream. +// `ok: true` optionally carries the raw upstream `Headers` so the source-side +// `respond` layer can forward them to the downstream client (blocklist in +// gateway `shared/respond.ts` — hop-by-hop, body framing, cookies). Absent +// on lifted/synthesized streams that have no upstream Response behind them, +// matching the same shape on `EventResult`. // `ok: false` carries the raw upstream Response verbatim so the gateway -// boundary can relay status + body unchanged. Non-2xx-but-not-SSE responses -// throw from the provider as a contract violation (provider always forces -// stream=true on streaming endpoints). +// boundary can relay status + body + headers unchanged. Non-2xx-but-not-SSE +// responses throw from the provider as a contract violation (provider always +// forces stream=true on streaming endpoints). export type ProviderStreamResult = - | { ok: true; events: AsyncIterable>; modelKey: string } + | { ok: true; events: AsyncIterable>; modelKey: string; headers?: Headers } | { ok: false; response: Response; modelKey: string }; // `/responses/compact` is non-streaming — the upstream returns a single diff --git a/packages/provider/src/result.ts b/packages/provider/src/result.ts index 24d7048b..e1d2365c 100644 --- a/packages/provider/src/result.ts +++ b/packages/provider/src/result.ts @@ -7,6 +7,10 @@ export interface EventResult { modelIdentity: TelemetryModelIdentity; performance?: PerformanceTelemetryContext; finalMetadata?: Promise; + // Raw upstream response headers for the source-side `respond` layer to + // forward (blocklist in gateway `shared/respond.ts`). Absent on + // lifted/synthesized streams that have no upstream Response behind them. + headers?: Headers; } export interface EventResultMetadata { @@ -43,15 +47,21 @@ export interface PlainResult { export type ExecuteResult = EventResult | UpstreamErrorResult | InternalErrorResult; +export interface EventResultOptions { + performance?: PerformanceTelemetryContext; + finalMetadata?: Promise; + headers?: Headers; +} + export const eventResult = ( events: AsyncIterable, modelIdentity: TelemetryModelIdentity, - performance?: PerformanceTelemetryContext, - finalMetadata?: Promise, + options: EventResultOptions = {}, ): EventResult => { const result: EventResult = { type: 'events', events, modelIdentity }; - if (performance !== undefined) result.performance = performance; - if (finalMetadata !== undefined) result.finalMetadata = finalMetadata; + if (options.performance !== undefined) result.performance = options.performance; + if (options.finalMetadata !== undefined) result.finalMetadata = options.finalMetadata; + if (options.headers !== undefined) result.headers = options.headers; return result; }; diff --git a/packages/provider/src/streaming.ts b/packages/provider/src/streaming.ts index 2153efd2..d449c5df 100644 --- a/packages/provider/src/streaming.ts +++ b/packages/provider/src/streaming.ts @@ -39,5 +39,5 @@ export const streamingProviderCall = async ( const snippet = await readBodySnippet(response); throw new Error(`Upstream returned ${response.status} with content-type "${contentType || 'unknown'}" but stream is required (provider must force stream=true and return text/event-stream when response.ok). Body: ${snippet}`); } - return { ok: true, events: parser(response.body, { signal }), modelKey }; + return { ok: true, events: parser(response.body, { signal }), modelKey, headers: response.headers }; };