Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 32 additions & 0 deletions src/google/adk/agents/loop_agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,10 @@
from typing_extensions import override

from ..events.event import Event
from ..events.event_actions import EventActions
from ..features import experimental
from ..features import FeatureName
from ..termination.termination_condition import TerminationCondition
from ..utils.context_utils import Aclosing
from .base_agent import BaseAgent
from .base_agent import BaseAgentState
Expand Down Expand Up @@ -66,13 +68,28 @@ class LoopAgent(BaseAgent):
escalates.
"""

termination_condition: Optional[TerminationCondition] = None
"""An optional termination condition that controls when the loop stops.

The condition is evaluated after each event emitted by a sub-agent. When
it fires, the loop yields a final event with
``actions.termination_reason`` set and ``actions.escalate`` set to
``True``, then stops.

The condition is automatically reset at the start of each ``_run_async_impl``
call, so the same instance can be reused across multiple runs.
"""

@override
async def _run_async_impl(
self, ctx: InvocationContext
) -> AsyncGenerator[Event, None]:
if not self.sub_agents:
return

if self.termination_condition:
await self.termination_condition.reset()

agent_state = self._load_agent_state(ctx, LoopAgentState)
is_resuming_at_current_agent = agent_state is not None
times_looped, start_index = self._get_start_state(agent_state)
Expand Down Expand Up @@ -102,6 +119,21 @@ async def _run_async_impl(
yield event
if event.actions.escalate:
should_exit = True

if self.termination_condition and not should_exit:
result = await self.termination_condition.check([event])
if result:
termination_event = Event(
invocation_id=ctx.invocation_id,
author=self.name,
actions=EventActions(
escalate=True,
termination_reason=result.reason,
),
)
yield termination_event
return

if ctx.should_pause_invocation(event):
pause_invocation = True

Expand Down
4 changes: 4 additions & 0 deletions src/google/adk/events/event_actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -112,3 +112,7 @@ class EventActions(BaseModel):

render_ui_widgets: Optional[list[UiWidget]] = None
"""List of UI widgets to be rendered by the UI."""

termination_reason: Optional[str] = None
"""The human-readable reason the conversation was terminated by a
TerminationCondition. Only set on synthetic termination events."""
19 changes: 19 additions & 0 deletions src/google/adk/runners.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@
from .sessions.in_memory_session_service import InMemorySessionService
from .sessions.session import Session
from .telemetry.tracing import tracer
from .termination.termination_condition import TerminationCondition
from .tools.base_toolset import BaseToolset
from .utils._debug_output import print_event
from .utils.context_utils import Aclosing
Expand Down Expand Up @@ -509,6 +510,7 @@ async def run_async(
new_message: Optional[types.Content] = None,
state_delta: Optional[dict[str, Any]] = None,
run_config: Optional[RunConfig] = None,
termination_condition: Optional[TerminationCondition] = None,
) -> AsyncGenerator[Event, None]:
"""Main entry method to run the agent in this runner.

Expand All @@ -526,6 +528,8 @@ async def run_async(
new_message: A new message to append to the session.
state_delta: Optional state changes to apply to the session.
run_config: The run config for the agent.
termination_condition: An optional condition that stops the run when
triggered. Reset automatically before the run begins.

Yields:
The events generated by the agent.
Expand Down Expand Up @@ -602,9 +606,24 @@ async def _run_with_trace(
return

async def execute(ctx: InvocationContext) -> AsyncGenerator[Event]:
if termination_condition:
await termination_condition.reset()
async with Aclosing(ctx.agent.run_async(ctx)) as agen:
async for event in agen:
yield event
if termination_condition:
termination_result = await termination_condition.check([event])
if termination_result:
termination_event = Event(
invocation_id=ctx.invocation_id,
author=ctx.agent.name,
actions=EventActions(
escalate=True,
termination_reason=termination_result.reason,
),
)
yield termination_event
return

async with Aclosing(
self._exec_with_plugin(
Expand Down
37 changes: 37 additions & 0 deletions src/google/adk/termination/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

from .external_termination import ExternalTermination
from .function_call_termination import FunctionCallTermination
from .max_iterations_termination import MaxIterationsTermination
from .termination_condition import AndTerminationCondition
from .termination_condition import OrTerminationCondition
from .termination_condition import TerminationCondition
from .termination_condition import TerminationResult
from .text_mention_termination import TextMentionTermination
from .timeout_termination import TimeoutTermination
from .token_usage_termination import TokenUsageTermination

__all__ = [
'AndTerminationCondition',
'ExternalTermination',
'FunctionCallTermination',
'MaxIterationsTermination',
'OrTerminationCondition',
'TerminationCondition',
'TerminationResult',
'TextMentionTermination',
'TimeoutTermination',
'TokenUsageTermination',
]
64 changes: 64 additions & 0 deletions src/google/adk/termination/external_termination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""A termination condition controlled programmatically via ``set()``."""

from __future__ import annotations

from typing import Optional
from typing import Sequence

from ..events.event import Event
from .termination_condition import TerminationCondition
from .termination_condition import TerminationResult


class ExternalTermination(TerminationCondition):
"""A termination condition that is controlled externally by calling ``set()``.

Useful for integrating external stop signals such as a UI "Stop" button
or application-level logic.

Example::

stop_button = ExternalTermination()

agent = LoopAgent(
name='my_loop',
sub_agents=[...],
termination_condition=stop_button,
)

# Elsewhere (e.g. from a UI event handler):
stop_button.set()
"""

def __init__(self) -> None:
self._terminated = False

@property
def terminated(self) -> bool:
return self._terminated

def set(self) -> None:
"""Signals that the conversation should terminate at the next check."""
self._terminated = True

async def check(self, events: Sequence[Event]) -> Optional[TerminationResult]:
if self._terminated:
return TerminationResult(reason='Externally terminated')
return None

async def reset(self) -> None:
self._terminated = False
60 changes: 60 additions & 0 deletions src/google/adk/termination/function_call_termination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Terminates when a specific function (tool) has been executed."""

from __future__ import annotations

from typing import Optional
from typing import Sequence

from ..events.event import Event
from .termination_condition import TerminationCondition
from .termination_condition import TerminationResult


class FunctionCallTermination(TerminationCondition):
"""Terminates when a tool with a specific name has been executed.

The condition checks ``FunctionResponse`` parts in events.

Example::

# Stop when the "approve" tool is called
condition = FunctionCallTermination('approve')
"""

def __init__(self, function_name: str) -> None:
self._function_name = function_name
self._terminated = False

@property
def terminated(self) -> bool:
return self._terminated

async def check(self, events: Sequence[Event]) -> Optional[TerminationResult]:
if self._terminated:
return None

for event in events:
for response in event.get_function_responses():
if response.name == self._function_name:
self._terminated = True
return TerminationResult(
reason=f"Function '{self._function_name}' was executed"
)
return None

async def reset(self) -> None:
self._terminated = False
64 changes: 64 additions & 0 deletions src/google/adk/termination/max_iterations_termination.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
# Copyright 2026 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

"""Terminates after a maximum number of events have been processed."""

from __future__ import annotations

from typing import Optional
from typing import Sequence

from ..events.event import Event
from .termination_condition import TerminationCondition
from .termination_condition import TerminationResult


class MaxIterationsTermination(TerminationCondition):
"""Terminates the conversation after a maximum number of events.

Example::

# Stop after 10 events
condition = MaxIterationsTermination(10)
"""

def __init__(self, max_iterations: int) -> None:
if max_iterations <= 0:
raise ValueError('max_iterations must be a positive integer.')
self._max_iterations = max_iterations
self._count = 0
self._terminated = False

@property
def terminated(self) -> bool:
return self._terminated

async def check(self, events: Sequence[Event]) -> Optional[TerminationResult]:
if self._terminated:
return None
self._count += len(events)

if self._count >= self._max_iterations:
self._terminated = True
return TerminationResult(
reason=(
f'Maximum iterations of {self._max_iterations} reached,'
f' current count: {self._count}'
)
)
return None

async def reset(self) -> None:
self._terminated = False
self._count = 0
Loading
Loading