Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions packages/components/nodes/agentflow/Iteration/Iteration.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import { ICommonObject, INode, INodeData, INodeParams } from '../../../src/Interface'
import { parseJsonBody } from '../../../src/utils'

const MAX_ITERATION_CONCURRENCY = 20

class Iteration_Agentflow implements INode {
label: string
name: string
Expand All @@ -18,7 +20,7 @@ class Iteration_Agentflow implements INode {
constructor() {
this.label = 'Iteration'
this.name = 'iterationAgentflow'
this.version = 1.0
this.version = 1.1
this.type = 'Iteration'
this.category = 'Agent Flows'
this.description = 'Execute the nodes within the iteration block through N iterations'
Expand All @@ -32,6 +34,16 @@ class Iteration_Agentflow implements INode {
description: 'The input array to iterate over',
acceptVariable: true,
rows: 4
},
{
label: 'Concurrency',
name: 'iterationConcurrency',
type: 'number',
description: `How many items to process in parallel. Set to 1 for sequential execution (default), up to a maximum of ${MAX_ITERATION_CONCURRENCY}. Higher values speed up independent tasks but multiply load on model/tool providers. Avoid values > 1 when the iteration body updates Flow State, as concurrent merges become nondeterministic.`,
default: 1,
step: 1,
optional: true,
additionalParams: true
}
]
}
Expand All @@ -58,11 +70,17 @@ class Iteration_Agentflow implements INode {

const state = options.agentflowRuntime?.state as ICommonObject

const parsedConcurrency = parseInt(nodeData.inputs?.iterationConcurrency)
const iterationConcurrency = Number.isNaN(parsedConcurrency)
? 1
: Math.min(MAX_ITERATION_CONCURRENCY, Math.max(1, parsedConcurrency))

const returnOutput = {
id: nodeData.id,
name: this.name,
input: {
iterationInput: iterationInputArray
iterationInput: iterationInputArray,
iterationConcurrency
},
output: {},
state
Expand Down
110 changes: 80 additions & 30 deletions packages/server/src/utils/buildAgentflow.ts
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,13 @@ interface IAgentFlowRuntime {
webhook?: Record<string, any>
}

type IterationOutcome = {
text?: string
executedData?: IAgentflowExecutedData[]
state?: ICommonObject
error?: string
}

/**
* Resolves {{ $webhook.body.* }}, {{ $webhook.headers.* }}, {{ $webhook.query.* }} references in a
* template string against an incoming webhook payload. Used to pre-resolve webhookDefaultInput
Expand Down Expand Up @@ -1292,17 +1299,25 @@ const executeNode = async ({
// Initialize array to collect results from iterations
const iterationResults: string[] = []

// Execute sub-flow for each item in the iteration array
for (let i = 0; i < results.input.iterationInput.length; i++) {
const totalItems = results.input.iterationInput.length

// Resolve concurrency: 1 = sequential (default), clamped to the number of items
const requestedConcurrency = Math.max(1, parseInt(results.input.iterationConcurrency) || 1)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Since results.input.iterationConcurrency is already parsed and validated as a number in Iteration.ts, calling parseInt on it is redundant. For older saved flows where this property is missing, it will be undefined, which can be safely handled with a fallback.

                const requestedConcurrency = Math.max(1, results.input.iterationConcurrency || 1)

const concurrency = Math.min(requestedConcurrency, totalItems || 1)

// Per-item outcome, stored by index so merging stays deterministic regardless of completion order
const perItem: (IterationOutcome | undefined)[] = new Array(totalItems)

// Execute a single iteration item into perItem[i] without mutating shared parent state
const runIterationItem = async (i: number) => {
const item = results.input.iterationInput[i]
logger.debug(` 🔄 Processing iteration ${i + 1}/${results.input.iterationInput.length} recursively`)

// Create iteration context
const iterationContext = {
index: i,
value: item,
isFirst: i === 0,
isLast: i === results.input.iterationInput.length - 1,
isLast: i === totalItems - 1,
sessionId: sessionId
}

Expand Down Expand Up @@ -1336,14 +1351,14 @@ const executeNode = async ({
productId
})

// Store the result
const outcome: IterationOutcome = {}

if (subFlowResult?.text) {
iterationResults.push(subFlowResult.text)
outcome.text = subFlowResult.text
}

// Add executed data from sub-flow to main execution data with appropriate iteration context
if (subFlowResult?.agentFlowExecutedData) {
const subflowExecutedData = subFlowResult.agentFlowExecutedData.map((data: IAgentflowExecutedData) => ({
outcome.executedData = subFlowResult.agentFlowExecutedData.map((data: IAgentflowExecutedData) => ({
...data,
data: {
...data.data,
Expand All @@ -1352,55 +1367,90 @@ const executeNode = async ({
parentNodeId: reactFlowNode.data.id
}
}))

// Add executed data to parent execution
agentFlowExecutedData.push(...subflowExecutedData)

// Update parent execution record with combined data if we have a parent execution ID
if (parentExecutionId) {
try {
logger.debug(` 📝 Updating parent execution ${parentExecutionId} with iteration ${i + 1} data`)
await updateExecution(appDataSource, parentExecutionId, workspaceId, {
executionData: JSON.stringify(agentFlowExecutedData)
})
} catch (error) {
console.error(` ❌ Error updating parent execution: ${getErrorMessage(error)}`)
}
}
}

// Merge the child iteration's runtime state back to parent
if (
subFlowResult?.agentflowRuntime &&
subFlowResult.agentflowRuntime.state &&
Object.keys(subFlowResult.agentflowRuntime.state).length > 0
) {
outcome.state = subFlowResult.agentflowRuntime.state
}

perItem[i] = outcome
} catch (error) {
console.error(` ❌ Error in iteration ${i + 1}: ${getErrorMessage(error)}`)
perItem[i] = { error: `Error in iteration ${i + 1}: ${getErrorMessage(error)}` }
}
}

// Merge a contiguous, completed batch [start, end) back into shared parent state in index order
const mergeBatch = async (start: number, end: number) => {
let executionDataChanged = false

for (let i = start; i < end; i++) {
const outcome = perItem[i]
if (!outcome) continue

if (outcome.error) {
iterationResults.push(outcome.error)
continue
}

if (outcome.text) {
iterationResults.push(outcome.text)
}

if (outcome.executedData?.length) {
agentFlowExecutedData.push(...outcome.executedData)
executionDataChanged = true
}

if (outcome.state) {
logger.debug(` 🔄 Merging iteration ${i + 1} runtime state back to parent`)

updatedState = {
...updatedState,
...subFlowResult.agentflowRuntime.state
...outcome.state
}

// Update next iteration's runtime state
// Subsequent batches read the merged state
agentflowRuntime.state = updatedState

// Update parent execution's runtime state
results.state = updatedState
}
} catch (error) {
console.error(` ❌ Error in iteration ${i + 1}: ${getErrorMessage(error)}`)
iterationResults.push(`Error in iteration ${i + 1}: ${getErrorMessage(error)}`)
}

// Persist the parent execution once per batch instead of once per item to avoid racing DB writes
if (executionDataChanged && parentExecutionId) {
try {
await updateExecution(appDataSource, parentExecutionId, workspaceId, {
executionData: JSON.stringify(agentFlowExecutedData)
})
} catch (error) {
console.error(` ❌ Error updating parent execution: ${getErrorMessage(error)}`)
}
}
}

// Process items in batches of `concurrency`, preserving index order on merge
for (let start = 0; start < totalItems; start += concurrency) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

When executing iterations in batches, if the execution is aborted (e.g., cancelled by the user), the loop will currently continue to spawn and execute all remaining batches. Checking the abort signal at the start of each batch loop and breaking early prevents unnecessary resource consumption and ensures the cancellation request is honored promptly.

                for (let start = 0; start < totalItems; start += concurrency) {
                    if (abortController?.signal?.aborted) {
                        break
                    }

const end = Math.min(start + concurrency, totalItems)
const batch: Promise<void>[] = []
for (let i = start; i < end; i++) {
batch.push(runIterationItem(i))
}
await Promise.all(batch)
await mergeBatch(start, end)
}

// Update the output with combined results
results.output = {
...(results.output || {}),
iterationResults,
content: iterationResults.join('\n')
}

logger.debug(` 📊 Completed all iterations. Total results: ${iterationResults.length}`)
}
}
Expand Down
Loading