From d74b8aa75bbb819f0190741d9c71d5f01fabb3d6 Mon Sep 17 00:00:00 2001 From: iancharest Date: Fri, 12 Jun 2026 10:13:41 -0400 Subject: [PATCH 1/2] Add terminal EQ meter to WAV example --- README.md | 3 +- examples/play_wav.py | 97 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 99 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 461b711..250d8d7 100644 --- a/README.md +++ b/README.md @@ -48,7 +48,8 @@ as a single operation. See `examples/play_wav.py` for dependency-free playback of WAV files exported from an editor such as Audacity. The repository example uses a 48 kHz float WAV -to avoid runtime resampling. +to avoid runtime resampling. Pass `--eq` to show a lightweight five-band +terminal meter during playback. Use blocking helpers when callers need complete buffer transfer: diff --git a/examples/play_wav.py b/examples/play_wav.py index 181b82b..b7738a9 100644 --- a/examples/play_wav.py +++ b/examples/play_wav.py @@ -3,6 +3,7 @@ import argparse from array import array from collections.abc import Iterator +import math from pathlib import Path import struct import sys @@ -16,6 +17,13 @@ _WAVE_FORMAT_EXTENSIBLE = 0xFFFE _PCM_SUBFORMAT = bytes.fromhex("0100000000001000800000aa00389b71") _IEEE_FLOAT_SUBFORMAT = bytes.fromhex("0300000000001000800000aa00389b71") +_EQ_BANDS = ( + ("Low", 80.0), + ("LowMid", 250.0), + ("Mid", 750.0), + ("HighMid", 2_000.0), + ("High", 6_000.0), +) def _pcm_sample_to_float(sample: int, sample_width: int) -> float: @@ -138,6 +146,68 @@ def upmix_mono_to_stereo(samples: array) -> array: return output +class EqualizerMeter: + """Small terminal five-band level meter for already-buffered samples.""" + + def __init__( + self, + *, + sample_rate: int, + channels: int, + width: int, + refresh: float, + ) -> None: + self.sample_rate = sample_rate + self.channels = channels + self.width = width + self.refresh = refresh + self.last_update = 0.0 + self.started = time.monotonic() + + def maybe_draw(self, samples: array, *, frame_cursor: int, total_frames: int) -> None: + now = time.monotonic() + if now - self.last_update < self.refresh: + return + self.last_update = now + levels = self._band_levels(samples) + position = frame_cursor / self.sample_rate + duration = total_frames / self.sample_rate + bars = " ".join( + f"{name}:{self._bar(level)}" + for (name, _frequency), level in zip(_EQ_BANDS, levels, strict=True) + ) + print(f"\r{position:6.1f}/{duration:6.1f}s {bars}", end="", flush=True) + + def finish(self) -> None: + print() + + def _bar(self, level: float) -> str: + scaled = min(1.0, math.sqrt(max(0.0, level)) * 2.5) + filled = min(self.width, int(round(scaled * self.width))) + return "█" * filled + "░" * (self.width - filled) + + def _band_levels(self, samples: array) -> list[float]: + frame_count = len(samples) // self.channels + if frame_count == 0: + return [0.0 for _band in _EQ_BANDS] + + levels: list[float] = [] + for _name, frequency in _EQ_BANDS: + real = 0.0 + imag = 0.0 + for frame in range(frame_count): + mono = 0.0 + frame_offset = frame * self.channels + for channel in range(self.channels): + mono += samples[frame_offset + channel] + mono /= self.channels + phase = 2.0 * math.pi * frequency * frame / self.sample_rate + real += mono * math.cos(phase) + imag -= mono * math.sin(phase) + levels.append(math.sqrt(real * real + imag * imag) / frame_count) + return levels + + def stream_wav( samples: array, *, @@ -147,6 +217,7 @@ def stream_wav( device_id: str | None, prebuffer: float, chunk_frames: int, + meter: EqualizerMeter | None, ) -> ta.StreamStats: """Play samples through a prebuffered stream to avoid underruns.""" @@ -183,10 +254,18 @@ def stream_wav( samples[frame_cursor * channels : end_frame * channels], timeout=2.0, ) + if meter is not None: + meter.maybe_draw( + samples[frame_cursor * channels : end_frame * channels], + frame_cursor=end_frame, + total_frames=total_frames, + ) frame_cursor = end_frame if not stream.drain(max(2.0, prebuffer + 2.0)): raise TimeoutError("audio playback did not drain before timeout") + if meter is not None: + meter.finish() return stream.stats() finally: stream.close() @@ -206,6 +285,9 @@ def main() -> None: parser.add_argument("--block-size", type=int, default=1024) parser.add_argument("--chunk-frames", type=int, default=4096) parser.add_argument("--prebuffer", type=float, default=0.25) + parser.add_argument("--eq", action="store_true", help="Show a fun five-band terminal meter") + parser.add_argument("--eq-width", type=int, default=8) + parser.add_argument("--eq-refresh", type=float, default=0.05) parser.add_argument( "--mono", action="store_true", @@ -224,6 +306,10 @@ def main() -> None: raise SystemExit("--chunk-frames must be positive") if args.prebuffer < 0.0: raise SystemExit("--prebuffer must be non-negative") + if args.eq_width < 1: + raise SystemExit("--eq-width must be positive") + if args.eq_refresh <= 0.0: + raise SystemExit("--eq-refresh must be positive") samples, sample_rate, channels, duration = load_wav( args.path, @@ -240,6 +326,16 @@ def main() -> None: f"duration={duration:.3f}s prebuffer={args.prebuffer:.3f}s" ) started = time.monotonic() + meter = ( + EqualizerMeter( + sample_rate=sample_rate, + channels=output_channels, + width=args.eq_width, + refresh=args.eq_refresh, + ) + if args.eq + else None + ) stats = stream_wav( samples, sample_rate=sample_rate, @@ -248,6 +344,7 @@ def main() -> None: device_id=args.device_id, prebuffer=args.prebuffer, chunk_frames=args.chunk_frames, + meter=meter, ) elapsed = time.monotonic() - started print(f"elapsed={elapsed:.3f}s") From db0f88d932d2634de7eef1d246b76b422c3bf7ff Mon Sep 17 00:00:00 2001 From: iancharest Date: Fri, 12 Jun 2026 10:17:02 -0400 Subject: [PATCH 2/2] Render WAV EQ meter dynamically --- examples/play_wav.py | 34 ++++++++++++++++++++++++++++------ 1 file changed, 28 insertions(+), 6 deletions(-) diff --git a/examples/play_wav.py b/examples/play_wav.py index b7738a9..74fdf47 100644 --- a/examples/play_wav.py +++ b/examples/play_wav.py @@ -163,6 +163,8 @@ def __init__( self.refresh = refresh self.last_update = 0.0 self.started = time.monotonic() + self.drawn_lines = 0 + self.dynamic = sys.stdout.isatty() def maybe_draw(self, samples: array, *, frame_cursor: int, total_frames: int) -> None: now = time.monotonic() @@ -172,20 +174,40 @@ def maybe_draw(self, samples: array, *, frame_cursor: int, total_frames: int) -> levels = self._band_levels(samples) position = frame_cursor / self.sample_rate duration = total_frames / self.sample_rate - bars = " ".join( - f"{name}:{self._bar(level)}" - for (name, _frequency), level in zip(_EQ_BANDS, levels, strict=True) - ) - print(f"\r{position:6.1f}/{duration:6.1f}s {bars}", end="", flush=True) + if self.dynamic: + self._draw_dynamic(levels, position=position, duration=duration) + else: + self._draw_log_line(levels, position=position, duration=duration) def finish(self) -> None: - print() + if self.drawn_lines: + print() def _bar(self, level: float) -> str: scaled = min(1.0, math.sqrt(max(0.0, level)) * 2.5) filled = min(self.width, int(round(scaled * self.width))) return "█" * filled + "░" * (self.width - filled) + def _draw_dynamic(self, levels: list[float], *, position: float, duration: float) -> None: + lines = [f"EQ {position:6.1f}/{duration:6.1f}s"] + for (name, _frequency), level in zip(_EQ_BANDS, levels, strict=True): + lines.append(f"{name:7} {self._bar(level)}") + + if self.drawn_lines: + sys.stdout.write(f"\033[{self.drawn_lines}F") + for line in lines: + sys.stdout.write(f"\033[2K{line}\n") + sys.stdout.flush() + self.drawn_lines = len(lines) + + def _draw_log_line(self, levels: list[float], *, position: float, duration: float) -> None: + bars = " ".join( + f"{name}:{self._bar(level)}" + for (name, _frequency), level in zip(_EQ_BANDS, levels, strict=True) + ) + print(f"\r{position:6.1f}/{duration:6.1f}s {bars}", end="", flush=True) + self.drawn_lines = 1 + def _band_levels(self, samples: array) -> list[float]: frame_count = len(samples) // self.channels if frame_count == 0: