From f81de6ae859c593e96094dc8ebc4f5fd256b5451 Mon Sep 17 00:00:00 2001 From: iancharest Date: Fri, 12 Jun 2026 09:01:29 -0400 Subject: [PATCH] Add duplex stream API contract --- CHANGELOG.md | 5 + README.md | 3 + docs/architecture.md | 5 + src/tachyaudio/__init__.py | 14 +- src/tachyaudio/_backend.py | 25 +++ src/tachyaudio/_native_backend.py | 7 + src/tachyaudio/_stream.py | 293 ++++++++++++++++++++++++------ tests/test_public_api.py | 170 +++++++++++++++++ 8 files changed, 468 insertions(+), 54 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ab217ef..4a0a217 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -7,6 +7,11 @@ public APIs may still change while the backend design stabilizes. ## [Unreleased] +### Added + +- Added the public `DuplexStream` API contract for backend-level full-duplex + capture and playback. + ## [0.2.0a3] - 2026-06-12 ### Changed diff --git a/README.md b/README.md index cbfd22d..e3fa6f5 100644 --- a/README.md +++ b/README.md @@ -53,6 +53,9 @@ Use blocking helpers when callers need complete buffer transfer: - `InputStream.read_exactly(frame_count, timeout=None)`: wait until exactly the requested number of frames has been captured +Full-duplex capture/playback is exposed as `DuplexStream`. The public API is in +place, but native backend implementations are still under development. + Lifecycle semantics: - `stop()`: stop playback without discarding queued frames diff --git a/docs/architecture.md b/docs/architecture.md index 6e10b8c..b6b43ea 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -39,6 +39,11 @@ the output ring can hold, and `read()` returns currently available captured frames. Blocking helpers (`write_all()` and `read_exactly()`) are Python-level conveniences layered on top of those primitives. +Full-duplex support is modeled as a backend-level `DuplexStream`, not as a +Python wrapper around one `OutputStream` and one `InputStream`. Backends should +use a single native duplex callback where available so capture and playback share +one scheduling clock. + Stream statistics distinguish queue state from hardware behavior. `queued_frames` and `queued_latency` describe the native ring buffer. `hardware_latency` describes backend-reported device latency when available. `buffer_size` diff --git a/src/tachyaudio/__init__.py b/src/tachyaudio/__init__.py index 338365f..5dab3d8 100644 --- a/src/tachyaudio/__init__.py +++ b/src/tachyaudio/__init__.py @@ -3,13 +3,25 @@ from tachyaudio._backend import get_backend, list_devices, set_backend from tachyaudio._device import DeviceInfo, DeviceKind from tachyaudio._errors import BackendUnavailable, StreamClosed, TachyAudioError -from tachyaudio._stream import InputStream, OutputStream, StreamConfig, StreamStats, play +from tachyaudio._stream import ( + DuplexStream, + DuplexStreamConfig, + DuplexStreamStats, + InputStream, + OutputStream, + StreamConfig, + StreamStats, + play, +) from tachyaudio._version import __version__ __all__ = [ "BackendUnavailable", "DeviceInfo", "DeviceKind", + "DuplexStream", + "DuplexStreamConfig", + "DuplexStreamStats", "InputStream", "OutputStream", "StreamClosed", diff --git a/src/tachyaudio/_backend.py b/src/tachyaudio/_backend.py index d6f4024..32d0da1 100644 --- a/src/tachyaudio/_backend.py +++ b/src/tachyaudio/_backend.py @@ -42,6 +42,24 @@ def read(self, frame_count: int) -> memoryview: ... def stats(self) -> object: ... +class DuplexStreamHandle(Protocol): + """Backend-owned full-duplex stream handle.""" + + def start(self) -> None: ... + + def stop(self) -> None: ... + + def flush(self) -> None: ... + + def close(self) -> None: ... + + def write(self, frames: Any) -> int: ... + + def read(self, frame_count: int) -> memoryview: ... + + def stats(self) -> object: ... + + class AudioBackend(Protocol): """Protocol implemented by concrete audio backends.""" @@ -53,6 +71,8 @@ def open_output_stream(self, config: object) -> OutputStreamHandle: ... def open_input_stream(self, config: object) -> InputStreamHandle: ... + def open_duplex_stream(self, config: object) -> DuplexStreamHandle: ... + class _UnavailableBackend: name = "unavailable" @@ -70,6 +90,11 @@ def open_input_stream(self, config: object) -> InputStreamHandle: "no tachyaudio backend is available yet; install or build the native backend" ) + def open_duplex_stream(self, config: object) -> DuplexStreamHandle: + raise BackendUnavailable( + "no tachyaudio backend is available yet; install or build the native backend" + ) + def _load_default_backend() -> AudioBackend: try: diff --git a/src/tachyaudio/_native_backend.py b/src/tachyaudio/_native_backend.py index 6e80b91..e671d46 100644 --- a/src/tachyaudio/_native_backend.py +++ b/src/tachyaudio/_native_backend.py @@ -2,8 +2,11 @@ from __future__ import annotations +from typing import NoReturn + from tachyaudio import _native from tachyaudio._device import DeviceInfo, DeviceKind +from tachyaudio._errors import BackendUnavailable class NativeOutputStream: @@ -121,3 +124,7 @@ def open_output_stream(self, config: object) -> NativeOutputStream: def open_input_stream(self, config: object) -> NativeInputStream: return NativeInputStream(config) + + def open_duplex_stream(self, config: object) -> NoReturn: + del config + raise BackendUnavailable("native duplex streams are not implemented yet") diff --git a/src/tachyaudio/_stream.py b/src/tachyaudio/_stream.py index f22ae23..a2846a7 100644 --- a/src/tachyaudio/_stream.py +++ b/src/tachyaudio/_stream.py @@ -2,6 +2,7 @@ from __future__ import annotations +from collections.abc import Callable from dataclasses import dataclass import time from typing import Any @@ -50,6 +51,34 @@ def __post_init__(self) -> None: raise ValueError("only float32 streams are currently part of the public API") +@dataclass(frozen=True, slots=True) +class DuplexStreamConfig: + """Full-duplex audio stream configuration.""" + + sample_rate: int = 48_000 + input_channels: int = 1 + output_channels: int = 2 + block_size: int | None = None + input_device_id: str | None = None + output_device_id: str | None = None + latency: float | None = None + dtype: str = "float32" + + def __post_init__(self) -> None: + if self.sample_rate < 1: + raise ValueError("sample_rate must be positive") + if self.input_channels < 1: + raise ValueError("input_channels must be positive") + if self.output_channels < 1: + raise ValueError("output_channels must be positive") + if self.block_size is not None and self.block_size < 1: + raise ValueError("block_size must be positive") + if self.latency is not None and self.latency <= 0: + raise ValueError("latency must be positive") + if self.dtype != "float32": + raise ValueError("only float32 streams are currently part of the public API") + + @dataclass(frozen=True, slots=True) class StreamStats: """Runtime counters reported by a stream backend.""" @@ -82,6 +111,89 @@ def __post_init__(self) -> None: raise ValueError("buffer_size must be positive") +@dataclass(frozen=True, slots=True) +class DuplexStreamStats: + """Runtime counters for a full-duplex stream.""" + + input: StreamStats + output: StreamStats + + +def _validate_frames(frames: Any, channels: int) -> tuple[memoryview, int, int]: + view = memoryview(frames).cast("B") + frame_bytes = channels * 4 + if len(view) == 0 or len(view) % frame_bytes != 0: + raise ValueError("frames must contain whole interleaved float32 frames") + return view, frame_bytes, len(view) // frame_bytes + + +def _write_all_to_handle( + write: Callable[[memoryview], int], + frames: Any, + *, + channels: int, + timeout: float | None, +) -> int: + _validate_timeout(timeout) + view, frame_bytes, total_frames = _validate_frames(frames, channels) + deadline = _deadline(timeout) + written_frames = 0 + while written_frames < total_frames: + start = written_frames * frame_bytes + remaining_frames = total_frames - written_frames + accepted = write(view[start:]) + if accepted < 0: + raise RuntimeError("backend returned a negative frame count") + if accepted > remaining_frames: + raise RuntimeError("backend accepted more frames than requested") + written_frames += accepted + if written_frames >= total_frames: + return written_frames + if _timed_out(deadline): + raise TimeoutError("audio write did not complete before timeout") + time.sleep(_POLL_INTERVAL) + + return written_frames + + +def _read_exactly_from_handle( + read: Callable[[int], memoryview], + frame_count: int, + *, + channels: int, + timeout: float | None, +) -> memoryview: + if frame_count < 1: + raise ValueError("frame_count must be positive") + _validate_timeout(timeout) + + frame_bytes = channels * 4 + deadline = _deadline(timeout) + chunks: list[memoryview] = [] + captured_frames = 0 + while captured_frames < frame_count: + chunk = read(frame_count - captured_frames) + if len(chunk) % frame_bytes != 0: + raise RuntimeError("backend returned partial frame bytes") + chunk_frames = len(chunk) // frame_bytes + if chunk_frames > frame_count - captured_frames: + raise RuntimeError("backend returned more frames than requested") + if chunk_frames: + chunks.append(chunk) + captured_frames += chunk_frames + continue + if _timed_out(deadline): + raise TimeoutError("audio read did not complete before timeout") + time.sleep(_POLL_INTERVAL) + + output = bytearray(frame_count * frame_bytes) + offset = 0 + for chunk in chunks: + output[offset : offset + len(chunk)] = chunk + offset += len(chunk) + return memoryview(output) + + class OutputStream: """Playback stream. @@ -174,31 +286,12 @@ def write_all(self, frames: Any, timeout: float | None = None) -> int: """ self._require_open() - _validate_timeout(timeout) - view = memoryview(frames).cast("B") - frame_bytes = self.config.channels * 4 - if len(view) == 0 or len(view) % frame_bytes != 0: - raise ValueError("frames must contain whole interleaved float32 frames") - - deadline = _deadline(timeout) - total_frames = len(view) // frame_bytes - written_frames = 0 - while written_frames < total_frames: - start = written_frames * frame_bytes - remaining_frames = total_frames - written_frames - accepted = self.write(view[start:]) - if accepted < 0: - raise RuntimeError("backend returned a negative frame count") - if accepted > remaining_frames: - raise RuntimeError("backend accepted more frames than requested") - written_frames += accepted - if written_frames >= total_frames: - return written_frames - if _timed_out(deadline): - raise TimeoutError("audio write did not complete before timeout") - time.sleep(_POLL_INTERVAL) - - return written_frames + return _write_all_to_handle( + self.write, + frames, + channels=self.config.channels, + timeout=timeout, + ) def stats(self) -> StreamStats: """Return current stream counters.""" @@ -319,39 +412,133 @@ def read_exactly(self, frame_count: int, timeout: float | None = None) -> memory before `timeout`. """ + self._require_open() + return _read_exactly_from_handle( + self.read, + frame_count, + channels=self.config.channels, + timeout=timeout, + ) + + def stats(self) -> StreamStats: + """Return current stream counters.""" + + self._require_open() + return self._handle.stats() + + def _require_open(self) -> None: + if self._closed: + raise StreamClosed("stream is closed") + + +class DuplexStream: + """Full-duplex stream with synchronized capture and playback. + + Backends should implement this as a single native duplex stream where the + platform supports it, not as two independently scheduled streams. + """ + + def __init__( + self, + *, + sample_rate: int = 48_000, + input_channels: int = 1, + output_channels: int = 2, + block_size: int | None = None, + input_device_id: str | None = None, + output_device_id: str | None = None, + latency: float | None = None, + dtype: str = "float32", + ) -> None: + self.config = DuplexStreamConfig( + sample_rate=sample_rate, + input_channels=input_channels, + output_channels=output_channels, + block_size=block_size, + input_device_id=input_device_id, + output_device_id=output_device_id, + latency=latency, + dtype=dtype, + ) + self._handle = get_backend().open_duplex_stream(self.config) + self._closed = False + + def __enter__(self) -> DuplexStream: + self.start() + return self + + def __exit__(self, exc_type: object, exc: object, traceback: object) -> None: + self.close() + + @property + def closed(self) -> bool: + """Whether the stream has been closed.""" + + return self._closed + + def start(self) -> None: + """Start capture and playback together.""" + + self._require_open() + self._handle.start() + + def stop(self) -> None: + """Stop capture and playback without releasing resources.""" + + self._require_open() + self._handle.stop() + + def flush(self) -> None: + """Discard queued input and output frames.""" + + self._require_open() + self._handle.flush() + + def close(self) -> None: + """Release stream resources.""" + + if not self._closed: + self._handle.close() + self._closed = True + + def write(self, frames: Any) -> int: + """Write interleaved output frames to the duplex stream.""" + + self._require_open() + return self._handle.write(frames) + + def write_all(self, frames: Any, timeout: float | None = None) -> int: + """Write all interleaved output frames, waiting for capacity.""" + + self._require_open() + return _write_all_to_handle( + self.write, + frames, + channels=self.config.output_channels, + timeout=timeout, + ) + + def read(self, frame_count: int) -> memoryview: + """Read interleaved captured input frames.""" + self._require_open() if frame_count < 1: raise ValueError("frame_count must be positive") - _validate_timeout(timeout) + return self._handle.read(frame_count) - frame_bytes = self.config.channels * 4 - deadline = _deadline(timeout) - chunks: list[memoryview] = [] - captured_frames = 0 - while captured_frames < frame_count: - chunk = self.read(frame_count - captured_frames) - if len(chunk) % frame_bytes != 0: - raise RuntimeError("backend returned partial frame bytes") - chunk_frames = len(chunk) // frame_bytes - if chunk_frames > frame_count - captured_frames: - raise RuntimeError("backend returned more frames than requested") - if chunk_frames: - chunks.append(chunk) - captured_frames += chunk_frames - continue - if _timed_out(deadline): - raise TimeoutError("audio read did not complete before timeout") - time.sleep(_POLL_INTERVAL) - - output = bytearray(frame_count * frame_bytes) - offset = 0 - for chunk in chunks: - output[offset : offset + len(chunk)] = chunk - offset += len(chunk) - return memoryview(output) + def read_exactly(self, frame_count: int, timeout: float | None = None) -> memoryview: + """Read exactly `frame_count` input frames, waiting for capture.""" - def stats(self) -> StreamStats: - """Return current stream counters.""" + self._require_open() + return _read_exactly_from_handle( + self.read, + frame_count, + channels=self.config.input_channels, + timeout=timeout, + ) + + def stats(self) -> DuplexStreamStats: + """Return current duplex stream counters.""" self._require_open() return self._handle.stats() diff --git a/tests/test_public_api.py b/tests/test_public_api.py index ed60dd3..30001e9 100644 --- a/tests/test_public_api.py +++ b/tests/test_public_api.py @@ -40,6 +40,20 @@ def test_stream_config_validates_values(self) -> None: with self.assertRaises(ValueError): ta.StreamConfig(dtype="int16") + def test_duplex_stream_config_validates_values(self) -> None: + with self.assertRaises(ValueError): + ta.DuplexStreamConfig(sample_rate=0) + with self.assertRaises(ValueError): + ta.DuplexStreamConfig(input_channels=0) + with self.assertRaises(ValueError): + ta.DuplexStreamConfig(output_channels=0) + with self.assertRaises(ValueError): + ta.DuplexStreamConfig(block_size=0) + with self.assertRaises(ValueError): + ta.DuplexStreamConfig(latency=0) + with self.assertRaises(ValueError): + ta.DuplexStreamConfig(dtype="int16") + def test_device_info_validates_values(self) -> None: with self.assertRaises(ValueError): ta.DeviceInfo(id="", name="Speakers", kind=ta.DeviceKind.OUTPUT, channels=2) @@ -433,6 +447,162 @@ def open_input_stream(self, config: object) -> InputHandle: finally: stream.close() + def test_duplex_stream_lifecycle_delegates_to_backend(self) -> None: + class DuplexHandle: + def __init__(self) -> None: + self.started = False + self.closed = False + self.flushed = False + self.written = 0 + + def start(self) -> None: + self.started = True + + def stop(self) -> None: + self.started = False + + def flush(self) -> None: + self.flushed = True + + def close(self) -> None: + self.closed = True + + def write(self, frames: object) -> int: + del frames + self.written += 1 + return 1 + + def read(self, frame_count: int) -> memoryview: + return memoryview(b"\x00" * frame_count * 4) + + def stats(self) -> ta.DuplexStreamStats: + return ta.DuplexStreamStats( + input=ta.StreamStats(frames_processed=4), + output=ta.StreamStats(frames_processed=self.written), + ) + + class Backend: + name = "test" + + def __init__(self) -> None: + self.handle = DuplexHandle() + self.config: object | None = None + + def list_devices(self) -> tuple[ta.DeviceInfo, ...]: + return () + + def open_output_stream(self, config: object) -> object: + raise NotImplementedError + + def open_input_stream(self, config: object) -> object: + raise NotImplementedError + + def open_duplex_stream(self, config: object) -> DuplexHandle: + self.config = config + return self.handle + + backend = Backend() + ta.set_backend(backend) + + stream = ta.DuplexStream(input_channels=1, output_channels=1) + self.assertEqual(stream.config.input_channels, 1) + self.assertEqual(stream.config.output_channels, 1) + self.assertIs(backend.config, stream.config) + stream.start() + self.assertTrue(backend.handle.started) + self.assertEqual(stream.write(b"\x00" * 4), 1) + self.assertEqual(len(stream.read(2)), 8) + stats = stream.stats() + self.assertEqual(stats.input.frames_processed, 4) + self.assertEqual(stats.output.frames_processed, 1) + stream.flush() + self.assertTrue(backend.handle.flushed) + stream.stop() + self.assertFalse(backend.handle.started) + stream.close() + self.assertTrue(stream.closed) + self.assertTrue(backend.handle.closed) + + with self.assertRaises(ta.StreamClosed): + stream.read(1) + + def test_duplex_stream_blocking_helpers(self) -> None: + class DuplexHandle: + def __init__(self) -> None: + self.output_frames = 0 + self.input_chunks = [b"\x01" * 8, b"\x02" * 8] + + def start(self) -> None: + pass + + def stop(self) -> None: + pass + + def flush(self) -> None: + pass + + def close(self) -> None: + pass + + def write(self, frames: object) -> int: + del frames + self.output_frames += 1 + return 1 + + def read(self, frame_count: int) -> memoryview: + del frame_count + return memoryview(self.input_chunks.pop(0) if self.input_chunks else b"") + + def stats(self) -> ta.DuplexStreamStats: + return ta.DuplexStreamStats(input=ta.StreamStats(), output=ta.StreamStats()) + + class Backend: + name = "test" + + def __init__(self) -> None: + self.handle = DuplexHandle() + + def list_devices(self) -> tuple[ta.DeviceInfo, ...]: + return () + + def open_output_stream(self, config: object) -> object: + raise NotImplementedError + + def open_input_stream(self, config: object) -> object: + raise NotImplementedError + + def open_duplex_stream(self, config: object) -> DuplexHandle: + return self.handle + + ta.set_backend(Backend()) + stream = ta.DuplexStream(input_channels=1, output_channels=1) + try: + self.assertEqual(stream.write_all(b"\x00" * 12, timeout=0.1), 3) + data = stream.read_exactly(4, timeout=0.1) + self.assertEqual(bytes(data), b"\x01" * 8 + b"\x02" * 8) + finally: + stream.close() + + def test_duplex_stream_uses_backend_level_open(self) -> None: + class Backend: + name = "test" + + def list_devices(self) -> tuple[ta.DeviceInfo, ...]: + return () + + def open_output_stream(self, config: object) -> object: + raise AssertionError("duplex must not compose output streams") + + def open_input_stream(self, config: object) -> object: + raise AssertionError("duplex must not compose input streams") + + def open_duplex_stream(self, config: object) -> object: + raise ta.BackendUnavailable("duplex unavailable") + + ta.set_backend(Backend()) + with self.assertRaises(ta.BackendUnavailable): + ta.DuplexStream() + def test_stream_stats_preserves_zero_latency(self) -> None: stats = ta.StreamStats(estimated_latency=0.0) self.assertEqual(stats.estimated_latency, 0.0)