Skip to content

Commit a512ec8

Browse files
Fix streaming text deduplication in CLI and SDK
Implement intelligent text merging and delta calculation to prevent duplicate content in streaming responses. Add XML filter module with tests. 🤖 Generated with Codebuff Co-Authored-By: Codebuff <noreply@codebuff.com>
1 parent 2e36e86 commit a512ec8

File tree

4 files changed

+558
-214
lines changed

4 files changed

+558
-214
lines changed

cli/src/hooks/use-send-message.ts

Lines changed: 231 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,44 @@ const processToolCallBuffer = (
100100
}
101101
}
102102

103+
const mergeTextSegments = (
104+
previous: string,
105+
incoming: string,
106+
): { next: string; delta: string } => {
107+
if (!incoming) {
108+
return { next: previous, delta: '' }
109+
}
110+
if (!previous) {
111+
return { next: incoming, delta: incoming }
112+
}
113+
114+
if (incoming.startsWith(previous)) {
115+
return { next: incoming, delta: incoming.slice(previous.length) }
116+
}
117+
118+
if (previous.includes(incoming)) {
119+
return { next: previous, delta: '' }
120+
}
121+
122+
const maxOverlap = Math.min(previous.length, incoming.length)
123+
for (let overlap = maxOverlap; overlap > 0; overlap--) {
124+
if (
125+
previous.slice(previous.length - overlap) === incoming.slice(0, overlap)
126+
) {
127+
const delta = incoming.slice(overlap)
128+
return {
129+
next: previous + delta,
130+
delta,
131+
}
132+
}
133+
}
134+
135+
return {
136+
next: previous + incoming,
137+
delta: incoming,
138+
}
139+
}
140+
103141
interface UseSendMessageOptions {
104142
setMessages: React.Dispatch<React.SetStateAction<ChatMessage[]>>
105143
setFocusedAgentId: (id: string | null) => void
@@ -144,6 +182,9 @@ export const useSendMessage = ({
144182
const subagentBuffersRef = useRef<
145183
Map<string, { buffer: string; insideToolCall: boolean }>
146184
>(new Map())
185+
const rootStreamBufferRef = useRef('')
186+
const agentStreamAccumulatorsRef = useRef<Map<string, string>>(new Map())
187+
const rootStreamSeenRef = useRef(false)
147188

148189
const updateChainInProgress = useCallback(
149190
(value: boolean) => {
@@ -350,10 +391,18 @@ export const useSendMessage = ({
350391
timestamp: formatTimestamp(),
351392
}
352393

394+
rootStreamBufferRef.current = ''
395+
rootStreamSeenRef.current = false
396+
agentStreamAccumulatorsRef.current = new Map<string, string>()
397+
subagentBuffersRef.current = new Map<
398+
string,
399+
{ buffer: string; insideToolCall: boolean }
400+
>()
401+
353402
const updateAgentContent = (
354403
agentId: string,
355404
update:
356-
| { type: 'text'; content: string }
405+
| { type: 'text'; content: string; replace?: boolean }
357406
| Extract<ContentBlock, { type: 'tool' }>,
358407
) => {
359408
const preview =
@@ -379,13 +428,52 @@ export const useSendMessage = ({
379428
const agentBlocks: ContentBlock[] = block.blocks
380429
? [...block.blocks]
381430
: []
382-
if (update.type === 'text' && update.content) {
431+
if (update.type === 'text') {
432+
const text = update.content ?? ''
433+
const replace = update.replace ?? false
434+
435+
if (replace) {
436+
const updatedBlocks = [...agentBlocks]
437+
let replaced = false
438+
439+
for (let i = updatedBlocks.length - 1; i >= 0; i--) {
440+
const entry = updatedBlocks[i]
441+
if (entry.type === 'text') {
442+
replaced = true
443+
if (entry.content === text && block.content === text) {
444+
logger.info('Agent block text replacement skipped', {
445+
agentId,
446+
preview,
447+
})
448+
return block
449+
}
450+
updatedBlocks[i] = { ...entry, content: text }
451+
break
452+
}
453+
}
454+
455+
if (!replaced) {
456+
updatedBlocks.push({ type: 'text', content: text })
457+
}
458+
459+
logger.info('Agent block text replaced', {
460+
agentId,
461+
length: text.length,
462+
})
463+
return {
464+
...block,
465+
content: text,
466+
blocks: updatedBlocks,
467+
}
468+
}
469+
470+
if (!text) {
471+
return block
472+
}
473+
383474
const lastBlock = agentBlocks[agentBlocks.length - 1]
384475
if (lastBlock && lastBlock.type === 'text') {
385-
if (
386-
update.content &&
387-
lastBlock.content.endsWith(update.content)
388-
) {
476+
if (lastBlock.content.endsWith(text)) {
389477
logger.info('Skipping duplicate agent text append', {
390478
agentId,
391479
preview,
@@ -394,13 +482,13 @@ export const useSendMessage = ({
394482
}
395483
const updatedLastBlock: ContentBlock = {
396484
...lastBlock,
397-
content: lastBlock.content + update.content,
485+
content: lastBlock.content + text,
398486
}
399487
const updatedContent =
400-
(block.content ?? '') + update.content
488+
(block.content ?? '') + text
401489
logger.info('Agent block text appended', {
402490
agentId,
403-
appendedLength: update.content.length,
491+
appendedLength: text.length,
404492
totalLength: updatedContent.length,
405493
})
406494
return {
@@ -410,16 +498,19 @@ export const useSendMessage = ({
410498
}
411499
} else {
412500
const updatedContent =
413-
(block.content ?? '') + update.content
501+
(block.content ?? '') + text
414502
logger.info('Agent block text started', {
415503
agentId,
416-
appendedLength: update.content.length,
504+
appendedLength: text.length,
417505
totalLength: updatedContent.length,
418506
})
419507
return {
420508
...block,
421509
content: updatedContent,
422-
blocks: [...agentBlocks, update],
510+
blocks: [
511+
...agentBlocks,
512+
{ type: 'text', content: text },
513+
],
423514
}
424515
}
425516
} else if (update.type === 'tool') {
@@ -432,9 +523,49 @@ export const useSendMessage = ({
432523
return block
433524
},
434525
)
435-
return { ...msg, blocks: newBlocks }
526+
return { ...msg, blocks: newBlocks }
527+
}
528+
return msg
529+
}),
530+
)
531+
}
532+
533+
const appendRootTextChunk = (delta: string) => {
534+
if (!delta) {
535+
return
536+
}
537+
538+
const fullText = rootStreamBufferRef.current ?? ''
539+
logger.info('appendRootTextChunk invoked', {
540+
chunkLength: delta.length,
541+
fullLength: fullText.length,
542+
preview: delta.slice(0, 100),
543+
})
544+
545+
queueMessageUpdate((prev) =>
546+
prev.map((msg) => {
547+
if (msg.id !== aiMessageId) {
548+
return msg
549+
}
550+
551+
const blocks: ContentBlock[] = msg.blocks ? [...msg.blocks] : []
552+
const lastBlock = blocks[blocks.length - 1]
553+
554+
if (lastBlock && lastBlock.type === 'text') {
555+
const updatedBlock: ContentBlock = {
556+
...lastBlock,
557+
content: lastBlock.content + delta,
558+
}
559+
return {
560+
...msg,
561+
blocks: [...blocks.slice(0, -1), updatedBlock],
562+
}
563+
}
564+
565+
return {
566+
...msg,
567+
blocks: [...blocks, { type: 'text', content: delta }],
436568
}
437-
return msg
438569
}),
439570
)
440571
}
@@ -461,8 +592,31 @@ export const useSendMessage = ({
461592
signal: abortController.signal,
462593

463594
handleStreamChunk: (chunk: any) => {
464-
// Streaming chunks are also sent via text events, so we ignore them here to avoid duplication
465-
// Text events have better handling for tool call filtering
595+
if (typeof chunk !== 'string' || !chunk) {
596+
return
597+
}
598+
599+
if (!hasReceivedContent) {
600+
hasReceivedContent = true
601+
setIsWaitingForResponse(false)
602+
}
603+
604+
const previous = rootStreamBufferRef.current ?? ''
605+
const { next, delta } = mergeTextSegments(previous, chunk)
606+
if (!delta && next === previous) {
607+
return
608+
}
609+
logger.info('handleStreamChunk root delta', {
610+
chunkLength: chunk.length,
611+
previousLength: previous.length,
612+
nextLength: next.length,
613+
preview: chunk.slice(0, 100),
614+
})
615+
rootStreamBufferRef.current = next
616+
rootStreamSeenRef.current = true
617+
if (delta) {
618+
appendRootTextChunk(delta)
619+
}
466620
},
467621

468622
handleEvent: (event: any) => {
@@ -480,18 +634,33 @@ export const useSendMessage = ({
480634
bufferState.buffer += chunk
481635

482636
processToolCallBuffer(bufferState, (text) => {
483-
updateAgentContent(agentId, { type: 'text', content: text })
637+
if (!text) {
638+
return
639+
}
640+
const previous =
641+
agentStreamAccumulatorsRef.current.get(agentId) ?? ''
642+
const { next, delta } = mergeTextSegments(previous, text)
643+
if (!delta && next === previous) {
644+
return
645+
}
646+
agentStreamAccumulatorsRef.current.set(agentId, next)
647+
if (delta) {
648+
updateAgentContent(agentId, { type: 'text', content: delta })
649+
} else {
650+
updateAgentContent(agentId, {
651+
type: 'text',
652+
content: next,
653+
replace: true,
654+
})
655+
}
484656
})
485657
return
486658
}
487659

488660
if (event.type === 'text') {
489-
let text = event.text.replace(
490-
/<codebuff_tool_call>[\s\S]*?<\/codebuff_tool_call>/g,
491-
'',
492-
)
661+
const text = event.text
493662

494-
if (!text) return
663+
if (typeof text !== 'string' || !text) return
495664

496665
if (!hasReceivedContent) {
497666
hasReceivedContent = true
@@ -503,53 +672,50 @@ export const useSendMessage = ({
503672
agentId: event.agentId,
504673
textPreview: text.slice(0, 100),
505674
})
506-
updateAgentContent(event.agentId, {
507-
type: 'text',
508-
content: text,
509-
})
675+
const previous =
676+
agentStreamAccumulatorsRef.current.get(event.agentId) ?? ''
677+
const { next, delta } = mergeTextSegments(previous, text)
678+
if (!delta && next === previous) {
679+
return
680+
}
681+
agentStreamAccumulatorsRef.current.set(event.agentId, next)
682+
683+
if (delta) {
684+
updateAgentContent(event.agentId, {
685+
type: 'text',
686+
content: delta,
687+
})
688+
} else {
689+
updateAgentContent(event.agentId, {
690+
type: 'text',
691+
content: next,
692+
replace: true,
693+
})
694+
}
510695
} else {
696+
if (rootStreamSeenRef.current) {
697+
logger.info('Skipping root text event (stream already handled)', {
698+
textPreview: text.slice(0, 100),
699+
textLength: text.length,
700+
})
701+
return
702+
}
703+
const previous = rootStreamBufferRef.current ?? ''
704+
const { next, delta } = mergeTextSegments(previous, text)
705+
if (!delta && next === previous) {
706+
return
707+
}
511708
logger.info('setMessages: text event without agentId', {
512709
textPreview: text.slice(0, 100),
710+
previousLength: previous.length,
711+
textLength: text.length,
712+
appendedLength: delta.length,
513713
})
514-
queueMessageUpdate((prev) =>
515-
prev.map((msg) => {
516-
if (msg.id !== aiMessageId) {
517-
return msg
518-
}
714+
rootStreamBufferRef.current = next
519715

520-
const blocks: ContentBlock[] = msg.blocks
521-
? [...msg.blocks]
522-
: []
523-
const lastBlock = blocks[blocks.length - 1]
524-
525-
// Deduplicate: if the new text is already at the end of the last block, skip it
526-
if (lastBlock && lastBlock.type === 'text') {
527-
if (lastBlock.content.endsWith(text)) {
528-
logger.info('Skipping duplicate main agent text', {
529-
textPreview: text.slice(0, 100),
530-
})
531-
return msg
532-
}
533-
const updatedTextBlock: ContentBlock = {
534-
type: 'text',
535-
content: lastBlock.content + text,
536-
}
537-
return {
538-
...msg,
539-
blocks: [...blocks.slice(0, -1), updatedTextBlock],
540-
}
541-
}
542-
543-
const newTextBlock: ContentBlock = {
544-
type: 'text',
545-
content: text,
546-
}
547-
return {
548-
...msg,
549-
blocks: [...blocks, newTextBlock],
550-
}
551-
}),
552-
)
716+
if (delta) {
717+
appendRootTextChunk(delta)
718+
}
553719
}
554720
return
555721
}
@@ -810,6 +976,7 @@ export const useSendMessage = ({
810976
event.type === 'subagent-finish'
811977
) {
812978
if (event.agentId) {
979+
agentStreamAccumulatorsRef.current.delete(event.agentId)
813980
removeActiveSubagent(event.agentId)
814981

815982
applyMessageUpdate((prev) =>

0 commit comments

Comments
 (0)