diff --git a/CHANGELOG.md b/CHANGELOG.md index 19bf16f..914ef0a 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -14,6 +14,8 @@ public APIs may still change while the backend design stabilizes. - Added `hardware_latency` to stream statistics. macOS reports Core Audio device latency plus safety offset; Linux reports `None` until a reliable miniaudio latency value is available. +- Added blocking convenience helpers `OutputStream.write_all()` and + `InputStream.read_exactly()`. ## [0.2.0a2] - 2026-06-10 diff --git a/README.md b/README.md index 1c11b7a..cbfd22d 100644 --- a/README.md +++ b/README.md @@ -46,6 +46,13 @@ an underrun. Use `tachyaudio.play()` for finite stimuli that should start, drain, and close as a single operation. +Use blocking helpers when callers need complete buffer transfer: + +- `OutputStream.write_all(frames, timeout=None)`: wait until all frames are + accepted by the output ring +- `InputStream.read_exactly(frame_count, timeout=None)`: wait until exactly the + requested number of frames has been captured + Lifecycle semantics: - `stop()`: stop playback without discarding queued frames diff --git a/docs/architecture.md b/docs/architecture.md index 453ffbc..6e10b8c 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -34,6 +34,11 @@ Input capture mirrors output buffering in reverse: Core Audio callbacks write captured float32 frames into a native ring buffer, and Python reads available frames without invoking Python from the audio callback. +The core stream methods remain nonblocking: `write()` accepts as many frames as +the output ring can hold, and `read()` returns currently available captured +frames. Blocking helpers (`write_all()` and `read_exactly()`) are Python-level +conveniences layered on top of those primitives. + Stream statistics distinguish queue state from hardware behavior. `queued_frames` and `queued_latency` describe the native ring buffer. `hardware_latency` describes backend-reported device latency when available. `buffer_size` diff --git a/src/tachyaudio/_stream.py b/src/tachyaudio/_stream.py index 2dcf876..f22ae23 100644 --- a/src/tachyaudio/_stream.py +++ b/src/tachyaudio/_stream.py @@ -3,12 +3,29 @@ from __future__ import annotations from dataclasses import dataclass +import time from typing import Any from tachyaudio._backend import get_backend from tachyaudio._errors import StreamClosed +_POLL_INTERVAL = 0.001 + + +def _validate_timeout(timeout: float | None) -> None: + if timeout is not None and timeout < 0: + raise ValueError("timeout cannot be negative") + + +def _deadline(timeout: float | None) -> float | None: + return None if timeout is None else time.monotonic() + timeout + + +def _timed_out(deadline: float | None) -> bool: + return deadline is not None and time.monotonic() >= deadline + + @dataclass(frozen=True, slots=True) class StreamConfig: """Audio stream configuration.""" @@ -127,8 +144,7 @@ def drain(self, timeout: float | None = None) -> bool: """ self._require_open() - if timeout is not None and timeout < 0: - raise ValueError("timeout cannot be negative") + _validate_timeout(timeout) return self._handle.drain(timeout) def flush(self) -> None: @@ -150,6 +166,40 @@ def write(self, frames: Any) -> int: self._require_open() return self._handle.write(frames) + def write_all(self, frames: Any, timeout: float | None = None) -> int: + """Write all interleaved frames, waiting for ring-buffer capacity. + + Returns the number of frames accepted. Raises `TimeoutError` if the + stream does not accept the complete buffer before `timeout`. + """ + + self._require_open() + _validate_timeout(timeout) + view = memoryview(frames).cast("B") + frame_bytes = self.config.channels * 4 + if len(view) == 0 or len(view) % frame_bytes != 0: + raise ValueError("frames must contain whole interleaved float32 frames") + + deadline = _deadline(timeout) + total_frames = len(view) // frame_bytes + written_frames = 0 + while written_frames < total_frames: + start = written_frames * frame_bytes + remaining_frames = total_frames - written_frames + accepted = self.write(view[start:]) + if accepted < 0: + raise RuntimeError("backend returned a negative frame count") + if accepted > remaining_frames: + raise RuntimeError("backend accepted more frames than requested") + written_frames += accepted + if written_frames >= total_frames: + return written_frames + if _timed_out(deadline): + raise TimeoutError("audio write did not complete before timeout") + time.sleep(_POLL_INTERVAL) + + return written_frames + def stats(self) -> StreamStats: """Return current stream counters.""" @@ -183,8 +233,8 @@ def play( dtype=dtype, ) try: - stream.write(frames) stream.start() + stream.write_all(frames, timeout) if not stream.drain(timeout): raise TimeoutError("audio playback did not drain before timeout") return stream.stats() @@ -262,6 +312,44 @@ def read(self, frame_count: int) -> memoryview: raise ValueError("frame_count must be positive") return self._handle.read(frame_count) + def read_exactly(self, frame_count: int, timeout: float | None = None) -> memoryview: + """Read exactly `frame_count` frames, waiting for captured input. + + Raises `TimeoutError` if the requested frame count is not available + before `timeout`. + """ + + self._require_open() + if frame_count < 1: + raise ValueError("frame_count must be positive") + _validate_timeout(timeout) + + frame_bytes = self.config.channels * 4 + deadline = _deadline(timeout) + chunks: list[memoryview] = [] + captured_frames = 0 + while captured_frames < frame_count: + chunk = self.read(frame_count - captured_frames) + if len(chunk) % frame_bytes != 0: + raise RuntimeError("backend returned partial frame bytes") + chunk_frames = len(chunk) // frame_bytes + if chunk_frames > frame_count - captured_frames: + raise RuntimeError("backend returned more frames than requested") + if chunk_frames: + chunks.append(chunk) + captured_frames += chunk_frames + continue + if _timed_out(deadline): + raise TimeoutError("audio read did not complete before timeout") + time.sleep(_POLL_INTERVAL) + + output = bytearray(frame_count * frame_bytes) + offset = 0 + for chunk in chunks: + output[offset : offset + len(chunk)] = chunk + offset += len(chunk) + return memoryview(output) + def stats(self) -> StreamStats: """Return current stream counters.""" diff --git a/tests/test_public_api.py b/tests/test_public_api.py index dec46f4..ed60dd3 100644 --- a/tests/test_public_api.py +++ b/tests/test_public_api.py @@ -145,15 +145,17 @@ def open_input_stream(self, config: object) -> object: def test_play_helper_returns_stats(self) -> None: class OutputHandle: def __init__(self) -> None: + self.events: list[str] = [] self.frames = 0 def start(self) -> None: - pass + self.events.append("start") def stop(self) -> None: pass def drain(self, timeout: float | None = None) -> bool: + self.events.append("drain") return True def flush(self) -> None: @@ -163,6 +165,7 @@ def close(self) -> None: pass def write(self, frames: object) -> int: + self.events.append("write") self.frames = 4 return self.frames @@ -184,9 +187,105 @@ def open_output_stream(self, config: object) -> OutputHandle: def open_input_stream(self, config: object) -> object: raise NotImplementedError - ta.set_backend(Backend()) - stats = ta.play(b"frames", timeout=0.1) + backend = Backend() + ta.set_backend(backend) + stats = ta.play(b"\x00" * 16, channels=1, timeout=0.1) self.assertEqual(stats.frames_processed, 4) + self.assertEqual(backend.handle.events, ["start", "write", "drain"]) + + def test_output_stream_write_all_handles_partial_writes(self) -> None: + class OutputHandle: + def __init__(self) -> None: + self.frames = 0 + + def start(self) -> None: + pass + + def stop(self) -> None: + pass + + def drain(self, timeout: float | None = None) -> bool: + return True + + def flush(self) -> None: + pass + + def close(self) -> None: + pass + + def write(self, frames: object) -> int: + del frames + self.frames += 1 + return 1 + + def stats(self) -> ta.StreamStats: + return ta.StreamStats(frames_processed=self.frames) + + class Backend: + name = "test" + + def __init__(self) -> None: + self.handle = OutputHandle() + + def list_devices(self) -> tuple[ta.DeviceInfo, ...]: + return () + + def open_output_stream(self, config: object) -> OutputHandle: + return self.handle + + def open_input_stream(self, config: object) -> object: + raise NotImplementedError + + ta.set_backend(Backend()) + stream = ta.OutputStream(channels=1) + try: + self.assertEqual(stream.write_all(b"\x00" * 12, timeout=0.1), 3) + self.assertEqual(stream.stats().frames_processed, 3) + finally: + stream.close() + + def test_output_stream_write_all_times_out(self) -> None: + class OutputHandle: + def start(self) -> None: + pass + + def stop(self) -> None: + pass + + def drain(self, timeout: float | None = None) -> bool: + return True + + def flush(self) -> None: + pass + + def close(self) -> None: + pass + + def write(self, frames: object) -> int: + return 0 + + def stats(self) -> ta.StreamStats: + return ta.StreamStats() + + class Backend: + name = "test" + + def list_devices(self) -> tuple[ta.DeviceInfo, ...]: + return () + + def open_output_stream(self, config: object) -> OutputHandle: + return OutputHandle() + + def open_input_stream(self, config: object) -> object: + raise NotImplementedError + + ta.set_backend(Backend()) + stream = ta.OutputStream(channels=1) + try: + with self.assertRaises(TimeoutError): + stream.write_all(b"\x00" * 4, timeout=0.001) + finally: + stream.close() def test_input_stream_lifecycle_delegates_to_backend(self) -> None: class InputHandle: @@ -247,6 +346,93 @@ def open_input_stream(self, config: object) -> InputHandle: with self.assertRaises(ta.StreamClosed): stream.read(1) + def test_input_stream_read_exactly_combines_chunks(self) -> None: + class InputHandle: + def __init__(self) -> None: + self.chunks = [b"\x01" * 8, b"\x02" * 8] + + def start(self) -> None: + pass + + def stop(self) -> None: + pass + + def close(self) -> None: + pass + + def flush(self) -> None: + pass + + def read(self, frame_count: int) -> memoryview: + del frame_count + return memoryview(self.chunks.pop(0) if self.chunks else b"") + + def stats(self) -> ta.StreamStats: + return ta.StreamStats() + + class Backend: + name = "test" + + def __init__(self) -> None: + self.handle = InputHandle() + + def list_devices(self) -> tuple[ta.DeviceInfo, ...]: + return () + + def open_output_stream(self, config: object) -> object: + raise NotImplementedError + + def open_input_stream(self, config: object) -> InputHandle: + return self.handle + + ta.set_backend(Backend()) + stream = ta.InputStream(channels=1) + try: + data = stream.read_exactly(4, timeout=0.1) + self.assertEqual(bytes(data), b"\x01" * 8 + b"\x02" * 8) + finally: + stream.close() + + def test_input_stream_read_exactly_times_out(self) -> None: + class InputHandle: + def start(self) -> None: + pass + + def stop(self) -> None: + pass + + def close(self) -> None: + pass + + def flush(self) -> None: + pass + + def read(self, frame_count: int) -> memoryview: + return memoryview(b"") + + def stats(self) -> ta.StreamStats: + return ta.StreamStats() + + class Backend: + name = "test" + + def list_devices(self) -> tuple[ta.DeviceInfo, ...]: + return () + + def open_output_stream(self, config: object) -> object: + raise NotImplementedError + + def open_input_stream(self, config: object) -> InputHandle: + return InputHandle() + + ta.set_backend(Backend()) + stream = ta.InputStream(channels=1) + try: + with self.assertRaises(TimeoutError): + stream.read_exactly(1, timeout=0.001) + finally: + stream.close() + def test_stream_stats_preserves_zero_latency(self) -> None: stats = ta.StreamStats(estimated_latency=0.0) self.assertEqual(stats.estimated_latency, 0.0)