Skip to content

Commit 6db8bc4

Browse files
committed
Add piping
1 parent 9ca539b commit 6db8bc4

File tree

9 files changed

+288
-3
lines changed

9 files changed

+288
-3
lines changed

apps/sim/app/api/function/execute/route.ts

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -610,6 +610,7 @@ export async function POST(req: NextRequest) {
610610
workflowVariables = {},
611611
workflowId,
612612
isCustomTool = false,
613+
_sandboxFiles,
613614
} = body
614615

615616
const executionParams = { ...params }
@@ -722,6 +723,7 @@ export async function POST(req: NextRequest) {
722723
code: codeForE2B,
723724
language: CodeLanguage.JavaScript,
724725
timeoutMs: timeout,
726+
sandboxFiles: _sandboxFiles,
725727
})
726728
const executionTime = Date.now() - execStart
727729
stdout += e2bStdout
@@ -785,6 +787,7 @@ export async function POST(req: NextRequest) {
785787
code: codeForE2B,
786788
language: CodeLanguage.Python,
787789
timeoutMs: timeout,
790+
sandboxFiles: _sandboxFiles,
788791
})
789792
const executionTime = Date.now() - execStart
790793
stdout += e2bStdout

apps/sim/app/workspace/[workspaceId]/home/hooks/use-chat.ts

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ import type {
2626
import { SUBAGENT_LABELS } from '../types'
2727
import {
2828
extractFileResource,
29+
extractFunctionExecuteResource,
2930
extractResourcesFromHistory,
3031
extractTableResource,
3132
extractWorkflowResource,
@@ -361,6 +362,22 @@ export function useChat(workspaceId: string, initialChatId?: string): UseChatRet
361362
queryKey: workspaceFilesKeys.content(workspaceId, resource.id),
362363
})
363364
}
365+
} else if (toolName === 'function_execute') {
366+
resource = extractFunctionExecuteResource(parsed, storedArgs)
367+
if (resource) {
368+
if (resource.type === 'table') {
369+
lastTableId = resource.id
370+
queryClient.invalidateQueries({ queryKey: tableKeys.detail(resource.id) })
371+
queryClient.invalidateQueries({ queryKey: tableKeys.rowsRoot(resource.id) })
372+
} else if (resource.type === 'file') {
373+
queryClient.invalidateQueries({
374+
queryKey: workspaceFilesKeys.list(workspaceId),
375+
})
376+
queryClient.invalidateQueries({
377+
queryKey: workspaceFilesKeys.content(workspaceId, resource.id),
378+
})
379+
}
380+
}
364381
} else if (toolName === 'create_workflow' || toolName === 'edit_workflow') {
365382
resource = extractWorkflowResource(parsed, lastWorkflowId)
366383
if (resource) {

apps/sim/app/workspace/[workspaceId]/home/utils.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ export const RESOURCE_TOOL_NAMES = new Set([
66
'workspace_file',
77
'create_workflow',
88
'edit_workflow',
9+
'function_execute',
910
])
1011

1112
function getResultData(parsed: SSEPayload): Record<string, unknown> | undefined {
@@ -60,6 +61,34 @@ export function extractFileResource(
6061
return null
6162
}
6263

64+
export function extractFunctionExecuteResource(
65+
parsed: SSEPayload,
66+
storedArgs: Record<string, unknown> | undefined
67+
): MothershipResource | null {
68+
const topResult = (parsed.result ??
69+
(typeof parsed.data === 'object' ? parsed.data?.result : undefined)) as
70+
| Record<string, unknown>
71+
| undefined
72+
73+
if (topResult?.tableId) {
74+
return {
75+
type: 'table',
76+
id: topResult.tableId as string,
77+
title: (topResult.tableName as string) || 'Table',
78+
}
79+
}
80+
81+
if (topResult?.fileId) {
82+
return {
83+
type: 'file',
84+
id: topResult.fileId as string,
85+
title: (topResult.fileName as string) || 'File',
86+
}
87+
}
88+
89+
return null
90+
}
91+
6392
export function extractWorkflowResource(
6493
parsed: SSEPayload,
6594
fallbackWorkflowId: string | null
@@ -113,6 +142,9 @@ export function extractResourcesFromHistory(messages: TaskStoredMessage[]): Moth
113142
if (resource) lastTableId = resource.id
114143
} else if (tc.name === 'workspace_file') {
115144
resource = extractFileResource(payload, args)
145+
} else if (tc.name === 'function_execute') {
146+
resource = extractFunctionExecuteResource(payload, args)
147+
if (resource?.type === 'table') lastTableId = resource.id
116148
} else if (tc.name === 'create_workflow' || tc.name === 'edit_workflow') {
117149
resource = extractWorkflowResource(payload, lastWorkflowId)
118150
if (resource) lastWorkflowId = resource.id

apps/sim/lib/copilot/orchestrator/sse/handlers/tool-execution.ts

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1+
import { db } from '@sim/db'
2+
import { userTableRows } from '@sim/db/schema'
13
import { createLogger } from '@sim/logger'
4+
import { eq } from 'drizzle-orm'
25
import {
36
TOOL_DECISION_INITIAL_POLL_MS,
47
TOOL_DECISION_MAX_POLL_MS,
@@ -18,6 +21,7 @@ import type {
1821
StreamingContext,
1922
ToolCallResult,
2023
} from '@/lib/copilot/orchestrator/types'
24+
import { getTableById } from '@/lib/table/service'
2125
import { uploadWorkspaceFile } from '@/lib/uploads/contexts/workspace/workspace-file-manager'
2226

2327
const logger = createLogger('CopilotSseToolExecution')
@@ -94,6 +98,114 @@ async function maybeWriteOutputToFile(
9498
}
9599
}
96100

101+
const MAX_OUTPUT_TABLE_ROWS = 10_000
102+
const BATCH_CHUNK_SIZE = 500
103+
104+
async function maybeWriteOutputToTable(
105+
toolName: string,
106+
params: Record<string, unknown> | undefined,
107+
result: ToolCallResult,
108+
context: ExecutionContext
109+
): Promise<ToolCallResult> {
110+
if (toolName !== 'function_execute') return result
111+
if (!result.success || !result.output) return result
112+
if (!context.workspaceId || !context.userId) return result
113+
114+
const outputTable = params?.outputTable as string | undefined
115+
if (!outputTable) return result
116+
117+
try {
118+
const table = await getTableById(outputTable)
119+
if (!table) {
120+
return {
121+
success: false,
122+
error: `Table "${outputTable}" not found`,
123+
}
124+
}
125+
126+
const rawOutput = result.output
127+
let rows: Array<Record<string, unknown>>
128+
129+
if (rawOutput && typeof rawOutput === 'object' && 'result' in rawOutput) {
130+
const inner = (rawOutput as Record<string, unknown>).result
131+
if (Array.isArray(inner)) {
132+
rows = inner
133+
} else {
134+
return {
135+
success: false,
136+
error: 'outputTable requires the code to return an array of objects',
137+
}
138+
}
139+
} else if (Array.isArray(rawOutput)) {
140+
rows = rawOutput
141+
} else {
142+
return {
143+
success: false,
144+
error: 'outputTable requires the code to return an array of objects',
145+
}
146+
}
147+
148+
if (rows.length > MAX_OUTPUT_TABLE_ROWS) {
149+
return {
150+
success: false,
151+
error: `outputTable row limit exceeded: got ${rows.length}, max is ${MAX_OUTPUT_TABLE_ROWS}`,
152+
}
153+
}
154+
155+
if (rows.length === 0) {
156+
return {
157+
success: false,
158+
error: 'outputTable requires at least one row — code returned an empty array',
159+
}
160+
}
161+
162+
await db.transaction(async (tx) => {
163+
await tx.delete(userTableRows).where(eq(userTableRows.tableId, outputTable))
164+
165+
const now = new Date()
166+
for (let i = 0; i < rows.length; i += BATCH_CHUNK_SIZE) {
167+
const chunk = rows.slice(i, i + BATCH_CHUNK_SIZE)
168+
const values = chunk.map((rowData, j) => ({
169+
id: `row_${crypto.randomUUID().replace(/-/g, '')}`,
170+
tableId: outputTable,
171+
workspaceId: context.workspaceId!,
172+
data: rowData,
173+
position: i + j,
174+
createdAt: now,
175+
updatedAt: now,
176+
createdBy: context.userId,
177+
}))
178+
await tx.insert(userTableRows).values(values)
179+
}
180+
})
181+
182+
logger.info('Tool output written to table', {
183+
toolName,
184+
tableId: outputTable,
185+
rowCount: rows.length,
186+
})
187+
188+
return {
189+
success: true,
190+
output: {
191+
message: `Wrote ${rows.length} rows to table ${outputTable}`,
192+
tableId: outputTable,
193+
rowCount: rows.length,
194+
},
195+
}
196+
} catch (err) {
197+
logger.warn('Failed to write tool output to table', {
198+
toolName,
199+
outputTable,
200+
error: err instanceof Error ? err.message : String(err),
201+
})
202+
return {
203+
success: false,
204+
error: `Failed to write to table: ${err instanceof Error ? err.message : String(err)}`,
205+
}
206+
}
207+
}
208+
97209
export async function executeToolAndReport(
98210
toolCallId: string,
99211
context: StreamingContext,
@@ -117,6 +229,7 @@ export async function executeToolAndReport(
117229
try {
118230
let result = await executeToolServerSide(toolCall, execContext)
119231
result = await maybeWriteOutputToFile(toolCall.name, toolCall.params, result, execContext)
232+
result = await maybeWriteOutputToTable(toolCall.name, toolCall.params, result, execContext)
120233
toolCall.status = result.success ? 'success' : 'error'
121234
toolCall.result = result
122235
toolCall.error = result.error

apps/sim/lib/copilot/orchestrator/tool-executor/integration-tools.ts

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,11 @@ import { generateRequestId } from '@/lib/core/utils/request'
1212
import { getCredentialActorContext } from '@/lib/credentials/access'
1313
import { getAccessibleOAuthCredentials } from '@/lib/credentials/environment'
1414
import { getEffectiveDecryptedEnv } from '@/lib/environment/utils'
15+
import { getTableById, queryRows } from '@/lib/table/service'
16+
import {
17+
downloadWorkspaceFile,
18+
listWorkspaceFiles,
19+
} from '@/lib/uploads/contexts/workspace/workspace-file-manager'
1520
import { getWorkflowById } from '@/lib/workflows/utils'
1621
import { refreshTokenIfNeeded } from '@/app/api/auth/oauth/utils'
1722
import { resolveEnvVarReferences } from '@/executor/utils/reference-validation'
@@ -143,6 +148,97 @@ export async function executeIntegrationToolDirect(
143148
executionParams.blockNameMapping = {}
144149
executionParams.language = executionParams.language || 'javascript'
145150
executionParams.timeout = executionParams.timeout || 30000
151+
152+
if (isHosted && workspaceId) {
153+
const sandboxFiles: Array<{ path: string; content: string }> = []
154+
const MAX_FILE_SIZE = 10 * 1024 * 1024
155+
const MAX_TOTAL_SIZE = 50 * 1024 * 1024
156+
const TEXT_EXTENSIONS = new Set([
157+
'csv',
158+
'json',
159+
'txt',
160+
'md',
161+
'html',
162+
'xml',
163+
'tsv',
164+
'yaml',
165+
'yml',
166+
])
167+
let totalSize = 0
168+
169+
const inputFilePaths = executionParams.inputFiles as string[] | undefined
170+
if (inputFilePaths?.length) {
171+
const allFiles = await listWorkspaceFiles(workspaceId)
172+
for (const filePath of inputFilePaths) {
173+
const fileName = filePath.replace(/^files\//, '')
174+
const ext = fileName.split('.').pop()?.toLowerCase() ?? ''
175+
if (!TEXT_EXTENSIONS.has(ext)) {
176+
logger.warn('Skipping non-text sandbox input file', { fileName, ext })
177+
continue
178+
}
179+
const record = allFiles.find(
180+
(f) => f.name === fileName || f.name.normalize('NFC') === fileName.normalize('NFC')
181+
)
182+
if (!record) {
183+
logger.warn('Sandbox input file not found', { fileName })
184+
continue
185+
}
186+
if (record.size > MAX_FILE_SIZE) {
187+
logger.warn('Sandbox input file exceeds size limit', { fileName, size: record.size })
188+
continue
189+
}
190+
if (totalSize + record.size > MAX_TOTAL_SIZE) {
191+
logger.warn('Sandbox input total size limit reached, skipping remaining files')
192+
break
193+
}
194+
const buffer = await downloadWorkspaceFile(record)
195+
totalSize += buffer.length
196+
sandboxFiles.push({ path: `/home/user/${fileName}`, content: buffer.toString('utf-8') })
197+
}
198+
}
199+
200+
const inputTableIds = executionParams.inputTables as string[] | undefined
201+
if (inputTableIds?.length) {
202+
for (const tableId of inputTableIds) {
203+
const table = await getTableById(tableId)
204+
if (!table) {
205+
logger.warn('Sandbox input table not found', { tableId })
206+
continue
207+
}
208+
const { rows } = await queryRows(tableId, workspaceId, { limit: 10000 }, 'sandbox-input')
209+
const cols = (table.schema as { columns: Array<{ name: string }> }).columns.map(
210+
(c) => c.name
211+
)
212+
const csvLines = [cols.join(',')]
213+
for (const row of rows) {
214+
csvLines.push(
215+
cols
216+
.map((c) => JSON.stringify((row.data as Record<string, unknown>)[c] ?? ''))
217+
.join(',')
218+
)
219+
}
220+
const csvContent = csvLines.join('\n')
221+
if (totalSize + csvContent.length > MAX_TOTAL_SIZE) {
222+
logger.warn('Sandbox input total size limit reached, skipping remaining tables')
223+
break
224+
}
225+
totalSize += csvContent.length
226+
sandboxFiles.push({ path: `/home/user/tables/${tableId}.csv`, content: csvContent })
227+
}
228+
}
229+
230+
if (sandboxFiles.length > 0) {
231+
executionParams._sandboxFiles = sandboxFiles
232+
logger.info('Prepared sandbox input files', {
233+
fileCount: sandboxFiles.length,
234+
totalSize,
235+
paths: sandboxFiles.map((f) => f.path),
236+
})
237+
}
238+
239+
executionParams.inputFiles = undefined
240+
executionParams.inputTables = undefined
241+
}
146242
}
147243

148244
const result = await executeTool(toolName, executionParams)

apps/sim/lib/core/config/feature-flags.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,8 +22,8 @@ export const isTest = env.NODE_ENV === 'test'
2222
* Is this the hosted version of the application
2323
*/
2424
export const isHosted =
25-
getEnv('NEXT_PUBLIC_APP_URL') === 'https://www.sim.ai' ||
26-
getEnv('NEXT_PUBLIC_APP_URL') === 'https://www.staging.sim.ai'
25+
getEnv('NEXT_PUBLIC_APP_URL') === 'https://www.sim.ai' ||
26+
getEnv('NEXT_PUBLIC_APP_URL') === 'https://www.staging.sim.ai'
2727

2828
/**
2929
* Is billing enforcement enabled

0 commit comments

Comments
 (0)