Skip to content

Commit 4f51faa

Browse files
committed
feat: AI assistant streaming with early change application
Add real-time streaming support for AI assistant responses and apply structured changes (workflow YAML, job code) before text finishes streaming. Backend: - Handle SSE streaming events from Apollo (text chunks, status, changes) - Broadcast streaming events via PubSub to ai_session topics - Forward streaming events through Phoenix Channel to WebSocket clients - Handle Mint transport errors gracefully during SSE streaming - Fix message_processor crash on non-map error tuples Frontend: - Add streaming state (content, status, changes) to AI assistant store - Register streaming event handlers in AIChannelRegistry - Render streaming content in MessageList during AI response - Auto-apply workflow YAML and auto-preview job code when streaming changes arrive, before text finishes streaming - Deduplicate auto-apply to prevent double-application when final message arrives
1 parent b6a5d2f commit 4f51faa

19 files changed

Lines changed: 1402 additions & 437 deletions

assets/js/collaborative-editor/components/AIAssistantPanelWrapper.tsx

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,9 @@ import {
1212
useAISessionId,
1313
useAISessionType,
1414
useAIStore,
15+
useAIStreamingChanges,
16+
useAIStreamingContent,
17+
useAIStreamingStatus,
1518
useAIWorkflowTemplateContext,
1619
} from '../hooks/useAIAssistant';
1720
import { useAISessionCommands } from '../hooks/useAIChannelRegistry';
@@ -122,6 +125,9 @@ export function AIAssistantPanelWrapper({
122125
} = useAISessionCommands();
123126
const messages = useAIMessages();
124127
const isLoading = useAIIsLoading();
128+
const streamingContent = useAIStreamingContent();
129+
const streamingStatus = useAIStreamingStatus();
130+
const streamingChanges = useAIStreamingChanges();
125131
const sessionId = useAISessionId();
126132
const sessionType = useAISessionType();
127133
const connectionState = useAIConnectionState();
@@ -539,6 +545,58 @@ export function AIAssistantPanelWrapper({
539545
onPreview: handlePreviewJobCode,
540546
});
541547

548+
// Auto-apply streaming changes as soon as they arrive (before text finishes)
549+
// This triggers the same apply/preview logic that normally runs on new_message,
550+
// but earlier — as soon as Apollo sends the structured changes event.
551+
const appliedStreamingChangesRef = useRef<Record<string, unknown> | null>(
552+
null
553+
);
554+
// Track whether we applied via streaming so we can skip the duplicate
555+
// auto-apply when the final new_message arrives
556+
const appliedViaStreamingRef = useRef(false);
557+
useEffect(() => {
558+
if (!streamingChanges || !canApplyChanges) return;
559+
// Avoid re-applying the same streaming changes object
560+
if (appliedStreamingChangesRef.current === streamingChanges) return;
561+
appliedStreamingChangesRef.current = streamingChanges;
562+
563+
if (aiMode?.page === 'workflow_template' && 'yaml' in streamingChanges) {
564+
const yaml = streamingChanges['yaml'] as string;
565+
if (yaml) {
566+
appliedViaStreamingRef.current = true;
567+
void handleApplyWorkflow(yaml, '__streaming__');
568+
}
569+
} else if (aiMode?.page === 'job_code' && 'code' in streamingChanges) {
570+
const code = streamingChanges['code'] as string;
571+
if (code) {
572+
appliedViaStreamingRef.current = true;
573+
handlePreviewJobCode(code, '__streaming__');
574+
}
575+
}
576+
}, [
577+
streamingChanges,
578+
aiMode?.page,
579+
canApplyChanges,
580+
handleApplyWorkflow,
581+
handlePreviewJobCode,
582+
]);
583+
584+
// When a new assistant message with code arrives after we already applied
585+
// via streaming, mark it as already applied to prevent duplicate auto-apply
586+
// and update previewingMessageId to the real ID to prevent diff flicker
587+
useEffect(() => {
588+
if (!appliedViaStreamingRef.current) return;
589+
590+
const latestAssistantMessage = [...messages]
591+
.reverse()
592+
.find(m => m.role === 'assistant' && m.code && m.status === 'success');
593+
594+
if (latestAssistantMessage) {
595+
appliedMessageIdsRef.current.add(latestAssistantMessage.id);
596+
appliedViaStreamingRef.current = false;
597+
}
598+
}, [messages, appliedMessageIdsRef]);
599+
542600
return (
543601
<div
544602
className="flex h-full flex-shrink-0"
@@ -626,6 +684,8 @@ export function AIAssistantPanelWrapper({
626684
}
627685
onRetryMessage={handleRetryMessage}
628686
isWriteDisabled={isWriteDisabled}
687+
streamingContent={streamingContent}
688+
streamingStatus={streamingStatus}
629689
/>
630690
</AIAssistantPanel>
631691
</div>

assets/js/collaborative-editor/components/MessageList.tsx

Lines changed: 124 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
import { useEffect, useRef, useState } from 'react';
1+
import { useEffect, useMemo, useRef, useState } from 'react';
22
import Markdown from 'react-markdown';
33
import remarkGfm from 'remark-gfm';
44

@@ -9,6 +9,11 @@ import type { Message } from '../types/ai-assistant';
99

1010
import { Tooltip } from './Tooltip';
1111

12+
const STREAMING_MESSAGE_ID = '__streaming__' as const;
13+
14+
const PROSE_CLASSES =
15+
'text-sm text-gray-700 leading-relaxed prose prose-sm max-w-none prose-headings:font-medium prose-h1:text-lg prose-h1:text-gray-900 prose-h1:mb-3 prose-h2:text-base prose-h2:text-gray-900 prose-h2:mb-2 prose-h2:mt-5 prose-h3:text-sm prose-h3:text-gray-900 prose-h3:mb-2 prose-h3:font-semibold prose-p:mb-3 prose-p:last:mb-0 prose-p:text-gray-700 prose-ul:list-disc prose-ul:pl-5 prose-ul:mb-3 prose-ul:space-y-1 prose-ol:list-decimal prose-ol:pl-5 prose-ol:mb-3 prose-ol:space-y-1 prose-li:text-gray-700 prose-strong:font-medium prose-strong:text-gray-900 prose-em:italic prose-a:text-primary-600 prose-a:hover:text-primary-700 prose-a:underline prose-a:font-normal prose-code:px-1.5 prose-code:py-0.5 prose-code:bg-gray-100 prose-code:text-gray-800 prose-code:rounded prose-code:text-xs prose-code:font-mono prose-code:font-normal prose-pre:rounded-md prose-pre:bg-slate-100 prose-pre:border-2 prose-pre:border-slate-200 prose-pre:text-slate-800 prose-pre:p-4 prose-pre:overflow-x-auto prose-pre:text-xs prose-pre:font-mono prose-pre:mb-4';
16+
1217
/**
1318
* Custom code block component for react-markdown
1419
* Renders code with COPY/ADD action buttons
@@ -384,8 +389,9 @@ interface MessageListProps {
384389
showAddButtons?: boolean;
385390
showApplyButton?: boolean;
386391
onRetryMessage?: (messageId: string) => void;
387-
/** Whether write actions (Apply/Add) are disabled due to readonly mode */
388392
isWriteDisabled?: boolean;
393+
streamingContent?: string | null;
394+
streamingStatus?: string | null;
389395
}
390396

391397
export function MessageList({
@@ -400,6 +406,8 @@ export function MessageList({
400406
showApplyButton = false,
401407
onRetryMessage,
402408
isWriteDisabled = false,
409+
streamingContent,
410+
streamingStatus,
403411
}: MessageListProps) {
404412
const loadingRef = useRef<HTMLDivElement>(null);
405413
const messagesEndRef = useRef<HTMLDivElement>(null);
@@ -417,10 +425,34 @@ export function MessageList({
417425

418426
useEffect(() => {
419427
if (isLoading && loadingRef.current) {
420-
loadingRef.current.scrollIntoView({ behavior: 'smooth', block: 'end' });
428+
loadingRef.current.scrollIntoView({
429+
behavior: 'smooth',
430+
block: 'end',
431+
});
421432
}
422433
}, [isLoading]);
423434

435+
const scrollTimeoutRef = useRef<ReturnType<typeof setTimeout>>();
436+
437+
useEffect(() => {
438+
if (streamingContent && messagesEndRef.current) {
439+
if (scrollTimeoutRef.current) {
440+
clearTimeout(scrollTimeoutRef.current);
441+
}
442+
scrollTimeoutRef.current = setTimeout(() => {
443+
messagesEndRef.current?.scrollIntoView({
444+
behavior: 'instant',
445+
block: 'end',
446+
});
447+
}, 100);
448+
}
449+
return () => {
450+
if (scrollTimeoutRef.current) {
451+
clearTimeout(scrollTimeoutRef.current);
452+
}
453+
};
454+
}, [streamingContent]);
455+
424456
if (messages.length === 0) {
425457
return (
426458
<div
@@ -435,26 +467,59 @@ export function MessageList({
435467
);
436468
}
437469

470+
// Build a unified message list: real messages + optional streaming placeholder.
471+
// The streaming message renders in the same loop as finalized messages,
472+
// so the transition from streaming → final is a seamless in-place update
473+
// instead of a DOM unmount/remount flash.
474+
const displayMessages = useMemo(() => {
475+
if (streamingContent) {
476+
return [
477+
...messages,
478+
{
479+
id: STREAMING_MESSAGE_ID,
480+
role: 'assistant' as const,
481+
content: streamingContent,
482+
status: 'streaming' as const,
483+
} as Message & { status: 'streaming' },
484+
];
485+
}
486+
return messages;
487+
}, [messages, streamingContent]);
488+
489+
const isStreaming = (message: Message) => message.id === STREAMING_MESSAGE_ID;
490+
438491
return (
439492
<div className="h-full overflow-y-auto" data-testid="message-list">
440-
{messages.map(message => (
493+
{displayMessages.map(message => (
441494
<div
442495
key={message.id}
443496
data-role={`${message.role}-message`}
444497
className={cn('group px-6 py-4')}
445498
>
446499
<div className="max-w-3xl mx-auto">
447500
{message.role === 'assistant' ? (
448-
<div data-testid="assistant-message">
501+
<div
502+
data-testid={
503+
isStreaming(message)
504+
? 'streaming-message'
505+
: 'assistant-message'
506+
}
507+
>
449508
<div className="space-y-3">
450509
<MarkdownContent
451-
content={message.content}
452-
showAddButtons={showAddButtons && !message.code}
510+
content={
511+
isStreaming(message)
512+
? message.content + ' ▍'
513+
: message.content
514+
}
515+
showAddButtons={
516+
!isStreaming(message) && showAddButtons && !message.code
517+
}
453518
isWriteDisabled={isWriteDisabled}
454-
className="text-sm text-gray-700 leading-relaxed prose prose-sm max-w-none prose-headings:font-medium prose-h1:text-lg prose-h1:text-gray-900 prose-h1:mb-3 prose-h2:text-base prose-h2:text-gray-900 prose-h2:mb-2 prose-h2:mt-5 prose-h3:text-sm prose-h3:text-gray-900 prose-h3:mb-2 prose-h3:font-semibold prose-p:mb-3 prose-p:last:mb-0 prose-p:text-gray-700 prose-ul:list-disc prose-ul:pl-5 prose-ul:mb-3 prose-ul:space-y-1 prose-ol:list-decimal prose-ol:pl-5 prose-ol:mb-3 prose-ol:space-y-1 prose-li:text-gray-700 prose-strong:font-medium prose-strong:text-gray-900 prose-em:italic prose-a:text-primary-600 prose-a:hover:text-primary-700 prose-a:underline prose-a:font-normal prose-code:px-1.5 prose-code:py-0.5 prose-code:bg-gray-100 prose-code:text-gray-800 prose-code:rounded prose-code:text-xs prose-code:font-mono prose-code:font-normal prose-pre:rounded-md prose-pre:bg-slate-100 prose-pre:border-2 prose-pre:border-slate-200 prose-pre:text-slate-800 prose-pre:p-4 prose-pre:overflow-x-auto prose-pre:text-xs prose-pre:font-mono prose-pre:mb-4"
519+
className={PROSE_CLASSES}
455520
/>
456521

457-
{message.code && (
522+
{!isStreaming(message) && message.code && (
458523
<div className="rounded-lg overflow-hidden border border-gray-200 bg-white">
459524
<div
460525
className={cn(
@@ -524,7 +589,7 @@ export function MessageList({
524589
</div>
525590
)}
526591

527-
{message.status === 'error' && (
592+
{!isStreaming(message) && message.status === 'error' && (
528593
<div
529594
className="flex items-center gap-2 px-3 py-2 rounded-lg bg-red-50 border border-red-200"
530595
data-testid="ai-error-message"
@@ -552,7 +617,7 @@ export function MessageList({
552617
</div>
553618
)}
554619

555-
{message.status === 'processing' && (
620+
{!isStreaming(message) && message.status === 'processing' && (
556621
<div className="flex items-center gap-2 text-gray-600">
557622
<div className="flex items-center gap-1">
558623
<span className="inline-block w-1.5 h-1.5 bg-gray-400 rounded-full animate-bounce" />
@@ -562,45 +627,47 @@ export function MessageList({
562627
</div>
563628
)}
564629

565-
<div className="mt-2 flex items-center gap-2 text-xs text-gray-400">
566-
<span>{formatTimestamp(message.inserted_at)}</span>
567-
<span></span>
568-
<button
569-
type="button"
570-
onClick={() => {
571-
void (async () => {
572-
const success = await doCopy(message.content);
573-
if (success) {
574-
setCopiedMessageId(message.id);
575-
setTimeout(() => setCopiedMessageId(null), 2000);
576-
}
577-
})();
578-
}}
579-
className={cn(
580-
'flex items-center gap-1 transition-colors duration-200',
581-
copiedMessageId === message.id
582-
? 'text-green-600'
583-
: 'text-gray-400 hover:text-gray-600'
584-
)}
585-
title={
586-
copiedMessageId === message.id
587-
? 'Copied!'
588-
: 'Copy message'
589-
}
590-
>
591-
<span
630+
{!isStreaming(message) && (
631+
<div className="mt-2 flex items-center gap-2 text-xs text-gray-400 animate-[fade-in-keys_0.3s_ease-in]">
632+
<span>{formatTimestamp(message.inserted_at)}</span>
633+
<span></span>
634+
<button
635+
type="button"
636+
onClick={() => {
637+
void (async () => {
638+
const success = await doCopy(message.content);
639+
if (success) {
640+
setCopiedMessageId(message.id);
641+
setTimeout(() => setCopiedMessageId(null), 2000);
642+
}
643+
})();
644+
}}
592645
className={cn(
593-
'h-3 w-3',
646+
'flex items-center gap-1 transition-colors duration-200',
594647
copiedMessageId === message.id
595-
? 'hero-check'
596-
: 'hero-clipboard-document'
648+
? 'text-green-600'
649+
: 'text-gray-400 hover:text-gray-600'
597650
)}
598-
/>
599-
<span>
600-
{copiedMessageId === message.id ? 'Copied' : 'Copy'}
601-
</span>
602-
</button>
603-
</div>
651+
title={
652+
copiedMessageId === message.id
653+
? 'Copied!'
654+
: 'Copy message'
655+
}
656+
>
657+
<span
658+
className={cn(
659+
'h-3 w-3',
660+
copiedMessageId === message.id
661+
? 'hero-check'
662+
: 'hero-clipboard-document'
663+
)}
664+
/>
665+
<span>
666+
{copiedMessageId === message.id ? 'Copied' : 'Copy'}
667+
</span>
668+
</button>
669+
</div>
670+
)}
604671
</div>
605672
</div>
606673
) : (
@@ -660,17 +727,22 @@ export function MessageList({
660727
</div>
661728
))}
662729

663-
{isLoading && (
730+
{isLoading && !streamingContent && (
664731
<div
665732
ref={loadingRef}
666733
className="group px-6 py-4"
667734
data-testid="loading-indicator"
668735
>
669736
<div className="max-w-3xl mx-auto">
670-
<div className="flex items-center gap-1.5">
671-
<span className="inline-block w-1.5 h-1.5 bg-gray-400 rounded-full animate-bounce" />
672-
<span className="inline-block w-1.5 h-1.5 bg-gray-400 rounded-full animate-bounce [animation-delay:0.15s]" />
673-
<span className="inline-block w-1.5 h-1.5 bg-gray-400 rounded-full animate-bounce [animation-delay:0.3s]" />
737+
<div className="flex items-center gap-2">
738+
<div className="flex items-center gap-1">
739+
<span className="inline-block w-1.5 h-1.5 bg-gray-400 rounded-full animate-bounce" />
740+
<span className="inline-block w-1.5 h-1.5 bg-gray-400 rounded-full animate-bounce [animation-delay:0.15s]" />
741+
<span className="inline-block w-1.5 h-1.5 bg-gray-400 rounded-full animate-bounce [animation-delay:0.3s]" />
742+
</div>
743+
<span className="text-xs text-gray-400 italic">
744+
{streamingStatus || 'Generating response...'}
745+
</span>
674746
</div>
675747
</div>
676748
</div>

assets/js/collaborative-editor/hooks/useAIAssistant.ts

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,39 @@ export const useAIIsLoading = () => {
104104
return isLoading;
105105
};
106106

107+
/**
108+
* Get streaming content
109+
*/
110+
export const useAIStreamingContent = () => {
111+
const store = useAIStore();
112+
return useSyncExternalStore(
113+
store.subscribe,
114+
store.withSelector(state => state.streamingContent)
115+
);
116+
};
117+
118+
/**
119+
* Get streaming changes (code edits or workflow YAML sent before text streams)
120+
*/
121+
export const useAIStreamingChanges = () => {
122+
const store = useAIStore();
123+
return useSyncExternalStore(
124+
store.subscribe,
125+
store.withSelector(state => state.streamingChanges)
126+
);
127+
};
128+
129+
/**
130+
* Get streaming status
131+
*/
132+
export const useAIStreamingStatus = () => {
133+
const store = useAIStore();
134+
return useSyncExternalStore(
135+
store.subscribe,
136+
store.withSelector(state => state.streamingStatus)
137+
);
138+
};
139+
107140
/**
108141
* Get sending state
109142
*/

0 commit comments

Comments
 (0)