Skip to content

Commit dfd68db

Browse files
authored
Add limits to AgentLauncher (#302)
* AgentLauncher: implement max_sessions_per_call and max_concurrent_sessions limits * AgentLauncher: update tests * Fix ruff * Fix cancellation handling in `run` mode * Add docstrings * Fix tests * Add limits to Runner API * Fix possible limit overflow in `AgentLauncher.start_session` * Fix idleness check in `AgentLauncher`
1 parent 62c783a commit dfd68db

7 files changed

Lines changed: 724 additions & 120 deletions

File tree

agents-core/vision_agents/core/agents/agent_launcher.py

Lines changed: 202 additions & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
import asyncio
22
import logging
3-
import weakref
43
from dataclasses import dataclass
54
from datetime import datetime, timezone
5+
from functools import partial
66
from typing import (
77
TYPE_CHECKING,
88
Any,
@@ -14,6 +14,8 @@
1414
from vision_agents.core.utils.utils import await_or_run, cancel_and_wait
1515
from vision_agents.core.warmup import Warmable, WarmupCache
1616

17+
from .exceptions import MaxConcurrentSessionsExceeded, MaxSessionsPerCallExceeded
18+
1719
if TYPE_CHECKING:
1820
from .agents import Agent
1921

@@ -22,6 +24,14 @@
2224

2325
@dataclass
2426
class AgentSession:
27+
"""
28+
Represents an active agent session within a call.
29+
30+
An AgentSession wraps an Agent instance along with metadata about the session,
31+
including when it started, which call it belongs to, and the async task running
32+
the agent's call handler.
33+
"""
34+
2535
agent: "Agent"
2636
call_id: str
2737
started_at: datetime
@@ -30,10 +40,12 @@ class AgentSession:
3040

3141
@property
3242
def finished(self) -> bool:
43+
"""Return True if the session task has completed."""
3344
return self.task.done()
3445

3546
@property
3647
def id(self) -> str:
48+
"""Return the session ID (same as the agent ID)."""
3749
return self.agent.id
3850

3951
async def wait(self):
@@ -42,6 +54,25 @@ async def wait(self):
4254
"""
4355
return await self.task
4456

57+
def on_call_for(self) -> float:
58+
"""
59+
Return the number of seconds for how long the agent has been on the call.
60+
Returns 0.0 if the agent has not joined a call yet.
61+
62+
Returns:
63+
Duration in seconds since the agent joined the call, or 0.0 if not on a call.
64+
"""
65+
return self.agent.on_call_for()
66+
67+
def idle_for(self) -> float:
68+
"""
69+
Return the idle time for this session if there are no other participants except the agent.
70+
71+
Returns:
72+
Idle time in seconds, or 0.0 if the session is active.
73+
"""
74+
return self.agent.idle_for()
75+
4576

4677
# TODO: Rename to `AgentManager`.
4778
class AgentLauncher:
@@ -57,48 +88,87 @@ def __init__(
5788
create_agent: Callable[..., "Agent" | Coroutine[Any, Any, "Agent"]],
5889
join_call: Callable[["Agent", str, str], Coroutine],
5990
agent_idle_timeout: float = 60.0,
60-
agent_idle_cleanup_interval: float = 5.0,
91+
max_concurrent_sessions: Optional[int] = None,
92+
max_sessions_per_call: Optional[int] = None,
93+
max_session_duration_seconds: Optional[float] = None,
94+
cleanup_interval: float = 5.0,
6195
):
6296
"""
6397
Initialize the agent launcher.
6498
6599
Args:
66-
create_agent: A function that creates and returns an Agent instance
67-
join_call: Optional function that handles joining a call with the agent
68-
agent_idle_timeout: Optional timeout in seconds for agent to stay alone on the call. Default - `60.0`.
69-
`0` means idle agents won't leave the call until it's ended.
70-
100+
create_agent: A function that creates and returns an Agent instance.
101+
join_call: A coroutine function that handles joining a call with the agent.
102+
agent_idle_timeout: Timeout in seconds for an agent to stay alone on a call
103+
before being automatically closed. Default is 60.0 seconds.
104+
Set to 0 to disable idle timeout (agents won't leave until the call ends).
105+
max_concurrent_sessions: Maximum number of concurrent sessions allowed across
106+
all calls. Default is None (unlimited).
107+
max_sessions_per_call: Maximum number of sessions allowed per call_id.
108+
Default is None (unlimited).
109+
max_session_duration_seconds: Maximum duration in seconds for a session
110+
before it is automatically closed. Default is None (unlimited).
111+
cleanup_interval: Interval in seconds between cleanup checks for idle
112+
or expired sessions. Default is 5.0 seconds.
71113
"""
72114
self._create_agent = create_agent
73115
self._join_call = join_call
74116
self._warmup_lock = asyncio.Lock()
75117
self._warmup_cache = WarmupCache()
118+
self._start_lock = asyncio.Lock()
119+
120+
if max_concurrent_sessions is not None and max_concurrent_sessions <= 0:
121+
raise ValueError("max_concurrent_sessions must be > 0 or None")
122+
self._max_concurrent_sessions = max_concurrent_sessions
123+
if max_sessions_per_call is not None and max_sessions_per_call <= 0:
124+
raise ValueError("max_sessions_per_call must be > 0 or None")
125+
self._max_sessions_per_call = max_sessions_per_call
126+
if (
127+
max_session_duration_seconds is not None
128+
and max_session_duration_seconds <= 0
129+
):
130+
raise ValueError("max_session_duration_seconds must be > 0 or None")
131+
self._max_session_duration_seconds = max_session_duration_seconds
76132

77133
if agent_idle_timeout < 0:
78134
raise ValueError("agent_idle_timeout must be >= 0")
79135
self._agent_idle_timeout = agent_idle_timeout
80136

81-
if agent_idle_cleanup_interval <= 0:
82-
raise ValueError("agent_idle_cleanup_interval must be > 0")
83-
self._agent_idle_cleanup_interval = agent_idle_cleanup_interval
84-
85-
self._active_agents: weakref.WeakSet[Agent] = weakref.WeakSet()
137+
if cleanup_interval <= 0:
138+
raise ValueError("cleanup_interval must be > 0")
139+
self._cleanup_interval: float = cleanup_interval
86140

87141
self._running = False
88142
self._cleanup_task: Optional[asyncio.Task] = None
89143
self._warmed_up: bool = False
90144
self._sessions: dict[str, AgentSession] = {}
145+
self._calls: dict[str, set[str]] = {}
91146

92-
async def start(self):
147+
async def start(self) -> None:
148+
"""
149+
Start the agent launcher.
150+
151+
This method warms up the agent components and starts the background
152+
cleanup task for managing idle and expired sessions.
153+
154+
Raises:
155+
RuntimeError: If the launcher is already running.
156+
"""
93157
if self._running:
94158
raise RuntimeError("AgentLauncher is already running")
95159
logger.debug("Starting AgentLauncher")
96160
self._running = True
97161
await self.warmup()
98-
self._cleanup_task = asyncio.create_task(self._cleanup_idle_agents())
162+
self._cleanup_task = asyncio.create_task(self._cleanup_idle_sessions())
99163
logger.debug("AgentLauncher started")
100164

101-
async def stop(self):
165+
async def stop(self) -> None:
166+
"""
167+
Stop the agent launcher and close all active sessions.
168+
169+
This method cancels the cleanup task, then cancels and waits for
170+
all active session tasks to complete.
171+
"""
102172
logger.debug("Stopping AgentLauncher")
103173
self._running = False
104174
if self._cleanup_task:
@@ -136,14 +206,17 @@ async def warmup(self) -> None:
136206

137207
@property
138208
def warmed_up(self) -> bool:
209+
"""Return True if the agent components have been warmed up."""
139210
return self._warmed_up
140211

141212
@property
142213
def running(self) -> bool:
214+
"""Return True if the launcher is currently running."""
143215
return self._running
144216

145217
@property
146218
def ready(self) -> bool:
219+
"""Return True if the launcher is warmed up and running."""
147220
return self.warmed_up and self.running
148221

149222
async def launch(self, **kwargs) -> "Agent":
@@ -158,7 +231,6 @@ async def launch(self, **kwargs) -> "Agent":
158231
"""
159232
agent: "Agent" = await await_or_run(self._create_agent, **kwargs)
160233
await self._warmup_agent(agent)
161-
self._active_agents.add(agent)
162234
return agent
163235

164236
async def start_session(
@@ -168,28 +240,75 @@ async def start_session(
168240
created_by: Optional[Any] = None,
169241
video_track_override_path: Optional[str] = None,
170242
) -> AgentSession:
171-
agent: "Agent" = await self.launch()
172-
if video_track_override_path:
173-
agent.set_video_track_override_path(video_track_override_path)
243+
"""
244+
Start a new agent session for a call.
174245
175-
task = asyncio.create_task(
176-
self._join_call(agent, call_type, call_id), name=f"agent-{agent.id}"
177-
)
246+
Creates a new agent, joins the specified call, and returns an AgentSession
247+
object to track the session.
178248
179-
# Remove the session when the task is done
180-
def _done_cb(_, agent_id_=agent.id):
181-
self._sessions.pop(agent_id_, None)
182-
183-
task.add_done_callback(_done_cb)
184-
session = AgentSession(
185-
agent=agent,
186-
task=task,
187-
started_at=datetime.now(timezone.utc),
188-
call_id=call_id,
189-
created_by=created_by,
190-
)
191-
self._sessions[agent.id] = session
192-
logger.info(f"Start agent session with id {session.id}")
249+
Args:
250+
call_id: Unique identifier for the call to join.
251+
call_type: Type of call. Default is "default".
252+
created_by: Optional metadata about who/what created this session.
253+
video_track_override_path: Optional path to a video file to use
254+
instead of a live video track.
255+
256+
Returns:
257+
An AgentSession object representing the new session.
258+
259+
Raises:
260+
MaxConcurrentSessionsExceeded: If the maximum number of concurrent
261+
sessions has been reached.
262+
MaxSessionsPerCallExceeded: If the maximum number of sessions for
263+
this call_id has been reached.
264+
"""
265+
async with self._start_lock:
266+
if (
267+
self._max_concurrent_sessions
268+
and len(self._sessions) >= self._max_concurrent_sessions
269+
):
270+
raise MaxConcurrentSessionsExceeded(
271+
f"Reached maximum concurrent sessions of {self._max_concurrent_sessions}"
272+
)
273+
274+
call_sessions_total = len(self._calls.get(call_id, set()))
275+
if (
276+
self._max_sessions_per_call
277+
and call_sessions_total >= self._max_sessions_per_call
278+
):
279+
raise MaxSessionsPerCallExceeded(
280+
f"Reached maximum sessions per call of {self._max_sessions_per_call}"
281+
)
282+
283+
agent: "Agent" = await self.launch()
284+
if video_track_override_path:
285+
agent.set_video_track_override_path(video_track_override_path)
286+
287+
task = asyncio.create_task(
288+
self._join_call(agent, call_type, call_id), name=f"agent-{agent.id}"
289+
)
290+
291+
# Remove the session when the task is done
292+
# or when the AgentSession is garbage-collected
293+
# in case the done callback wasn't fired
294+
def _finalizer(session_id_: str, call_id_: str, *_):
295+
session_ = self._sessions.pop(session_id_, None)
296+
if session_ is not None:
297+
call_sessions = self._calls.get(call_id_, set())
298+
if call_sessions:
299+
call_sessions.discard(session_id_)
300+
301+
task.add_done_callback(partial(_finalizer, agent.id, call_id))
302+
session = AgentSession(
303+
agent=agent,
304+
task=task,
305+
started_at=datetime.now(timezone.utc),
306+
call_id=call_id,
307+
created_by=created_by,
308+
)
309+
self._sessions[agent.id] = session
310+
self._calls.setdefault(call_id, set()).add(agent.id)
311+
logger.info(f"Started agent session with id {session.id}")
193312
return session
194313

195314
async def close_session(self, session_id: str, wait: bool = False) -> bool:
@@ -209,6 +328,9 @@ async def close_session(self, session_id: str, wait: bool = False) -> bool:
209328
if session is None:
210329
# The session is either closed or doesn't exist, exit early
211330
return False
331+
call_sessions = self._calls.get(session.call_id)
332+
if call_sessions:
333+
call_sessions.discard(session.id)
212334

213335
logger.info(f"Closing agent session with id {session.id}")
214336
if wait:
@@ -218,6 +340,15 @@ async def close_session(self, session_id: str, wait: bool = False) -> bool:
218340
return True
219341

220342
def get_session(self, session_id: str) -> Optional[AgentSession]:
343+
"""
344+
Get a session by its ID.
345+
346+
Args:
347+
session_id: The session ID to look up.
348+
349+
Returns:
350+
The AgentSession if found, None otherwise.
351+
"""
221352
return self._sessions.get(session_id)
222353

223354
async def _warmup_agent(self, agent: "Agent") -> None:
@@ -259,39 +390,58 @@ async def _warmup_agent(self, agent: "Agent") -> None:
259390
if warmup_tasks:
260391
await asyncio.gather(*warmup_tasks)
261392

262-
async def _cleanup_idle_agents(self) -> None:
263-
if not self._agent_idle_timeout:
393+
async def _cleanup_idle_sessions(self) -> None:
394+
if not self._agent_idle_timeout and not self._max_session_duration_seconds:
264395
return
396+
max_session_duration_seconds = self._max_session_duration_seconds or float(
397+
"inf"
398+
)
265399

266400
while self._running:
267401
# Collect idle agents first to close them all at once
268-
idle_agents = []
269-
for agent in self._active_agents:
270-
agent_idle_for = agent.idle_for()
271-
if agent_idle_for >= self._agent_idle_timeout:
402+
to_close = []
403+
for session in self._sessions.values():
404+
agent = session.agent
405+
on_call_for = agent.on_call_for()
406+
idle_for = agent.idle_for()
407+
if 0 < self._agent_idle_timeout <= idle_for:
408+
logger.info(
409+
f'Closing session "{session.id}" with '
410+
f'user_id "{agent.agent_user.id}" after being '
411+
f"idle for {round(idle_for, 2)}s "
412+
f"(idle timeout is {self._agent_idle_timeout}s)"
413+
)
414+
to_close.append(agent)
415+
elif on_call_for >= max_session_duration_seconds:
272416
logger.info(
273-
f'Agent with user_id "{agent.agent_user.id}" is idle for {round(agent_idle_for, 2)}s, '
274-
f"closing it after {self._agent_idle_timeout}s timeout"
417+
f'Closing session "{session.id}" with user_id "{agent.agent_user.id}" '
418+
f"after reaching the maximum session "
419+
f"duration of {max_session_duration_seconds}s"
275420
)
276-
idle_agents.append(agent)
421+
to_close.append(agent)
277422

278-
if idle_agents:
279-
coros = [asyncio.shield(a.close()) for a in idle_agents]
423+
if to_close:
424+
coros = [
425+
asyncio.shield(self.close_session(s.id, wait=False))
426+
for s in to_close
427+
]
280428
result = await asyncio.shield(
281429
asyncio.gather(*coros, return_exceptions=True)
282430
)
283-
for agent, r in zip(idle_agents, result):
431+
for agent, r in zip(to_close, result):
284432
if isinstance(r, Exception):
285433
logger.error(
286-
f"Failed to close idle agent with user_id {agent.agent_user.id}",
434+
f"Failed to close agent with user_id {agent.agent_user.id}",
287435
exc_info=r,
288436
)
289437

290-
await asyncio.sleep(self._agent_idle_cleanup_interval)
438+
await asyncio.sleep(self._cleanup_interval)
291439

292-
async def __aenter__(self):
440+
async def __aenter__(self) -> "AgentLauncher":
441+
"""Enter the async context manager, starting the launcher."""
293442
await self.start()
294443
return self
295444

296-
async def __aexit__(self, exc_type, exc_val, exc_tb):
445+
async def __aexit__(self, exc_type, exc_val, exc_tb) -> None:
446+
"""Exit the async context manager, stopping the launcher."""
297447
await self.stop()

0 commit comments

Comments
 (0)