Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
162 changes: 162 additions & 0 deletions ovoscope/wakeword_probe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
"""Lightweight per-clip wake-word detection probe.

Drives a **real** OVOS :class:`HotWordEngine` over a single audio clip the way
the live listening loop does — a few seconds of leading silence to warm the
engine's streaming feature buffers, then the clip streamed frame by frame —
and returns a per-clip detection decision plus latency.

Unlike :class:`ovoscope.voice_loop.MiniVoiceLoop` (which runs the full
``DinkumVoiceLoop`` state machine and needs the ``[listener]`` extra), this is
self-contained: no bus, no VAD, no state machine — just ``engine.update()`` /
``engine.found_wake_word()`` over primed audio. Ideal for plugin test suites
and benchmarks that score detection on labelled fixtures.

Why the long lead matters
-------------------------
Streaming detectors (openWakeWord, microWakeWord, …) only emit a prediction
once their rolling mel/embedding window is full (~2.5 s of frames). A clip fed
with too little leading silence never fills that window: the activation is
missed (a false reject), and on the shortest clips the half-full buffer raises
a shape mismatch that drops the sample entirely. Priming with a few seconds of
leading silence fills the window *before* the keyword arrives, exactly as a
live microphone keeps the loop warm. :data:`PRIME_SECONDS` defaults to 3 s.

Audio contract: mono ``float32`` in ``[-1, 1]`` at the engine's sample rate
(16 kHz for every OVOS hotword engine). Resample upstream if your source
differs. Needs the ``[bench]`` extra (numpy).
"""
from __future__ import annotations

import inspect
import time
from dataclasses import dataclass
from typing import Any, Dict, Optional

SAMPLE_RATE = 16000
FRAME_SAMPLES = 1280 # 80 ms @ 16 kHz — the OVOS listener chunk size
PRIME_SECONDS = 3.0 # leading silence to warm the feature window (see module docstring)
TAIL_SECONDS = 0.5 # trailing silence so a late activation can settle


@dataclass
class WakeWordDetection:
"""Outcome of running one clip through a hotword engine."""

detected: bool
latency_ms: float
frames_to_detection: Optional[int] # frames streamed before the latch fired


def apply_hotword_compat() -> None:
"""Let hotword plugins written for a newer plugin-manager load here.

Recent wake-word plugins call ``super().__init__(key_phrase, config, lang)``;
older ``HotWordEngine`` bases accept only ``(key_phrase, config)``. Widen the
base signature to ignore the extra argument. A no-op when the installed base
already accepts ``lang``.
"""
from ovos_plugin_manager.templates import hotwords as hw

base = hw.HotWordEngine
if "lang" in inspect.signature(base.__init__).parameters:
return
_orig = base.__init__

def _compat(self, key_phrase="hey_mycroft", config=None, lang=None,
*args, **kwargs):
_orig(self, key_phrase, config)

base.__init__ = _compat


def load_hotword_engine(plugin_id: str, key_phrase: str = "hey_mycroft",
config: Optional[Dict[str, Any]] = None,
lang: str = "en-us"):
"""Load and instantiate a real OVOS hotword engine by plugin id.

Tolerates the ``HotWordEngine(lang)`` signature change and dash/underscore
variation in plugin ids, mirroring how the listening loop resolves engines.
"""
from ovos_plugin_manager.wakewords import load_wake_word_plugin

apply_hotword_compat()
clazz = load_wake_word_plugin(plugin_id)
if clazz is None and "-" in plugin_id:
clazz = load_wake_word_plugin(plugin_id.replace("-", "_"))
if clazz is None:
raise ValueError(f"no wake-word plugin {plugin_id!r}")
return clazz(key_phrase, dict(config or {}), lang)


class WakeWordProbe:
"""Drive a real ``HotWordEngine`` over single clips with listener-style priming."""

def __init__(self, engine, *, sample_rate: int = SAMPLE_RATE,
frame_samples: int = FRAME_SAMPLES,
prime_seconds: float = PRIME_SECONDS,
tail_seconds: float = TAIL_SECONDS):
self.engine = engine
self.sample_rate = sample_rate
self.frame_samples = frame_samples
self.prime_seconds = prime_seconds
self.tail_seconds = tail_seconds

@classmethod
def from_plugin(cls, plugin_id: str, key_phrase: str = "hey_mycroft",
config: Optional[Dict[str, Any]] = None,
lang: str = "en-us", **kwargs) -> "WakeWordProbe":
"""Build a probe from a plugin id (loads + instantiates the engine)."""
engine = load_hotword_engine(plugin_id, key_phrase, config, lang)
return cls(engine, **kwargs)

def prime_pad(self, array):
"""Wrap a clip in leading + trailing silence, padded to whole frames."""
import numpy as np

arr = np.asarray(array, dtype="float32")
lead = np.zeros(int(self.sample_rate * self.prime_seconds), dtype="float32")
tail = np.zeros(int(self.sample_rate * self.tail_seconds), dtype="float32")
out = np.concatenate([lead, arr, tail])
rem = len(out) % self.frame_samples
if rem:
out = np.concatenate(
[out, np.zeros(self.frame_samples - rem, dtype="float32")])
return out

@staticmethod
def to_pcm16(array) -> bytes:
"""Float32 ``[-1, 1]`` mono array → 16-bit little-endian PCM bytes."""
import numpy as np

arr = np.clip(np.asarray(array, dtype="float32"), -1.0, 1.0)
return (arr * 32767.0).astype("<i2").tobytes()

def detect(self, array) -> WakeWordDetection:
"""Stream one clip through the engine; return the detection decision.

The OVOS contract is ``update(chunk_bytes)`` to feed audio then
``found_wake_word()`` to read the latch. Some plugins keep a vestigial
``found_wake_word(frame_data)`` argument they ignore — we pass the chunk
through so the signature matches either way.
"""
primed = self.prime_pad(array)
if hasattr(self.engine, "reset"):
try:
self.engine.reset()
except Exception:
pass
fww = self.engine.found_wake_word
fww_takes_arg = len(inspect.signature(fww).parameters) >= 1
has_update = hasattr(self.engine, "update")
pcm = self.to_pcm16(primed)
step = self.frame_samples * 2 # 2 bytes / sample (int16)
start = time.perf_counter()
for i, off in enumerate(range(0, len(pcm), step), 1):
chunk = pcm[off:off + step]
if has_update:
self.engine.update(chunk)
if (fww(chunk) if fww_takes_arg else fww()):
latency = (time.perf_counter() - start) * 1000
return WakeWordDetection(True, round(latency, 3), i)
latency = (time.perf_counter() - start) * 1000
return WakeWordDetection(False, round(latency, 3), None)
4 changes: 4 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,9 @@ media = ["ovos-media>=0.0.2a3"]
# AudioTransformersService. >=0.7.2a1 is the first release that allows
# ovos-bus-client 2.x (older pins cap it <2.0.0 and conflict with ovos-core).
listener = ["ovos-dinkum-listener>=0.7.2a1"]
# Per-clip WakeWordProbe (ovoscope.wakeword_probe): streams audio arrays through
# a real HotWordEngine. Only needs numpy — the engine itself is environmental.
bench = ["numpy"]
# End-to-end TTS intelligibility scoring (WER/CER round-trip via reference STT).
# faster-whisper itself is pulled by the plugin — don't list it here to avoid
# version skew.
Expand All @@ -61,6 +64,7 @@ dev = [
"ovos-media>=0.0.2a3",
"ovos-dinkum-listener>=0.7.2a1",
"ovos-pydantic-models>=0.1.0",
"numpy",
"jiwer",
"ovos-utterance-normalizer",
"ovos-stt-plugin-fasterwhisper",
Expand Down
93 changes: 93 additions & 0 deletions test/unittests/test_wakeword_probe.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
"""Tests for the per-clip WakeWordProbe (no real engine / no heavy deps)."""
import inspect
import unittest

import numpy as np

from ovoscope.wakeword_probe import (
FRAME_SAMPLES,
PRIME_SECONDS,
SAMPLE_RATE,
WakeWordDetection,
WakeWordProbe,
)


class _FakeEngine:
"""Fires once it has seen ``trigger_after`` update() calls. Resets on read."""

def __init__(self, trigger_after=None, takes_arg=False):
self.trigger_after = trigger_after
self._calls = 0
self._fired = False
self.reset_count = 0
# Build found_wake_word with or without the vestigial frame arg, so the
# probe's signature sniffing is exercised both ways.
if takes_arg:
def found_wake_word(frame_data=b""):
return self._read()
else:
def found_wake_word():
return self._read()
self.found_wake_word = found_wake_word

def update(self, chunk: bytes):
self._calls += 1
if self.trigger_after is not None and self._calls >= self.trigger_after:
self._fired = True

def _read(self):
fired, self._fired = self._fired, False
return fired

def reset(self):
self.reset_count += 1
self._calls = 0
self._fired = False


class TestPrimePad(unittest.TestCase):
def test_leads_with_silence_and_pads_to_frames(self):
probe = WakeWordProbe(_FakeEngine())
clip = np.ones(1000, dtype="float32")
out = probe.prime_pad(clip)
# whole number of frames
self.assertEqual(len(out) % FRAME_SAMPLES, 0)
# at least PRIME_SECONDS of leading silence before any signal
lead = int(SAMPLE_RATE * PRIME_SECONDS)
self.assertTrue(np.all(out[:lead] == 0.0))
# the clip survives inside the padded buffer
self.assertGreaterEqual(len(out), lead + len(clip))

def test_default_prime_is_a_few_seconds(self):
self.assertGreaterEqual(PRIME_SECONDS, 2.5)


class TestDetect(unittest.TestCase):
def test_detects_and_reports_frames_and_latency(self):
engine = _FakeEngine(trigger_after=5)
probe = WakeWordProbe(engine)
result = probe.detect(np.zeros(SAMPLE_RATE, dtype="float32"))
self.assertIsInstance(result, WakeWordDetection)
self.assertTrue(result.detected)
self.assertEqual(result.frames_to_detection, 5)
self.assertGreaterEqual(result.latency_ms, 0.0)
self.assertEqual(engine.reset_count, 1) # reset before streaming

def test_no_detection_returns_none_frames(self):
probe = WakeWordProbe(_FakeEngine(trigger_after=None))
result = probe.detect(np.zeros(SAMPLE_RATE, dtype="float32"))
self.assertFalse(result.detected)
self.assertIsNone(result.frames_to_detection)

def test_handles_found_wake_word_with_frame_arg(self):
engine = _FakeEngine(trigger_after=3, takes_arg=True)
self.assertGreaterEqual(
len(inspect.signature(engine.found_wake_word).parameters), 1)
result = WakeWordProbe(engine).detect(
np.zeros(SAMPLE_RATE, dtype="float32"))
self.assertTrue(result.detected)


if __name__ == "__main__":
unittest.main()
Loading