Skip to content

fix: parallel tool calls execute sequentially despite PARALLEL mode (#735)#1127

Open
YuqiGuo105 wants to merge 1 commit intogoogle:mainfrom
YuqiGuo105:fix/parallel-tool-execution-735
Open

fix: parallel tool calls execute sequentially despite PARALLEL mode (#735)#1127
YuqiGuo105 wants to merge 1 commit intogoogle:mainfrom
YuqiGuo105:fix/parallel-tool-execution-735

Conversation

@YuqiGuo105
Copy link
Copy Markdown
Contributor

Problem

Fixes #735

When an LLM returns multiple tool calls simultaneously (parallel function calling), ADK Java executes them sequentially despite ToolExecutionMode.PARALLEL being set. Two tools each taking 1 second complete in ~2 seconds instead of ~1 second.

Root Cause

The bug is in Functions.callTool(). For FunctionTool (the most common tool type), runAsync() calls func.invoke() via Java reflection — a synchronous, blocking call that completes before returning a Single:

// FunctionTool.runAsync() calls this:
private Maybe<...> call(...) throws ... {
    Object result = func.invoke(instance, arguments);  // blocks here — HTTP/DB call
    return Maybe.just(result);                         // value already computed
}

The original callTool() was:

// BEFORE (broken)
return tool.runAsync(args, toolContext)   // func.invoke() runs NOW, blocks calling thread
    .toMaybe()
    // subscribeOn here would be too late — computation already done
    .doOnError(...)
    .onErrorResumeNext(...);

handleFunctionCalls uses concatMapEager for PARALLEL mode, which eagerly subscribes to all inner Observables. However, if the subscription itself is synchronous and blocking, concatMapEager cannot dispatch to a new thread — it is stuck waiting for the first subscription to complete before starting the next.

Why subscribeOn(Schedulers.io()) alone (without Single.defer) does not fix it:

subscribeOn only shifts the subscription signal to an IO thread, not the method call that produces the Single. By the time .subscribeOn() is reached in the chain, func.invoke() has already finished on the calling thread.

Calling thread:
  tool.runAsync()           ← func.invoke() runs synchronously here (~1000ms)
      ↓ returns Single.just(alreadyComputedValue)
  .toMaybe()
  .subscribeOn(IO)          ← too late, value was computed before this

Contrast with ParallelAgent (ParallelAgent.java#L148), which correctly uses .subscribeOn(scheduler) because subAgent.runAsync() returns a lazy Flowable — the work has not started yet when it is returned.

Fix

Wrap tool.runAsync() in Single.defer() to make it lazy, then apply subscribeOn(Schedulers.io()):

// AFTER (fixed)
return Single.defer(() -> tool.runAsync(args, toolContext))  // deferred — not called yet
    .toMaybe()
    .subscribeOn(Schedulers.io())    // subscription (+ runAsync call) happens on IO thread
    .doOnError(...)
    .onErrorResumeNext(...);

Single.defer() packages tool.runAsync() as a lambda that only executes at subscription time. subscribeOn(Schedulers.io()) then causes that subscription — including func.invoke() — to happen on an IO thread. concatMapEager eagerly dispatches all tool subscriptions, each to its own IO thread, achieving true parallelism while preserving result order.

Execution Flow: Before vs After

Scenario: LLM returns two parallel tool calls — get_weather(city=Beijing) and search_hotels(city=Shanghai) — each taking ~1000ms.

Before (sequential despite PARALLEL mode)

Calling thread ─────────────────────────────────────────────────────────────►

concatMapEager subscribes to Observable1
│
├─ Maybe.defer(lambda) executes on calling thread
│  └─ callTool() -> tool.runAsync()
│        └─ func.invoke("get_weather") ████████████ 1000ms (HTTP call) ████
│              returns Single.just({"temperature":"25C"})  <- already computed
│  .toMaybe().subscribeOn(IO)  <- pointless, value already exists
│
│  <- thread released; concatMapEager can now subscribe to Observable2
│
├─ Maybe.defer(lambda) executes on calling thread
│  └─ callTool() -> tool.runAsync()
│        └─ func.invoke("search_hotels") █████████ 1000ms (HTTP call) ████
│              returns Single.just({"hotel":"Hilton"})
│
└─ merge results -> total ~2000ms  ✗

After (true parallel)

Calling thread ──────────►    IO-thread-1 ──────────────────────────────►
                               ┌─ Single.defer lambda executes
concatMapEager subscribes      │  └─ tool.runAsync() -> func.invoke("get_weather")
to Observable1                 │            ████████████ 1000ms ████
│                              └─ emits {"temperature":"25C"}
├─ callTool() returns
│  Single.defer(...).subscribeOn(IO) ──────┘  (non-blocking!)
│
concatMapEager subscribes      IO-thread-2 ──────────────────────────────►
to Observable2                 ┌─ Single.defer lambda executes
│                              │  └─ tool.runAsync() -> func.invoke("search_hotels")
├─ callTool() returns          │            ████████████ 1000ms ████
│  Single.defer(...).subscribeOn(IO) ──────┘  (non-blocking!)
│
│       <- ~1000ms: both IO threads finish ──────────────────────────────┤
└─ concatMapEager emits in original order: [weather_result, hotel_result]
   total ~1000ms  ✓

Changes

Functions.java

  • Added import io.reactivex.rxjava3.schedulers.Schedulers
  • Wrapped tool.runAsync(args, toolContext) in Single.defer() and added .subscribeOn(Schedulers.io()) in callTool()

FunctionsTest.java

  • Added SlowTool inner class — a BaseTool that sleeps for a configurable duration using Single.fromCallable
  • handleFunctionCalls_parallelMode_shouldExecuteConcurrently: two 1000ms tools complete in <1500ms under PARALLEL mode
  • handleFunctionCalls_sequentialMode_shouldExecuteSerially: two 1000ms tools take ≥2000ms under SEQUENTIAL mode (serial contract unchanged)

Testing

Tests run: 16, Failures: 0, Errors: 0, Skipped: 0

…sync

Fixes google#735

When ToolExecutionMode.PARALLEL is set and the LLM returns multiple
function calls, tools were still executing sequentially because
callTool() invoked tool.runAsync() as a plain method call, causing
FunctionTool to execute func.invoke() synchronously on the subscribing
thread before returning a Single. Since concatMapEager eagerly subscribes
to all inner Observables, it could not dispatch work to IO threads if the
subscription itself was blocking.

Fix: wrap tool.runAsync() in Single.defer() so the call is deferred until
subscription time, then apply subscribeOn(Schedulers.io()) to move the
actual subscription (and thus the synchronous func.invoke() call) onto an
IO thread. concatMapEager then subscribes to all tool Singles eagerly,
each on its own IO thread, achieving true parallelism while preserving
result order.

Added two new timing-based tests that verify:
- PARALLEL mode: two 1000ms tools complete in <1500ms (not ~2000ms)
- SEQUENTIAL mode: two 1000ms tools take >=2000ms (serial contract unchanged)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Support for Parallel Tool Execution with Custom Models

1 participant