Skip to content

Commit efeca6a

Browse files
authored
Fix Agent warnings (#310)
1 parent b4efa9a commit efeca6a

7 files changed

Lines changed: 65 additions & 148 deletions

File tree

agents-core/vision_agents/core/agents/agent_launcher.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -198,11 +198,13 @@ async def warmup(self) -> None:
198198

199199
# Create a dry-run Agent instance and warmup its components for the first time.
200200
agent: "Agent" = await await_or_run(self._create_agent)
201-
logger.info("Warming up agent components...")
202-
await self._warmup_agent(agent)
203-
self._warmed_up = True
204-
205-
logger.info("Agent warmup completed")
201+
try:
202+
logger.info("Warming up agent components...")
203+
await self._warmup_agent(agent)
204+
self._warmed_up = True
205+
logger.info("Agent warmup completed")
206+
finally:
207+
await agent.close()
206208

207209
@property
208210
def warmed_up(self) -> bool:

agents-core/vision_agents/core/agents/agents.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -799,6 +799,7 @@ async def _close(self):
799799
self._call_ended_event = None
800800
self._joined_at = 0.0
801801
self.clear_call_logging_context()
802+
self.events.stop()
802803
self._closed = True
803804
self.logger.info("🤖 Agent stopped")
804805

agents-core/vision_agents/core/events/__init__.py

Lines changed: 9 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,3 @@
1-
from .base import (
2-
ConnectionState,
3-
AudioFormat,
4-
BaseEvent,
5-
PluginBaseEvent,
6-
PluginInitializedEvent,
7-
PluginClosedEvent,
8-
PluginErrorEvent,
9-
VideoProcessorDetectionEvent,
10-
)
11-
from .manager import EventManager
12-
131
from getstream.models import (
142
BlockedUserEvent,
153
CallAcceptedEvent,
@@ -65,6 +53,15 @@
6553
UpdatedCallPermissionsEvent,
6654
)
6755

56+
from .base import (
57+
AudioFormat,
58+
BaseEvent,
59+
ConnectionState,
60+
PluginBaseEvent,
61+
VideoProcessorDetectionEvent,
62+
)
63+
from .manager import EventManager
64+
6865
__all__ = [
6966
"BlockedUserEvent",
7067
"CallAcceptedEvent",
@@ -125,9 +122,6 @@
125122
"AudioFormat",
126123
"BaseEvent",
127124
"PluginBaseEvent",
128-
"PluginInitializedEvent",
129-
"PluginClosedEvent",
130-
"PluginErrorEvent",
131125
"VideoProcessorDetectionEvent",
132126
"EventManager",
133127
]

agents-core/vision_agents/core/events/base.py

Lines changed: 3 additions & 81 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
1-
import uuid
21
import dataclasses
2+
import uuid
33
from dataclasses import dataclass, field
44
from datetime import datetime, timezone
55
from enum import Enum
6-
from typing import Any, Dict, List, Optional
76
from types import FunctionType
8-
from dataclasses_json import DataClassJsonMixin
7+
from typing import Any, Optional
98

9+
from dataclasses_json import DataClassJsonMixin
1010
from getstream.video.rtc.pb.stream.video.sfu.models.models_pb2 import Participant
1111

1212

@@ -54,91 +54,13 @@ class PluginBaseEvent(BaseEvent):
5454
plugin_version: str | None = None
5555

5656

57-
@dataclass
58-
class PluginInitializedEvent(PluginBaseEvent):
59-
"""Event emitted when a plugin is successfully initialized."""
60-
61-
type: str = field(default="plugin.initialized", init=False)
62-
plugin_type: Optional[str] = None
63-
provider: Optional[str] = None
64-
configuration: Optional[Dict[str, Any]] = None
65-
capabilities: Optional[List[str]] = None
66-
67-
68-
@dataclass
69-
class PluginClosedEvent(PluginBaseEvent):
70-
"""Event emitted when a plugin is closed."""
71-
72-
type: str = field(default="plugin.closed", init=False)
73-
plugin_type: Optional[str] = None # "STT", "STS", "VAD"
74-
provider: Optional[str] = None
75-
reason: Optional[str] = None
76-
cleanup_successful: bool = True
77-
78-
79-
@dataclass
80-
class PluginErrorEvent(PluginBaseEvent):
81-
"""Event emitted when a generic plugin error occurs."""
82-
83-
type: str = field(default="plugin.error", init=False)
84-
plugin_type: Optional[str] = None # "STT", "TTS", "STS", "VAD"
85-
provider: Optional[str] = None
86-
error: Optional[Exception] = None
87-
error_code: Optional[str] = None
88-
context: Optional[str] = None
89-
is_fatal: bool = False
90-
91-
@property
92-
def error_message(self) -> str:
93-
return str(self.error) if self.error else "Unknown error"
94-
95-
9657
@dataclasses.dataclass
9758
class ExceptionEvent:
9859
exc: Exception
9960
handler: FunctionType
10061
type: str = "base.exception"
10162

10263

103-
@dataclasses.dataclass
104-
class HealthCheckEvent(DataClassJsonMixin):
105-
connection_id: str
106-
created_at: int
107-
custom: dict
108-
type: str = "health.check"
109-
110-
111-
@dataclass
112-
class ConnectionOkEvent(BaseEvent):
113-
"""Event emitted when WebSocket connection is established."""
114-
115-
type: str = field(default="connection.ok", init=False)
116-
connection_id: Optional[str] = None
117-
server_time: Optional[str] = None
118-
api_key: Optional[str] = None
119-
user_id: Optional[str] = None # type: ignore[assignment]
120-
121-
122-
@dataclass
123-
class ConnectionErrorEvent(BaseEvent):
124-
"""Event emitted when WebSocket connection encounters an error."""
125-
126-
type: str = field(default="connection.error", init=False)
127-
error_code: Optional[str] = None
128-
error_message: Optional[str] = None
129-
reconnect_attempt: Optional[int] = None
130-
131-
132-
@dataclass
133-
class ConnectionClosedEvent(BaseEvent):
134-
"""Event emitted when WebSocket connection is closed."""
135-
136-
type: str = field(default="connection.closed", init=False)
137-
code: Optional[int] = None
138-
reason: Optional[str] = None
139-
was_clean: bool = False
140-
141-
14264
@dataclass
14365
class VideoProcessorDetectionEvent(PluginBaseEvent):
14466
"""Base event for video processor detection results.

agents-core/vision_agents/core/events/manager.py

Lines changed: 8 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,19 +1,12 @@
11
import asyncio
2-
import uuid
32
import collections
43
import logging
54
import types
65
import typing
6+
import uuid
77
from typing import Any, Deque, Dict, Optional, Union, get_args, get_origin
88

9-
from .base import (
10-
ConnectionClosedEvent,
11-
ConnectionErrorEvent,
12-
ConnectionOkEvent,
13-
ExceptionEvent,
14-
HealthCheckEvent,
15-
)
16-
9+
from .base import ExceptionEvent
1710

1811
logger = logging.getLogger(__name__)
1912

@@ -145,10 +138,6 @@ def __init__(self, ignore_unknown_events: bool = True):
145138
self._received_event = asyncio.Event()
146139

147140
self.register(ExceptionEvent)
148-
self.register(HealthCheckEvent)
149-
self.register(ConnectionOkEvent)
150-
self.register(ConnectionErrorEvent)
151-
self.register(ConnectionClosedEvent)
152141

153142
# Start background processing task
154143
self._start_processing_task()
@@ -195,8 +184,7 @@ def register(self, event_class, ignore_not_compatible=False):
195184

196185
def merge(self, em: "EventManager"):
197186
# Stop the processing task in the merged manager
198-
if em._processing_task and not em._processing_task.done():
199-
em._processing_task.cancel()
187+
em.stop()
200188

201189
# Merge all data from the other manager
202190
self._events.update(em._events)
@@ -559,3 +547,8 @@ async def _process_single_event(self, event):
559547
loop = asyncio.get_running_loop()
560548
handler_task = loop.create_task(self._run_handler(handler, event))
561549
self._handler_tasks[uuid.uuid4()] = handler_task
550+
551+
def stop(self):
552+
if self._processing_task and not self._processing_task.done():
553+
self._processing_task.cancel()
554+
self._processing_task = None

agents-core/vision_agents/core/runner/runner.py

Lines changed: 37 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
import asyncio
22
import logging
3+
import os
34
import warnings
45
from typing import Optional
56
from uuid import uuid4
@@ -26,10 +27,6 @@
2627

2728
logger = logging.getLogger(__name__)
2829

29-
# TODO:
30-
# - Figure out how to serialize the agent config into some dict
31-
# - Docs
32-
3330
asyncio_logger = logging.getLogger("asyncio")
3431

3532

@@ -127,12 +124,15 @@ async def _run():
127124
# Start the agent launcher.
128125
await self._launcher.start()
129126

130-
# Create the agent
131-
agent = await self._launcher.launch()
132-
133127
logger.info("✅ Agent warmed up and ready")
134128

129+
# Join call if join_call function is provided
130+
logger.info(f"📞 Joining call: {call_type}/{call_id}")
131+
session = await self._launcher.start_session(
132+
call_id, call_type, video_track_override_path=video_track_override
133+
)
135134
# Open demo UI by default
135+
agent = session.agent
136136
if (
137137
not no_demo
138138
and hasattr(agent, "edge")
@@ -141,11 +141,6 @@ async def _run():
141141
logger.info("🌐 Opening demo UI...")
142142
await agent.edge.open_demo_for_agent(agent, call_type, call_id)
143143

144-
# Join call if join_call function is provided
145-
logger.info(f"📞 Joining call: {call_type}/{call_id}")
146-
session = await self._launcher.start_session(
147-
call_id, call_type, video_track_override_path=video_track_override
148-
)
149144
await session.wait()
150145
except asyncio.CancelledError:
151146
logger.info("The session is cancelled, shutting down gracefully...")
@@ -177,18 +172,17 @@ def serve(
177172
port: int = 8000,
178173
agents_log_level: str = "INFO",
179174
http_log_level: str = "INFO",
180-
):
175+
debug: bool = False,
176+
) -> None:
181177
"""
182178
Start the HTTP server that spawns agents to the calls.
183179
184180
Args:
185-
host:
186-
port:
187-
agents_log_level:
188-
http_log_level:
189-
190-
Returns:
191-
181+
host: Host address to bind the server to.
182+
port: Port number for the server.
183+
agents_log_level: Logging level for agent-related logs.
184+
http_log_level: Logging level for FastAPI and uvicorn logs.
185+
debug: Enable asyncio debug mode.
192186
"""
193187
# Configure loggers if they're not already configured
194188
configure_sdk_logger(
@@ -203,9 +197,22 @@ def serve(
203197
warnings.filterwarnings(
204198
"ignore", category=RuntimeWarning, module="dataclasses_json.core"
205199
)
200+
201+
# Enable asyncio debug via environment variable before uvicorn creates its loop
202+
if debug:
203+
os.environ.setdefault("PYTHONASYNCIODEBUG", "1")
206204
uvicorn.run(self.fast_api, host=host, port=port, log_config=None)
207205

208206
def _create_fastapi_app(self, options: ServeOptions) -> FastAPI:
207+
"""
208+
Create and configure a FastAPI application for serving agents.
209+
210+
Args:
211+
options: Configuration options for the server.
212+
213+
Returns:
214+
Configured FastAPI application instance.
215+
"""
209216
app = FastAPI(lifespan=lifespan)
210217
app.state.launcher = self._launcher
211218
app.state.options = self._serve_options
@@ -228,9 +235,9 @@ def _create_fastapi_app(self, options: ServeOptions) -> FastAPI:
228235
)
229236
return app
230237

231-
def cli(self):
238+
def cli(self) -> None:
232239
"""
233-
Run the CLI
240+
Run the command-line interface with `run` and `serve` subcommands.
234241
"""
235242

236243
@click.group()
@@ -326,11 +333,18 @@ def run_cmd(
326333
default="INFO",
327334
help="Set the logging level for FastAPI and uvicorn",
328335
)
336+
@click.option(
337+
"--debug",
338+
is_flag=True,
339+
default=False,
340+
help="Enable asyncio debug mode",
341+
)
329342
def serve_cmd(
330343
host: str,
331344
port: int,
332345
agents_log_level: str,
333346
http_log_level: str,
347+
debug: bool,
334348
) -> None:
335349
"""
336350
Start the HTTP server that spawns agents to the calls.
@@ -340,6 +354,7 @@ def serve_cmd(
340354
port=port,
341355
agents_log_level=agents_log_level.upper(),
342356
http_log_level=http_log_level.upper(),
357+
debug=debug,
343358
)
344359

345360
cli_()

agents-core/vision_agents/core/tts/tts.py

Lines changed: 0 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,6 @@
77
import av
88
from vision_agents.core.events import (
99
AudioFormat,
10-
PluginClosedEvent,
1110
)
1211
from vision_agents.core.events.manager import EventManager
1312

@@ -282,12 +281,3 @@ async def send(
282281

283282
async def close(self):
284283
"""Close the TTS service and release any resources."""
285-
self.events.send(
286-
PluginClosedEvent(
287-
session_id=self.session_id,
288-
plugin_name=self.provider_name,
289-
plugin_type="TTS",
290-
provider=self.provider_name,
291-
cleanup_successful=True,
292-
)
293-
)

0 commit comments

Comments
 (0)