diff --git a/e2e-chatbot-app-next/server/src/routes/chat.ts b/e2e-chatbot-app-next/server/src/routes/chat.ts index d5428237..e3ed54c5 100644 --- a/e2e-chatbot-app-next/server/src/routes/chat.ts +++ b/e2e-chatbot-app-next/server/src/routes/chat.ts @@ -239,28 +239,120 @@ chatRouter.post('/', requireAuth, async (req: Request, res: Response) => { /** * We manually create the stream to have access to the stream writer. - * This allows us to inject custom stream parts like data-error. + * + * Instead of writer.merge(), we read the inner UI stream manually so that + * the execute promise stays unresolved until we're fully done — including + * a potential generateText fallback. This keeps the outer stream (and its + * onFinish) waiting, because createUIMessageStream only closes the + * controller and fires onFinish once every promise in + * ongoingStreamPromises has settled. */ const stream = createUIMessageStream({ execute: async ({ writer }) => { - writer.merge( - result.toUIMessageStream({ - originalMessages: uiMessages, - generateMessageId: generateUUID, - sendReasoning: true, - sendSources: true, - onError: (error) => { - console.error('Stream error:', error); + let streamingFailed = false; + let streamErrorMessage: string | undefined; + let hasStartBeenForwarded = false; + let hasStartStepBeenForwarded = false; + let hasContentBeenForwarded = false; + + const uiStream = result.toUIMessageStream({ + originalMessages: uiMessages, + generateMessageId: generateUUID, + sendReasoning: true, + sendSources: true, + onError: (error) => { + console.error('Stream error:', error); + streamingFailed = true; + streamErrorMessage = + error instanceof Error ? error.message : JSON.stringify(error); + return streamErrorMessage; + }, + }); - const errorMessage = - error instanceof Error ? error.message : JSON.stringify(error); + const reader = uiStream.getReader(); + try { + while (true) { + const { done, value } = await reader.read(); + if (done) break; + + if (value.type === 'error') { + streamingFailed = true; + streamErrorMessage = streamErrorMessage || value.errorText; + continue; + } + + if (value.type === 'start') hasStartBeenForwarded = true; + if (value.type === 'start-step') hasStartStepBeenForwarded = true; + if ( + value.type === 'text-delta' || + value.type === 'reasoning-delta' + ) { + hasContentBeenForwarded = true; + } + + writer.write(value); + } + } catch (readError) { + streamingFailed = true; + streamErrorMessage = + readError instanceof Error + ? readError.message + : String(readError); + } - writer.write({ type: 'data-error', data: errorMessage }); + if (streamingFailed && !hasContentBeenForwarded) { + console.log( + '[Chat] Streaming failed before content was sent, ' + + 'attempting generateText fallback...', + ); + try { + const fallbackResult = await generateText({ + model, + messages: await convertToModelMessages(uiMessages), + headers: { + [CONTEXT_HEADER_CONVERSATION_ID]: id, + [CONTEXT_HEADER_USER_ID]: + session.user.email ?? session.user.id, + }, + }); - return errorMessage; - }, - }), - ); + finalUsage = fallbackResult.usage; + + const textId = generateUUID(); + if (!hasStartBeenForwarded) { + writer.write({ type: 'start' }); + } + if (!hasStartStepBeenForwarded) { + writer.write({ type: 'start-step' }); + } + writer.write({ type: 'text-start', id: textId }); + writer.write({ + type: 'text-delta', + id: textId, + delta: fallbackResult.text, + }); + writer.write({ type: 'text-end', id: textId }); + writer.write({ type: 'finish-step' }); + writer.write({ type: 'finish', finishReason: 'stop' }); + } catch (fallbackError) { + console.error( + '[Chat] generateText fallback also failed:', + fallbackError, + ); + const errorMsg = + fallbackError instanceof Error + ? fallbackError.message + : JSON.stringify(fallbackError); + writer.write({ type: 'error', errorText: errorMsg }); + } + } else if (streamingFailed && hasContentBeenForwarded) { + writer.write({ + type: 'error', + errorText: + streamErrorMessage || + 'Stream failed after partial content was already sent', + }); + } }, onFinish: async ({ responseMessage }) => { console.log(