Skip to content

feat(6571): add Concurrency option to Iteration node for parallel subagents#6572

Open
prd-hoang-doan wants to merge 1 commit into
FlowiseAI:mainfrom
prd-hoang-doan:feat/6571_concurrency_in_iteration
Open

feat(6571): add Concurrency option to Iteration node for parallel subagents#6572
prd-hoang-doan wants to merge 1 commit into
FlowiseAI:mainfrom
prd-hoang-doan:feat/6571_concurrency_in_iteration

Conversation

@prd-hoang-doan

Copy link
Copy Markdown
Contributor

Ticket:

#6571

Summary

Adds an optional Concurrency setting to the Agentflow V2 Iteration node so independent iteration items can run in parallel instead of strictly one-at-a-time. Defaults to 1 (sequential), so existing flows are unchanged.

Motivated by multi-subagent fan-out flows like Deep Research: a Planner → Iteration → SubAgent → Summarizer → Loop flow runs one subagent per task, and since each subagent does its own retrieval + LLM work, total runtime grows linearly with the number of tasks. When the tasks are independent, serializing them is pure wasted wall-clock.

What changed

packages/components/nodes/agentflow/Iteration/Iteration.ts (node version 1.0 → 1.1)

  • New optional numeric input Concurrency (iterationConcurrency, Additional Parameters), default: 1, clamped to [1, 20].
  • Resolved value is surfaced on the node output (input.iterationConcurrency) for the engine to consume.

packages/server/src/utils/buildAgentflow.ts

  • The iteration loop now processes items in batches of effective = min(concurrency, itemCount) via Promise.all, instead of a sequential for … await.
  • Correctness preserved:
    • Result order — each item writes into an index slot; results are flattened in index order after each batch (completion order no longer affects output).
    • Executed data / persistence — per-item executed data is appended in order and the parent execution is persisted once per batch instead of per item, avoiding racing updateExecution writes on the same row.
    • Flow state — merged back in index order between batches; subsequent batches observe the merged state.
    • Per-item error handling is preserved (errors land in the correct slot).

docs/selftest-concurrency/README.md

  • A self-contained, LLM-only test flow (Planner → Iteration → SubAgent → Merger) to demonstrate both the parallel speedup and the order-preserving merge.

Behavior & compatibility

  • Concurrency = 1 (default) reproduces the previous sequential Iteration exactly — fully backward compatible.
  • Value is clamped at both ends ([1, 20]) and additionally to the array length, so it can never exceed the cap or item count.
  • Concurrency > 1 is intended for independent iteration bodies (subagents that read {{ $iteration.* }} and return output). Bodies that write to Flow State should stay at 1, since concurrent merges are last-writer-wins (documented on the input).

Testing

@gemini-code-assist gemini-code-assist Bot left a comment

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces concurrent execution support for the Iteration Agentflow node, allowing users to process multiple items in parallel up to a maximum concurrency of 20. The backend is updated to execute items in batches, preserving index order when merging results and states, and optimizing database writes by persisting execution data once per batch. Feedback suggests checking for an abort signal at the start of each batch loop to handle cancellations gracefully, and removing a redundant parseInt call on the concurrency input.

Important

The consumer version of Gemini Code Assist on GitHub is being sunset. Starting June 18, 2026, new organization installations will be blocked, and all code review activity will officially cease on July 17, 2026.
For more details on the timeline and next steps, please review the Help Documentation.

}

// Process items in batches of `concurrency`, preserving index order on merge
for (let start = 0; start < totalItems; start += concurrency) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

high

When executing iterations in batches, if the execution is aborted (e.g., cancelled by the user), the loop will currently continue to spawn and execute all remaining batches. Checking the abort signal at the start of each batch loop and breaking early prevents unnecessary resource consumption and ensures the cancellation request is honored promptly.

                for (let start = 0; start < totalItems; start += concurrency) {
                    if (abortController?.signal?.aborted) {
                        break
                    }

const totalItems = results.input.iterationInput.length

// Resolve concurrency: 1 = sequential (default), clamped to the number of items
const requestedConcurrency = Math.max(1, parseInt(results.input.iterationConcurrency) || 1)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Since results.input.iterationConcurrency is already parsed and validated as a number in Iteration.ts, calling parseInt on it is redundant. For older saved flows where this property is missing, it will be undefined, which can be safely handled with a fallback.

                const requestedConcurrency = Math.max(1, results.input.iterationConcurrency || 1)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant