Skip to content
Open

nice2 #136

Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
124 changes: 108 additions & 16 deletions e2e-chatbot-app-next/server/src/routes/chat.ts
Original file line number Diff line number Diff line change
Expand Up @@ -239,28 +239,120 @@ chatRouter.post('/', requireAuth, async (req: Request, res: Response) => {

/**
* We manually create the stream to have access to the stream writer.
* This allows us to inject custom stream parts like data-error.
*
* Instead of writer.merge(), we read the inner UI stream manually so that
* the execute promise stays unresolved until we're fully done — including
* a potential generateText fallback. This keeps the outer stream (and its
* onFinish) waiting, because createUIMessageStream only closes the
* controller and fires onFinish once every promise in
* ongoingStreamPromises has settled.
*/
const stream = createUIMessageStream({
execute: async ({ writer }) => {
writer.merge(
result.toUIMessageStream({
originalMessages: uiMessages,
generateMessageId: generateUUID,
sendReasoning: true,
sendSources: true,
onError: (error) => {
console.error('Stream error:', error);
let streamingFailed = false;
let streamErrorMessage: string | undefined;
let hasStartBeenForwarded = false;
let hasStartStepBeenForwarded = false;
let hasContentBeenForwarded = false;

const uiStream = result.toUIMessageStream({
originalMessages: uiMessages,
generateMessageId: generateUUID,
sendReasoning: true,
sendSources: true,
onError: (error) => {
console.error('Stream error:', error);
streamingFailed = true;
streamErrorMessage =
error instanceof Error ? error.message : JSON.stringify(error);
return streamErrorMessage;
},
});

const errorMessage =
error instanceof Error ? error.message : JSON.stringify(error);
const reader = uiStream.getReader();
try {
while (true) {
const { done, value } = await reader.read();
if (done) break;

if (value.type === 'error') {
streamingFailed = true;
streamErrorMessage = streamErrorMessage || value.errorText;
continue;
}

if (value.type === 'start') hasStartBeenForwarded = true;
if (value.type === 'start-step') hasStartStepBeenForwarded = true;
if (
value.type === 'text-delta' ||
value.type === 'reasoning-delta'
) {
hasContentBeenForwarded = true;
}

writer.write(value);
}
} catch (readError) {
streamingFailed = true;
streamErrorMessage =
readError instanceof Error
? readError.message
: String(readError);
}

writer.write({ type: 'data-error', data: errorMessage });
if (streamingFailed && !hasContentBeenForwarded) {
console.log(
'[Chat] Streaming failed before content was sent, ' +
'attempting generateText fallback...',
);
try {
const fallbackResult = await generateText({
model,
messages: await convertToModelMessages(uiMessages),
headers: {
[CONTEXT_HEADER_CONVERSATION_ID]: id,
[CONTEXT_HEADER_USER_ID]:
session.user.email ?? session.user.id,
},
});

return errorMessage;
},
}),
);
finalUsage = fallbackResult.usage;

const textId = generateUUID();
if (!hasStartBeenForwarded) {
writer.write({ type: 'start' });
}
if (!hasStartStepBeenForwarded) {
writer.write({ type: 'start-step' });
}
writer.write({ type: 'text-start', id: textId });
writer.write({
type: 'text-delta',
id: textId,
delta: fallbackResult.text,
});
writer.write({ type: 'text-end', id: textId });
writer.write({ type: 'finish-step' });
writer.write({ type: 'finish', finishReason: 'stop' });
} catch (fallbackError) {
console.error(
'[Chat] generateText fallback also failed:',
fallbackError,
);
const errorMsg =
fallbackError instanceof Error
? fallbackError.message
: JSON.stringify(fallbackError);
writer.write({ type: 'error', errorText: errorMsg });
}
} else if (streamingFailed && hasContentBeenForwarded) {
writer.write({
type: 'error',
errorText:
streamErrorMessage ||
'Stream failed after partial content was already sent',
});
}
},
onFinish: async ({ responseMessage }) => {
console.log(
Expand Down