Skip to content

Commit feca5fa

Browse files
improvement(execution, connectors): offload large function inputs, increase connector limits + better error propagation (#5089)
* fix(execution,connectors): offload large function inputs; harden KB connector size limits Addresses a class of 10 MB limit failures: - executor/variables: offload over-budget function block-output context values to durable large-value refs (lazy `sim.values.read`) so JS function blocks can merge medium files without exceeding the 10 MB inter-block request-body cap. - connectors: stream downloads via `readBodyWithLimit` (memory-safe), and surface oversized files as visible `failed` KB documents instead of silently dropping them — listing-time for github/s3/dropbox/onedrive/sharepoint, fetch-time for gitlab/azure/google-drive via a shared `ConnectorFileTooLargeError`. Raise the per-file cap from a hardcoded 10 MB to the canonical 100 MB KB document limit (`CONNECTOR_MAX_FILE_BYTES`), except Google Drive's export path (Google's hard 10 MB export-API limit). - sync-engine: `classifyExternalDoc` + bulk `skipDocuments` (failed rows with a reason, excluded from retry), byte-bounded batch concurrency to cap peak worker memory at the raised cap, and a `metadata.fileSize ?? size` fallback. * fix zoom * update skill * address comments + fix terminal event in sse stream * fix accounting issue
1 parent cc56408 commit feca5fa

19 files changed

Lines changed: 1340 additions & 163 deletions

File tree

.agents/skills/memory-load-check/SKILL.md

Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -49,10 +49,35 @@ Read these when doing a deeper pass:
4949
- cap downloads and parsed output separately
5050
- preserve partial results when a later item exceeds the cap
5151
- never read untrusted response bodies without a byte cap
52+
- KB connector file downloads in `apps/sim/connectors/utils.ts`
53+
- `CONNECTOR_MAX_FILE_BYTES`: shared per-file cap (aligned with the manual KB upload limit)
54+
- `readBodyWithLimit`: stream a download body to a Buffer with a hard byte cap (null on overflow)
55+
- `stubOrSkipBySize`: listing-time skip when the reported size exceeds the cap
56+
- `markSkipped` / `sizeLimitSkipReason`: surface oversized files as failed (skipped) KB rows
57+
- `ConnectorFileTooLargeError`: thrown mid-download when the listing under-reported size
5258
- Large workflow value payloads
5359
- prefer durable references/manifests over inlining large arrays or files
5460
- materialize refs only behind an explicit byte budget
5561

62+
## KB Connector File Size Handling
63+
64+
The connector size pattern in `apps/sim/connectors/utils.ts` (`CONNECTOR_MAX_FILE_BYTES` + `readBodyWithLimit` + `stubOrSkipBySize`/`markSkipped`) exists for one risk: a knowledge-base connector downloading **arbitrary, user-controlled file bytes** that the source does not hard-cap. Apply it by that risk, not by the connector's name.
65+
66+
Use the pattern when the connector downloads file content via a stream/`download_url` where the user controls the size:
67+
- file-storage connectors: Dropbox, OneDrive, SharePoint, Google Drive, S3, GitHub, GitLab, Azure DevOps
68+
- any connector that fetches a file via a download URL even if it is not a "storage" service (e.g. the Zoom transcript `.vtt`)
69+
70+
For those, require all three:
71+
- stream the body with `readBodyWithLimit(resp, CONNECTOR_MAX_FILE_BYTES)` — never raw `response.text()`/`response.arrayBuffer()`
72+
- skip oversize at listing (`stubOrSkipBySize` with the reported size) and again at fetch time (overflow -> `markSkipped`), since the listing size can be missing or under-reported
73+
- never drop/truncate silently — oversized files become content-less failed rows carrying `skippedReason`, so they stay visible in the KB UI instead of vanishing from the index
74+
75+
Skip the pattern when the source already bounds the payload:
76+
- pure API/structured-data connectors (Jira, Linear, Notion, Confluence, Sentry, Slack, Zendesk, Gmail, ...) — paginated JSON/text; apply normal pagination + concurrency bounds instead of a per-file byte cap
77+
- native-document connectors capped by the platform (Google Docs ~50 MB, Google Sheets via `MAX_ROWS`, Evernote ~25 MB/note) — a 100 MB cap can never fire, and wrapping a `response.json()`/Thrift parse in `readBodyWithLimit` is cargo-culting
78+
79+
Litmus test: "Can a user make this one fetch arbitrarily large, with nothing upstream stopping it?" Yes -> use the pattern. No (platform hard-cap, or already paginated) -> a per-file byte cap adds noise, not safety. Borderline: a user-configured/self-hosted endpoint with no platform cap (e.g. Obsidian) — bound it only if the content is genuinely unbounded.
80+
5681
## Review Workflow
5782

5883
1. Identify every changed data source:
@@ -96,6 +121,7 @@ Read these when doing a deeper pass:
96121
- fetches all pages from an external API before processing
97122
- reads an entire file, HTTP response, or stream without a max byte budget
98123
- checks size only after `Buffer.concat`, `arrayBuffer`, `text`, `JSON.parse`, or parse expansion
124+
- a KB connector silently drops or truncates an oversized file instead of recording it as a failed (skipped) row
99125
- chunks only after loading the complete dataset
100126
- paginates with unbounded/deep `OFFSET` on a mutable or large table
101127
- creates one queue job per row without batching or a queue-level concurrency key

apps/sim/app/api/workflows/[id]/execute/route.ts

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1235,12 +1235,28 @@ async function handleExecutePost(
12351235
const isBuffered = event.type !== 'stream:chunk' && event.type !== 'stream:done'
12361236
let eventToSend = event
12371237
if (isBuffered) {
1238-
const entry = terminalStatus
1239-
? await eventWriter.writeTerminal(event, terminalStatus)
1240-
: await eventWriter.write(event)
1241-
eventToSend = entry.event
1242-
eventToSend.eventId = entry.eventId
1243-
terminalEventPublished ||= Boolean(terminalStatus)
1238+
try {
1239+
const entry = terminalStatus
1240+
? await eventWriter.writeTerminal(event, terminalStatus)
1241+
: await eventWriter.write(event)
1242+
eventToSend = entry.event
1243+
eventToSend.eventId = entry.eventId
1244+
terminalEventPublished ||= Boolean(terminalStatus)
1245+
} catch (e) {
1246+
// The event buffer (Redis replay store) rejected this event — e.g. the flush
1247+
// batch exceeds the per-write byte cap for large block outputs. The buffer only
1248+
// backs reconnect/replay; the live SSE stream is the primary delivery. Fall
1249+
// through to enqueue the event live (below) instead of throwing, so terminal
1250+
// events still reach the active client and the UI doesn't hang on "running".
1251+
// Marking a terminal event delivered-live as published lets finalization close
1252+
// the stream cleanly instead of aborting it with controller.error().
1253+
reqLogger.warn('Event buffer write failed; delivering event over live stream only', {
1254+
eventType: event.type,
1255+
terminal: Boolean(terminalStatus),
1256+
error: toError(e).message,
1257+
})
1258+
terminalEventPublished ||= Boolean(terminalStatus)
1259+
}
12441260
}
12451261
if (!isStreamClosed) {
12461262
try {

apps/sim/connectors/azure-devops/azure-devops.ts

Lines changed: 31 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,15 @@ import { getErrorMessage, toError } from '@sim/utils/errors'
33
import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils'
44
import { azureDevopsConnectorMeta } from '@/connectors/azure-devops/meta'
55
import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types'
6-
import { htmlToPlainText, joinTagArray, parseTagDate, readBodyWithLimit } from '@/connectors/utils'
6+
import {
7+
CONNECTOR_MAX_FILE_BYTES,
8+
htmlToPlainText,
9+
joinTagArray,
10+
markSkipped,
11+
parseTagDate,
12+
readBodyWithLimit,
13+
sizeLimitSkipReason,
14+
} from '@/connectors/utils'
715

816
const logger = createLogger('AzureDevOpsConnector')
917

@@ -30,7 +38,7 @@ const FILE_BATCH_SIZE = 100
3038
* and aborts (returning null) the moment the cap is exceeded. Larger files are
3139
* skipped without being fully buffered.
3240
*/
33-
const MAX_FILE_SIZE = 10 * 1024 * 1024
41+
const MAX_FILE_SIZE = CONNECTOR_MAX_FILE_BYTES
3442
/** Bytes sniffed for a NUL byte when detecting binary files (matches git's heuristic). */
3543
const BINARY_SNIFF_BYTES = 8000
3644
/**
@@ -1090,7 +1098,27 @@ async function getFileDocument(
10901098
const buffer = await readBodyWithLimit(contentResponse, MAX_FILE_SIZE)
10911099
if (buffer === null) {
10921100
logger.info('Skipping oversized Azure DevOps file', { path })
1093-
return null
1101+
const skippedTitle = path.split('/').filter(Boolean).pop() || path
1102+
return markSkipped(
1103+
{
1104+
externalId,
1105+
title: skippedTitle,
1106+
content: '',
1107+
mimeType: 'text/plain',
1108+
sourceUrl: buildFileSourceUrl(repo?.webUrl, branch, path),
1109+
contentHash: buildFileContentHash(repoId, item.objectId),
1110+
metadata: {
1111+
kind: 'file',
1112+
organization,
1113+
project,
1114+
repository: repo?.name ?? '',
1115+
repositoryId: repoId,
1116+
branch,
1117+
path,
1118+
},
1119+
},
1120+
sizeLimitSkipReason(MAX_FILE_SIZE)
1121+
)
10941122
}
10951123
if (isBinaryBuffer(buffer)) {
10961124
logger.info('Skipping binary Azure DevOps file', { path })

apps/sim/connectors/dropbox/dropbox.ts

Lines changed: 61 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,18 @@ import { getErrorMessage, toError } from '@sim/utils/errors'
33
import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils'
44
import { dropboxConnectorMeta } from '@/connectors/dropbox/meta'
55
import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types'
6-
import { htmlToPlainText, parseTagDate } from '@/connectors/utils'
6+
import {
7+
CONNECTOR_MAX_FILE_BYTES,
8+
ConnectorFileTooLargeError,
9+
htmlToPlainText,
10+
isSkippedDocument,
11+
markSkipped,
12+
parseTagDate,
13+
readBodyWithLimit,
14+
sizeLimitSkipReason,
15+
stubOrSkipBySize,
16+
takeIndexableWithinCap,
17+
} from '@/connectors/utils'
718

819
const logger = createLogger('DropboxConnector')
920

@@ -23,7 +34,7 @@ const SUPPORTED_EXTENSIONS = new Set([
2334
'.tsv',
2435
])
2536

26-
const MAX_FILE_SIZE = 10 * 1024 * 1024 // 10 MB
37+
const MAX_FILE_SIZE = CONNECTOR_MAX_FILE_BYTES
2738

2839
interface DropboxFileEntry {
2940
'.tag': 'file' | 'folder' | 'deleted'
@@ -44,16 +55,18 @@ interface DropboxListFolderResponse {
4455
has_more: boolean
4556
}
4657

47-
function isSupportedFile(entry: DropboxFileEntry): boolean {
48-
if (entry['.tag'] !== 'file') return false
49-
if (entry.is_downloadable === false) return false
50-
if (entry.size && entry.size > MAX_FILE_SIZE) return false
51-
52-
const name = entry.name.toLowerCase()
53-
const dotIndex = name.lastIndexOf('.')
58+
function hasSupportedExtension(name: string): boolean {
59+
const lower = name.toLowerCase()
60+
const dotIndex = lower.lastIndexOf('.')
5461
if (dotIndex === -1) return false
62+
return SUPPORTED_EXTENSIONS.has(lower.slice(dotIndex))
63+
}
5564

56-
return SUPPORTED_EXTENSIONS.has(name.slice(dotIndex))
65+
/** A downloadable file with a supported extension, regardless of size. */
66+
function isDownloadableFile(entry: DropboxFileEntry): boolean {
67+
return (
68+
entry['.tag'] === 'file' && entry.is_downloadable !== false && hasSupportedExtension(entry.name)
69+
)
5770
}
5871

5972
async function downloadFileContent(accessToken: string, filePath: string): Promise<string> {
@@ -69,7 +82,15 @@ async function downloadFileContent(accessToken: string, filePath: string): Promi
6982
throw new Error(`Failed to download file ${filePath}: ${response.status}`)
7083
}
7184

72-
const text = await response.text()
85+
// Stream with a hard byte cap so a file whose listing metadata under-reported
86+
// (or omitted) its size can never be fully buffered into memory. Oversize raises
87+
// so getDocument can surface it as a skipped (failed) row rather than dropping it.
88+
const buffer = await readBodyWithLimit(response, MAX_FILE_SIZE)
89+
if (!buffer) {
90+
throw new ConnectorFileTooLargeError(MAX_FILE_SIZE)
91+
}
92+
93+
const text = buffer.toString('utf8')
7394

7495
if (filePath.endsWith('.html') || filePath.endsWith('.htm')) {
7596
return htmlToPlainText(text)
@@ -162,23 +183,27 @@ export const dropboxConnector: ConnectorConfig = {
162183
data = await response.json()
163184
}
164185

165-
const supportedFiles = data.entries.filter(isSupportedFile)
186+
// Keep oversized files and surface them as skipped (failed) documents instead
187+
// of dropping them silently at listing time.
188+
const candidateFiles = data.entries.filter(isDownloadableFile)
166189

167190
const maxFiles = sourceConfig.maxFiles ? Number(sourceConfig.maxFiles) : 0
168191
const previouslyFetched = (syncContext?.totalDocsFetched as number) ?? 0
169192

170-
let documents = supportedFiles.map(fileToStub)
193+
const stubs = candidateFiles.map((entry) =>
194+
stubOrSkipBySize(fileToStub(entry), entry.size, MAX_FILE_SIZE)
195+
)
171196

172-
if (maxFiles > 0) {
173-
const remaining = maxFiles - previouslyFetched
174-
if (documents.length > remaining) {
175-
documents = documents.slice(0, remaining)
176-
}
177-
}
197+
const { documents, indexableCount, capReached } = takeIndexableWithinCap(
198+
stubs,
199+
isSkippedDocument,
200+
maxFiles,
201+
previouslyFetched
202+
)
178203

179-
const totalFetched = previouslyFetched + documents.length
204+
const totalFetched = previouslyFetched + indexableCount
180205
if (syncContext) syncContext.totalDocsFetched = totalFetched
181-
const hitLimit = maxFiles > 0 && totalFetched >= maxFiles
206+
const hitLimit = capReached
182207
if (hitLimit && syncContext) syncContext.listingCapped = true
183208

184209
return {
@@ -210,12 +235,24 @@ export const dropboxConnector: ConnectorConfig = {
210235

211236
const entry = (await response.json()) as DropboxFileEntry
212237

213-
if (!isSupportedFile(entry)) return null
238+
if (!isDownloadableFile(entry)) return null
214239

215-
const content = await downloadFileContent(accessToken, entry.path_lower)
240+
const stub = fileToStub(entry)
241+
if (entry.size && entry.size > MAX_FILE_SIZE) {
242+
return markSkipped(stub, sizeLimitSkipReason(MAX_FILE_SIZE))
243+
}
244+
245+
let content: string
246+
try {
247+
content = await downloadFileContent(accessToken, entry.path_lower)
248+
} catch (error) {
249+
if (error instanceof ConnectorFileTooLargeError) {
250+
return markSkipped(stub, sizeLimitSkipReason(error.limitBytes))
251+
}
252+
throw error
253+
}
216254
if (!content.trim()) return null
217255

218-
const stub = fileToStub(entry)
219256
return { ...stub, content, contentDeferred: false }
220257
} catch (error) {
221258
logger.warn(`Failed to fetch document ${externalId}`, {

apps/sim/connectors/github/github.ts

Lines changed: 43 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -3,14 +3,21 @@ import { getErrorMessage, toError } from '@sim/utils/errors'
33
import { fetchWithRetry, VALIDATE_RETRY_OPTIONS } from '@/lib/knowledge/documents/utils'
44
import { githubConnectorMeta } from '@/connectors/github/meta'
55
import type { ConnectorConfig, ExternalDocument, ExternalDocumentList } from '@/connectors/types'
6-
import { parseTagDate } from '@/connectors/utils'
6+
import {
7+
CONNECTOR_MAX_FILE_BYTES,
8+
markSkipped,
9+
parseTagDate,
10+
sizeLimitSkipReason,
11+
stubOrSkipBySize,
12+
takeIndexableWithinCap,
13+
} from '@/connectors/utils'
714

815
const logger = createLogger('GitHubConnector')
916

1017
const GITHUB_API_URL = 'https://api.github.com'
1118
const BATCH_SIZE = 30
1219
const GIT_SHA_PREFIX = 'git-sha:'
13-
const MAX_FILE_SIZE = 10 * 1024 * 1024 // 10 MB
20+
const MAX_FILE_SIZE = CONNECTOR_MAX_FILE_BYTES
1421
const BINARY_SNIFF_BYTES = 8000
1522

1623
/**
@@ -197,16 +204,25 @@ export const githubConnector: ConnectorConfig = {
197204
} else {
198205
const tree = await fetchTree(accessToken, owner, repo, branch)
199206

200-
// Filter by path prefix, extensions, and size
207+
// Filter by path prefix and extensions. Oversized files are kept here and
208+
// surfaced as skipped (failed) documents at stub time so they stay visible.
201209
const filtered = tree.filter((item) => {
202210
if (pathPrefix && !item.path.startsWith(pathPrefix)) return false
203211
if (!matchesExtension(item.path, extSet)) return false
204-
if (typeof item.size === 'number' && item.size > MAX_FILE_SIZE) return false
205212
return true
206213
})
207214

208-
// Apply max files limit
209-
capped = maxFiles > 0 ? filtered.slice(0, maxFiles) : filtered
215+
// Apply the max-files limit to indexable files only; oversized files within
216+
// the capped window are kept (and surfaced as skipped) but never consume the cap.
217+
capped =
218+
maxFiles > 0
219+
? takeIndexableWithinCap(
220+
filtered,
221+
(item) => Boolean(item.size && item.size > MAX_FILE_SIZE),
222+
maxFiles,
223+
0
224+
).documents
225+
: filtered
210226
if (syncContext) syncContext.filteredTree = capped
211227
}
212228

@@ -223,7 +239,9 @@ export const githubConnector: ConnectorConfig = {
223239
batchSize: batch.length,
224240
})
225241

226-
const documents = batch.map((item) => treeItemToStub(owner, repo, branch, item))
242+
const documents = batch.map((item) =>
243+
stubOrSkipBySize(treeItemToStub(owner, repo, branch, item), item.size, MAX_FILE_SIZE)
244+
)
227245

228246
const nextOffset = offset + BATCH_SIZE
229247
const hasMore = nextOffset < capped.length
@@ -281,7 +299,24 @@ export const githubConnector: ConnectorConfig = {
281299
size,
282300
limit: MAX_FILE_SIZE,
283301
})
284-
return null
302+
return markSkipped(
303+
{
304+
externalId,
305+
title: path.split('/').pop() || path,
306+
content: '',
307+
mimeType: 'text/plain',
308+
sourceUrl: `https://github.com/${owner}/${repo}/blob/${branch.split('/').map(encodeURIComponent).join('/')}/${path.split('/').map(encodeURIComponent).join('/')}`,
309+
contentHash: `${GIT_SHA_PREFIX}${data.sha as string}`,
310+
metadata: {
311+
path,
312+
sha: data.sha as string,
313+
size,
314+
branch,
315+
repository: `${owner}/${repo}`,
316+
},
317+
},
318+
sizeLimitSkipReason(MAX_FILE_SIZE)
319+
)
285320
}
286321

287322
const rawContent = (data.content as string) || ''

0 commit comments

Comments
 (0)