diff --git a/packages/components/nodes/agentflow/Iteration/Iteration.ts b/packages/components/nodes/agentflow/Iteration/Iteration.ts index 145602b93e7..f8f9867297d 100644 --- a/packages/components/nodes/agentflow/Iteration/Iteration.ts +++ b/packages/components/nodes/agentflow/Iteration/Iteration.ts @@ -1,6 +1,8 @@ import { ICommonObject, INode, INodeData, INodeParams } from '../../../src/Interface' import { parseJsonBody } from '../../../src/utils' +const MAX_ITERATION_CONCURRENCY = 20 + class Iteration_Agentflow implements INode { label: string name: string @@ -18,7 +20,7 @@ class Iteration_Agentflow implements INode { constructor() { this.label = 'Iteration' this.name = 'iterationAgentflow' - this.version = 1.0 + this.version = 1.1 this.type = 'Iteration' this.category = 'Agent Flows' this.description = 'Execute the nodes within the iteration block through N iterations' @@ -32,6 +34,16 @@ class Iteration_Agentflow implements INode { description: 'The input array to iterate over', acceptVariable: true, rows: 4 + }, + { + label: 'Concurrency', + name: 'iterationConcurrency', + type: 'number', + description: `How many items to process in parallel. Set to 1 for sequential execution (default), up to a maximum of ${MAX_ITERATION_CONCURRENCY}. Higher values speed up independent tasks but multiply load on model/tool providers. Avoid values > 1 when the iteration body updates Flow State, as concurrent merges become nondeterministic.`, + default: 1, + step: 1, + optional: true, + additionalParams: true } ] } @@ -58,11 +70,17 @@ class Iteration_Agentflow implements INode { const state = options.agentflowRuntime?.state as ICommonObject + const parsedConcurrency = parseInt(nodeData.inputs?.iterationConcurrency) + const iterationConcurrency = Number.isNaN(parsedConcurrency) + ? 1 + : Math.min(MAX_ITERATION_CONCURRENCY, Math.max(1, parsedConcurrency)) + const returnOutput = { id: nodeData.id, name: this.name, input: { - iterationInput: iterationInputArray + iterationInput: iterationInputArray, + iterationConcurrency }, output: {}, state diff --git a/packages/server/src/utils/buildAgentflow.ts b/packages/server/src/utils/buildAgentflow.ts index e38ccbec092..91db70bb2d4 100644 --- a/packages/server/src/utils/buildAgentflow.ts +++ b/packages/server/src/utils/buildAgentflow.ts @@ -101,6 +101,13 @@ interface IAgentFlowRuntime { webhook?: Record } +type IterationOutcome = { + text?: string + executedData?: IAgentflowExecutedData[] + state?: ICommonObject + error?: string +} + /** * Resolves {{ $webhook.body.* }}, {{ $webhook.headers.* }}, {{ $webhook.query.* }} references in a * template string against an incoming webhook payload. Used to pre-resolve webhookDefaultInput @@ -1292,17 +1299,25 @@ const executeNode = async ({ // Initialize array to collect results from iterations const iterationResults: string[] = [] - // Execute sub-flow for each item in the iteration array - for (let i = 0; i < results.input.iterationInput.length; i++) { + const totalItems = results.input.iterationInput.length + + // Resolve concurrency: 1 = sequential (default), clamped to the number of items + const requestedConcurrency = Math.max(1, parseInt(results.input.iterationConcurrency) || 1) + const concurrency = Math.min(requestedConcurrency, totalItems || 1) + + // Per-item outcome, stored by index so merging stays deterministic regardless of completion order + const perItem: (IterationOutcome | undefined)[] = new Array(totalItems) + + // Execute a single iteration item into perItem[i] without mutating shared parent state + const runIterationItem = async (i: number) => { const item = results.input.iterationInput[i] - logger.debug(` 🔄 Processing iteration ${i + 1}/${results.input.iterationInput.length} recursively`) // Create iteration context const iterationContext = { index: i, value: item, isFirst: i === 0, - isLast: i === results.input.iterationInput.length - 1, + isLast: i === totalItems - 1, sessionId: sessionId } @@ -1336,14 +1351,14 @@ const executeNode = async ({ productId }) - // Store the result + const outcome: IterationOutcome = {} + if (subFlowResult?.text) { - iterationResults.push(subFlowResult.text) + outcome.text = subFlowResult.text } - // Add executed data from sub-flow to main execution data with appropriate iteration context if (subFlowResult?.agentFlowExecutedData) { - const subflowExecutedData = subFlowResult.agentFlowExecutedData.map((data: IAgentflowExecutedData) => ({ + outcome.executedData = subFlowResult.agentFlowExecutedData.map((data: IAgentflowExecutedData) => ({ ...data, data: { ...data.data, @@ -1352,55 +1367,90 @@ const executeNode = async ({ parentNodeId: reactFlowNode.data.id } })) - - // Add executed data to parent execution - agentFlowExecutedData.push(...subflowExecutedData) - - // Update parent execution record with combined data if we have a parent execution ID - if (parentExecutionId) { - try { - logger.debug(` 📝 Updating parent execution ${parentExecutionId} with iteration ${i + 1} data`) - await updateExecution(appDataSource, parentExecutionId, workspaceId, { - executionData: JSON.stringify(agentFlowExecutedData) - }) - } catch (error) { - console.error(` ❌ Error updating parent execution: ${getErrorMessage(error)}`) - } - } } - // Merge the child iteration's runtime state back to parent if ( subFlowResult?.agentflowRuntime && subFlowResult.agentflowRuntime.state && Object.keys(subFlowResult.agentflowRuntime.state).length > 0 ) { + outcome.state = subFlowResult.agentflowRuntime.state + } + + perItem[i] = outcome + } catch (error) { + console.error(` ❌ Error in iteration ${i + 1}: ${getErrorMessage(error)}`) + perItem[i] = { error: `Error in iteration ${i + 1}: ${getErrorMessage(error)}` } + } + } + + // Merge a contiguous, completed batch [start, end) back into shared parent state in index order + const mergeBatch = async (start: number, end: number) => { + let executionDataChanged = false + + for (let i = start; i < end; i++) { + const outcome = perItem[i] + if (!outcome) continue + + if (outcome.error) { + iterationResults.push(outcome.error) + continue + } + + if (outcome.text) { + iterationResults.push(outcome.text) + } + + if (outcome.executedData?.length) { + agentFlowExecutedData.push(...outcome.executedData) + executionDataChanged = true + } + + if (outcome.state) { logger.debug(` 🔄 Merging iteration ${i + 1} runtime state back to parent`) updatedState = { ...updatedState, - ...subFlowResult.agentflowRuntime.state + ...outcome.state } - // Update next iteration's runtime state + // Subsequent batches read the merged state agentflowRuntime.state = updatedState // Update parent execution's runtime state results.state = updatedState } - } catch (error) { - console.error(` ❌ Error in iteration ${i + 1}: ${getErrorMessage(error)}`) - iterationResults.push(`Error in iteration ${i + 1}: ${getErrorMessage(error)}`) + } + + // Persist the parent execution once per batch instead of once per item to avoid racing DB writes + if (executionDataChanged && parentExecutionId) { + try { + await updateExecution(appDataSource, parentExecutionId, workspaceId, { + executionData: JSON.stringify(agentFlowExecutedData) + }) + } catch (error) { + console.error(` ❌ Error updating parent execution: ${getErrorMessage(error)}`) + } } } + // Process items in batches of `concurrency`, preserving index order on merge + for (let start = 0; start < totalItems; start += concurrency) { + const end = Math.min(start + concurrency, totalItems) + const batch: Promise[] = [] + for (let i = start; i < end; i++) { + batch.push(runIterationItem(i)) + } + await Promise.all(batch) + await mergeBatch(start, end) + } + // Update the output with combined results results.output = { ...(results.output || {}), iterationResults, content: iterationResults.join('\n') } - logger.debug(` 📊 Completed all iterations. Total results: ${iterationResults.length}`) } }