Skip to content

Commit 8a73523

Browse files
committed
cleanup audio devices upon shutdown
1 parent f91e0f3 commit 8a73523

1 file changed

Lines changed: 59 additions & 3 deletions

File tree

examples/wakeword_agent_dispatch/client.py

Lines changed: 59 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@
44
import contextlib
55
import logging
66
import os
7+
import signal
78
from collections import deque
89
from dataclasses import dataclass
910
from enum import Enum
@@ -246,6 +247,28 @@ def _drain_audio_queue(audio_queue: asyncio.Queue[MicChunk]) -> int:
246247
return drained
247248

248249

250+
async def _wait_mic_chunk_or_shutdown(
251+
audio_queue: asyncio.Queue[MicChunk],
252+
shutdown: asyncio.Event,
253+
) -> MicChunk | None:
254+
"""Return the next mic chunk, or None when shutdown was requested."""
255+
if shutdown.is_set():
256+
return None
257+
get_chunk = asyncio.create_task(audio_queue.get())
258+
wait_shutdown = asyncio.create_task(shutdown.wait())
259+
done, pending = await asyncio.wait(
260+
{get_chunk, wait_shutdown},
261+
return_when=asyncio.FIRST_COMPLETED,
262+
)
263+
for task in pending:
264+
task.cancel()
265+
with contextlib.suppress(asyncio.CancelledError):
266+
await task
267+
if wait_shutdown in done:
268+
return None
269+
return get_chunk.result()
270+
271+
249272
def _reset_wakeword_model(model: Any) -> None:
250273
reset = getattr(model, "reset", None)
251274
if callable(reset):
@@ -284,6 +307,7 @@ async def _run_audio_loop(
284307
room: rtc.Room,
285308
lkapi: api.LiveKitAPI,
286309
config: Config,
310+
shutdown: asyncio.Event,
287311
) -> None:
288312
preroll = PrerollBuffer(config.wakeword_preroll_seconds)
289313
wakeword_window = WakeWordAudioWindow(WAKEWORD_WINDOW_SAMPLES)
@@ -370,7 +394,10 @@ def _on_dispatch_done(task: asyncio.Task[str]) -> None:
370394

371395
try:
372396
while True:
373-
chunk = await audio_queue.get()
397+
chunk = await _wait_mic_chunk_or_shutdown(audio_queue, shutdown)
398+
if chunk is None:
399+
logger.info("audio loop stopping (shutdown requested)")
400+
break
374401

375402
if state.mode == ClientMode.IDLE:
376403
preroll.append(chunk)
@@ -429,6 +456,8 @@ async def main() -> None:
429456

430457
from livekit.wakeword import WakeWordModel
431458

459+
shutdown_event = asyncio.Event()
460+
432461
model = WakeWordModel(models=[str(config.wakeword_model)])
433462
room = rtc.Room()
434463
lkapi = api.LiveKitAPI(config.url, config.api_key, config.api_secret)
@@ -533,10 +562,26 @@ def _on_track_unsubscribed(
533562
.to_jwt()
534563
)
535564

565+
loop = asyncio.get_running_loop()
566+
signal_handlers_registered = False
567+
568+
def _request_shutdown() -> None:
569+
if not shutdown_event.is_set():
570+
logger.info("shutdown requested; stopping wake word client…")
571+
shutdown_event.set()
572+
536573
try:
537574
await room.connect(config.url, token)
538575
logger.info("connected to room %s", room.name)
539576

577+
try:
578+
for sig in (signal.SIGINT, signal.SIGTERM):
579+
loop.add_signal_handler(sig, _request_shutdown)
580+
signal_handlers_registered = True
581+
except (NotImplementedError, RuntimeError):
582+
# Some platforms (e.g. Windows) may not support signal handlers on the event loop.
583+
pass
584+
540585
track = rtc.LocalAudioTrack.create_audio_track("wakeword-mic", mic.source)
541586

542587
options = rtc.TrackPublishOptions()
@@ -556,15 +601,26 @@ def _on_track_unsubscribed(
556601
room,
557602
lkapi,
558603
config,
604+
shutdown_event,
559605
)
560606
finally:
607+
if signal_handlers_registered:
608+
for sig in (signal.SIGINT, signal.SIGTERM):
609+
with contextlib.suppress(Exception):
610+
loop.remove_signal_handler(sig)
611+
561612
for task in list(background_tasks):
562613
task.cancel()
563614
for task in list(background_tasks):
564615
with contextlib.suppress(asyncio.CancelledError):
565616
await task
566-
await mic.aclose()
567-
await player.aclose()
617+
618+
# Close playback before capture so PortAudio + AEC reverse path tear down cleanly.
619+
with contextlib.suppress(Exception):
620+
await player.aclose()
621+
with contextlib.suppress(Exception):
622+
await mic.aclose()
623+
568624
room.unregister_text_stream_handler("lk.transcription")
569625
await lkapi.aclose()
570626
await room.disconnect()

0 commit comments

Comments
 (0)