Skip to content

Commit 9f2cc02

Browse files
committed
fix subagent lane fallback issue
1 parent ed9d5ba commit 9f2cc02

2 files changed

Lines changed: 81 additions & 0 deletions

File tree

apps/sim/app/workspace/[workspaceId]/home/hooks/stream/turn-model-serialize.test.ts

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -133,6 +133,49 @@ describe('modelToContentBlocks', () => {
133133
expect(snap2[0].content).toBe('one')
134134
})
135135

136+
it('attributes subagent content that streams before its subagent_start (parallel-burst inversion)', () => {
137+
const sub: Scope = {
138+
lane: 'subagent',
139+
spanId: 'R1',
140+
parentSpanId: 'main',
141+
parentToolCallId: 'tc-r1',
142+
agentId: 'research',
143+
}
144+
const m = createTurnModel()
145+
reduceEvent(m, env(1, 'text', { channel: 'assistant', text: 'Spawning research.' }))
146+
// Under an 8-way burst the subagent's thinking + text can be reduced before
147+
// its subagent_start lands. The content already carries the lane identity.
148+
reduceEvent(m, env(2, 'text', { channel: 'thinking', text: 'Considering odds.' }, sub))
149+
reduceEvent(m, env(3, 'text', { channel: 'assistant', text: 'Team analysis.' }, sub))
150+
151+
// Snapshot mid-burst (before the start): the research content must already be
152+
// its own lane, never leaked into the main ("Sim") lane with its thinking dropped.
153+
const mid = modelToContentBlocks(m)
154+
const midSub = mid.find((b) => b.type === 'subagent')
155+
expect(midSub?.content).toBe('research')
156+
expect(midSub?.spanId).toBe('R1')
157+
expect(mid.find((b) => b.type === 'subagent_thinking')?.spanId).toBe('R1')
158+
expect(mid.filter((b) => b.type === 'text' && b.spanId === 'R1')).toHaveLength(1)
159+
// The main lane holds only the pre-spawn text — nothing leaked in.
160+
const mainText = mid.filter((b) => b.type === 'text' && !b.spanId)
161+
expect(mainText).toHaveLength(1)
162+
expect(mainText[0].content).toBe('Spawning research.')
163+
164+
// The real subagent_start lands afterward and no-ops: still one research lane.
165+
reduceEvent(
166+
m,
167+
env(
168+
4,
169+
'span',
170+
{ kind: 'subagent', event: 'start', agent: 'research', data: { tool_call_id: 'tc-r1' } },
171+
sub
172+
)
173+
)
174+
const after = modelToContentBlocks(m)
175+
expect(after.filter((b) => b.type === 'subagent')).toHaveLength(1)
176+
expect(after.find((b) => b.type === 'subagent')?.content).toBe('research')
177+
})
178+
136179
it('places subagent_end at its end seq (after the lane work), never reordering siblings', () => {
137180
const sub: Scope = {
138181
lane: 'subagent',

apps/sim/app/workspace/[workspaceId]/home/hooks/stream/turn-model.ts

Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -344,6 +344,42 @@ function applyToolResult(
344344
model.bufferedResults.set(id, { success, output, status, ...(error ? { error } : {}) })
345345
}
346346

347+
/**
348+
* Materializes a subagent lane on first reference. Subagent-scoped content
349+
* (text/thinking/tool) can be reduced before its `subagent_start` under heavy
350+
* parallel bursts (many subagents streaming into one ordered channel); without
351+
* the owning `AgentNode` the serializer can't attribute the content, so it leaks
352+
* into the main lane and the subagent's thinking is dropped until the start
353+
* lands. The wire scope already carries the lane identity (Go tags every
354+
* forwarded subagent event with its agent id/span), so the lane is rebuilt
355+
* deterministically from the content event itself — the symmetric counterpart to
356+
* buffering a result before its call. The later `subagent_start` finds this node
357+
* and no-ops.
358+
*/
359+
function ensureSubagentLane(
360+
model: TurnModel,
361+
spanId: string,
362+
scope: { agentId?: string; parentSpanId?: string; parentToolCallId?: string } | undefined,
363+
seq: number,
364+
atMs?: number
365+
): void {
366+
if (spanId === MAIN_SPAN || model.agentBySpanId.has(spanId)) return
367+
const node: AgentNode = {
368+
kind: 'agent',
369+
id: spanId,
370+
spanId,
371+
parentSpanId: scope?.parentSpanId ?? MAIN_SPAN,
372+
agentId: scope?.agentId ?? '',
373+
status: 'running',
374+
seq,
375+
...(atMs !== undefined ? { startedAtMs: atMs } : {}),
376+
...(scope?.parentToolCallId ? { triggerToolCallId: scope.parentToolCallId } : {}),
377+
}
378+
model.nodes.set(node.id, node)
379+
model.order.push(node.id)
380+
model.agentBySpanId.set(spanId, node.id)
381+
}
382+
347383
/**
348384
* Folds one wire envelope into the model. Pure accumulator: it mutates and
349385
* returns the same `model` (the streaming hot path keeps one model per turn).
@@ -364,6 +400,7 @@ export function reduceEvent(model: TurnModel, envelope: PersistedStreamEventEnve
364400
switch (envelope.type) {
365401
case MothershipStreamV1EventType.text: {
366402
const payload = envelope.payload
403+
ensureSubagentLane(model, spanId, scope, seq, tsMs)
367404
appendText(model, spanId, payload.channel as TextChannel, payload.text, seq, tsMs)
368405
break
369406
}
@@ -374,6 +411,7 @@ export function reduceEvent(model: TurnModel, envelope: PersistedStreamEventEnve
374411
const rawToolCallId = asString(payload.toolCallId)
375412
if (!rawToolCallId) break
376413
const toolName = asString(payload.toolName) ?? ''
414+
ensureSubagentLane(model, spanId, scope, seq, tsMs)
377415
const phase = payload.phase
378416
if (phase === MothershipStreamV1ToolPhase.call) {
379417
// edit_content folds into its span's workspace_file row (the write

0 commit comments

Comments
 (0)