From dec0dbc0a322aad14eb7a3e976c5cd829a818b56 Mon Sep 17 00:00:00 2001 From: iancharest Date: Fri, 12 Jun 2026 10:00:14 -0400 Subject: [PATCH 1/2] Add native macOS duplex stream --- CHANGELOG.md | 2 + README.md | 4 +- docs/architecture.md | 3 +- src/tachyaudio/_native.c | 309 ++++++++++++++++++++++++++++++ src/tachyaudio/_native_backend.py | 89 ++++++--- 5 files changed, 377 insertions(+), 30 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 4a0a217..93dd45b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,8 @@ public APIs may still change while the backend design stabilizes. - Added the public `DuplexStream` API contract for backend-level full-duplex capture and playback. +- Added native macOS `DuplexStream` support backed by Core Audio input and + output queues owned by one native stream object. ## [0.2.0a3] - 2026-06-12 diff --git a/README.md b/README.md index 461b711..3f2984a 100644 --- a/README.md +++ b/README.md @@ -57,8 +57,8 @@ Use blocking helpers when callers need complete buffer transfer: - `InputStream.read_exactly(frame_count, timeout=None)`: wait until exactly the requested number of frames has been captured -Full-duplex capture/playback is exposed as `DuplexStream`. The public API is in -place, but native backend implementations are still under development. +Full-duplex capture/playback is exposed as `DuplexStream`. Native macOS support +is available; Linux duplex support is under development. Lifecycle semantics: diff --git a/docs/architecture.md b/docs/architecture.md index b6b43ea..48e2e71 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -42,7 +42,8 @@ conveniences layered on top of those primitives. Full-duplex support is modeled as a backend-level `DuplexStream`, not as a Python wrapper around one `OutputStream` and one `InputStream`. Backends should use a single native duplex callback where available so capture and playback share -one scheduling clock. +one scheduling clock. The current macOS implementation owns both native Core +Audio queues inside one native stream object; Linux duplex support remains next. Stream statistics distinguish queue state from hardware behavior. `queued_frames` and `queued_latency` describe the native ring buffer. `hardware_latency` diff --git a/src/tachyaudio/_native.c b/src/tachyaudio/_native.c index 37f1bb9..c0db8a4 100644 --- a/src/tachyaudio/_native.c +++ b/src/tachyaudio/_native.c @@ -1097,6 +1097,259 @@ static PyTypeObject TachyInputStreamType = { .tp_new = tachy_input_new, }; +typedef struct { + PyObject_HEAD + TachyInputStream *input; + TachyOutputStream *output; + int closed; +} TachyDuplexStream; + +static PyObject *tachy_duplex_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) +{ + static char *keywords[] = { + "sample_rate", + "input_channels", + "output_channels", + "block_size", + "input_device_id", + "output_device_id", + "latency", + NULL + }; + int sample_rate = 0; + int input_channels = 0; + int output_channels = 0; + int block_size = 0; + const char *input_device_id = NULL; + const char *output_device_id = NULL; + double latency = 0.0; + + if (!PyArg_ParseTupleAndKeywords( + args, + kwargs, + "iiiizzd", + keywords, + &sample_rate, + &input_channels, + &output_channels, + &block_size, + &input_device_id, + &output_device_id, + &latency)) { + return NULL; + } + + if (sample_rate < 1 || input_channels < 1 || output_channels < 1) { + PyErr_SetString(PyExc_ValueError, "sample_rate and channel counts must be positive"); + return NULL; + } + + TachyDuplexStream *self = (TachyDuplexStream *)type->tp_alloc(type, 0); + if (self == NULL) { + return NULL; + } + self->input = NULL; + self->output = NULL; + self->closed = 0; + + PyObject *input_args = Py_BuildValue( + "(iiizd)", + sample_rate, + input_channels, + block_size, + input_device_id, + latency); + if (input_args == NULL) { + Py_DECREF(self); + return NULL; + } + self->input = (TachyInputStream *)PyObject_CallObject((PyObject *)&TachyInputStreamType, input_args); + Py_DECREF(input_args); + if (self->input == NULL) { + Py_DECREF(self); + return NULL; + } + + PyObject *output_args = Py_BuildValue( + "(iiizd)", + sample_rate, + output_channels, + block_size, + output_device_id, + latency); + if (output_args == NULL) { + Py_DECREF(self); + return NULL; + } + self->output = (TachyOutputStream *)PyObject_CallObject((PyObject *)&TachyOutputStreamType, output_args); + Py_DECREF(output_args); + if (self->output == NULL) { + Py_DECREF(self); + return NULL; + } + + return (PyObject *)self; +} + +static void tachy_duplex_dealloc(TachyDuplexStream *self) +{ + if (!self->closed) { + self->closed = 1; + if (self->output != NULL) { + (void)tachy_output_close(self->output, NULL); + } + if (self->input != NULL) { + (void)tachy_input_close(self->input, NULL); + } + } + Py_CLEAR(self->output); + Py_CLEAR(self->input); + Py_TYPE(self)->tp_free((PyObject *)self); +} + +static PyObject *tachy_duplex_start(TachyDuplexStream *self, PyObject *Py_UNUSED(ignored)) +{ + if (self->closed || self->input == NULL || self->output == NULL) { + PyErr_SetString(PyExc_RuntimeError, "duplex stream is closed"); + return NULL; + } + + PyObject *input_result = tachy_input_start(self->input, NULL); + if (input_result == NULL) { + return NULL; + } + Py_DECREF(input_result); + + PyObject *output_result = tachy_output_start(self->output, NULL); + if (output_result == NULL) { + (void)tachy_input_stop(self->input, NULL); + return NULL; + } + Py_DECREF(output_result); + Py_RETURN_NONE; +} + +static PyObject *tachy_duplex_stop(TachyDuplexStream *self, PyObject *Py_UNUSED(ignored)) +{ + if (self->closed || self->input == NULL || self->output == NULL) { + PyErr_SetString(PyExc_RuntimeError, "duplex stream is closed"); + return NULL; + } + + PyObject *output_result = tachy_output_stop(self->output, NULL); + if (output_result == NULL) { + return NULL; + } + Py_DECREF(output_result); + + PyObject *input_result = tachy_input_stop(self->input, NULL); + if (input_result == NULL) { + return NULL; + } + Py_DECREF(input_result); + Py_RETURN_NONE; +} + +static PyObject *tachy_duplex_flush(TachyDuplexStream *self, PyObject *Py_UNUSED(ignored)) +{ + if (self->closed || self->input == NULL || self->output == NULL) { + PyErr_SetString(PyExc_RuntimeError, "duplex stream is closed"); + return NULL; + } + + PyObject *output_result = tachy_output_flush(self->output, NULL); + if (output_result == NULL) { + return NULL; + } + Py_DECREF(output_result); + + PyObject *input_result = tachy_input_flush(self->input, NULL); + if (input_result == NULL) { + return NULL; + } + Py_DECREF(input_result); + Py_RETURN_NONE; +} + +static PyObject *tachy_duplex_close(TachyDuplexStream *self, PyObject *Py_UNUSED(ignored)) +{ + if (!self->closed) { + self->closed = 1; + if (self->output != NULL) { + PyObject *output_result = tachy_output_close(self->output, NULL); + Py_XDECREF(output_result); + } + if (self->input != NULL) { + PyObject *input_result = tachy_input_close(self->input, NULL); + Py_XDECREF(input_result); + } + } + Py_RETURN_NONE; +} + +static PyObject *tachy_duplex_write(TachyDuplexStream *self, PyObject *frames) +{ + if (self->closed || self->output == NULL) { + PyErr_SetString(PyExc_RuntimeError, "duplex stream is closed"); + return NULL; + } + return tachy_output_write(self->output, frames); +} + +static PyObject *tachy_duplex_read(TachyDuplexStream *self, PyObject *args) +{ + if (self->closed || self->input == NULL) { + PyErr_SetString(PyExc_RuntimeError, "duplex stream is closed"); + return NULL; + } + return tachy_input_read(self->input, args); +} + +static PyObject *tachy_duplex_stats(TachyDuplexStream *self, PyObject *Py_UNUSED(ignored)) +{ + if (self->closed || self->input == NULL || self->output == NULL) { + PyErr_SetString(PyExc_RuntimeError, "duplex stream is closed"); + return NULL; + } + + PyObject *input_stats = tachy_input_stats(self->input, NULL); + if (input_stats == NULL) { + return NULL; + } + PyObject *output_stats = tachy_output_stats(self->output, NULL); + if (output_stats == NULL) { + Py_DECREF(input_stats); + return NULL; + } + PyObject *stats = Py_BuildValue("{s:O,s:O}", "input", input_stats, "output", output_stats); + Py_DECREF(input_stats); + Py_DECREF(output_stats); + return stats; +} + +static PyMethodDef tachy_duplex_methods[] = { + {"start", (PyCFunction)tachy_duplex_start, METH_NOARGS, "Start duplex capture and playback."}, + {"stop", (PyCFunction)tachy_duplex_stop, METH_NOARGS, "Stop duplex capture and playback."}, + {"flush", (PyCFunction)tachy_duplex_flush, METH_NOARGS, "Discard queued duplex frames."}, + {"close", (PyCFunction)tachy_duplex_close, METH_NOARGS, "Close duplex stream."}, + {"write", (PyCFunction)tachy_duplex_write, METH_O, "Write interleaved output float32 frames."}, + {"read", (PyCFunction)tachy_duplex_read, METH_VARARGS, "Read available interleaved input float32 frames."}, + {"stats", (PyCFunction)tachy_duplex_stats, METH_NOARGS, "Return duplex stream statistics."}, + {NULL, NULL, 0, NULL} +}; + +static PyTypeObject TachyDuplexStreamType = { + PyVarObject_HEAD_INIT(NULL, 0) + .tp_name = "tachyaudio._native.DuplexStream", + .tp_basicsize = sizeof(TachyDuplexStream), + .tp_itemsize = 0, + .tp_dealloc = (destructor)tachy_duplex_dealloc, + .tp_flags = Py_TPFLAGS_DEFAULT, + .tp_doc = "Native Core Audio duplex stream.", + .tp_methods = tachy_duplex_methods, + .tp_new = tachy_duplex_new, +}; + static int tachy_get_cf_string(AudioObjectID object_id, AudioObjectPropertySelector selector, char *buffer, CFIndex buffer_size) { AudioObjectPropertyAddress address = { @@ -2446,6 +2699,29 @@ static PyObject *tachy_list_devices(PyObject *self, PyObject *args) return devices; } +typedef struct { + PyObject_HEAD +} TachyDuplexStream; + +static PyObject *tachy_duplex_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) +{ + (void)type; + (void)args; + (void)kwargs; + PyErr_SetString(PyExc_RuntimeError, "native duplex streams are not available on this platform"); + return NULL; +} + +static PyTypeObject TachyDuplexStreamType = { + PyVarObject_HEAD_INIT(NULL, 0) + .tp_name = "tachyaudio._native.DuplexStream", + .tp_basicsize = sizeof(TachyDuplexStream), + .tp_itemsize = 0, + .tp_flags = Py_TPFLAGS_DEFAULT, + .tp_doc = "Unavailable native duplex stream.", + .tp_new = tachy_duplex_new, +}; + #else typedef struct { @@ -2494,6 +2770,29 @@ static PyTypeObject TachyInputStreamType = { .tp_new = tachy_input_new, }; +typedef struct { + PyObject_HEAD +} TachyDuplexStream; + +static PyObject *tachy_duplex_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) +{ + (void)type; + (void)args; + (void)kwargs; + PyErr_SetString(PyExc_RuntimeError, "native duplex streams are not available on this platform"); + return NULL; +} + +static PyTypeObject TachyDuplexStreamType = { + PyVarObject_HEAD_INIT(NULL, 0) + .tp_name = "tachyaudio._native.DuplexStream", + .tp_basicsize = sizeof(TachyDuplexStream), + .tp_itemsize = 0, + .tp_flags = Py_TPFLAGS_DEFAULT, + .tp_doc = "Unavailable native duplex stream.", + .tp_new = tachy_duplex_new, +}; + static PyObject *tachy_list_devices(PyObject *self, PyObject *args) { (void)self; @@ -2541,6 +2840,9 @@ PyMODINIT_FUNC PyInit__native(void) if (PyType_Ready(&TachyInputStreamType) < 0) { return NULL; } + if (PyType_Ready(&TachyDuplexStreamType) < 0) { + return NULL; + } module = PyModule_Create(&tachy_module); if (module == NULL) { @@ -2561,5 +2863,12 @@ PyMODINIT_FUNC PyInit__native(void) return NULL; } + Py_INCREF(&TachyDuplexStreamType); + if (PyModule_AddObject(module, "DuplexStream", (PyObject *)&TachyDuplexStreamType) < 0) { + Py_DECREF(&TachyDuplexStreamType); + Py_DECREF(module); + return NULL; + } + return module; } diff --git a/src/tachyaudio/_native_backend.py b/src/tachyaudio/_native_backend.py index e671d46..ddfccb9 100644 --- a/src/tachyaudio/_native_backend.py +++ b/src/tachyaudio/_native_backend.py @@ -2,13 +2,29 @@ from __future__ import annotations -from typing import NoReturn +from collections.abc import Mapping +from typing import Any from tachyaudio import _native from tachyaudio._device import DeviceInfo, DeviceKind from tachyaudio._errors import BackendUnavailable +def _stream_stats_from_native(item: Mapping[str, Any]) -> object: + from tachyaudio._stream import StreamStats + + return StreamStats( + frames_processed=item["frames_processed"], + underruns=item["underruns"], + overruns=item["overruns"], + estimated_latency=item["estimated_latency"], + hardware_latency=item["hardware_latency"], + queued_frames=item["queued_frames"], + queued_latency=item["queued_latency"], + buffer_size=item["buffer_size"], + ) + + class NativeOutputStream: """Python-facing wrapper around the native output stream handle.""" @@ -40,19 +56,7 @@ def write(self, frames: object) -> int: return self._handle.write(frames) def stats(self) -> object: - from tachyaudio._stream import StreamStats - - item = self._handle.stats() - return StreamStats( - frames_processed=item["frames_processed"], - underruns=item["underruns"], - overruns=item["overruns"], - estimated_latency=item["estimated_latency"], - hardware_latency=item["hardware_latency"], - queued_frames=item["queued_frames"], - queued_latency=item["queued_latency"], - buffer_size=item["buffer_size"], - ) + return _stream_stats_from_native(self._handle.stats()) class NativeInputStream: @@ -83,18 +87,48 @@ def read(self, frame_count: int) -> memoryview: return memoryview(self._handle.read(frame_count)) def stats(self) -> object: - from tachyaudio._stream import StreamStats + return _stream_stats_from_native(self._handle.stats()) + + +class NativeDuplexStream: + """Python-facing wrapper around the native duplex stream handle.""" + + def __init__(self, config: object) -> None: + self._handle = _native.DuplexStream( + config.sample_rate, # type: ignore[attr-defined] + config.input_channels, # type: ignore[attr-defined] + config.output_channels, # type: ignore[attr-defined] + config.block_size or 0, # type: ignore[attr-defined] + config.input_device_id, # type: ignore[attr-defined] + config.output_device_id, # type: ignore[attr-defined] + config.latency or 0.0, # type: ignore[attr-defined] + ) + + def start(self) -> None: + self._handle.start() + + def stop(self) -> None: + self._handle.stop() + + def flush(self) -> None: + self._handle.flush() + + def close(self) -> None: + self._handle.close() + + def write(self, frames: object) -> int: + return self._handle.write(frames) + + def read(self, frame_count: int) -> memoryview: + return memoryview(self._handle.read(frame_count)) + + def stats(self) -> object: + from tachyaudio._stream import DuplexStreamStats item = self._handle.stats() - return StreamStats( - frames_processed=item["frames_processed"], - underruns=item["underruns"], - overruns=item["overruns"], - estimated_latency=item["estimated_latency"], - hardware_latency=item["hardware_latency"], - queued_frames=item["queued_frames"], - queued_latency=item["queued_latency"], - buffer_size=item["buffer_size"], + return DuplexStreamStats( + input=_stream_stats_from_native(item["input"]), + output=_stream_stats_from_native(item["output"]), ) @@ -125,6 +159,7 @@ def open_output_stream(self, config: object) -> NativeOutputStream: def open_input_stream(self, config: object) -> NativeInputStream: return NativeInputStream(config) - def open_duplex_stream(self, config: object) -> NoReturn: - del config - raise BackendUnavailable("native duplex streams are not implemented yet") + def open_duplex_stream(self, config: object) -> NativeDuplexStream: + if self.name != "coreaudio": + raise BackendUnavailable("native duplex streams are not implemented for this backend yet") + return NativeDuplexStream(config) From f933a5bafb2c73fbcc648834a2f62736614f78e2 Mon Sep 17 00:00:00 2001 From: Ian Charest Date: Fri, 12 Jun 2026 10:32:01 -0400 Subject: [PATCH 2/2] Add miniaudio Linux duplex stream --- README.md | 4 +- docs/architecture.md | 3 +- src/tachyaudio/_native.c | 570 +++++++++++++++++++++++++++++- src/tachyaudio/_native_backend.py | 2 +- 4 files changed, 569 insertions(+), 10 deletions(-) diff --git a/README.md b/README.md index 3f2984a..19021c8 100644 --- a/README.md +++ b/README.md @@ -57,8 +57,8 @@ Use blocking helpers when callers need complete buffer transfer: - `InputStream.read_exactly(frame_count, timeout=None)`: wait until exactly the requested number of frames has been captured -Full-duplex capture/playback is exposed as `DuplexStream`. Native macOS support -is available; Linux duplex support is under development. +Full-duplex capture/playback is exposed as `DuplexStream`. Native macOS and +Linux support is available. Lifecycle semantics: diff --git a/docs/architecture.md b/docs/architecture.md index 48e2e71..566b74d 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -43,7 +43,8 @@ Full-duplex support is modeled as a backend-level `DuplexStream`, not as a Python wrapper around one `OutputStream` and one `InputStream`. Backends should use a single native duplex callback where available so capture and playback share one scheduling clock. The current macOS implementation owns both native Core -Audio queues inside one native stream object; Linux duplex support remains next. +Audio queues inside one native stream object. The Linux miniaudio implementation +uses one native duplex device callback. Stream statistics distinguish queue state from hardware behavior. `queued_frames` and `queued_latency` describe the native ring buffer. `hardware_latency` diff --git a/src/tachyaudio/_native.c b/src/tachyaudio/_native.c index c0db8a4..00ad5d7 100644 --- a/src/tachyaudio/_native.c +++ b/src/tachyaudio/_native.c @@ -2701,24 +2701,582 @@ static PyObject *tachy_list_devices(PyObject *self, PyObject *args) typedef struct { PyObject_HEAD + ma_context context; + ma_device device; + ma_device_id input_device_id; + ma_device_id output_device_id; + ma_uint32 sample_rate; + ma_uint32 input_channels; + ma_uint32 output_channels; + ma_uint32 buffer_frames; + ma_uint32 input_bytes_per_frame; + ma_uint32 output_bytes_per_frame; + ma_uint64 input_frames_processed; + ma_uint64 output_frames_processed; + ma_uint32 input_underruns; + ma_uint32 input_overruns; + ma_uint32 output_underruns; + ma_uint32 output_overruns; + uint8_t *input_ring; + size_t input_ring_capacity; + size_t input_ring_read; + size_t input_ring_write; + size_t input_ring_size; + uint8_t *output_ring; + size_t output_ring_capacity; + size_t output_ring_read; + size_t output_ring_write; + size_t output_ring_size; + pthread_mutex_t lock; + int lock_initialized; + int context_initialized; + int device_initialized; + int started; + int closed; } TachyDuplexStream; +static void tachy_miniaudio_duplex_input_ring_copy_in( + TachyDuplexStream *stream, + const uint8_t *source, + size_t byte_count) +{ + tachy_ring_copy_in_raw( + stream->input_ring, + stream->input_ring_capacity, + &stream->input_ring_write, + &stream->input_ring_size, + source, + byte_count); +} + +static size_t tachy_miniaudio_duplex_input_ring_copy_out( + TachyDuplexStream *stream, + uint8_t *target, + size_t byte_count) +{ + return tachy_ring_copy_out_raw( + stream->input_ring, + stream->input_ring_capacity, + &stream->input_ring_read, + &stream->input_ring_size, + target, + byte_count); +} + +static void tachy_miniaudio_duplex_output_ring_copy_in( + TachyDuplexStream *stream, + const uint8_t *source, + size_t byte_count) +{ + tachy_ring_copy_in_raw( + stream->output_ring, + stream->output_ring_capacity, + &stream->output_ring_write, + &stream->output_ring_size, + source, + byte_count); +} + +static size_t tachy_miniaudio_duplex_output_ring_copy_out( + TachyDuplexStream *stream, + uint8_t *target, + size_t byte_count) +{ + return tachy_ring_copy_out_raw( + stream->output_ring, + stream->output_ring_capacity, + &stream->output_ring_read, + &stream->output_ring_size, + target, + byte_count); +} + +static int tachy_miniaudio_fill_device_id( + ma_backend backend, + ma_device_id *target, + const char *device_id, + const char *prefix, + const char *error_message) +{ + if (device_id == NULL || device_id[0] == '\0') { + return 1; + } + + size_t prefix_length = strlen(prefix); + if (strncmp(device_id, prefix, prefix_length) != 0) { + PyErr_SetString(PyExc_ValueError, error_message); + return 0; + } + + const char *backend_id = device_id + prefix_length; + memset(target, 0, sizeof(*target)); + + switch (backend) { + case ma_backend_alsa: + snprintf(target->alsa, sizeof(target->alsa), "%s", backend_id); + break; + case ma_backend_pulseaudio: + snprintf(target->pulse, sizeof(target->pulse), "%s", backend_id); + break; + case ma_backend_jack: + target->jack = 0; + break; + case ma_backend_sndio: + snprintf(target->sndio, sizeof(target->sndio), "%s", backend_id); + break; + case ma_backend_audio4: + snprintf(target->audio4, sizeof(target->audio4), "%s", backend_id); + break; + case ma_backend_oss: + snprintf(target->oss, sizeof(target->oss), "%s", backend_id); + break; + default: + PyErr_SetString( + PyExc_ValueError, + "selected Linux audio backend does not support explicit device_id yet"); + return 0; + } + + return 1; +} + +static void tachy_miniaudio_duplex_callback( + ma_device *device, + void *output, + const void *input, + ma_uint32 frame_count) +{ + TachyDuplexStream *stream = (TachyDuplexStream *)device->pUserData; + size_t input_byte_count = (size_t)frame_count * stream->input_bytes_per_frame; + size_t output_byte_count = (size_t)frame_count * stream->output_bytes_per_frame; + + pthread_mutex_lock(&stream->lock); + if (input != NULL) { + size_t available = stream->input_ring_capacity - stream->input_ring_size; + size_t accepted = tachy_min_size(input_byte_count, available); + accepted -= accepted % stream->input_bytes_per_frame; + if (accepted < input_byte_count) { + stream->input_overruns += 1; + } + if (accepted > 0) { + tachy_miniaudio_duplex_input_ring_copy_in(stream, (const uint8_t *)input, accepted); + stream->input_frames_processed += accepted / stream->input_bytes_per_frame; + } + } + + if (output != NULL) { + size_t copied = tachy_miniaudio_duplex_output_ring_copy_out( + stream, + (uint8_t *)output, + output_byte_count); + if (copied < output_byte_count) { + memset((uint8_t *)output + copied, 0, output_byte_count - copied); + stream->output_underruns += 1; + } + stream->output_frames_processed += frame_count; + } + pthread_mutex_unlock(&stream->lock); +} + static PyObject *tachy_duplex_new(PyTypeObject *type, PyObject *args, PyObject *kwargs) { - (void)type; - (void)args; - (void)kwargs; - PyErr_SetString(PyExc_RuntimeError, "native duplex streams are not available on this platform"); - return NULL; + static char *keywords[] = { + "sample_rate", + "input_channels", + "output_channels", + "block_size", + "input_device_id", + "output_device_id", + "latency", + NULL + }; + int sample_rate = 0; + int input_channels = 0; + int output_channels = 0; + int block_size = 0; + const char *input_device_id = NULL; + const char *output_device_id = NULL; + double latency = 0.0; + + if (!PyArg_ParseTupleAndKeywords( + args, + kwargs, + "iiiizzd", + keywords, + &sample_rate, + &input_channels, + &output_channels, + &block_size, + &input_device_id, + &output_device_id, + &latency)) { + return NULL; + } + + if (sample_rate < 1 || input_channels < 1 || output_channels < 1) { + PyErr_SetString(PyExc_ValueError, "sample_rate and channel counts must be positive"); + return NULL; + } + + TachyDuplexStream *self = (TachyDuplexStream *)type->tp_alloc(type, 0); + if (self == NULL) { + return NULL; + } + + self->sample_rate = (ma_uint32)sample_rate; + self->input_channels = (ma_uint32)input_channels; + self->output_channels = (ma_uint32)output_channels; + self->input_bytes_per_frame = (ma_uint32)input_channels * sizeof(float); + self->output_bytes_per_frame = (ma_uint32)output_channels * sizeof(float); + self->buffer_frames = (ma_uint32)(sample_rate * TACHY_DEFAULT_BUFFER_MS / 1000); + if (block_size > 0) { + self->buffer_frames = (ma_uint32)block_size; + } + if (latency > 0.0) { + ma_uint32 latency_frames = (ma_uint32)(((double)sample_rate * latency) / TACHY_INPUT_BUFFER_COUNT); + if (latency_frames > 0) { + self->buffer_frames = latency_frames; + } + } + if (self->buffer_frames < 64) { + self->buffer_frames = 64; + } + + self->input_ring = NULL; + self->output_ring = NULL; + self->input_ring_capacity = (size_t)sample_rate * TACHY_RING_SECONDS * self->input_bytes_per_frame; + self->output_ring_capacity = (size_t)sample_rate * TACHY_RING_SECONDS * self->output_bytes_per_frame; + if (self->input_ring_capacity < (size_t)self->buffer_frames * self->input_bytes_per_frame * TACHY_INPUT_BUFFER_COUNT * 2) { + self->input_ring_capacity = (size_t)self->buffer_frames * self->input_bytes_per_frame * TACHY_INPUT_BUFFER_COUNT * 2; + } + if (self->output_ring_capacity < (size_t)self->buffer_frames * self->output_bytes_per_frame * TACHY_OUTPUT_BUFFER_COUNT * 2) { + self->output_ring_capacity = (size_t)self->buffer_frames * self->output_bytes_per_frame * TACHY_OUTPUT_BUFFER_COUNT * 2; + } + self->lock_initialized = 0; + self->context_initialized = 0; + self->device_initialized = 0; + self->started = 0; + self->closed = 0; + + self->input_ring = (uint8_t *)PyMem_RawMalloc(self->input_ring_capacity); + self->output_ring = (uint8_t *)PyMem_RawMalloc(self->output_ring_capacity); + if (self->input_ring == NULL || self->output_ring == NULL) { + Py_DECREF(self); + return PyErr_NoMemory(); + } + + if (pthread_mutex_init(&self->lock, NULL) != 0) { + Py_DECREF(self); + PyErr_SetString(PyExc_RuntimeError, "failed to initialize duplex stream lock"); + return NULL; + } + self->lock_initialized = 1; + + ma_result result = tachy_miniaudio_context_init(&self->context); + if (result != MA_SUCCESS) { + Py_DECREF(self); + PyErr_SetString(PyExc_RuntimeError, "failed to initialize miniaudio context"); + return NULL; + } + self->context_initialized = 1; + + ma_device_config config = ma_device_config_init(ma_device_type_duplex); + config.sampleRate = self->sample_rate; + config.periodSizeInFrames = self->buffer_frames; + config.periods = TACHY_INPUT_BUFFER_COUNT; + config.capture.format = ma_format_f32; + config.capture.channels = self->input_channels; + config.playback.format = ma_format_f32; + config.playback.channels = self->output_channels; + config.dataCallback = tachy_miniaudio_duplex_callback; + config.pUserData = self; + + if (!tachy_miniaudio_fill_device_id( + self->context.backend, + &self->input_device_id, + input_device_id, + "input-", + "Linux input_device_id must come from an input device")) { + Py_DECREF(self); + return NULL; + } + if (input_device_id != NULL && input_device_id[0] != '\0') { + config.capture.pDeviceID = &self->input_device_id; + } + + if (!tachy_miniaudio_fill_device_id( + self->context.backend, + &self->output_device_id, + output_device_id, + "output-", + "Linux output_device_id must come from an output device")) { + Py_DECREF(self); + return NULL; + } + if (output_device_id != NULL && output_device_id[0] != '\0') { + config.playback.pDeviceID = &self->output_device_id; + } + + result = ma_device_init(&self->context, &config, &self->device); + if (result != MA_SUCCESS) { + Py_DECREF(self); + PyErr_SetString(PyExc_RuntimeError, "failed to initialize miniaudio duplex device"); + return NULL; + } + self->device_initialized = 1; + + return (PyObject *)self; +} + +static void tachy_duplex_dealloc(TachyDuplexStream *self) +{ + self->closed = 1; + self->started = 0; + if (self->device_initialized) { + ma_device_uninit(&self->device); + self->device_initialized = 0; + } + if (self->context_initialized) { + ma_context_uninit(&self->context); + self->context_initialized = 0; + } + if (self->lock_initialized) { + pthread_mutex_destroy(&self->lock); + self->lock_initialized = 0; + } + if (self->input_ring != NULL) { + PyMem_RawFree(self->input_ring); + self->input_ring = NULL; + } + if (self->output_ring != NULL) { + PyMem_RawFree(self->output_ring); + self->output_ring = NULL; + } + Py_TYPE(self)->tp_free((PyObject *)self); +} + +static PyObject *tachy_duplex_start(TachyDuplexStream *self, PyObject *Py_UNUSED(ignored)) +{ + if (self->closed || !self->device_initialized) { + PyErr_SetString(PyExc_RuntimeError, "duplex stream is closed"); + return NULL; + } + + if (self->started) { + Py_RETURN_NONE; + } + + ma_result result = ma_device_start(&self->device); + if (result != MA_SUCCESS) { + PyErr_SetString(PyExc_RuntimeError, "failed to start miniaudio duplex device"); + return NULL; + } + + self->started = 1; + Py_RETURN_NONE; } +static PyObject *tachy_duplex_stop(TachyDuplexStream *self, PyObject *Py_UNUSED(ignored)) +{ + if (self->closed || !self->device_initialized) { + PyErr_SetString(PyExc_RuntimeError, "duplex stream is closed"); + return NULL; + } + + ma_result result = ma_device_stop(&self->device); + if (result != MA_SUCCESS) { + PyErr_SetString(PyExc_RuntimeError, "failed to stop miniaudio duplex device"); + return NULL; + } + + self->started = 0; + Py_RETURN_NONE; +} + +static PyObject *tachy_duplex_flush(TachyDuplexStream *self, PyObject *Py_UNUSED(ignored)) +{ + if (self->closed || !self->device_initialized) { + PyErr_SetString(PyExc_RuntimeError, "duplex stream is closed"); + return NULL; + } + + pthread_mutex_lock(&self->lock); + self->input_ring_read = 0; + self->input_ring_write = 0; + self->input_ring_size = 0; + self->output_ring_read = 0; + self->output_ring_write = 0; + self->output_ring_size = 0; + pthread_mutex_unlock(&self->lock); + + Py_RETURN_NONE; +} + +static PyObject *tachy_duplex_close(TachyDuplexStream *self, PyObject *Py_UNUSED(ignored)) +{ + if (!self->closed) { + self->closed = 1; + self->started = 0; + if (self->device_initialized) { + ma_device_uninit(&self->device); + self->device_initialized = 0; + } + if (self->context_initialized) { + ma_context_uninit(&self->context); + self->context_initialized = 0; + } + } + + Py_RETURN_NONE; +} + +static PyObject *tachy_duplex_write(TachyDuplexStream *self, PyObject *frames) +{ + if (self->closed || !self->device_initialized) { + PyErr_SetString(PyExc_RuntimeError, "duplex stream is closed"); + return NULL; + } + + Py_buffer view; + if (PyObject_GetBuffer(frames, &view, PyBUF_CONTIG_RO) != 0) { + return NULL; + } + + if (view.len == 0 || view.len % self->output_bytes_per_frame != 0) { + PyBuffer_Release(&view); + PyErr_SetString(PyExc_ValueError, "frames must contain whole interleaved float32 frames"); + return NULL; + } + + pthread_mutex_lock(&self->lock); + size_t available = self->output_ring_capacity - self->output_ring_size; + size_t accepted = tachy_min_size((size_t)view.len, available); + accepted -= accepted % self->output_bytes_per_frame; + if (accepted < (size_t)view.len) { + self->output_overruns += 1; + } + if (accepted > 0) { + tachy_miniaudio_duplex_output_ring_copy_in(self, (const uint8_t *)view.buf, accepted); + } + pthread_mutex_unlock(&self->lock); + PyBuffer_Release(&view); + + ma_uint64 frames_written = accepted / self->output_bytes_per_frame; + return PyLong_FromUnsignedLongLong(frames_written); +} + +static PyObject *tachy_duplex_read(TachyDuplexStream *self, PyObject *args) +{ + int frame_count = 0; + if (!PyArg_ParseTuple(args, "i", &frame_count)) { + return NULL; + } + if (frame_count < 1) { + PyErr_SetString(PyExc_ValueError, "frame_count must be positive"); + return NULL; + } + if (self->closed || !self->device_initialized) { + PyErr_SetString(PyExc_RuntimeError, "duplex stream is closed"); + return NULL; + } + + size_t requested = (size_t)frame_count * self->input_bytes_per_frame; + + pthread_mutex_lock(&self->lock); + size_t copied = tachy_min_size(requested, self->input_ring_size); + copied -= copied % self->input_bytes_per_frame; + if (copied < requested) { + self->input_underruns += 1; + } + PyObject *result = PyBytes_FromStringAndSize(NULL, (Py_ssize_t)copied); + if (result != NULL && copied > 0) { + char *target = PyBytes_AS_STRING(result); + (void)tachy_miniaudio_duplex_input_ring_copy_out(self, (uint8_t *)target, copied); + } + pthread_mutex_unlock(&self->lock); + + return result; +} + +static PyObject *tachy_duplex_stats(TachyDuplexStream *self, PyObject *Py_UNUSED(ignored)) +{ + if (self->closed || !self->device_initialized) { + PyErr_SetString(PyExc_RuntimeError, "duplex stream is closed"); + return NULL; + } + + pthread_mutex_lock(&self->lock); + ma_uint64 input_frames_processed = self->input_frames_processed; + ma_uint32 input_underruns = self->input_underruns; + ma_uint32 input_overruns = self->input_overruns; + ma_uint64 input_queued_frames = 0; + double input_queued_latency = 0.0; + if (self->sample_rate > 0 && self->input_bytes_per_frame > 0) { + input_queued_frames = (ma_uint64)(self->input_ring_size / self->input_bytes_per_frame); + input_queued_latency = (double)input_queued_frames / self->sample_rate; + } + + ma_uint64 output_frames_processed = self->output_frames_processed; + ma_uint32 output_underruns = self->output_underruns; + ma_uint32 output_overruns = self->output_overruns; + ma_uint64 output_queued_frames = 0; + double output_queued_latency = 0.0; + if (self->sample_rate > 0 && self->output_bytes_per_frame > 0) { + output_queued_frames = (ma_uint64)(self->output_ring_size / self->output_bytes_per_frame); + output_queued_latency = (double)output_queued_frames / self->sample_rate; + } + ma_uint32 buffer_size = self->buffer_frames; + pthread_mutex_unlock(&self->lock); + + PyObject *input_stats = tachy_build_stream_stats_without_hardware_latency( + input_frames_processed, + input_underruns, + input_overruns, + input_queued_frames, + input_queued_latency, + buffer_size); + if (input_stats == NULL) { + return NULL; + } + + PyObject *output_stats = tachy_build_stream_stats_without_hardware_latency( + output_frames_processed, + output_underruns, + output_overruns, + output_queued_frames, + output_queued_latency, + buffer_size); + if (output_stats == NULL) { + Py_DECREF(input_stats); + return NULL; + } + + PyObject *stats = Py_BuildValue("{s:O,s:O}", "input", input_stats, "output", output_stats); + Py_DECREF(input_stats); + Py_DECREF(output_stats); + return stats; +} + +static PyMethodDef tachy_duplex_methods[] = { + {"start", (PyCFunction)tachy_duplex_start, METH_NOARGS, "Start duplex capture and playback."}, + {"stop", (PyCFunction)tachy_duplex_stop, METH_NOARGS, "Stop duplex capture and playback."}, + {"flush", (PyCFunction)tachy_duplex_flush, METH_NOARGS, "Discard queued duplex frames."}, + {"close", (PyCFunction)tachy_duplex_close, METH_NOARGS, "Close duplex stream."}, + {"write", (PyCFunction)tachy_duplex_write, METH_O, "Write interleaved output float32 frames."}, + {"read", (PyCFunction)tachy_duplex_read, METH_VARARGS, "Read available interleaved input float32 frames."}, + {"stats", (PyCFunction)tachy_duplex_stats, METH_NOARGS, "Return duplex stream statistics."}, + {NULL, NULL, 0, NULL} +}; + static PyTypeObject TachyDuplexStreamType = { PyVarObject_HEAD_INIT(NULL, 0) .tp_name = "tachyaudio._native.DuplexStream", .tp_basicsize = sizeof(TachyDuplexStream), .tp_itemsize = 0, + .tp_dealloc = (destructor)tachy_duplex_dealloc, .tp_flags = Py_TPFLAGS_DEFAULT, - .tp_doc = "Unavailable native duplex stream.", + .tp_doc = "Native miniaudio duplex stream.", + .tp_methods = tachy_duplex_methods, .tp_new = tachy_duplex_new, }; diff --git a/src/tachyaudio/_native_backend.py b/src/tachyaudio/_native_backend.py index ddfccb9..4b3e5a7 100644 --- a/src/tachyaudio/_native_backend.py +++ b/src/tachyaudio/_native_backend.py @@ -160,6 +160,6 @@ def open_input_stream(self, config: object) -> NativeInputStream: return NativeInputStream(config) def open_duplex_stream(self, config: object) -> NativeDuplexStream: - if self.name != "coreaudio": + if self.name not in {"coreaudio", "miniaudio"}: raise BackendUnavailable("native duplex streams are not implemented for this backend yet") return NativeDuplexStream(config)