diff --git a/src/routes/chat-completions/handler.ts b/src/routes/chat-completions/handler.ts index 04a5ae9ed..ecc5851ab 100644 --- a/src/routes/chat-completions/handler.ts +++ b/src/routes/chat-completions/handler.ts @@ -1,7 +1,6 @@ import type { Context } from "hono" import consola from "consola" -import { streamSSE, type SSEMessage } from "hono/streaming" import { awaitApproval } from "~/lib/approval" import { checkRateLimit } from "~/lib/rate-limit" @@ -10,7 +9,6 @@ import { getTokenCount } from "~/lib/tokenizer" import { isNullish } from "~/lib/utils" import { createChatCompletions, - type ChatCompletionResponse, type ChatCompletionsPayload, } from "~/services/copilot/create-chat-completions" @@ -39,7 +37,23 @@ export async function handleCompletion(c: Context) { if (state.manualApprove) await awaitApproval() - if (isNullish(payload.max_tokens)) { + // Normalize max_completion_tokens to max_tokens for upstream compatibility + const payloadAny = payload as unknown as Record + if ( + payloadAny.max_completion_tokens !== null + && payloadAny.max_completion_tokens !== undefined + && isNullish(payload.max_tokens) + ) { + payload = { + ...payload, + max_tokens: payloadAny.max_completion_tokens as number, + } + delete (payload as unknown as Record).max_completion_tokens + consola.debug( + "Normalized max_completion_tokens to max_tokens:", + payload.max_tokens, + ) + } else if (isNullish(payload.max_tokens)) { payload = { ...payload, max_tokens: selectedModel?.capabilities.limits.max_output_tokens, @@ -49,20 +63,44 @@ export async function handleCompletion(c: Context) { const response = await createChatCompletions(payload) - if (isNonStreaming(response)) { - consola.debug("Non-streaming response:", JSON.stringify(response)) - return c.json(response) - } + // Streaming: response is a raw fetch Response — pipe body directly + if (response instanceof Response) { + const startTime = Date.now() + let byteCount = 0 + let chunkCount = 0 + + const monitor = new TransformStream({ + transform(chunk: Uint8Array, controller) { + chunkCount++ + byteCount += chunk.byteLength + if (chunkCount === 1) { + consola.info(`TTFT: ${Date.now() - startTime}ms`) + } + controller.enqueue(chunk) + }, + flush() { + consola.info( + `Stream complete: ${chunkCount} chunks, ${byteCount} bytes in ${Date.now() - startTime}ms`, + ) + }, + }) - consola.debug("Streaming response") - return streamSSE(c, async (stream) => { - for await (const chunk of response) { - consola.debug("Streaming chunk:", JSON.stringify(chunk)) - await stream.writeSSE(chunk as SSEMessage) + const body = response.body + if (!body) { + return c.text("No response body", 502) } - }) -} -const isNonStreaming = ( - response: Awaited>, -): response is ChatCompletionResponse => Object.hasOwn(response, "choices") + return new Response(body.pipeThrough(monitor), { + status: 200, + headers: { + "Content-Type": "text/event-stream", + "Cache-Control": "no-cache", + Connection: "keep-alive", + }, + }) + } + + // Non-streaming: response is a parsed JSON object + consola.debug("Non-streaming response:", JSON.stringify(response)) + return c.json(response) +} diff --git a/src/routes/messages/handler.ts b/src/routes/messages/handler.ts index 85dbf6243..778534634 100644 --- a/src/routes/messages/handler.ts +++ b/src/routes/messages/handler.ts @@ -1,6 +1,7 @@ import type { Context } from "hono" import consola from "consola" +import { events } from "fetch-event-stream" import { streamSSE } from "hono/streaming" import { awaitApproval } from "~/lib/approval" @@ -9,7 +10,6 @@ import { state } from "~/lib/state" import { createChatCompletions, type ChatCompletionChunk, - type ChatCompletionResponse, } from "~/services/copilot/create-chat-completions" import { @@ -40,52 +40,50 @@ export async function handleCompletion(c: Context) { const response = await createChatCompletions(openAIPayload) - if (isNonStreaming(response)) { - consola.debug( - "Non-streaming response from Copilot:", - JSON.stringify(response).slice(-400), - ) - const anthropicResponse = translateToAnthropic(response) - consola.debug( - "Translated Anthropic response:", - JSON.stringify(anthropicResponse), - ) - return c.json(anthropicResponse) - } - - consola.debug("Streaming response from Copilot") - return streamSSE(c, async (stream) => { - const streamState: AnthropicStreamState = { - messageStartSent: false, - contentBlockIndex: 0, - contentBlockOpen: false, - toolCalls: {}, - } - - for await (const rawEvent of response) { - consola.debug("Copilot raw stream event:", JSON.stringify(rawEvent)) - if (rawEvent.data === "[DONE]") { - break + // Streaming: response is a raw fetch Response + if (response instanceof Response) { + consola.debug("Streaming response from Copilot") + return streamSSE(c, async (stream) => { + const streamState: AnthropicStreamState = { + messageStartSent: false, + contentBlockIndex: 0, + contentBlockOpen: false, + toolCalls: {}, } - if (!rawEvent.data) { - continue - } + for await (const rawEvent of events(response)) { + consola.debug("Copilot raw stream event:", JSON.stringify(rawEvent)) + if (rawEvent.data === "[DONE]") { + break + } + + if (!rawEvent.data) { + continue + } - const chunk = JSON.parse(rawEvent.data) as ChatCompletionChunk - const events = translateChunkToAnthropicEvents(chunk, streamState) + const chunk = JSON.parse(rawEvent.data) as ChatCompletionChunk + const sseEvents = translateChunkToAnthropicEvents(chunk, streamState) - for (const event of events) { - consola.debug("Translated Anthropic event:", JSON.stringify(event)) - await stream.writeSSE({ - event: event.type, - data: JSON.stringify(event), - }) + for (const event of sseEvents) { + consola.debug("Translated Anthropic event:", JSON.stringify(event)) + await stream.writeSSE({ + event: event.type, + data: JSON.stringify(event), + }) + } } - } - }) -} + }) + } -const isNonStreaming = ( - response: Awaited>, -): response is ChatCompletionResponse => Object.hasOwn(response, "choices") + // Non-streaming: response is a parsed JSON object + consola.debug( + "Non-streaming response from Copilot:", + JSON.stringify(response).slice(-400), + ) + const anthropicResponse = translateToAnthropic(response) + consola.debug( + "Translated Anthropic response:", + JSON.stringify(anthropicResponse), + ) + return c.json(anthropicResponse) +} diff --git a/src/services/copilot/create-chat-completions.ts b/src/services/copilot/create-chat-completions.ts index 8534151da..16934280e 100644 --- a/src/services/copilot/create-chat-completions.ts +++ b/src/services/copilot/create-chat-completions.ts @@ -1,5 +1,4 @@ import consola from "consola" -import { events } from "fetch-event-stream" import { copilotHeaders, copilotBaseUrl } from "~/lib/api-config" import { HTTPError } from "~/lib/error" @@ -40,7 +39,7 @@ export const createChatCompletions = async ( } if (payload.stream) { - return events(response) + return response } return (await response.json()) as ChatCompletionResponse diff --git a/src/start.ts b/src/start.ts index 14abbbdff..3b267951b 100644 --- a/src/start.ts +++ b/src/start.ts @@ -117,6 +117,11 @@ export async function runServer(options: RunServerOptions): Promise { serve({ fetch: server.fetch as ServerHandler, port: options.port, + bun: { + // Disable idle timeout to prevent Bun from closing SSE/streaming connections + // Default is 10 seconds which kills long-running LLM streams + idleTimeout: 0, + }, }) }