Skip to content

Commit f448134

Browse files
committed
Subagent tool call persistence
1 parent 7b53137 commit f448134

File tree

9 files changed

+170
-11
lines changed

9 files changed

+170
-11
lines changed

apps/sim/app/api/mothership/chat/route.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,7 @@ export async function POST(req: NextRequest) {
206206
: 'error'
207207
: block.toolCall.status,
208208
result: block.toolCall.result,
209+
...(block.calledBy ? { calledBy: block.calledBy } : {}),
209210
}
210211
}
211212
return stored

apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts

Lines changed: 24 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ function mapStoredBlock(block: TaskStoredContentBlock): ContentBlock {
7070
name: block.toolCall.name ?? 'unknown',
7171
status: STATE_TO_STATUS[block.toolCall.state ?? ''] ?? 'success',
7272
displayTitle: block.toolCall.display?.text,
73+
calledBy: block.toolCall.calledBy,
7374
result: block.toolCall.result,
7475
}
7576
}
@@ -383,11 +384,22 @@ export function useChat(workspaceId: string, initialChatId?: string): UseChatRet
383384
let resource: MothershipResource | null = null
384385

385386
if (toolName === 'user_table') {
386-
resource = extractTableResource(parsed, storedArgs, lastTableId)
387-
if (resource) {
388-
lastTableId = resource.id
389-
queryClient.invalidateQueries({ queryKey: tableKeys.detail(resource.id) })
390-
queryClient.invalidateQueries({ queryKey: tableKeys.rowsRoot(resource.id) })
387+
const redirected = extractFunctionExecuteResource(parsed, storedArgs)
388+
if (redirected?.type === 'file') {
389+
resource = redirected
390+
queryClient.invalidateQueries({
391+
queryKey: workspaceFilesKeys.list(workspaceId),
392+
})
393+
queryClient.invalidateQueries({
394+
queryKey: workspaceFilesKeys.content(workspaceId, resource.id),
395+
})
396+
} else {
397+
resource = extractTableResource(parsed, storedArgs, lastTableId)
398+
if (resource) {
399+
lastTableId = resource.id
400+
queryClient.invalidateQueries({ queryKey: tableKeys.detail(resource.id) })
401+
queryClient.invalidateQueries({ queryKey: tableKeys.rowsRoot(resource.id) })
402+
}
391403
}
392404
} else if (toolName === 'workspace_file') {
393405
resource = extractFileResource(parsed, storedArgs)
@@ -415,6 +427,13 @@ export function useChat(workspaceId: string, initialChatId?: string): UseChatRet
415427
})
416428
}
417429
}
430+
} else if (toolName === 'read') {
431+
resource = extractFunctionExecuteResource(parsed, storedArgs)
432+
if (resource?.type === 'table') {
433+
lastTableId = resource.id
434+
queryClient.invalidateQueries({ queryKey: tableKeys.detail(resource.id) })
435+
queryClient.invalidateQueries({ queryKey: tableKeys.rowsRoot(resource.id) })
436+
}
418437
} else if (toolName === 'create_workflow' || toolName === 'edit_workflow') {
419438
resource = extractWorkflowResource(parsed, lastWorkflowId)
420439
if (resource) {

apps/sim/app/workspace/[workspaceId]/home/utils.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ export const RESOURCE_TOOL_NAMES = new Set([
77
'create_workflow',
88
'edit_workflow',
99
'function_execute',
10+
'read',
1011
])
1112

1213
function getResultData(parsed: SSEPayload): Record<string, unknown> | undefined {
@@ -138,13 +139,21 @@ export function extractResourcesFromHistory(messages: TaskStoredMessage[]): Moth
138139

139140
let resource: MothershipResource | null = null
140141
if (tc.name === 'user_table') {
141-
resource = extractTableResource(payload, args, lastTableId)
142-
if (resource) lastTableId = resource.id
142+
const redirected = extractFunctionExecuteResource(payload, args)
143+
if (redirected?.type === 'file') {
144+
resource = redirected
145+
} else {
146+
resource = extractTableResource(payload, args, lastTableId)
147+
if (resource) lastTableId = resource.id
148+
}
143149
} else if (tc.name === 'workspace_file') {
144150
resource = extractFileResource(payload, args)
145151
} else if (tc.name === 'function_execute') {
146152
resource = extractFunctionExecuteResource(payload, args)
147153
if (resource?.type === 'table') lastTableId = resource.id
154+
} else if (tc.name === 'read') {
155+
resource = extractFunctionExecuteResource(payload, args)
156+
if (resource?.type === 'table') lastTableId = resource.id
148157
} else if (tc.name === 'create_workflow' || tc.name === 'edit_workflow') {
149158
resource = extractWorkflowResource(payload, lastWorkflowId)
150159
if (resource) lastWorkflowId = resource.id

apps/sim/hooks/queries/tasks.ts

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ export interface TaskStoredContentBlock {
4141
params?: Record<string, unknown>
4242
result?: { success: boolean; output?: unknown; error?: string }
4343
display?: { text?: string }
44+
calledBy?: string
4445
} | null
4546
}
4647

apps/sim/lib/copilot/orchestrator/sse/handlers/handlers.ts

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -484,6 +484,12 @@ export const subAgentHandlers: Record<string, SSEHandler> = {
484484
}
485485
if (!context.toolCalls.has(toolCallId)) {
486486
context.toolCalls.set(toolCallId, toolCall)
487+
const parentToolCall = context.toolCalls.get(parentToolCallId)
488+
addContentBlock(context, {
489+
type: 'tool_call',
490+
toolCall,
491+
calledBy: parentToolCall?.name,
492+
})
487493
}
488494

489495
if (isPartial) return

apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts

Lines changed: 114 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -206,6 +206,119 @@ async function maybeWriteOutputToTable(
206206
}
207207
}
208208

209+
async function maybeWriteReadCsvToTable(
210+
toolName: string,
211+
params: Record<string, unknown> | undefined,
212+
result: ToolCallResult,
213+
context: ExecutionContext
214+
): Promise<ToolCallResult> {
215+
if (toolName !== 'read') return result
216+
if (!result.success || !result.output) return result
217+
if (!context.workspaceId || !context.userId) return result
218+
219+
const outputTable = params?.outputTable as string | undefined
220+
if (!outputTable) return result
221+
222+
try {
223+
const table = await getTableById(outputTable)
224+
if (!table) {
225+
return { success: false, error: `Table "${outputTable}" not found` }
226+
}
227+
228+
const output = result.output as Record<string, unknown>
229+
const content = (output.content as string) || ''
230+
if (!content.trim()) {
231+
return { success: false, error: 'File has no content to import into table' }
232+
}
233+
234+
const filePath = (params?.path as string) || ''
235+
const ext = filePath.split('.').pop()?.toLowerCase()
236+
237+
let rows: Record<string, unknown>[]
238+
239+
if (ext === 'json') {
240+
const parsed = JSON.parse(content)
241+
if (!Array.isArray(parsed)) {
242+
return {
243+
success: false,
244+
error: 'JSON file must contain an array of objects for table import',
245+
}
246+
}
247+
rows = parsed
248+
} else {
249+
const { parse } = await import('csv-parse/sync')
250+
rows = parse(content, {
251+
columns: true,
252+
skip_empty_lines: true,
253+
trim: true,
254+
relax_column_count: true,
255+
relax_quotes: true,
256+
skip_records_with_error: true,
257+
cast: false,
258+
}) as Record<string, unknown>[]
259+
}
260+
261+
if (rows.length === 0) {
262+
return { success: false, error: 'File has no data rows to import' }
263+
}
264+
265+
if (rows.length > MAX_OUTPUT_TABLE_ROWS) {
266+
return {
267+
success: false,
268+
error: `Row limit exceeded: got ${rows.length}, max is ${MAX_OUTPUT_TABLE_ROWS}`,
269+
}
270+
}
271+
272+
await db.transaction(async (tx) => {
273+
await tx.delete(userTableRows).where(eq(userTableRows.tableId, outputTable))
274+
275+
const now = new Date()
276+
for (let i = 0; i < rows.length; i += BATCH_CHUNK_SIZE) {
277+
const chunk = rows.slice(i, i + BATCH_CHUNK_SIZE)
278+
const values = chunk.map((rowData, j) => ({
279+
id: `row_${crypto.randomUUID().replace(/-/g, '')}`,
280+
tableId: outputTable,
281+
workspaceId: context.workspaceId!,
282+
data: rowData,
283+
position: i + j,
284+
createdAt: now,
285+
updatedAt: now,
286+
createdBy: context.userId,
287+
}))
288+
await tx.insert(userTableRows).values(values)
289+
}
290+
})
291+
292+
logger.info('Read output written to table', {
293+
toolName,
294+
tableId: outputTable,
295+
tableName: table.name,
296+
rowCount: rows.length,
297+
filePath,
298+
})
299+
300+
return {
301+
success: true,
302+
output: {
303+
message: `Imported ${rows.length} rows from "${filePath}" into table "${table.name}"`,
304+
tableId: outputTable,
305+
tableName: table.name,
306+
rowCount: rows.length,
307+
},
308+
}
309+
} catch (err) {
310+
logger.warn('Failed to write read output to table', {
311+
toolName,
312+
outputTable,
313+
error: err instanceof Error ? err.message : String(err),
314+
})
315+
return {
316+
success: false,
317+
error: `Failed to import into table: ${err instanceof Error ? err.message : String(err)}`,
318+
}
319+
}
320+
}
321+
209322
export async function executeToolAndReport(
210323
toolCallId: string,
211324
context: StreamingContext,
@@ -230,6 +343,7 @@ export async function executeToolAndReport(
230343
let result = await executeToolServerSide(toolCall, execContext)
231344
result = await maybeWriteOutputToFile(toolCall.name, toolCall.params, result, execContext)
232345
result = await maybeWriteOutputToTable(toolCall.name, toolCall.params, result, execContext)
346+
result = await maybeWriteReadCsvToTable(toolCall.name, toolCall.params, result, execContext)
233347
toolCall.status = result.success ? 'success' : 'error'
234348
toolCall.result = result
235349
toolCall.error = result.error

apps/sim/lib/copilot/orchestrator/stream/core.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -176,12 +176,20 @@ export async function runStreamLoop(
176176
if (normalizedEvent.type === 'subagent_start') {
177177
const eventData = normalizedEvent.data as Record<string, unknown> | undefined
178178
const toolCallId = eventData?.tool_call_id as string | undefined
179+
const subagentName = normalizedEvent.subagent || (eventData?.agent as string | undefined)
179180
if (toolCallId) {
180181
context.subAgentParentStack.push(toolCallId)
181182
context.subAgentParentToolCallId = toolCallId
182183
context.subAgentContent[toolCallId] = ''
183184
context.subAgentToolCalls[toolCallId] = []
184185
}
186+
if (subagentName) {
187+
context.contentBlocks.push({
188+
type: 'subagent',
189+
content: subagentName,
190+
timestamp: Date.now(),
191+
})
192+
}
185193
continue
186194
}
187195

apps/sim/lib/copilot/orchestrator/types.ts

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -59,12 +59,13 @@ export interface ToolCallResult<T = unknown> {
5959
error?: string
6060
}
6161

62-
export type ContentBlockType = 'text' | 'thinking' | 'tool_call' | 'subagent_text'
62+
export type ContentBlockType = 'text' | 'thinking' | 'tool_call' | 'subagent_text' | 'subagent'
6363

6464
export interface ContentBlock {
6565
type: ContentBlockType
6666
content?: string
6767
toolCall?: ToolCallState
68+
calledBy?: string
6869
timestamp: number
6970
}
7071

apps/sim/lib/file-parsers/csv-parser.ts

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -37,9 +37,9 @@ export class CsvParser implements FileParser {
3737
`Parsing CSV buffer, size: ${bufferSize} bytes (${(bufferSize / 1024 / 1024).toFixed(2)} MB)`
3838
)
3939

40-
const stream = Readable.from(buffer, {
41-
highWaterMark: CONFIG.STREAM_CHUNK_SIZE,
42-
})
40+
const stream = new Readable({ read() {} })
41+
stream.push(buffer)
42+
stream.push(null)
4343

4444
return this.parseStream(stream)
4545
}

0 commit comments

Comments
 (0)