Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion DEVELOPMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -469,7 +469,7 @@ async def start_agent() -> None:
)

call = agent.edge.client.video.call("default", str(uuid4()))
with await agent.join(call):
async with agent.join(call):
await agent.simple_response("Hello!")
await agent.finish()
```
Expand Down
4 changes: 4 additions & 0 deletions agents-core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,8 @@ qwen = ["vision-agents-plugins-qwen"]
fish = ["vision-agents-plugins-fish"]
fast-whisper = ["vision-agents-plugins-fast-whisper"]
decart = ["vision-agents-plugins-decart"]
twilio = ["vision-agents-plugins-twilio"]
turbopuffer = ["vision-agents-plugins-turbopuffer"]

all-plugins = [
"vision-agents-plugins-anthropic",
Expand Down Expand Up @@ -88,6 +90,8 @@ all-plugins = [
"vision-agents-plugins-vogent",
"vision-agents-plugins-fast-whisper",
"vision-agents-plugins-qwen",
"vision-agents-plugins-twilio",
"vision-agents-plugins-turbopuffer",
]

[tool.hatch.metadata]
Expand Down
2 changes: 2 additions & 0 deletions agents-core/vision_agents/core/agents/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,11 @@
from .agents import Agent as Agent
from .conversation import Conversation as Conversation
from .agent_launcher import AgentLauncher as AgentLauncher
from .agent_types import AgentOptions as AgentOptions

__all__ = [
"Agent",
"Conversation",
"AgentLauncher",
"AgentOptions",
]
91 changes: 84 additions & 7 deletions agents-core/vision_agents/core/agents/agent_launcher.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import asyncio
import logging
from typing import TYPE_CHECKING, Awaitable, Callable
import weakref
from typing import TYPE_CHECKING, Awaitable, Callable, Optional

from vision_agents.core.utils.utils import await_or_run
from vision_agents.core.warmup import WarmupCache, Warmable
from vision_agents.core.utils.utils import await_or_run, cancel_and_wait
from vision_agents.core.warmup import Warmable, WarmupCache

if TYPE_CHECKING:
from .agents import Agent
Expand All @@ -23,33 +24,72 @@ def __init__(
self,
create_agent: Callable[..., "Agent" | Awaitable["Agent"]],
join_call: Callable[..., None | Awaitable[None]] | None = None,
agent_idle_timeout: float = 60.0,
agent_idle_cleanup_interval: float = 5.0,
):
"""
Initialize the agent launcher.

Args:
create_agent: A function that creates and returns an Agent instance
join_call: Optional function that handles joining a call with the agent
agent_idle_timeout: Optional timeout in seconds for agent to stay alone on the call. Default - `60.0`.
`0` means idle agents won't leave the call until it's ended.

"""
self.create_agent = create_agent
self.join_call = join_call
self._warmup_lock = asyncio.Lock()
self._warmup_cache = WarmupCache()

if agent_idle_timeout < 0:
raise ValueError("agent_idle_timeout must be >= 0")
self._agent_idle_timeout = agent_idle_timeout

if agent_idle_cleanup_interval <= 0:
raise ValueError("agent_idle_cleanup_interval must be > 0")
self._agent_idle_cleanup_interval = agent_idle_cleanup_interval

self._active_agents: weakref.WeakSet[Agent] = weakref.WeakSet()

self._running = False
self._cleanup_task: Optional[asyncio.Task] = None
self._warmed_up: bool = False

async def start(self):
if self._running:
raise RuntimeError("AgentLauncher is already running")
logger.debug("Starting AgentLauncher")
self._running = True
await self.warmup()
self._cleanup_task = asyncio.create_task(self._cleanup_idle_agents())
logger.debug("AgentLauncher started")

async def stop(self):
logger.debug("Stopping AgentLauncher")
self._running = False
if self._cleanup_task:
await cancel_and_wait(self._cleanup_task)
logger.debug("AgentLauncher stopped")

async def warmup(self) -> None:
"""
Warm up all agent components.

This method creates the agent and calls warmup() on LLM, TTS, STT,
and turn detection components if they exist.
"""
if self._warmed_up or self._warmup_lock.locked():
return

async with self._warmup_lock:
logger.info("Creating agent...")

# Create a dry-run Agent instance and warmup its components for the first time.
agent: "Agent" = await await_or_run(self.create_agent)
logger.info("Warming up agent components...")
await self._warmup_agent(agent)
self._warmed_up = True

logger.info("Agent warmup completed")

Expand All @@ -65,6 +105,7 @@ async def launch(self, **kwargs) -> "Agent":
"""
agent: "Agent" = await await_or_run(self.create_agent, **kwargs)
await self._warmup_agent(agent)
self._active_agents.add(agent)
return agent

async def _warmup_agent(self, agent: "Agent") -> None:
Expand Down Expand Up @@ -99,10 +140,46 @@ async def _warmup_agent(self, agent: "Agent") -> None:
warmup_tasks.append(agent.turn_detection.warmup(self._warmup_cache))

# Warmup processors
if agent.processors:
for processor in agent.processors:
if isinstance(processor, Warmable):
warmup_tasks.append(processor.warmup(self._warmup_cache))
for processor in agent.processors:
if isinstance(processor, Warmable):
warmup_tasks.append(processor.warmup(self._warmup_cache))

if warmup_tasks:
await asyncio.gather(*warmup_tasks)

async def _cleanup_idle_agents(self) -> None:
if not self._agent_idle_timeout:
return

while self._running:
# Collect idle agents first to close them all at once
idle_agents = []
for agent in self._active_agents:
agent_idle_for = agent.idle_for()
if agent_idle_for >= self._agent_idle_timeout:
logger.info(
f'Agent with user_id "{agent.agent_user.id}" is idle for {round(agent_idle_for, 2)}s, '
f"closing it after {self._agent_idle_timeout}s timeout"
)
idle_agents.append(agent)

if idle_agents:
coros = [asyncio.shield(a.close()) for a in idle_agents]
result = await asyncio.shield(
asyncio.gather(*coros, return_exceptions=True)
)
for agent, r in zip(idle_agents, result):
if isinstance(r, Exception):
logger.error(
f"Failed to close idle agent with user_id {agent.agent_user.id}",
exc_info=r,
)

await asyncio.sleep(self._agent_idle_cleanup_interval)

async def __aenter__(self):
await self.start()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.stop()
91 changes: 0 additions & 91 deletions agents-core/vision_agents/core/agents/agent_session.py

This file was deleted.

Loading
Loading