From 7fe05cea0f69ba479066901d412fbacdd2173922 Mon Sep 17 00:00:00 2001 From: Stanislaw Kem Date: Thu, 23 Apr 2026 17:53:55 +0200 Subject: [PATCH] Fix shutdown hangs in stop path --- core_engine/src/outputs/asr_sink.rs | 4 +- core_engine/src/outputs/wav_file.rs | 4 +- macloop/__init__.py | 12 ++- python_ffi/src/lib.rs | 161 ++++++++++++++++++++++++---- tests/test_runtime_helpers.py | 5 +- 5 files changed, 161 insertions(+), 25 deletions(-) diff --git a/core_engine/src/outputs/asr_sink.rs b/core_engine/src/outputs/asr_sink.rs index 253be7b..b6e4d49 100644 --- a/core_engine/src/outputs/asr_sink.rs +++ b/core_engine/src/outputs/asr_sink.rs @@ -396,7 +396,7 @@ impl AsrSink { let idle_sleep = Duration::from_micros(200); loop { - if stop_thread.load(Ordering::Relaxed) { + if stop_thread.load(Ordering::Acquire) { for state in &mut states { state.stop_now(); } @@ -442,7 +442,7 @@ impl AsrSink { } pub fn stop(&mut self) -> Result, AsrSinkError> { - self.stop.store(true, Ordering::Relaxed); + self.stop.store(true, Ordering::Release); let Some(handle) = self.handle.take() else { return Err(AsrSinkError::AlreadyStopped); }; diff --git a/core_engine/src/outputs/wav_file.rs b/core_engine/src/outputs/wav_file.rs index a1f66fc..3959506 100644 --- a/core_engine/src/outputs/wav_file.rs +++ b/core_engine/src/outputs/wav_file.rs @@ -131,7 +131,7 @@ impl WavFileOutput { let mut mixed_buffer = Vec::::new(); loop { - let stopping = stop_thread.load(Ordering::Relaxed); + let stopping = stop_thread.load(Ordering::Acquire); let mut drained_any = false; for (consumer, buffer) in consumers.iter_mut().zip(input_buffers.iter_mut()) { let drain_limit = consumer.occupied_len() / frame_channels * frame_channels; @@ -302,7 +302,7 @@ impl WavFileOutput { } pub fn stop(&mut self) -> Result, WavOutputError> { - self.stop.store(true, Ordering::Relaxed); + self.stop.store(true, Ordering::Release); let Some(handle) = self.handle.take() else { return Err(WavOutputError::AlreadyStopped); }; diff --git a/macloop/__init__.py b/macloop/__init__.py index c3b2142..aebe3c8 100644 --- a/macloop/__init__.py +++ b/macloop/__init__.py @@ -4,6 +4,7 @@ import os import queue import uuid +import warnings import weakref from dataclasses import dataclass from pathlib import Path @@ -68,6 +69,14 @@ def _raise_on_unexpected_kwargs(name: str, kwargs: dict[str, Any]) -> None: raise TypeError(f"{name} got unexpected keyword arguments: {unexpected}") +def _warn_deferred_native_cleanup(exc: TimeoutError) -> None: + warnings.warn( + f"AudioEngine.close() deferred native cleanup: {exc}", + RuntimeWarning, + stacklevel=3, + ) + + def _close_backend_with_optional_engine(backend: Any, engine_backend: Any | None) -> None: if engine_backend is not None: try: @@ -375,7 +384,8 @@ def close(self) -> None: sink_err: Optional[Exception] = None try: self._backend.close() - except TimeoutError: + except TimeoutError as exc: + _warn_deferred_native_cleanup(exc) backend_err = None except Exception as exc: backend_err = exc diff --git a/python_ffi/src/lib.rs b/python_ffi/src/lib.rs index da9299e..f8ebfb2 100644 --- a/python_ffi/src/lib.rs +++ b/python_ffi/src/lib.rs @@ -20,9 +20,10 @@ use std::collections::HashMap; use std::fs::File; use std::os::fd::FromRawFd; use std::os::raw::c_int; -use std::sync::mpsc::{self, RecvTimeoutError}; +use std::sync::atomic::{AtomicU64, Ordering}; +use std::sync::mpsc::{self, RecvTimeoutError, TrySendError}; use std::sync::{Arc, Mutex}; -use std::thread; +use std::thread::{self, JoinHandle}; use std::time::{Duration, Instant}; const DEFAULT_COMMAND_CAPACITY: usize = 32; @@ -912,25 +913,134 @@ type DetachedAsrStartResult = Result)>; +const ASR_WORKER_QUEUE_CAPACITY: usize = 32; +const ASR_WORKER_JOIN_TIMEOUT: Duration = Duration::from_millis(500); +const ASR_WORKER_JOIN_POLL: Duration = Duration::from_millis(5); + +enum AsrWorkerPayload { + F32 { + input_id: String, + frames: usize, + samples: Vec, + }, + I16 { + input_id: String, + frames: usize, + samples: Vec, + }, +} + struct PythonAsrCallback { - callback: Py, + tx: Option>, + worker: Option>, + dropped_chunks: Arc, +} + +impl PythonAsrCallback { + fn spawn(callback: Py) -> Self { + let (tx, rx) = mpsc::sync_channel::(ASR_WORKER_QUEUE_CAPACITY); + let worker = thread::spawn(move || { + // `callback` is owned by this thread. Its final drop happens when this closure + // returns; PyO3 queues the decref and the next GIL holder releases it, so no + // extra Python attach is required here. + while let Ok(payload) = rx.recv() { + let _ = Python::try_attach(|py| { + let (input_id, frames, samples_obj) = match payload { + AsrWorkerPayload::F32 { + input_id, + frames, + samples, + } => ( + input_id, + frames, + samples.to_pyarray(py).into_any().unbind(), + ), + AsrWorkerPayload::I16 { + input_id, + frames, + samples, + } => ( + input_id, + frames, + samples.to_pyarray(py).into_any().unbind(), + ), + }; + + if let Err(err) = callback.call1(py, (input_id, frames, samples_obj)) { + err.print(py); + } + }); + } + }); + + Self { + tx: Some(tx), + worker: Some(worker), + dropped_chunks: Arc::new(AtomicU64::new(0)), + } + } } impl AsrSinkCallback for PythonAsrCallback { fn on_chunk(&mut self, chunk: AsrChunkView<'_>) { - let _ = Python::try_attach(|py| { - let samples = match chunk.samples { - AsrSampleSlice::F32(values) => values.to_pyarray(py).into_any().unbind(), - AsrSampleSlice::I16(values) => values.to_pyarray(py).into_any().unbind(), - }; + // Sink thread never blocks on user Python code: we hand chunks off to a worker via + // a bounded queue. If the user callback cannot keep up we drop the newest chunk so + // that shutdown (which drops `tx` to signal the worker) cannot be wedged by a slow + // or hung consumer. + let Some(tx) = self.tx.as_ref() else { + return; + }; - if let Err(err) = self - .callback - .call1(py, (chunk.input_id, chunk.frames, samples)) - { - err.print(py); + let payload = match chunk.samples { + AsrSampleSlice::F32(values) => AsrWorkerPayload::F32 { + input_id: chunk.input_id.to_string(), + frames: chunk.frames, + samples: values.to_vec(), + }, + AsrSampleSlice::I16(values) => AsrWorkerPayload::I16 { + input_id: chunk.input_id.to_string(), + frames: chunk.frames, + samples: values.to_vec(), + }, + }; + + match tx.try_send(payload) { + Ok(()) => {} + Err(TrySendError::Full(_)) => { + self.dropped_chunks.fetch_add(1, Ordering::Relaxed); } - }); + Err(TrySendError::Disconnected(_)) => {} + } + } +} + +impl Drop for PythonAsrCallback { + fn drop(&mut self) { + // Drop the sender first so the worker's `rx.recv()` returns `Err(Disconnected)` and + // the loop can exit naturally once it finishes the chunk it is currently handling. + drop(self.tx.take()); + + let Some(handle) = self.worker.take() else { + return; + }; + + let deadline = Instant::now() + ASR_WORKER_JOIN_TIMEOUT; + while !handle.is_finished() && Instant::now() < deadline { + thread::sleep(ASR_WORKER_JOIN_POLL); + } + + if handle.is_finished() { + let _ = handle.join(); + } else { + // The user callback is stuck. We can't unblock it, but we refuse to wedge the + // sink shutdown: detach the worker (dropping the JoinHandle) and move on. The + // leaked thread will exit whenever the user code finally returns. + eprintln!( + "macloop: ASR callback worker thread did not exit within {:?}; detaching to avoid blocking shutdown", + ASR_WORKER_JOIN_TIMEOUT + ); + drop(handle); + } } } @@ -1009,7 +1119,14 @@ impl PyAsrSinkBackend { impl Drop for PyAsrSinkBackend { fn drop(&mut self) { - let _ = Python::try_attach(|py| self.close_no_restore(py)); + if Python::try_attach(|py| self.close_no_restore(py)).is_none() { + // The Python runtime is being torn down; the interpreter is unreachable. + // We cannot stop the sink worker or restore route consumers here, so log and leak + // rather than silently wedging or corrupting native state. + eprintln!( + "macloop: AsrSinkBackend dropped with Python runtime unavailable; sink cleanup skipped" + ); + } } } @@ -1077,7 +1194,11 @@ impl PyWavSinkBackend { impl Drop for PyWavSinkBackend { fn drop(&mut self) { - let _ = Python::try_attach(|py| self.close_no_restore(py)); + if Python::try_attach(|py| self.close_no_restore(py)).is_none() { + eprintln!( + "macloop: WavSinkBackend dropped with Python runtime unavailable; sink cleanup skipped" + ); + } } } @@ -1604,7 +1725,7 @@ impl PyAudioEngineBackend { format, chunk_frames, }, - Box::new(PythonAsrCallback { callback }), + Box::new(PythonAsrCallback::spawn(callback)), ) { Ok(sink) => Ok(sink), Err((err, inputs)) => Err(( @@ -1703,7 +1824,11 @@ impl PyAudioEngineBackend { impl Drop for PyAudioEngineBackend { fn drop(&mut self) { - let _ = Python::try_attach(|py| self.close(py)); + if Python::try_attach(|py| self.close(py)).is_none() { + eprintln!( + "macloop: AudioEngineBackend dropped with Python runtime unavailable; native sources may outlive the process" + ); + } } } diff --git a/tests/test_runtime_helpers.py b/tests/test_runtime_helpers.py index 4dabfa9..93d811c 100644 --- a/tests/test_runtime_helpers.py +++ b/tests/test_runtime_helpers.py @@ -200,7 +200,7 @@ def close(self) -> None: assert sink.closed is True -def test_audio_engine_close_suppresses_backend_timeout_after_sink_cleanup(macloop_module) -> None: +def test_audio_engine_close_warns_on_backend_timeout_after_sink_cleanup(macloop_module) -> None: class TrackingSink: def __init__(self) -> None: self.closed = False @@ -213,7 +213,8 @@ def close(self) -> None: engine._register_sink(sink) engine._backend.close = lambda: (_ for _ in ()).throw(TimeoutError("native close timed out")) - engine.close() + with pytest.warns(RuntimeWarning, match="deferred native cleanup"): + engine.close() assert sink.closed is True with pytest.raises(RuntimeError, match="audio engine is closed"):