Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core_engine/src/outputs/asr_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -396,7 +396,7 @@ impl AsrSink {
let idle_sleep = Duration::from_micros(200);

loop {
if stop_thread.load(Ordering::Relaxed) {
if stop_thread.load(Ordering::Acquire) {
for state in &mut states {
state.stop_now();
}
Expand Down Expand Up @@ -442,7 +442,7 @@ impl AsrSink {
}

pub fn stop(&mut self) -> Result<Vec<AsrSinkInput>, AsrSinkError> {
self.stop.store(true, Ordering::Relaxed);
self.stop.store(true, Ordering::Release);
let Some(handle) = self.handle.take() else {
return Err(AsrSinkError::AlreadyStopped);
};
Expand Down
4 changes: 2 additions & 2 deletions core_engine/src/outputs/wav_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl WavFileOutput {
let mut mixed_buffer = Vec::<f32>::new();

loop {
let stopping = stop_thread.load(Ordering::Relaxed);
let stopping = stop_thread.load(Ordering::Acquire);
let mut drained_any = false;
for (consumer, buffer) in consumers.iter_mut().zip(input_buffers.iter_mut()) {
let drain_limit = consumer.occupied_len() / frame_channels * frame_channels;
Expand Down Expand Up @@ -302,7 +302,7 @@ impl WavFileOutput {
}

pub fn stop(&mut self) -> Result<Vec<RouteConsumer>, WavOutputError> {
self.stop.store(true, Ordering::Relaxed);
self.stop.store(true, Ordering::Release);
let Some(handle) = self.handle.take() else {
return Err(WavOutputError::AlreadyStopped);
};
Expand Down
12 changes: 11 additions & 1 deletion macloop/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import os
import queue
import uuid
import warnings
import weakref
from dataclasses import dataclass
from pathlib import Path
Expand Down Expand Up @@ -68,6 +69,14 @@ def _raise_on_unexpected_kwargs(name: str, kwargs: dict[str, Any]) -> None:
raise TypeError(f"{name} got unexpected keyword arguments: {unexpected}")


def _warn_deferred_native_cleanup(exc: TimeoutError) -> None:
warnings.warn(
f"AudioEngine.close() deferred native cleanup: {exc}",
RuntimeWarning,
stacklevel=3,
)


def _close_backend_with_optional_engine(backend: Any, engine_backend: Any | None) -> None:
if engine_backend is not None:
try:
Expand Down Expand Up @@ -375,7 +384,8 @@ def close(self) -> None:
sink_err: Optional[Exception] = None
try:
self._backend.close()
except TimeoutError:
except TimeoutError as exc:
_warn_deferred_native_cleanup(exc)
backend_err = None
except Exception as exc:
backend_err = exc
Expand Down
161 changes: 143 additions & 18 deletions python_ffi/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@ use std::collections::HashMap;
use std::fs::File;
use std::os::fd::FromRawFd;
use std::os::raw::c_int;
use std::sync::mpsc::{self, RecvTimeoutError};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::mpsc::{self, RecvTimeoutError, TrySendError};
use std::sync::{Arc, Mutex};
use std::thread;
use std::thread::{self, JoinHandle};
use std::time::{Duration, Instant};

const DEFAULT_COMMAND_CAPACITY: usize = 32;
Expand Down Expand Up @@ -912,25 +913,134 @@ type DetachedAsrStartResult = Result<AsrSink, (String, Vec<(String, RouteConsume

type DetachedWavStartResult = Result<WavFileOutput, (String, Vec<(String, RouteConsumer)>)>;

const ASR_WORKER_QUEUE_CAPACITY: usize = 32;
const ASR_WORKER_JOIN_TIMEOUT: Duration = Duration::from_millis(500);
const ASR_WORKER_JOIN_POLL: Duration = Duration::from_millis(5);

enum AsrWorkerPayload {
F32 {
input_id: String,
frames: usize,
samples: Vec<f32>,
},
I16 {
input_id: String,
frames: usize,
samples: Vec<i16>,
},
}

struct PythonAsrCallback {
callback: Py<PyAny>,
tx: Option<mpsc::SyncSender<AsrWorkerPayload>>,
worker: Option<JoinHandle<()>>,
dropped_chunks: Arc<AtomicU64>,
}

impl PythonAsrCallback {
fn spawn(callback: Py<PyAny>) -> Self {
let (tx, rx) = mpsc::sync_channel::<AsrWorkerPayload>(ASR_WORKER_QUEUE_CAPACITY);
let worker = thread::spawn(move || {
// `callback` is owned by this thread. Its final drop happens when this closure
// returns; PyO3 queues the decref and the next GIL holder releases it, so no
// extra Python attach is required here.
while let Ok(payload) = rx.recv() {
let _ = Python::try_attach(|py| {
let (input_id, frames, samples_obj) = match payload {
AsrWorkerPayload::F32 {
input_id,
frames,
samples,
} => (
input_id,
frames,
samples.to_pyarray(py).into_any().unbind(),
),
AsrWorkerPayload::I16 {
input_id,
frames,
samples,
} => (
input_id,
frames,
samples.to_pyarray(py).into_any().unbind(),
),
};

if let Err(err) = callback.call1(py, (input_id, frames, samples_obj)) {
err.print(py);
}
});
}
});

Self {
tx: Some(tx),
worker: Some(worker),
dropped_chunks: Arc::new(AtomicU64::new(0)),
}
}
}

impl AsrSinkCallback for PythonAsrCallback {
fn on_chunk(&mut self, chunk: AsrChunkView<'_>) {
let _ = Python::try_attach(|py| {
let samples = match chunk.samples {
AsrSampleSlice::F32(values) => values.to_pyarray(py).into_any().unbind(),
AsrSampleSlice::I16(values) => values.to_pyarray(py).into_any().unbind(),
};
// Sink thread never blocks on user Python code: we hand chunks off to a worker via
// a bounded queue. If the user callback cannot keep up we drop the newest chunk so
// that shutdown (which drops `tx` to signal the worker) cannot be wedged by a slow
// or hung consumer.
let Some(tx) = self.tx.as_ref() else {
return;
};

if let Err(err) = self
.callback
.call1(py, (chunk.input_id, chunk.frames, samples))
{
err.print(py);
let payload = match chunk.samples {
AsrSampleSlice::F32(values) => AsrWorkerPayload::F32 {
input_id: chunk.input_id.to_string(),
frames: chunk.frames,
samples: values.to_vec(),
},
AsrSampleSlice::I16(values) => AsrWorkerPayload::I16 {
input_id: chunk.input_id.to_string(),
frames: chunk.frames,
samples: values.to_vec(),
},
};

match tx.try_send(payload) {
Ok(()) => {}
Err(TrySendError::Full(_)) => {
self.dropped_chunks.fetch_add(1, Ordering::Relaxed);
}
});
Err(TrySendError::Disconnected(_)) => {}
}
}
}

impl Drop for PythonAsrCallback {
fn drop(&mut self) {
// Drop the sender first so the worker's `rx.recv()` returns `Err(Disconnected)` and
// the loop can exit naturally once it finishes the chunk it is currently handling.
drop(self.tx.take());

let Some(handle) = self.worker.take() else {
return;
};

let deadline = Instant::now() + ASR_WORKER_JOIN_TIMEOUT;
while !handle.is_finished() && Instant::now() < deadline {
thread::sleep(ASR_WORKER_JOIN_POLL);
}

if handle.is_finished() {
let _ = handle.join();
} else {
// The user callback is stuck. We can't unblock it, but we refuse to wedge the
// sink shutdown: detach the worker (dropping the JoinHandle) and move on. The
// leaked thread will exit whenever the user code finally returns.
eprintln!(
"macloop: ASR callback worker thread did not exit within {:?}; detaching to avoid blocking shutdown",
ASR_WORKER_JOIN_TIMEOUT
);
drop(handle);
}
}
}

Expand Down Expand Up @@ -1009,7 +1119,14 @@ impl PyAsrSinkBackend {

impl Drop for PyAsrSinkBackend {
fn drop(&mut self) {
let _ = Python::try_attach(|py| self.close_no_restore(py));
if Python::try_attach(|py| self.close_no_restore(py)).is_none() {
// The Python runtime is being torn down; the interpreter is unreachable.
// We cannot stop the sink worker or restore route consumers here, so log and leak
// rather than silently wedging or corrupting native state.
eprintln!(
"macloop: AsrSinkBackend dropped with Python runtime unavailable; sink cleanup skipped"
);
}
}
}

Expand Down Expand Up @@ -1077,7 +1194,11 @@ impl PyWavSinkBackend {

impl Drop for PyWavSinkBackend {
fn drop(&mut self) {
let _ = Python::try_attach(|py| self.close_no_restore(py));
if Python::try_attach(|py| self.close_no_restore(py)).is_none() {
eprintln!(
"macloop: WavSinkBackend dropped with Python runtime unavailable; sink cleanup skipped"
);
}
}
}

Expand Down Expand Up @@ -1604,7 +1725,7 @@ impl PyAudioEngineBackend {
format,
chunk_frames,
},
Box::new(PythonAsrCallback { callback }),
Box::new(PythonAsrCallback::spawn(callback)),
) {
Ok(sink) => Ok(sink),
Err((err, inputs)) => Err((
Expand Down Expand Up @@ -1703,7 +1824,11 @@ impl PyAudioEngineBackend {

impl Drop for PyAudioEngineBackend {
fn drop(&mut self) {
let _ = Python::try_attach(|py| self.close(py));
if Python::try_attach(|py| self.close(py)).is_none() {
eprintln!(
"macloop: AudioEngineBackend dropped with Python runtime unavailable; native sources may outlive the process"
);
}
}
}

Expand Down
5 changes: 3 additions & 2 deletions tests/test_runtime_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ def close(self) -> None:
assert sink.closed is True


def test_audio_engine_close_suppresses_backend_timeout_after_sink_cleanup(macloop_module) -> None:
def test_audio_engine_close_warns_on_backend_timeout_after_sink_cleanup(macloop_module) -> None:
class TrackingSink:
def __init__(self) -> None:
self.closed = False
Expand All @@ -213,7 +213,8 @@ def close(self) -> None:
engine._register_sink(sink)
engine._backend.close = lambda: (_ for _ in ()).throw(TimeoutError("native close timed out"))

engine.close()
with pytest.warns(RuntimeWarning, match="deferred native cleanup"):
engine.close()

assert sink.closed is True
with pytest.raises(RuntimeError, match="audio engine is closed"):
Expand Down
Loading