Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ public APIs may still change while the backend design stabilizes.
- Added `hardware_latency` to stream statistics. macOS reports Core Audio
device latency plus safety offset; Linux reports `None` until a reliable
miniaudio latency value is available.
- Added blocking convenience helpers `OutputStream.write_all()` and
`InputStream.read_exactly()`.

## [0.2.0a2] - 2026-06-10

Expand Down
7 changes: 7 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,13 @@ an underrun.
Use `tachyaudio.play()` for finite stimuli that should start, drain, and close
as a single operation.

Use blocking helpers when callers need complete buffer transfer:

- `OutputStream.write_all(frames, timeout=None)`: wait until all frames are
accepted by the output ring
- `InputStream.read_exactly(frame_count, timeout=None)`: wait until exactly the
requested number of frames has been captured

Lifecycle semantics:

- `stop()`: stop playback without discarding queued frames
Expand Down
5 changes: 5 additions & 0 deletions docs/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ Input capture mirrors output buffering in reverse: Core Audio callbacks write
captured float32 frames into a native ring buffer, and Python reads available
frames without invoking Python from the audio callback.

The core stream methods remain nonblocking: `write()` accepts as many frames as
the output ring can hold, and `read()` returns currently available captured
frames. Blocking helpers (`write_all()` and `read_exactly()`) are Python-level
conveniences layered on top of those primitives.

Stream statistics distinguish queue state from hardware behavior. `queued_frames`
and `queued_latency` describe the native ring buffer. `hardware_latency`
describes backend-reported device latency when available. `buffer_size`
Expand Down
94 changes: 91 additions & 3 deletions src/tachyaudio/_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,29 @@
from __future__ import annotations

from dataclasses import dataclass
import time
from typing import Any

from tachyaudio._backend import get_backend
from tachyaudio._errors import StreamClosed


_POLL_INTERVAL = 0.001


def _validate_timeout(timeout: float | None) -> None:
if timeout is not None and timeout < 0:
raise ValueError("timeout cannot be negative")


def _deadline(timeout: float | None) -> float | None:
return None if timeout is None else time.monotonic() + timeout


def _timed_out(deadline: float | None) -> bool:
return deadline is not None and time.monotonic() >= deadline


@dataclass(frozen=True, slots=True)
class StreamConfig:
"""Audio stream configuration."""
Expand Down Expand Up @@ -127,8 +144,7 @@ def drain(self, timeout: float | None = None) -> bool:
"""

self._require_open()
if timeout is not None and timeout < 0:
raise ValueError("timeout cannot be negative")
_validate_timeout(timeout)
return self._handle.drain(timeout)

def flush(self) -> None:
Expand All @@ -150,6 +166,40 @@ def write(self, frames: Any) -> int:
self._require_open()
return self._handle.write(frames)

def write_all(self, frames: Any, timeout: float | None = None) -> int:
"""Write all interleaved frames, waiting for ring-buffer capacity.

Returns the number of frames accepted. Raises `TimeoutError` if the
stream does not accept the complete buffer before `timeout`.
"""

self._require_open()
_validate_timeout(timeout)
view = memoryview(frames).cast("B")
frame_bytes = self.config.channels * 4
if len(view) == 0 or len(view) % frame_bytes != 0:
raise ValueError("frames must contain whole interleaved float32 frames")

deadline = _deadline(timeout)
total_frames = len(view) // frame_bytes
written_frames = 0
while written_frames < total_frames:
start = written_frames * frame_bytes
remaining_frames = total_frames - written_frames
accepted = self.write(view[start:])
if accepted < 0:
raise RuntimeError("backend returned a negative frame count")
if accepted > remaining_frames:
raise RuntimeError("backend accepted more frames than requested")
written_frames += accepted
if written_frames >= total_frames:
return written_frames
if _timed_out(deadline):
raise TimeoutError("audio write did not complete before timeout")
time.sleep(_POLL_INTERVAL)

return written_frames

def stats(self) -> StreamStats:
"""Return current stream counters."""

Expand Down Expand Up @@ -183,8 +233,8 @@ def play(
dtype=dtype,
)
try:
stream.write(frames)
stream.start()
stream.write_all(frames, timeout)
if not stream.drain(timeout):
raise TimeoutError("audio playback did not drain before timeout")
return stream.stats()
Expand Down Expand Up @@ -262,6 +312,44 @@ def read(self, frame_count: int) -> memoryview:
raise ValueError("frame_count must be positive")
return self._handle.read(frame_count)

def read_exactly(self, frame_count: int, timeout: float | None = None) -> memoryview:
"""Read exactly `frame_count` frames, waiting for captured input.

Raises `TimeoutError` if the requested frame count is not available
before `timeout`.
"""

self._require_open()
if frame_count < 1:
raise ValueError("frame_count must be positive")
_validate_timeout(timeout)

frame_bytes = self.config.channels * 4
deadline = _deadline(timeout)
chunks: list[memoryview] = []
captured_frames = 0
while captured_frames < frame_count:
chunk = self.read(frame_count - captured_frames)
if len(chunk) % frame_bytes != 0:
raise RuntimeError("backend returned partial frame bytes")
chunk_frames = len(chunk) // frame_bytes
if chunk_frames > frame_count - captured_frames:
raise RuntimeError("backend returned more frames than requested")
if chunk_frames:
chunks.append(chunk)
captured_frames += chunk_frames
continue
if _timed_out(deadline):
raise TimeoutError("audio read did not complete before timeout")
time.sleep(_POLL_INTERVAL)

output = bytearray(frame_count * frame_bytes)
offset = 0
for chunk in chunks:
output[offset : offset + len(chunk)] = chunk
offset += len(chunk)
return memoryview(output)

def stats(self) -> StreamStats:
"""Return current stream counters."""

Expand Down
192 changes: 189 additions & 3 deletions tests/test_public_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,15 +145,17 @@ def open_input_stream(self, config: object) -> object:
def test_play_helper_returns_stats(self) -> None:
class OutputHandle:
def __init__(self) -> None:
self.events: list[str] = []
self.frames = 0

def start(self) -> None:
pass
self.events.append("start")

def stop(self) -> None:
pass

def drain(self, timeout: float | None = None) -> bool:
self.events.append("drain")
return True

def flush(self) -> None:
Expand All @@ -163,6 +165,7 @@ def close(self) -> None:
pass

def write(self, frames: object) -> int:
self.events.append("write")
self.frames = 4
return self.frames

Expand All @@ -184,9 +187,105 @@ def open_output_stream(self, config: object) -> OutputHandle:
def open_input_stream(self, config: object) -> object:
raise NotImplementedError

ta.set_backend(Backend())
stats = ta.play(b"frames", timeout=0.1)
backend = Backend()
ta.set_backend(backend)
stats = ta.play(b"\x00" * 16, channels=1, timeout=0.1)
self.assertEqual(stats.frames_processed, 4)
self.assertEqual(backend.handle.events, ["start", "write", "drain"])

def test_output_stream_write_all_handles_partial_writes(self) -> None:
class OutputHandle:
def __init__(self) -> None:
self.frames = 0

def start(self) -> None:
pass

def stop(self) -> None:
pass

def drain(self, timeout: float | None = None) -> bool:
return True

def flush(self) -> None:
pass

def close(self) -> None:
pass

def write(self, frames: object) -> int:
del frames
self.frames += 1
return 1

def stats(self) -> ta.StreamStats:
return ta.StreamStats(frames_processed=self.frames)

class Backend:
name = "test"

def __init__(self) -> None:
self.handle = OutputHandle()

def list_devices(self) -> tuple[ta.DeviceInfo, ...]:
return ()

def open_output_stream(self, config: object) -> OutputHandle:
return self.handle

def open_input_stream(self, config: object) -> object:
raise NotImplementedError

ta.set_backend(Backend())
stream = ta.OutputStream(channels=1)
try:
self.assertEqual(stream.write_all(b"\x00" * 12, timeout=0.1), 3)
self.assertEqual(stream.stats().frames_processed, 3)
finally:
stream.close()

def test_output_stream_write_all_times_out(self) -> None:
class OutputHandle:
def start(self) -> None:
pass

def stop(self) -> None:
pass

def drain(self, timeout: float | None = None) -> bool:
return True

def flush(self) -> None:
pass

def close(self) -> None:
pass

def write(self, frames: object) -> int:
return 0

def stats(self) -> ta.StreamStats:
return ta.StreamStats()

class Backend:
name = "test"

def list_devices(self) -> tuple[ta.DeviceInfo, ...]:
return ()

def open_output_stream(self, config: object) -> OutputHandle:
return OutputHandle()

def open_input_stream(self, config: object) -> object:
raise NotImplementedError

ta.set_backend(Backend())
stream = ta.OutputStream(channels=1)
try:
with self.assertRaises(TimeoutError):
stream.write_all(b"\x00" * 4, timeout=0.001)
finally:
stream.close()

def test_input_stream_lifecycle_delegates_to_backend(self) -> None:
class InputHandle:
Expand Down Expand Up @@ -247,6 +346,93 @@ def open_input_stream(self, config: object) -> InputHandle:
with self.assertRaises(ta.StreamClosed):
stream.read(1)

def test_input_stream_read_exactly_combines_chunks(self) -> None:
class InputHandle:
def __init__(self) -> None:
self.chunks = [b"\x01" * 8, b"\x02" * 8]

def start(self) -> None:
pass

def stop(self) -> None:
pass

def close(self) -> None:
pass

def flush(self) -> None:
pass

def read(self, frame_count: int) -> memoryview:
del frame_count
return memoryview(self.chunks.pop(0) if self.chunks else b"")

def stats(self) -> ta.StreamStats:
return ta.StreamStats()

class Backend:
name = "test"

def __init__(self) -> None:
self.handle = InputHandle()

def list_devices(self) -> tuple[ta.DeviceInfo, ...]:
return ()

def open_output_stream(self, config: object) -> object:
raise NotImplementedError

def open_input_stream(self, config: object) -> InputHandle:
return self.handle

ta.set_backend(Backend())
stream = ta.InputStream(channels=1)
try:
data = stream.read_exactly(4, timeout=0.1)
self.assertEqual(bytes(data), b"\x01" * 8 + b"\x02" * 8)
finally:
stream.close()

def test_input_stream_read_exactly_times_out(self) -> None:
class InputHandle:
def start(self) -> None:
pass

def stop(self) -> None:
pass

def close(self) -> None:
pass

def flush(self) -> None:
pass

def read(self, frame_count: int) -> memoryview:
return memoryview(b"")

def stats(self) -> ta.StreamStats:
return ta.StreamStats()

class Backend:
name = "test"

def list_devices(self) -> tuple[ta.DeviceInfo, ...]:
return ()

def open_output_stream(self, config: object) -> object:
raise NotImplementedError

def open_input_stream(self, config: object) -> InputHandle:
return InputHandle()

ta.set_backend(Backend())
stream = ta.InputStream(channels=1)
try:
with self.assertRaises(TimeoutError):
stream.read_exactly(1, timeout=0.001)
finally:
stream.close()

def test_stream_stats_preserves_zero_latency(self) -> None:
stats = ta.StreamStats(estimated_latency=0.0)
self.assertEqual(stats.estimated_latency, 0.0)
Expand Down
Loading