Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 57 additions & 2 deletions livekit-agents/livekit/agents/cli/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from ..plugin import Plugin
from ..utils import aio, shortuuid
from ..voice import AgentSession, io
from ..voice.run_result import RunEvent
from ..voice.run_result import RunEvent, RunResult
from ..voice.transcription import TranscriptSynchronizer
from ..worker import AgentServer, WorkerOptions
from . import proto
Expand Down Expand Up @@ -291,6 +291,8 @@ def __init__(self) -> None:
self._lock = threading.Lock()
self._io_acquired = False
self._io_acquired_event = threading.Event()
self._io_initial_run_fut: asyncio.Future[RunResult] | None = None
self._exit_flag = threading.Event()

self._enabled = False
self._record = False
Expand Down Expand Up @@ -322,6 +324,9 @@ def acquire_io(self, *, loop: asyncio.AbstractEventLoop, session: AgentSession)
next_in_chain_audio=self._io_audio_output,
next_in_chain_text=None,
)
# create a future for the initial run result in text mode
if self._console_mode == "text":
self._io_initial_run_fut = loop.create_future()
self._io_acquired_event.set()
self._io_session = session

Expand Down Expand Up @@ -383,6 +388,17 @@ def io_context(self) -> contextvars.Context:
def wait_for_io_acquisition(self) -> None:
self._io_acquired_event.wait()

def _set_initial_run(self, run: RunResult) -> None:
if self._io_initial_run_fut is not None and not self._io_initial_run_fut.done():
self._io_loop.call_soon_threadsafe(self._io_initial_run_fut.set_result, run)

def signal_exit(self) -> None:
self._exit_flag.set()

@property
def should_exit(self) -> bool:
return self._exit_flag.is_set()

@property
def input_name(self) -> str | None:
return self._input_name
Expand Down Expand Up @@ -1070,7 +1086,43 @@ def _key_read(ch: str) -> None:
if ch == key.CTRL_T:
raise _ToggleMode()

initial_run_fut = c._io_initial_run_fut
if initial_run_fut is not None:

async def _await_initial_run() -> list[RunEvent]:
run_result = await initial_run_fut
await run_result
return run_result.events.copy()

cf = asyncio.run_coroutine_threadsafe(_await_initial_run(), c.io_loop)
try:
with live_status(c.console, Text.from_markup(" [dim]Initializing...[/dim]")):
while True:
if c.should_exit:
cf.cancel()
with contextlib.suppress(Exception):
cf.result()
raise _ExitCli()
try:
events = cf.result(timeout=0.1)
c._io_initial_run_fut = None
for event in events:
_print_run_event(c, event)
break
except TimeoutError:
continue
except KeyboardInterrupt:
cf.cancel()
with contextlib.suppress(Exception):
cf.result()
raise _ExitCli() from None
except Exception:
pass

Comment on lines +1089 to +1121
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

⚠️ Potential issue | 🟠 Major

Potential hang if the initial run never completes.
If the agent doesn’t emit any initial speech/events, RunResult may never complete, leaving the console stuck in “Initializing...” and blocking user input. Consider a timeout/fallback to proceed with whatever events are available.

💡 Suggested guard to avoid indefinite blocking
         async def _await_initial_run() -> list[RunEvent]:
             run_result = await initial_run_fut
-            await run_result
+            try:
+                await asyncio.wait_for(run_result, timeout=5.0)
+            except asyncio.TimeoutError:
+                # Proceed even if the initial run doesn't finish (e.g., no initial output).
+                pass
             return run_result.events.copy()
🤖 Prompt for AI Agents
In `@livekit-agents/livekit/agents/cli/cli.py` around lines 1089 - 1121, The
current loop awaiting c._io_initial_run_fut/_await_initial_run via cf can block
forever if RunResult never completes; add a bounded wait fallback: record a
configurable max wait (e.g. MAX_INIT_WAIT), poll cf.result(timeout=short_poll)
up to that limit, and if exceeded cancel cf, call cf.result() inside
contextlib.suppress to drain, set c._io_initial_run_fut = None, then process any
available events (or proceed without events) by calling _print_run_event only
for events returned if any, and ensure CancelledError/TimeoutError/Exception are
properly suppressed and translated into continuing startup rather than hanging
(affecting symbols: initial_run_fut, _await_initial_run, cf,
c._io_initial_run_fut, _print_run_event, _ExitCli).

while True:
if c.should_exit:
raise _ExitCli()

try:
text = prompt(
Text.from_markup(" [bold]User input[/bold]: "),
Expand All @@ -1079,7 +1131,7 @@ def _key_read(ch: str) -> None:
placeholder="Type to talk to your agent",
)
except KeyboardInterrupt:
break
raise _ExitCli() from None

if not text.strip():
c.console.bell()
Expand Down Expand Up @@ -1114,6 +1166,8 @@ def _done_callback(task: asyncio.Task[list[RunEvent]]) -> None:

with live_status(c.console, Text.from_markup(" [dim]Thinking...[/dim]")):
while not h.done():
if c.should_exit:
raise _ExitCli()
time.sleep(0.1)

for event in h.result():
Expand Down Expand Up @@ -1355,6 +1409,7 @@ def _on_worker_shutdown() -> None:

def _handle_exit(sig: int, frame: FrameType | None) -> None:
nonlocal exit_triggered
c.signal_exit()
if not exit_triggered:
exit_triggered = True
raise _ExitCli()
Expand Down
11 changes: 9 additions & 2 deletions livekit-agents/livekit/agents/voice/agent_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -625,7 +625,9 @@ async def start(
self._job_context_cb_registered = True

run_state: RunResult | None = None
if capture_run:
# capture_run for text mode console to get the initial agent response
text_mode_capture = c._io_initial_run_fut is not None
if capture_run or text_mode_capture:
if self._global_run_state is not None and not self._global_run_state.done():
raise RuntimeError("nested runs are not supported")

Expand Down Expand Up @@ -710,7 +712,12 @@ def _collect_chain(
)

if run_state:
await run_state
if text_mode_capture:
# pass the run_state to the console so it can display initial events
c._set_initial_run(run_state)

if capture_run:
await run_state

return run_state

Expand Down