Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@ as a single operation.

See `examples/play_wav.py` for dependency-free playback of WAV files exported
from an editor such as Audacity. The repository example uses a 48 kHz float WAV
to avoid runtime resampling.
to avoid runtime resampling. Pass `--eq` to show a lightweight five-band
terminal meter during playback.

Use blocking helpers when callers need complete buffer transfer:

Expand Down
119 changes: 119 additions & 0 deletions examples/play_wav.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import argparse
from array import array
from collections.abc import Iterator
import math
from pathlib import Path
import struct
import sys
Expand All @@ -16,6 +17,13 @@
_WAVE_FORMAT_EXTENSIBLE = 0xFFFE
_PCM_SUBFORMAT = bytes.fromhex("0100000000001000800000aa00389b71")
_IEEE_FLOAT_SUBFORMAT = bytes.fromhex("0300000000001000800000aa00389b71")
_EQ_BANDS = (
("Low", 80.0),
("LowMid", 250.0),
("Mid", 750.0),
("HighMid", 2_000.0),
("High", 6_000.0),
)


def _pcm_sample_to_float(sample: int, sample_width: int) -> float:
Expand Down Expand Up @@ -138,6 +146,90 @@ def upmix_mono_to_stereo(samples: array) -> array:
return output


class EqualizerMeter:
"""Small terminal five-band level meter for already-buffered samples."""

def __init__(
self,
*,
sample_rate: int,
channels: int,
width: int,
refresh: float,
) -> None:
self.sample_rate = sample_rate
self.channels = channels
self.width = width
self.refresh = refresh
self.last_update = 0.0
self.started = time.monotonic()
self.drawn_lines = 0
self.dynamic = sys.stdout.isatty()

def maybe_draw(self, samples: array, *, frame_cursor: int, total_frames: int) -> None:
now = time.monotonic()
if now - self.last_update < self.refresh:
return
self.last_update = now
levels = self._band_levels(samples)
position = frame_cursor / self.sample_rate
duration = total_frames / self.sample_rate
if self.dynamic:
self._draw_dynamic(levels, position=position, duration=duration)
else:
self._draw_log_line(levels, position=position, duration=duration)

def finish(self) -> None:
if self.drawn_lines:
print()

def _bar(self, level: float) -> str:
scaled = min(1.0, math.sqrt(max(0.0, level)) * 2.5)
filled = min(self.width, int(round(scaled * self.width)))
return "█" * filled + "░" * (self.width - filled)

def _draw_dynamic(self, levels: list[float], *, position: float, duration: float) -> None:
lines = [f"EQ {position:6.1f}/{duration:6.1f}s"]
for (name, _frequency), level in zip(_EQ_BANDS, levels, strict=True):
lines.append(f"{name:7} {self._bar(level)}")

if self.drawn_lines:
sys.stdout.write(f"\033[{self.drawn_lines}F")
for line in lines:
sys.stdout.write(f"\033[2K{line}\n")
sys.stdout.flush()
self.drawn_lines = len(lines)

def _draw_log_line(self, levels: list[float], *, position: float, duration: float) -> None:
bars = " ".join(
f"{name}:{self._bar(level)}"
for (name, _frequency), level in zip(_EQ_BANDS, levels, strict=True)
)
print(f"\r{position:6.1f}/{duration:6.1f}s {bars}", end="", flush=True)
self.drawn_lines = 1

def _band_levels(self, samples: array) -> list[float]:
frame_count = len(samples) // self.channels
if frame_count == 0:
return [0.0 for _band in _EQ_BANDS]

levels: list[float] = []
for _name, frequency in _EQ_BANDS:
real = 0.0
imag = 0.0
for frame in range(frame_count):
mono = 0.0
frame_offset = frame * self.channels
for channel in range(self.channels):
mono += samples[frame_offset + channel]
mono /= self.channels
phase = 2.0 * math.pi * frequency * frame / self.sample_rate
real += mono * math.cos(phase)
imag -= mono * math.sin(phase)
levels.append(math.sqrt(real * real + imag * imag) / frame_count)
return levels


def stream_wav(
samples: array,
*,
Expand All @@ -147,6 +239,7 @@ def stream_wav(
device_id: str | None,
prebuffer: float,
chunk_frames: int,
meter: EqualizerMeter | None,
) -> ta.StreamStats:
"""Play samples through a prebuffered stream to avoid underruns."""

Expand Down Expand Up @@ -183,10 +276,18 @@ def stream_wav(
samples[frame_cursor * channels : end_frame * channels],
timeout=2.0,
)
if meter is not None:
meter.maybe_draw(
samples[frame_cursor * channels : end_frame * channels],
frame_cursor=end_frame,
total_frames=total_frames,
)
frame_cursor = end_frame

if not stream.drain(max(2.0, prebuffer + 2.0)):
raise TimeoutError("audio playback did not drain before timeout")
if meter is not None:
meter.finish()
return stream.stats()
finally:
stream.close()
Expand All @@ -206,6 +307,9 @@ def main() -> None:
parser.add_argument("--block-size", type=int, default=1024)
parser.add_argument("--chunk-frames", type=int, default=4096)
parser.add_argument("--prebuffer", type=float, default=0.25)
parser.add_argument("--eq", action="store_true", help="Show a fun five-band terminal meter")
parser.add_argument("--eq-width", type=int, default=8)
parser.add_argument("--eq-refresh", type=float, default=0.05)
parser.add_argument(
"--mono",
action="store_true",
Expand All @@ -224,6 +328,10 @@ def main() -> None:
raise SystemExit("--chunk-frames must be positive")
if args.prebuffer < 0.0:
raise SystemExit("--prebuffer must be non-negative")
if args.eq_width < 1:
raise SystemExit("--eq-width must be positive")
if args.eq_refresh <= 0.0:
raise SystemExit("--eq-refresh must be positive")

samples, sample_rate, channels, duration = load_wav(
args.path,
Expand All @@ -240,6 +348,16 @@ def main() -> None:
f"duration={duration:.3f}s prebuffer={args.prebuffer:.3f}s"
)
started = time.monotonic()
meter = (
EqualizerMeter(
sample_rate=sample_rate,
channels=output_channels,
width=args.eq_width,
refresh=args.eq_refresh,
)
if args.eq
else None
)
stats = stream_wav(
samples,
sample_rate=sample_rate,
Expand All @@ -248,6 +366,7 @@ def main() -> None:
device_id=args.device_id,
prebuffer=args.prebuffer,
chunk_frames=args.chunk_frames,
meter=meter,
)
elapsed = time.monotonic() - started
print(f"elapsed={elapsed:.3f}s")
Expand Down
Loading