Skip to content

Commit b6c3018

Browse files
committed
fix(subagents): address parallel-subagent bugs
1 parent 1a4518a commit b6c3018

8 files changed

Lines changed: 239 additions & 90 deletions

File tree

apps/sim/lib/copilot/request/context/request-context.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,6 @@ export function createStreamingContext(overrides?: Partial<StreamingContext>): S
2020
currentThinkingBlock: null,
2121
subagentThinkingBlocks: new Map(),
2222
isInThinkingBlock: false,
23-
subAgentParentToolCallId: undefined,
24-
subAgentParentStack: [],
2523
subAgentContent: {},
2624
subAgentToolCalls: {},
2725
pendingContent: '',

apps/sim/lib/copilot/request/context/result.test.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,8 +25,6 @@ function makeContext(): StreamingContext {
2525
currentThinkingBlock: null,
2626
subagentThinkingBlocks: new Map(),
2727
isInThinkingBlock: false,
28-
subAgentParentToolCallId: undefined,
29-
subAgentParentStack: [],
3028
subAgentContent: {},
3129
subAgentToolCalls: {},
3230
pendingContent: '',

apps/sim/lib/copilot/request/go/stream.test.ts

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -96,8 +96,6 @@ function createStreamingContext(): StreamingContext {
9696
currentThinkingBlock: null,
9797
subagentThinkingBlocks: new Map(),
9898
isInThinkingBlock: false,
99-
subAgentParentToolCallId: undefined,
100-
subAgentParentStack: [],
10199
subAgentContent: {},
102100
subAgentToolCalls: {},
103101
pendingContent: '',

apps/sim/lib/copilot/request/go/stream.ts

Lines changed: 1 addition & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -392,10 +392,6 @@ export async function runStreamLoop(
392392
flushThinkingBlock(context)
393393
if (spanEvt === MothershipStreamV1SpanLifecycleEvent.start) {
394394
if (toolCallId) {
395-
if (!context.subAgentParentStack.includes(toolCallId)) {
396-
context.subAgentParentStack.push(toolCallId)
397-
}
398-
context.subAgentParentToolCallId = toolCallId
399395
context.subAgentContent[toolCallId] ??= ''
400396
context.subAgentToolCalls[toolCallId] ??= []
401397
}
@@ -424,20 +420,9 @@ export async function runStreamLoop(
424420
if (isPendingPause) {
425421
return
426422
}
427-
if (toolCallId) {
428-
const idx = context.subAgentParentStack.lastIndexOf(toolCallId)
429-
if (idx >= 0) {
430-
context.subAgentParentStack.splice(idx, 1)
431-
} else {
432-
logger.warn('subagent end without matching start', { toolCallId })
433-
}
434-
} else {
423+
if (!toolCallId) {
435424
logger.warn('subagent end missing toolCallId')
436425
}
437-
context.subAgentParentToolCallId =
438-
context.subAgentParentStack.length > 0
439-
? context.subAgentParentStack[context.subAgentParentStack.length - 1]
440-
: undefined
441426
if (toolCallId) {
442427
for (let i = context.contentBlocks.length - 1; i >= 0; i--) {
443428
const b = context.contentBlocks[i]

apps/sim/lib/copilot/request/handlers/handlers.test.ts

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -93,8 +93,6 @@ describe('sse-handlers tool lifecycle', () => {
9393
currentThinkingBlock: null,
9494
subagentThinkingBlocks: new Map(),
9595
isInThinkingBlock: false,
96-
subAgentParentToolCallId: undefined,
97-
subAgentParentStack: [],
9896
subAgentContent: {},
9997
subAgentToolCalls: {},
10098
pendingContent: '',
@@ -462,8 +460,6 @@ describe('sse-handlers tool lifecycle', () => {
462460

463461
it('updates stored params when a subagent generating event is followed by the final tool call', async () => {
464462
executeTool.mockResolvedValueOnce({ success: true, output: { ok: true } })
465-
context.subAgentParentToolCallId = 'parent-1'
466-
context.subAgentParentStack = ['parent-1']
467463
context.toolCalls.set('parent-1', {
468464
id: 'parent-1',
469465
name: 'workflow',
@@ -522,7 +518,6 @@ describe('sse-handlers tool lifecycle', () => {
522518
})
523519

524520
it('routes subagent text using the event scope parent tool call id', async () => {
525-
context.subAgentParentToolCallId = 'wrong-parent'
526521
context.subAgentContent['parent-1'] = ''
527522

528523
await subAgentHandlers.text(
@@ -573,7 +568,6 @@ describe('sse-handlers tool lifecycle', () => {
573568

574569
it('routes subagent tool calls using the event scope parent tool call id', async () => {
575570
executeTool.mockResolvedValueOnce({ success: true, output: { ok: true } })
576-
context.subAgentParentToolCallId = 'wrong-parent'
577571
context.toolCalls.set('parent-1', {
578572
id: 'parent-1',
579573
name: 'deploy',
Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,82 @@
1+
import { describe, expect, it } from 'vitest'
2+
import { createStreamingContext } from '@/lib/copilot/request/context/request-context'
3+
import { makeResumeLegContext, mergeResumeLegOutputs } from '@/lib/copilot/request/lifecycle/run'
4+
5+
// Guards the makeResumeLegContext / mergeResumeLegOutputs contract: the two MUST
6+
// stay in lockstep (every per-leg-isolated scalar is reset on leg creation and
7+
// folded back on merge), and the heavy accumulators stay shared by reference so
8+
// all concurrent legs build one chat. This is the regression the inline comment
9+
// warns about — without per-leg isolation the orchestrator's pre-fanout content
10+
// gets multiplied by the leg count on merge.
11+
describe('resume leg context isolate/merge contract', () => {
12+
it('isolates the per-leg scalars while sharing the heavy accumulators by reference', () => {
13+
const base = createStreamingContext({
14+
accumulatedContent: 'PRE',
15+
finalAssistantContent: 'PRE-FINAL',
16+
usage: { prompt: 10, completion: 5 },
17+
cost: { input: 1, output: 2, total: 3 },
18+
errors: ['pre-existing'],
19+
})
20+
21+
const leg = makeResumeLegContext(base)
22+
23+
// Per-leg scalars reset so a leg accumulates only its OWN output.
24+
expect(leg.accumulatedContent).toBe('')
25+
expect(leg.finalAssistantContent).toBe('')
26+
expect(leg.usage).toBeUndefined()
27+
expect(leg.cost).toBeUndefined()
28+
expect(leg.errors).toEqual([])
29+
expect(leg.streamComplete).toBe(false)
30+
expect(leg.awaitingAsyncContinuation).toBeUndefined()
31+
32+
// A leg's own errors array is a fresh array (not the shared one) so a leg's
33+
// retry rollback can't truncate a sibling's errors.
34+
expect(leg.errors).not.toBe(base.errors)
35+
36+
// Heavy accumulators stay shared by reference (one merged chat).
37+
expect(leg.contentBlocks).toBe(base.contentBlocks)
38+
expect(leg.toolCalls).toBe(base.toolCalls)
39+
expect(leg.pendingToolPromises).toBe(base.pendingToolPromises)
40+
expect(leg.subAgentContent).toBe(base.subAgentContent)
41+
})
42+
43+
it('folds a leg back exactly once (no double-count of the orchestrator content)', () => {
44+
const base = createStreamingContext({ accumulatedContent: 'PRE', errors: ['pre'] })
45+
46+
const leg = makeResumeLegContext(base)
47+
leg.accumulatedContent = 'JOIN'
48+
leg.finalAssistantContent = 'JOIN-FINAL'
49+
leg.usage = { prompt: 100, completion: 50 }
50+
leg.cost = { input: 4, output: 5, total: 9 }
51+
leg.errors.push('leg-err')
52+
53+
mergeResumeLegOutputs(base, leg)
54+
55+
// PRE seeded once + the leg's own output appended once — not PRE+PRE+JOIN.
56+
expect(base.accumulatedContent).toBe('PREJOIN')
57+
expect(base.finalAssistantContent).toBe('JOIN-FINAL')
58+
expect(base.usage).toEqual({ prompt: 100, completion: 50 })
59+
expect(base.cost).toEqual({ input: 4, output: 5, total: 9 })
60+
expect(base.errors).toEqual(['pre', 'leg-err'])
61+
})
62+
63+
it('does not multiply pre-fanout content across many legs (N children + one join leg)', () => {
64+
const base = createStreamingContext({ accumulatedContent: 'PRE' })
65+
66+
// Seven child legs that stream subagent content (not main accumulatedContent)
67+
// contribute nothing to the join scalars; only the join-carrying leg does.
68+
for (let i = 0; i < 7; i++) {
69+
const childLeg = makeResumeLegContext(base)
70+
mergeResumeLegOutputs(base, childLeg)
71+
}
72+
const joinLeg = makeResumeLegContext(base)
73+
joinLeg.accumulatedContent = 'SUMMARY'
74+
joinLeg.usage = { prompt: 1, completion: 1 }
75+
mergeResumeLegOutputs(base, joinLeg)
76+
77+
// Exactly the pre-fanout content + the one join leg's summary — the 7 child
78+
// legs must not each re-append 'PRE'.
79+
expect(base.accumulatedContent).toBe('PRESUMMARY')
80+
expect(base.usage).toEqual({ prompt: 1, completion: 1 })
81+
})
82+
})

0 commit comments

Comments
 (0)