Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions agents-core/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ dependencies = [
"numpy>=1.24.0",
"mcp>=1.16.0",
"colorlog>=6.10.1",
"fastapi>=0.128.0",
"uvicorn>=0.38.0",
]

[project.urls]
Expand Down
17 changes: 12 additions & 5 deletions agents-core/vision_agents/core/__init__.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
from vision_agents.core.edge.types import User

from vision_agents.core.agents import Agent

from vision_agents.core.agents.agent_launcher import AgentLauncher, AgentSession
from vision_agents.core.cli.cli_runner import cli
from vision_agents.core.agents.agent_launcher import AgentLauncher
from vision_agents.core.edge.types import User
from vision_agents.core.runner import Runner, ServeOptions

__all__ = ["Agent", "User", "cli", "AgentLauncher"]
__all__ = [
"Agent",
"User",
"cli",
"AgentLauncher",
"AgentSession",
"Runner",
"ServeOptions",
]
128 changes: 121 additions & 7 deletions agents-core/vision_agents/core/agents/agent_launcher.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,18 @@
import asyncio
import logging
import weakref
from typing import TYPE_CHECKING, Awaitable, Callable, Optional
from asyncio import Task
from dataclasses import dataclass, field
from datetime import datetime, timezone
from typing import (
TYPE_CHECKING,
Any,
AsyncIterator,
Callable,
Coroutine,
Optional,
cast,
)

from vision_agents.core.utils.utils import await_or_run, cancel_and_wait
from vision_agents.core.warmup import Warmable, WarmupCache
Expand All @@ -12,6 +23,31 @@
logger = logging.getLogger(__name__)


@dataclass
class AgentSession:
agent: "Agent"
call_id: str
started_at: datetime
task: asyncio.Task
config: dict = field(default_factory=dict)
created_by: Optional[Any] = None

@property
def finished(self) -> bool:
return self.task.done()

@property
def id(self) -> str:
return self.agent.id

async def wait(self):
"""
Wait for the session task to finish running.
"""
return await self.task


# TODO: Rename to `AgentManager`.
class AgentLauncher:
"""
Agent launcher that handles warmup and lifecycle management.
Expand All @@ -22,8 +58,8 @@ class AgentLauncher:

def __init__(
self,
create_agent: Callable[..., "Agent" | Awaitable["Agent"]],
join_call: Callable[..., None | Awaitable[None]] | None = None,
create_agent: Callable[..., "Agent" | Coroutine[Any, Any, "Agent"]],
join_call: Callable[["Agent", str, str], Coroutine],
agent_idle_timeout: float = 60.0,
agent_idle_cleanup_interval: float = 5.0,
):
Expand All @@ -37,8 +73,8 @@ def __init__(
`0` means idle agents won't leave the call until it's ended.

"""
self.create_agent = create_agent
self.join_call = join_call
self._create_agent = create_agent
self._join_call = join_call
self._warmup_lock = asyncio.Lock()
self._warmup_cache = WarmupCache()

Expand All @@ -55,6 +91,7 @@ def __init__(
self._running = False
self._cleanup_task: Optional[asyncio.Task] = None
self._warmed_up: bool = False
self._sessions: dict[str, AgentSession] = {}

async def start(self):
if self._running:
Expand All @@ -70,6 +107,12 @@ async def stop(self):
self._running = False
if self._cleanup_task:
await cancel_and_wait(self._cleanup_task)

coros = [cancel_and_wait(s.task) for s in self._sessions.values()]
async for result in cast(AsyncIterator[Task], asyncio.as_completed(coros)):
if result.done() and not result.cancelled() and result.exception():
logger.error(f"Failed to cancel the agent task: {result.exception()}")

logger.debug("AgentLauncher stopped")

async def warmup(self) -> None:
Expand All @@ -86,13 +129,25 @@ async def warmup(self) -> None:
logger.info("Creating agent...")

# Create a dry-run Agent instance and warmup its components for the first time.
agent: "Agent" = await await_or_run(self.create_agent)
agent: "Agent" = await await_or_run(self._create_agent)
logger.info("Warming up agent components...")
await self._warmup_agent(agent)
self._warmed_up = True

logger.info("Agent warmup completed")

@property
def warmed_up(self) -> bool:
return self._warmed_up

@property
def running(self) -> bool:
return self._running

@property
def ready(self) -> bool:
return self.warmed_up and self.running

async def launch(self, **kwargs) -> "Agent":
"""
Launch the agent.
Expand All @@ -103,11 +158,70 @@ async def launch(self, **kwargs) -> "Agent":
Returns:
The Agent instance
"""
agent: "Agent" = await await_or_run(self.create_agent, **kwargs)
agent: "Agent" = await await_or_run(self._create_agent, **kwargs)
await self._warmup_agent(agent)
self._active_agents.add(agent)
return agent

async def start_session(
self,
call_id: str,
call_type: str = "default",
created_by: Optional[Any] = None,
video_track_override_path: Optional[str] = None,
) -> AgentSession:
agent: "Agent" = await self.launch()
if video_track_override_path:
agent.set_video_track_override_path(video_track_override_path)

task = asyncio.create_task(
self._join_call(agent, call_type, call_id), name=f"agent-{agent.id}"
)

# Remove the session when the task is done
def _done_cb(_, agent_id_=agent.id):
self._sessions.pop(agent_id_, None)

task.add_done_callback(_done_cb)
session = AgentSession(
agent=agent,
task=task,
started_at=datetime.now(timezone.utc),
call_id=call_id,
created_by=created_by,
)
self._sessions[agent.id] = session
logger.info(f"Start agent session with id {session.id}")
return session

async def close_session(self, session_id: str, wait: bool = False) -> bool:
"""
Close session with id `session_id`.
Returns `True` if session was found and closed, `False` otherwise.

Args:
session_id: session id
wait: when True, wait for the underlying agent to finish.
Otherwise, just cancel the task and return.

Returns:
`True` if session was found and closed, `False` otherwise.
"""
session = self._sessions.pop(session_id, None)
if session is None:
# The session is either closed or doesn't exist, exit early
return False

logger.info(f"Closing agent session with id {session.id}")
if wait:
await cancel_and_wait(session.task)
else:
session.task.cancel()
return True

def get_session(self, session_id: str) -> Optional[AgentSession]:
return self._sessions.get(session_id)

async def _warmup_agent(self, agent: "Agent") -> None:
"""
Go over the Agent's dependencies and trigger `.warmup()` on them.
Expand Down
7 changes: 7 additions & 0 deletions agents-core/vision_agents/core/agents/agents.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
from ..llm.llm import LLM, AudioLLM, VideoLLM
from ..llm.realtime import Realtime
from ..mcp import MCPBaseServer, MCPManager
from ..observability import MetricsCollector
from ..observability.agent import AgentMetrics
from ..processors.base_processor import (
AudioProcessor,
Expand Down Expand Up @@ -148,6 +149,7 @@ def __init__(
if not self.agent_user.id:
self.agent_user.id = f"agent-{uuid4()}"

self._id = str(uuid4())
self._pending_turn: Optional[LLMTurn] = None
self.call: Optional[Call] = None

Expand Down Expand Up @@ -254,6 +256,11 @@ def __init__(
self._close_lock = asyncio.Lock()
self._closed = False
self._metrics = AgentMetrics()
self._collector = MetricsCollector(self)

@property
def id(self) -> str:
return self._id

async def _finish_llm_turn(self):
if self._pending_turn is None or self._pending_turn.response is None:
Expand Down
14 changes: 5 additions & 9 deletions agents-core/vision_agents/core/cli/cli_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,15 +121,11 @@ async def _run():
await agent.edge.open_demo_for_agent(agent, call_type, call_id)

# Join call if join_call function is provided
if launcher.join_call:
logger.info(f"📞 Joining call: {call_type}/{call_id}")
result = launcher.join_call(agent, call_type, call_id)
if asyncio.iscoroutine(result):
await result
else:
logger.warning(
'⚠️ No "join_call" function provided; the agent is created but will not join the call'
)
logger.info(f"📞 Joining call: {call_type}/{call_id}")
session = await launcher.start_session(
call_id, call_type, video_track_override_path=video_track_override
)
await session.wait()
except KeyboardInterrupt:
logger.info("🛑 Received interrupt signal, shutting down gracefully...")
except Exception as e:
Expand Down
24 changes: 24 additions & 0 deletions agents-core/vision_agents/core/observability/agent.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import abc
import dataclasses
from dataclasses import dataclass, field
from typing import Iterable


class _Metric(abc.ABC):
Expand Down Expand Up @@ -143,3 +145,25 @@ class AgentMetrics:
video_processing_latency_ms__avg: Average = field(
default_factory=lambda: Average("Average video frame processing latency")
)

def to_dict(self, fields: Iterable[str] = ()) -> dict[str, int | float | None]:
"""
Convert metrics into a dictionary {<metric>: <value>}.

Args:
fields: optional list of fields to extract. If empty, extract all fields.

Returns:
a dictionary {<metric>: <value>}

"""
all_fields = dataclasses.asdict(self)
result = {}
fields = fields or list(all_fields.keys())

for field_name in fields:
field_ = all_fields.get(field_name)
if field_ is None:
raise ValueError(f"Unknown field: {field_name}")
result[field_name] = field_.value()
return result
2 changes: 2 additions & 0 deletions agents-core/vision_agents/core/runner/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .http.options import ServeOptions as ServeOptions
from .runner import Runner as Runner
Empty file.
Loading
Loading