Skip to content

Commit ce264e9

Browse files
committed
AgentLauncher: track the active agents and stop those that are idle
1 parent d714562 commit ce264e9

6 files changed

Lines changed: 257 additions & 9 deletions

File tree

agents-core/vision_agents/core/agents/__init__.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@
77
from .agents import Agent as Agent
88
from .conversation import Conversation as Conversation
99
from .agent_launcher import AgentLauncher as AgentLauncher
10+
from .agent_types import AgentOptions as AgentOptions
1011

1112
__all__ = [
1213
"Agent",
1314
"Conversation",
1415
"AgentLauncher",
16+
"AgentOptions",
1517
]

agents-core/vision_agents/core/agents/agent_launcher.py

Lines changed: 84 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,10 @@
11
import asyncio
22
import logging
3-
from typing import TYPE_CHECKING, Awaitable, Callable
3+
import weakref
4+
from typing import TYPE_CHECKING, Awaitable, Callable, Optional
45

5-
from vision_agents.core.utils.utils import await_or_run
6-
from vision_agents.core.warmup import WarmupCache, Warmable
6+
from vision_agents.core.utils.utils import await_or_run, cancel_and_wait
7+
from vision_agents.core.warmup import Warmable, WarmupCache
78

89
if TYPE_CHECKING:
910
from .agents import Agent
@@ -23,33 +24,72 @@ def __init__(
2324
self,
2425
create_agent: Callable[..., "Agent" | Awaitable["Agent"]],
2526
join_call: Callable[..., None | Awaitable[None]] | None = None,
27+
agent_idle_timeout: float = 10.0,
28+
agent_idle_cleanup_interval: float = 5.0,
2629
):
2730
"""
2831
Initialize the agent launcher.
2932
3033
Args:
3134
create_agent: A function that creates and returns an Agent instance
3235
join_call: Optional function that handles joining a call with the agent
36+
agent_idle_timeout: Optional timeout in seconds for agent to stay alone on the call. Default - `30.0`.
37+
`0` means idle agents won't leave the call until it's ended.
38+
3339
"""
3440
self.create_agent = create_agent
3541
self.join_call = join_call
3642
self._warmup_lock = asyncio.Lock()
3743
self._warmup_cache = WarmupCache()
3844

45+
if agent_idle_timeout < 0:
46+
raise ValueError("agent_idle_timeout must be >= 0")
47+
self._agent_idle_timeout = agent_idle_timeout
48+
49+
if agent_idle_cleanup_interval <= 0:
50+
raise ValueError("agent_idle_cleanup_interval must be > 0")
51+
self._agent_idle_cleanup_interval = agent_idle_cleanup_interval
52+
53+
self._active_agents: weakref.WeakSet[Agent] = weakref.WeakSet()
54+
55+
self._running = False
56+
self._cleanup_task: Optional[asyncio.Task] = None
57+
self._warmed_up: bool = False
58+
59+
async def start(self):
60+
if self._running:
61+
raise RuntimeError("AgentLauncher is already running")
62+
logger.debug("Starting AgentLauncher")
63+
self._running = True
64+
await self.warmup()
65+
self._cleanup_task = asyncio.create_task(self._cleanup_idle_agents())
66+
logger.debug("AgentLauncher started")
67+
68+
async def stop(self):
69+
logger.debug("Stopping AgentLauncher")
70+
self._running = False
71+
if self._cleanup_task:
72+
await cancel_and_wait(self._cleanup_task)
73+
logger.debug("AgentLauncher stopped")
74+
3975
async def warmup(self) -> None:
4076
"""
4177
Warm up all agent components.
4278
4379
This method creates the agent and calls warmup() on LLM, TTS, STT,
4480
and turn detection components if they exist.
4581
"""
82+
if self._warmed_up or self._warmup_lock.locked():
83+
return
84+
4685
async with self._warmup_lock:
4786
logger.info("Creating agent...")
4887

4988
# Create a dry-run Agent instance and warmup its components for the first time.
5089
agent: "Agent" = await await_or_run(self.create_agent)
5190
logger.info("Warming up agent components...")
5291
await self._warmup_agent(agent)
92+
self._warmed_up = True
5393

5494
logger.info("Agent warmup completed")
5595

@@ -65,6 +105,7 @@ async def launch(self, **kwargs) -> "Agent":
65105
"""
66106
agent: "Agent" = await await_or_run(self.create_agent, **kwargs)
67107
await self._warmup_agent(agent)
108+
self._active_agents.add(agent)
68109
return agent
69110

70111
async def _warmup_agent(self, agent: "Agent") -> None:
@@ -99,10 +140,46 @@ async def _warmup_agent(self, agent: "Agent") -> None:
99140
warmup_tasks.append(agent.turn_detection.warmup(self._warmup_cache))
100141

101142
# Warmup processors
102-
if agent.processors:
103-
for processor in agent.processors:
104-
if isinstance(processor, Warmable):
105-
warmup_tasks.append(processor.warmup(self._warmup_cache))
143+
for processor in agent.processors:
144+
if isinstance(processor, Warmable):
145+
warmup_tasks.append(processor.warmup(self._warmup_cache))
106146

107147
if warmup_tasks:
108148
await asyncio.gather(*warmup_tasks)
149+
150+
async def _cleanup_idle_agents(self) -> None:
151+
if not self._agent_idle_timeout:
152+
return
153+
154+
while self._running:
155+
# Collect idle agents first to close them all at once
156+
idle_agents = []
157+
for agent in self._active_agents:
158+
agent_idle_for = agent.idle_for()
159+
if agent_idle_for >= self._agent_idle_timeout:
160+
logger.info(
161+
f'Agent with user_id "{agent.agent_user.id}" is idle for {round(agent_idle_for, 2)}s, '
162+
f"closing it after {self._agent_idle_timeout}s timeout"
163+
)
164+
idle_agents.append(agent)
165+
166+
if idle_agents:
167+
coros = [asyncio.shield(a.close()) for a in idle_agents]
168+
result = await asyncio.shield(
169+
asyncio.gather(*coros, return_exceptions=True)
170+
)
171+
for agent, r in zip(idle_agents, result):
172+
if isinstance(r, Exception):
173+
logger.error(
174+
f"Failed to close idle agent with user_id {agent.agent_user.id}",
175+
exc_info=r,
176+
)
177+
178+
await asyncio.sleep(self._agent_idle_cleanup_interval)
179+
180+
async def __aenter__(self):
181+
await self.start()
182+
return self
183+
184+
async def __aexit__(self, exc_type, exc_val, exc_tb):
185+
await self.stop()

agents-core/vision_agents/core/agents/agents.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -690,6 +690,10 @@ def _end_tracing(self):
690690
otel_context.detach(self._context_token)
691691
self._context_token = None
692692

693+
@property
694+
def closed(self) -> bool:
695+
return self._closed
696+
693697
async def close(self):
694698
"""
695699
Clean up all connections and resources.

agents-core/vision_agents/core/cli/cli_runner.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -101,8 +101,8 @@ async def _run():
101101
logger.info("🚀 Launching agent...")
102102

103103
try:
104-
# Warmup the agent's dependencies
105-
await launcher.warmup()
104+
# Start the agent launcher.
105+
await launcher.start()
106106

107107
# Create the agent
108108
agent = await launcher.launch()
@@ -135,6 +135,8 @@ async def _run():
135135
except Exception as e:
136136
logger.error(f"❌ Error running agent: {e}", exc_info=True)
137137
raise
138+
finally:
139+
await launcher.stop()
138140

139141
asyncio_logger_level = asyncio_logger.level
140142

tests/test_agents/__init__.py

Whitespace-only changes.
Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
import asyncio
2+
from typing import Any
3+
from unittest.mock import MagicMock, patch
4+
5+
import pytest
6+
from vision_agents.core import Agent, AgentLauncher, User
7+
from vision_agents.core.events import EventManager
8+
from vision_agents.core.llm import LLM
9+
from vision_agents.core.llm.llm import LLMResponseEvent
10+
from vision_agents.core.tts import TTS
11+
from vision_agents.core.warmup import Warmable
12+
13+
14+
class DummyTTS(TTS):
15+
async def stream_audio(self, *_, **__):
16+
return b""
17+
18+
async def stop_audio(self) -> None: ...
19+
20+
21+
class DummyLLM(LLM, Warmable[bool]):
22+
def __init__(self):
23+
super(DummyLLM, self).__init__()
24+
self.warmed_up = False
25+
26+
async def simple_response(self, *_, **__) -> LLMResponseEvent[Any]:
27+
return LLMResponseEvent(text="Simple response", original=None)
28+
29+
async def on_warmup(self) -> bool:
30+
return True
31+
32+
async def on_warmed_up(self, *_) -> None:
33+
self.warmed_up = True
34+
35+
36+
@pytest.fixture()
37+
async def stream_edge_mock() -> MagicMock:
38+
mock = MagicMock()
39+
mock.events.return_value = EventManager()
40+
return mock
41+
42+
43+
class TestAgentLauncher:
44+
async def test_warmup(self, stream_edge_mock):
45+
llm = DummyLLM()
46+
tts = DummyTTS()
47+
48+
async def create_agent(**kwargs) -> Agent:
49+
return Agent(
50+
llm=llm,
51+
tts=tts,
52+
edge=stream_edge_mock,
53+
agent_user=User(name="test"),
54+
)
55+
56+
launcher = AgentLauncher(create_agent=create_agent)
57+
await launcher.warmup()
58+
assert llm.warmed_up
59+
60+
async def test_launch(self, stream_edge_mock):
61+
llm = DummyLLM()
62+
tts = DummyTTS()
63+
64+
async def create_agent(**kwargs) -> Agent:
65+
return Agent(
66+
llm=llm,
67+
tts=tts,
68+
edge=stream_edge_mock,
69+
agent_user=User(name="test"),
70+
)
71+
72+
launcher = AgentLauncher(create_agent=create_agent)
73+
agent = await launcher.launch()
74+
assert agent
75+
76+
async def test_idle_agents_stopped(self, stream_edge_mock):
77+
llm = DummyLLM()
78+
tts = DummyTTS()
79+
80+
async def create_agent(**kwargs) -> Agent:
81+
return Agent(
82+
llm=llm,
83+
tts=tts,
84+
edge=stream_edge_mock,
85+
agent_user=User(name="test"),
86+
)
87+
88+
launcher = AgentLauncher(
89+
create_agent=create_agent,
90+
agent_idle_timeout=1.0,
91+
agent_idle_cleanup_interval=0.5,
92+
)
93+
with patch.object(Agent, "idle_for", return_value=10):
94+
# Start the launcher internals
95+
async with launcher:
96+
# Launch a couple of idle agents
97+
agent1 = await launcher.launch()
98+
agent2 = await launcher.launch()
99+
# Sleep 2s to let the launcher clean up the agents
100+
await asyncio.sleep(2)
101+
102+
# The agents must be closed
103+
assert agent1.closed
104+
assert agent2.closed
105+
106+
async def test_idle_agents_alive_with_idle_timeout_zero(self, stream_edge_mock):
107+
llm = DummyLLM()
108+
tts = DummyTTS()
109+
110+
async def create_agent(**kwargs) -> Agent:
111+
return Agent(
112+
llm=llm,
113+
tts=tts,
114+
edge=stream_edge_mock,
115+
agent_user=User(name="test"),
116+
)
117+
118+
launcher = AgentLauncher(
119+
create_agent=create_agent,
120+
agent_idle_timeout=0,
121+
)
122+
with patch.object(Agent, "idle_for", return_value=10):
123+
# Start the launcher internals
124+
async with launcher:
125+
# Launch a couple of idle agents
126+
agent1 = await launcher.launch()
127+
agent2 = await launcher.launch()
128+
# Sleep 2s to let the launcher clean up the agents
129+
await asyncio.sleep(2)
130+
131+
# The agents must not be closed because agent_idle_timeout=0
132+
assert not agent1.closed
133+
assert not agent2.closed
134+
135+
async def test_active_agents_alive(self, stream_edge_mock):
136+
llm = DummyLLM()
137+
tts = DummyTTS()
138+
139+
async def create_agent(**kwargs) -> Agent:
140+
return Agent(
141+
llm=llm,
142+
tts=tts,
143+
edge=stream_edge_mock,
144+
agent_user=User(name="test"),
145+
)
146+
147+
launcher = AgentLauncher(
148+
create_agent=create_agent,
149+
agent_idle_timeout=1.0,
150+
agent_idle_cleanup_interval=0.5,
151+
)
152+
with patch.object(Agent, "idle_for", return_value=0):
153+
# Start the launcher internals
154+
async with launcher:
155+
# Launch a couple of active agents (idle_for=0)
156+
agent1 = await launcher.launch()
157+
agent2 = await launcher.launch()
158+
# Sleep 2s to let the launcher clean up the agents
159+
await asyncio.sleep(2)
160+
161+
# The agents must be closed
162+
assert not agent1.closed
163+
assert not agent2.closed

0 commit comments

Comments
 (0)