diff --git a/README.md b/README.md index f8cbb2b..06938ed 100644 --- a/README.md +++ b/README.md @@ -52,6 +52,8 @@ software timestamps alone. - `GLSystemText` (backward-compatible explicit name for `Text`). - Psychophysics helpers (`make_gabor`, gratings, normalization, dithering). - Audio playback utility (`Audio`) backed by `tachyaudio`. +- Optional Wooting analog-keyboard integration (`tachypy[wooting]`): on-screen + pressure feedback and `WOOTING_ACQUISITION` straight from `tachypy`. - Test suite for core logic and regressions. ## Installation @@ -79,10 +81,26 @@ Optional extras: ```bash pip install -e ".[test]" # pytest -pip install -e ".[text]" # Pillow text fallback -# Audio support (tachyaudio) is included in the base install +pip install -e ".[wooting]" # Wooting analog-keyboard integration +# Pillow, FreeType, HarfBuzz, GLFW, and audio are included in the base install ``` +### Wooting analog-keyboard integration + +`pip install "tachypy[wooting]"` adds support for Wooting analog keyboards +(pressure acquisition, logging, and on-screen visual feedback): + +```python +from tachypy import Screen, WOOTING_ACQUISITION + +acq = WOOTING_ACQUISITION(threshold=0.8) +acq.initialize_keyboard() +acq.wait_light_press_visual(target_keys=["c", "z"], screen=Screen(fullscreen=False)) +``` + +See the [Wooting docs page](https://tachypy.readthedocs.io/en/latest/wooting.html) +for details. + ### Audio dependency TachyPy audio now uses `tachyaudio>=0.2.0b1`. TachyPy no longer depends on @@ -164,7 +182,7 @@ TACHYPY_FONT="Avenir Next, Helvetica, Arial" python example_tachypy.py backend-independent. - `GLSystemText` supports system font selection by family name, fallback list (e.g. `"Avenir Next, Helvetica, Arial"`), or direct font file path. -- For production instruction text, prefer `Text` with `.[system_text]`. +- For production instruction text, prefer `Text`. - The old texture-backed constructor is backbenched as `tachypy.text.LegacyText`. ## API Naming @@ -198,6 +216,7 @@ Expanded docs live in `/docs` and include: - backend behavior and input routing - text rendering options - audio backend guidance +- Wooting analog-keyboard integration - examples and contribution workflow Hosted docs (Read the Docs): https://tachypy.readthedocs.io/ diff --git a/docs/api.rst b/docs/api.rst index 7007a69..0dd48a6 100644 --- a/docs/api.rst +++ b/docs/api.rst @@ -34,3 +34,13 @@ Core modules .. automodule:: tachypy.psychophysics :members: + +Wooting pressure feedback +------------------------- + +Keyboard-agnostic visual feedback toolkit (see :doc:`wooting`). These modules +never import a keyboard package; they render feedback for any object satisfying +:class:`tachypy.feedback.PressureSource`. + +.. automodule:: tachypy.feedback + :members: diff --git a/docs/getting_started.rst b/docs/getting_started.rst index 589bbdd..b352313 100644 --- a/docs/getting_started.rst +++ b/docs/getting_started.rst @@ -27,8 +27,10 @@ Optional extras .. code-block:: bash pip install -e ".[test]" # pytest, coverage, lint tooling - pip install -e ".[text]" # Pillow text fallback - # Audio support (tachyaudio) is included in the base install + pip install -e ".[wooting]" # Wooting analog-keyboard integration + # Pillow, FreeType, HarfBuzz, GLFW, and audio are included in the base install + +See :doc:`wooting` for the Wooting analog-keyboard integration. Minimal loop ------------ diff --git a/docs/gifs/wooting-mini-bw-experiment.gif b/docs/gifs/wooting-mini-bw-experiment.gif new file mode 100644 index 0000000..4682a79 Binary files /dev/null and b/docs/gifs/wooting-mini-bw-experiment.gif differ diff --git a/docs/gifs/wooting-visual-fixation-demo.gif b/docs/gifs/wooting-visual-fixation-demo.gif new file mode 100644 index 0000000..b892402 Binary files /dev/null and b/docs/gifs/wooting-visual-fixation-demo.gif differ diff --git a/docs/index.rst b/docs/index.rst index d763a18..17f1975 100644 --- a/docs/index.rst +++ b/docs/index.rst @@ -13,6 +13,7 @@ abstractions for display/input, and helper utilities for experiment workflows. timing_validation text_rendering audio + wooting examples contributing diff --git a/docs/wooting.rst b/docs/wooting.rst new file mode 100644 index 0000000..4063fa0 --- /dev/null +++ b/docs/wooting.rst @@ -0,0 +1,326 @@ +Wooting Analog Keyboards +======================== + +TachyPy integrates with **TachyWooting**, a hardware toolbox for Wooting analog +keyboards (analog pressure acquisition, +hierarchical HDF5 logging, light-press / release readiness checks). The hardware +toolbox is usable on its own; this page documents only what becomes available +**inside TachyPy** once the integration is installed — chiefly on-screen visual +pressure feedback. For the full keyboard/logging reference, see TachyWooting's own +documentation. + +Installation +------------ + +The integration ships as an optional extra. It pulls ``tachywooting`` and exposes +the keyboard through the top-level ``tachypy`` namespace: + +.. code-block:: bash + + pip install "tachypy[wooting]" + +(See `First-time setup`_ below for the one-time native/permissions step.) + +One import surface +------------------ + +The enriched ``WOOTING_ACQUISITION`` — the hardware acquisition class plus TachyPy +visual feedback — is available straight from the top-level package: + +.. code-block:: python + + from tachypy import WOOTING_ACQUISITION # keyboard + visual feedback + +You never import ``tachywooting`` directly. ``WOOTING_ACQUISITION`` is the only +keyboard symbol exposed at the top level (it is the common entry point and its +name is unambiguous). The rest of the keyboard surface — the helpers +(``convert_char_to_keycode``, ``load_trial``, ``load_session``, +``trial_to_dataframe``, ``visualize``, ``visualize_all_keys``) and the low-level +CFFI handles ``lib`` and ``ffi`` — lives under :mod:`tachypy.wooting`: + +.. code-block:: python + + from tachypy.wooting import visualize, load_trial, convert_char_to_keycode + +How the enrichment works +------------------------ + +``WOOTING_ACQUISITION`` is enriched in ``tachypy/wooting/__init__.py``: it is a +thin subclass that combines TachyWooting's hardware acquisition class with +:class:`~tachypy.feedback.VisualPressureFeedbackMixin`. The mixin is what adds the +``wait_light_press_visual`` method — nothing else changes: + +.. code-block:: python + + from tachywooting import WOOTING_ACQUISITION as _BaseAcquisition + from tachypy.feedback import VisualPressureFeedbackMixin + + class WOOTING_ACQUISITION(_BaseAcquisition, VisualPressureFeedbackMixin): + """Hardware acquisition + logging (base) + TachyPy visual feedback (mixin).""" + +This keeps the hardware package (TachyWooting) completely free of TachyPy — the +visual method is grafted on here, on TachyPy's side. Because the mixin only relies +on the :class:`~tachypy.feedback.PressureSource` contract (reading pressures plus +the light-press thresholds), the very same pattern enriches any future analog +keyboard: subclass its base acquisition class and mix in +``VisualPressureFeedbackMixin``. + +First-time setup +---------------- + +The first time you create a ``WOOTING_ACQUISITION``, TachyPy builds the native +interface automatically if it is missing (this needs only a C compiler, no admin +rights). The Wooting **SDK plugins and input permissions**, however, require a +one-time privileged step: + +.. code-block:: bash + + wooting-build-interface # installs SDK plugins + permissions (needs admin) + +If the keyboard is not detected, the error message tells you exactly to run this +command — you do not have to remember it. + +Basic keyboard use +------------------ + +The enriched class behaves exactly like the hardware acquisition class for +acquisition and logging: + +.. code-block:: python + + from tachypy import WOOTING_ACQUISITION + + acq = WOOTING_ACQUISITION(threshold=0.8) + acq.initialize_keyboard(verbose=True) + try: + # Instantaneous analog pressure (0.0–1.0) of one or more keys + print(acq.read_pressure("C")) + print(acq.read_pressures(["C", "Z"])) + + # Block until keys are held in the light-press range (no display needed) + acq.wait_keys_light_press(target_keys=["C", "Z"], quit_key="Q") + + # Record an analog trial and write it to an HDF5 shard + acq.setup_logging(name="tracking", path="logs", int_analog=2) + trial = acq.acquire_analog_values(target_keys=["C", "Z"]) + finally: + acq.uninitialize_keyboard() + +Visual pressure feedback +------------------------ + +.. note:: + + **Why this matters.** The whole point of an analog keyboard is to record the + *continuous pressure trajectory* of a response, not just a binary press — and + that only works if the participant's fingers are already resting on the keys + when the trial begins. Requiring a stable light press before each trial: + + - **guarantees the fingers are on the keys**, so the full movement is captured + from its earliest, lowest-pressure samples; + - **guides them to the right resting pressure** — high enough that softer + presses still fall inside the trackable range, yet low enough to leave + headroom for the fuller press that follows. + + The on-screen feedback turns this abstract requirement into something the + participant can simply *aim for*. + +``wait_light_press_visual`` shows an interactive fixation cross while waiting for +two keys to stay within the light-press interval for ``hold_seconds``. It needs a +TachyPy :class:`~tachypy.Screen`; pass a :class:`~tachypy.ResponseHandler` to let +the participant quit early: + +.. code-block:: python + + from tachypy import Screen, ResponseHandler, FixationCross + from tachypy import WOOTING_ACQUISITION + + acq = WOOTING_ACQUISITION(threshold=0.8, min_pressure_start=0.33, max_pressure_start=0.66) + acq.initialize_keyboard() + + screen = Screen(fullscreen=False) + rh = ResponseHandler(screen=screen) + fixation = FixationCross(center=(screen.width // 2, screen.height // 2), + half_width=18, half_height=18, thickness=8, color=(0, 0, 0)) + + ready = acq.wait_light_press_visual( + target_keys=["c", "z"], + screen=screen, + response_handler=rh, + fixation_cross=fixation, # geometry + color copied automatically + show_pressure_text=True, + show_goal_markers=True, + ) + +The horizontal bar grows and shrinks with pressure, turns toward the target color +as the hold completes, and (optionally) shows live pressure values for keys that +fall outside the acceptable interval. + +.. image:: gifs/wooting-visual-fixation-demo.gif + :alt: Interactive fixation cross with real-time pressure feedback + :width: 100% + +Logging and a full experiment loop +---------------------------------- + +Acquisition trials are logged to **HDF5**. Logging is opt-in and follows a simple +lifecycle: + +#. ``setup_logging(name, path, int_analog)`` enables logging. ``int_analog=2`` + stores analog pressure in ``[0, 1]``; ``int_analog=1`` stores integer values + in ``[0, 255]``. +#. Every :meth:`acquire_analog_values` call writes **one trial shard** to a + staging directory, so a crash mid-experiment never loses completed trials. +#. ``uninitialize_keyboard()`` merges all shards into the final ``.hdf5`` + and releases the SDK. **Always call it at the end** (a ``try/finally`` is the + safest pattern). + +Each ``values`` dataset stores three columns — ``position``, ``time_from_onset`` +and ``time_abs`` — under a per-trial, per-key hierarchy:: + + /trials/0001/keys/0004/values + +Beyond the three columns, **each trial also carries metadata** as HDF5 group +attributes, surfaced under the ``"_attrs"`` key by :func:`load_trial`: + +.. list-table:: + :header-rows: 1 + :widths: 24 12 64 + + * - Attribute + - Type + - Meaning + * - ``backend`` + - str + - Readout backend used for the trial (``"read_analog"`` or ``"read_full_buffer"``). + * - ``threshold`` + - float + - Actuation threshold (0–1) that defined the response on this trial. + * - ``threshold_time`` + - float + - Seconds from ``trial_start_ns`` (onset) to the threshold crossing. + * - ``threshold_key`` + - int + - HID keycode of the key that crossed the threshold first. + * - ``trial_start_perf_ns`` + - int + - Onset reference timestamp (``perf_counter_ns``) used to compute ``time_from_onset``. + * - ``trial_start_clock`` + - str + - Clock the onset was supplied on (``"perf"`` or ``"mono"``). + +The threshold-related attributes (``threshold_time``, ``threshold_key``) are +absent when the threshold was never reached during the trial. + +A minimal but complete experiment that ties the lifecycle together: + +.. code-block:: python + + from tachypy import FixationCross, ResponseHandler, Screen, WOOTING_ACQUISITION + from tachypy.wooting import convert_char_to_keycode + + YES, NO = "z", "c" + + acq = WOOTING_ACQUISITION(threshold=0.8, min_pressure_start=0.33, max_pressure_start=0.66) + acq.initialize_keyboard() # builds the native interface on first use + acq.setup_logging(name="participant_01", path="logs", int_analog=2) + + screen = Screen(fullscreen=False) + rh = ResponseHandler(screen=screen) + fixation = FixationCross(center=(screen.width // 2, screen.height // 2), + half_width=18, half_height=18, thickness=8, color=(0, 0, 0)) + + try: + for trial in range(1, 21): + # 1) readiness: wait for both fingers to rest lightly on the keys + if not acq.wait_light_press_visual([YES, NO], screen=screen, + response_handler=rh, fixation_cross=fixation): + break # participant pressed a quit key (Esc, etc.) + + # 2) present your stimulus, then time-lock the trial to the flip + screen.fill((128, 128, 128)) + # ... draw your stimulus here ... + onset = screen.flip() # flip() returns the post-swap timestamp = onset + + # 3) acquire the response trajectory (writes one HDF5 shard) + hier = acq.acquire_analog_values([YES, NO], trial_start_ns=onset, trial_start_clock="mono") + response = acq.get_response_key(hier, [YES, NO]) + print(f"trial {trial}: response = {convert_char_to_keycode([response])[0]}") + + finally: + acq.uninitialize_keyboard() # merges shards -> logs/participant_01.hdf5 + screen.close() + +The trial is time-locked through the flip: ``screen.flip()`` returns the post-swap +timestamp on TachyPy's monotonic clock, which is passed straight to +``acquire_analog_values`` as ``trial_start_ns`` with ``trial_start_clock="mono"``. +Every sample's ``time_from_onset`` is then measured from that exact onset. + +Reading the log back afterwards needs no keyboard: + +.. code-block:: python + + from tachypy.wooting import load_session, load_trial, trial_to_dataframe + + session = load_session("logs/participant_01.hdf5") # every trial + trial = load_trial("logs/participant_01.hdf5", 1) # one trial + df = trial_to_dataframe(trial) # tidy pandas DataFrame + + trial["0006"]["position"] # per-key trajectory (keycode "0006") + trial["_attrs"]["threshold_time"] # trial metadata (see the table above) + +Detecting finger removal +------------------------ + +For the trajectory to be meaningful, the participant must keep their fingers on +the keys *throughout* the trial. Lifting a finger mid-response breaks the +continuous signal the analog keyboard is meant to capture — it is the in-trial +counterpart to the pre-trial readiness check above. After each +:meth:`acquire_analog_values`, the acquisition object reports whether a removal +occurred, so you can warn the participant before it becomes a habit: + +.. code-block:: python + + acq.acquire_analog_values([YES, NO], trial_start_ns=onset) + + if acq.reached_consecutive_removal_limit(2): + # show your own on-screen message (e.g. a TachyPy Text) + warn(f"Keep your fingers on the keys — lifted {acq.current_removal_streak} trials in a row.") + elif acq.last_trial_had_removal: + warn("Try to keep your fingers resting on the keys.") + +Related counters let you monitor data quality across the whole session: +``last_trial_had_removal``, ``current_removal_streak``, +``reached_consecutive_removal_limit(n)``, ``removal_trials``, and +``removal_trial_proportion`` (the fraction of trials with at least one removal). +Logging these alongside your responses makes it easy to flag or exclude +compromised trials during analysis. + +Custom widgets (advanced) +------------------------- + +``wait_light_press_visual`` builds an +:class:`~tachypy.feedback.InteractiveFixationCross` by default. To render feedback +differently, subclass :class:`~tachypy.feedback.PressureFeedbackWidget` and pass it +via ``widget=``. The settings (:class:`~tachypy.feedback.PressureFeedbackConfig`, +which also computes the pressure-to-scale mapping via ``scale_for()``) and the +pressure state machine (:class:`~tachypy.feedback.PressureFeedbackState`) are +reusable building blocks. See :doc:`api` for the full ``tachypy.feedback`` reference. + +These tools are keyboard-agnostic: any object exposing the +:class:`~tachypy.feedback.PressureSource` contract (``read_pressures`` plus the +light-press thresholds) can drive the same feedback loop. + +Console demos +------------- + +The integration installs two on-screen demos (they require a display): + +.. code-block:: bash + + tachypy-wooting-fixation-demo # gamified interactive fixation cross + tachypy-wooting-mini-bw # minimal black/white response experiment + +.. image:: gifs/wooting-mini-bw-experiment.gif + :alt: Minimal black/white response experiment + :width: 100% diff --git a/setup.cfg b/setup.cfg index df3dc73..9e105e0 100644 --- a/setup.cfg +++ b/setup.cfg @@ -1,7 +1,7 @@ [metadata] name = tachypy version = 0.1.17 -author = Ian Charest and Frederic Gosselin +author = Ian Charest, Mathias Salvas-Hebert and Frederic Gosselin author_email = charest.ian@gmail.com description = A package for psychophysics in Python, using GLFW and OpenGL. long_description = file: README.md @@ -41,18 +41,14 @@ test = pytest>=7.0 pytest-cov>=5.0 ruff>=0.6 -glfw = - glfw>=2.7 -audio = -text = - Pillow>=10.0 -system_text = - freetype-py>=2.4 - uharfbuzz>=0.39 +wooting = + tachywooting>=0.2.1 [options.entry_points] console_scripts = tachypy-clock-demo = tachypy.examples.clock_timer_demo:main + tachypy-wooting-fixation-demo = tachypy.wooting.demos.visual_fixation_demo:main + tachypy-wooting-mini-bw = tachypy.wooting.demos.mini_bw_experiment:main [tool:pytest] addopts = -q @@ -71,6 +67,7 @@ omit = */tachypy/gltext_sdf.py */tachypy/glsystemtext.py */tachypy/examples/* + */tachypy/wooting/demos/* [coverage:report] skip_empty = True diff --git a/setup.py b/setup.py index 69b4a31..8863b7e 100644 --- a/setup.py +++ b/setup.py @@ -13,8 +13,16 @@ packages=find_packages(where='src'), package_dir={'': 'src'}, install_requires=requires, + extras_require={ + 'test': ['pytest>=7.0', 'pytest-cov>=5.0', 'ruff>=0.6'], + 'text': ['Pillow>=10.0'], + 'system_text': ['freetype-py>=2.4', 'uharfbuzz>=0.39'], + 'glfw': ['glfw>=2.7'], + 'audio': [], + 'wooting': ['tachywooting>=0.2.1'], + }, python_requires='>=3.10', - author='Ian Charest and Frederic Gosselin', + author='Ian Charest, Mathias Salvas-Hebert and Frederic Gosselin', author_email='charest.ian@gmail.com', description='A package for timing-focused psychophysics using GLFW and OpenGL.', long_description=open('README.md').read(), @@ -32,6 +40,8 @@ entry_points={ 'console_scripts': [ 'tachypy-clock-demo=tachypy.examples.clock_timer_demo:main', + 'tachypy-wooting-fixation-demo=tachypy.wooting.demos.visual_fixation_demo:main', + 'tachypy-wooting-mini-bw=tachypy.wooting.demos.mini_bw_experiment:main', ], }, ) diff --git a/src/tachypy/__init__.py b/src/tachypy/__init__.py index fffe331..858f0e5 100644 --- a/src/tachypy/__init__.py +++ b/src/tachypy/__init__.py @@ -40,6 +40,12 @@ "location_bubbles", "Audio", "QuestObject", + "PressureFeedbackWidget", + "InteractiveFixationCross", + "PressureFeedbackState", + "PressureFeedbackConfig", + "PressureSource", + "VisualPressureFeedbackMixin", ] @@ -81,6 +87,21 @@ "location_bubbles": ("tachypy.psychophysics", "location_bubbles"), "Audio": ("tachypy.audio", "Audio"), "QuestObject": ("tachypy.quest", "QuestObject"), + "PressureFeedbackWidget": ("tachypy.feedback", "PressureFeedbackWidget"), + "InteractiveFixationCross": ("tachypy.feedback", "InteractiveFixationCross"), + "PressureFeedbackState": ("tachypy.feedback", "PressureFeedbackState"), + "PressureFeedbackConfig": ("tachypy.feedback", "PressureFeedbackConfig"), + "PressureSource": ("tachypy.feedback", "PressureSource"), + "VisualPressureFeedbackMixin": ("tachypy.feedback", "VisualPressureFeedbackMixin"), + # Wooting keyboard integration — convenience shortcut for the one distinctive, + # high-frequency entry point: `from tachypy import WOOTING_ACQUISITION`. + # Resolved lazily from the `tachypy.wooting` facade, which requires the + # `tachypy[wooting]` extra (a clear ImportError is raised otherwise). Kept out + # of `__all__` so tachypy core stays keyboard-agnostic and doc builds / `import *` + # never pull in TachyWooting. Generic helpers (visualize, load_trial, lib, ffi, …) + # are intentionally NOT aliased here — they live under `tachypy.wooting` to keep + # the top-level namespace unambiguous and collision-free for future devices. + "WOOTING_ACQUISITION": ("tachypy.wooting", "WOOTING_ACQUISITION"), } diff --git a/src/tachypy/feedback/__init__.py b/src/tachypy/feedback/__init__.py new file mode 100644 index 0000000..f804b4b --- /dev/null +++ b/src/tachypy/feedback/__init__.py @@ -0,0 +1,32 @@ +"""Visual pressure-feedback toolkit for analog keyboards. + +Keyboard-agnostic: it renders feedback for any object satisfying +:class:`PressureSource` and never imports a keyboard package. Most users do not +import from here directly — they call ``wait_light_press_visual`` on an +acquisition class enriched with :class:`VisualPressureFeedbackMixin` +(see :mod:`tachypy.wooting`). The building blocks below are exposed for power +users who want custom widgets or to drive the loop manually. + +Layout +------ +- ``model`` — pure logic: ``PressureSource``, ``PressureFeedbackConfig`` + (thresholds, hold, and scaling), and ``PressureFeedbackState`` (no OpenGL). +- ``widgets`` — rendering: ``PressureFeedbackWidget`` (ABC) and the default + ``InteractiveFixationCross``. +- ``runner`` — the agnostic loop (``run_light_press_visual``) and the + user-facing ``VisualPressureFeedbackMixin``. +""" +from .model import PressureFeedbackConfig, PressureFeedbackState, PressureSource +from .runner import DEFAULT_EXIT_KEYS, VisualPressureFeedbackMixin, run_light_press_visual +from .widgets import InteractiveFixationCross, PressureFeedbackWidget + +__all__ = [ + "DEFAULT_EXIT_KEYS", + "InteractiveFixationCross", + "PressureFeedbackConfig", + "PressureFeedbackState", + "PressureFeedbackWidget", + "PressureSource", + "VisualPressureFeedbackMixin", + "run_light_press_visual", +] diff --git a/src/tachypy/feedback/model.py b/src/tachypy/feedback/model.py new file mode 100644 index 0000000..f1fbf2d --- /dev/null +++ b/src/tachypy/feedback/model.py @@ -0,0 +1,220 @@ +"""Pure pressure-feedback model: source contract, settings, and state machine. + +This module contains no OpenGL or drawing code, so it stays importable in +headless environments and is unit-testable on its own. +""" +from __future__ import annotations + +from dataclasses import dataclass +from typing import Dict, Literal, Protocol, Sequence, Union, runtime_checkable + +# Internal classification of a single key's pressure relative to the interval. +PressureStatus = Literal["too_weak", "ideal", "too_strong"] + + +@runtime_checkable +class PressureSource(Protocol): + """Minimal interface a keyboard must expose to drive visual feedback. + + The feedback engine is keyboard-agnostic: it only needs a way to read + pressures and the light-press thresholds. Any object satisfying this + protocol (for example ``tachywooting.WOOTING_ACQUISITION``) can drive the + visual feedback, without TachyPy ever importing the keyboard package. + + Attributes + ---------- + min_pressure_start, max_pressure_start : float + Bounds of the accepted light-press interval. + threshold : float + Response threshold of the acquisition task. + hold_seconds : float + Default continuous-hold duration for readiness checks. + + Methods + ------- + read_pressures(keys) + Return current analog pressures (``[0, 1]``) for the given keys, + as a mapping keyed by ``str(key)`` preserving input order. + """ + + min_pressure_start: float + max_pressure_start: float + threshold: float + hold_seconds: float + + def read_pressures(self, keys: Sequence[Union[str, int]]) -> Dict[str, float]: ... + + +@dataclass(frozen=True) +class PressureFeedbackConfig: + """Settings for pressure-readiness feedback: thresholds, hold, and scaling. + + Parameters + ---------- + min_pressure_start : float, default=0.01 + Lower bound for the accepted light-press interval. + max_pressure_start : float, default=0.35 + Upper bound for the accepted light-press interval. + threshold : float, default=0.8 + Response threshold used by the acquisition task. Must be greater than + ``max_pressure_start``. + hold_seconds : float, default=0.30 + Duration both pressures must remain inside the accepted interval before + readiness is reached. + min_scale, normal_scale, max_scale : float + Visual scale factors for the weakest non-zero pressure, the in-range + pressure, and strong over-pressure (used by :meth:`scale_for`). + """ + + min_pressure_start: float = 0.01 + max_pressure_start: float = 0.35 + threshold: float = 0.8 + hold_seconds: float = 0.30 + min_scale: float = 0.25 + normal_scale: float = 1.0 + max_scale: float = 2.0 + + def __post_init__(self) -> None: + if not (0 <= self.min_pressure_start < self.max_pressure_start < self.threshold <= 1): + raise ValueError("Require 0 <= min_pressure_start < max_pressure_start < threshold <= 1") + if self.hold_seconds <= 0: + raise ValueError("hold_seconds must be positive") + if not (0 < self.min_scale <= self.normal_scale <= self.max_scale): + raise ValueError("Require 0 < min_scale <= normal_scale <= max_scale") + + @classmethod + def from_source(cls, source, *, hold_seconds: float | None = None, **overrides): + """Build a config from a :class:`PressureSource`. + + Parameters + ---------- + source : PressureSource + Object exposing ``min_pressure_start``, ``max_pressure_start``, + ``threshold`` and ``hold_seconds`` (e.g. a keyboard acquisition). + hold_seconds : float, optional + Override the source's ``hold_seconds``. + **overrides + Any other field to override (scale factors, thresholds, ...). + + Returns + ------- + PressureFeedbackConfig + """ + values = dict( + min_pressure_start=source.min_pressure_start, + max_pressure_start=source.max_pressure_start, + threshold=source.threshold, + hold_seconds=source.hold_seconds if hold_seconds is None else hold_seconds, + ) + values.update(overrides) + return cls(**values) + + def scale_for(self, pressure: float) -> float: + """Return the visual scale factor for one pressure value. + + Returns ``0.0`` when ``pressure`` is exactly zero, ``normal_scale`` + inside the accepted interval, and a clamped continuous scale outside it. + """ + pressure = max(0.0, min(1.0, float(pressure))) + if pressure == 0.0: + return 0.0 + + low, high = self.min_pressure_start, self.max_pressure_start + if pressure < low: + if low <= 0: + return self.normal_scale + ratio = pressure / low + return self._clamp_scale(self.min_scale + ratio * (self.normal_scale - self.min_scale)) + + if pressure <= high: + return self.normal_scale + + if high >= 1.0: + return self.max_scale + ratio = (pressure - high) / (1.0 - high) + return self._clamp_scale(self.normal_scale + ratio * (self.max_scale - self.normal_scale)) + + def _clamp_scale(self, value: float) -> float: + return max(self.min_scale, min(self.max_scale, float(value))) + + +@dataclass +class PressureFeedbackState: + """State machine for real-time pressure feedback. + + Parameters + ---------- + config : PressureFeedbackConfig + Feedback thresholds, hold duration, and scale factors. + + Attributes + ---------- + left_pressure, right_pressure : float + Most recent pressure values. + left_scale, right_scale : float + Current visual scale values for the left and right horizontal segments. + left_status, right_status : {"too_weak", "ideal", "too_strong"} + Pressure classification for each side. + hold_progress : float + Fraction of the hold duration completed, clamped to ``[0, 1]``. + elapsed_hold_time : float + Seconds spent continuously inside the accepted interval. + is_ready : bool + ``True`` once both pressures have remained ideal for ``hold_seconds``. + """ + + config: PressureFeedbackConfig + left_pressure: float = 0.0 + right_pressure: float = 0.0 + left_scale: float = 1.0 + right_scale: float = 1.0 + left_status: PressureStatus = "too_weak" + right_status: PressureStatus = "too_weak" + hold_progress: float = 0.0 + elapsed_hold_time: float = 0.0 + is_ready: bool = False + _hold_started_at: float | None = None + + def update(self, left_pressure: float, right_pressure: float, now: float) -> None: + """Update pressure status, scale, hold timer, and readiness. + + Parameters + ---------- + left_pressure : float + Current pressure for the left monitored key. + right_pressure : float + Current pressure for the right monitored key. + now : float + Current monotonic timestamp, usually from ``time.perf_counter()``. + + Returns + ------- + None + The object is updated in place. + """ + self.left_pressure = float(left_pressure) + self.right_pressure = float(right_pressure) + self.left_status = self._status(self.left_pressure) + self.right_status = self._status(self.right_pressure) + self.left_scale = self.config.scale_for(self.left_pressure) + self.right_scale = self.config.scale_for(self.right_pressure) + + if self.left_status == "ideal" and self.right_status == "ideal": + if self._hold_started_at is None: + self._hold_started_at = float(now) + self.elapsed_hold_time = max(0.0, float(now) - self._hold_started_at) + self.hold_progress = min(1.0, self.elapsed_hold_time / self.config.hold_seconds) + self.is_ready = self.hold_progress >= 1.0 + return + + self._hold_started_at = None + self.elapsed_hold_time = 0.0 + self.hold_progress = 0.0 + self.is_ready = False + + def _status(self, pressure: float) -> PressureStatus: + if pressure < self.config.min_pressure_start: + return "too_weak" + if pressure > self.config.max_pressure_start: + return "too_strong" + return "ideal" diff --git a/src/tachypy/feedback/runner.py b/src/tachypy/feedback/runner.py new file mode 100644 index 0000000..bd4f250 --- /dev/null +++ b/src/tachypy/feedback/runner.py @@ -0,0 +1,303 @@ +"""Visual pressure-feedback loop and the user-facing mixin. + +``run_light_press_visual`` is the keyboard-agnostic render loop: it never imports +any keyboard package — pressures are supplied through a ``read_pair`` callable and +timing can be delegated to an injected ``wait_until``. + +``VisualPressureFeedbackMixin`` grafts ``wait_light_press_visual`` onto any object +satisfying :class:`~tachypy.feedback.model.PressureSource`; it wires the keyboard's +pressure reading and thresholds into the loop and builds the default widget. +""" +from __future__ import annotations + +import time +from typing import Any, Callable, Sequence + +from .model import PressureFeedbackConfig, PressureFeedbackState +from .widgets import InteractiveFixationCross, PressureFeedbackWidget + +DEFAULT_EXIT_KEYS: tuple[str, ...] = ("escape", "esc", "enter", "return", "space", "q") + +_TICK_INTERVAL = 1.0 / 1000.0 # 1000 Hz polling + + +def _exit_requested(response_handler, exit_keys: set[str]) -> bool: + """Return True when the user requested an exit through a response handler. + + Works with any TachyPy ``ResponseHandler``-like object (duck-typed): it may + expose ``get_events``, ``should_quit`` and ``get_key_presses``. + """ + if response_handler is None: + return False + if hasattr(response_handler, "get_events"): + response_handler.get_events() + if hasattr(response_handler, "should_quit") and response_handler.should_quit(): + return True + if hasattr(response_handler, "key_down_events"): + return any(str(key).lower() in exit_keys for key in response_handler.key_down_events) + if not hasattr(response_handler, "get_key_presses"): + return False + for event in response_handler.get_key_presses(): + if event.get("type") == "keydown" and str(event.get("key", "")).lower() in exit_keys: + return True + return False + + +def _default_wait_until(next_t: float) -> None: + remaining = next_t - time.perf_counter() + if remaining > 0: + time.sleep(remaining) + + +def run_light_press_visual( + *, + read_pair: Callable[[], tuple[float, float]], + state: PressureFeedbackState, + widget: PressureFeedbackWidget, + screen, + response_handler=None, + exit_keys: Sequence[str] = DEFAULT_EXIT_KEYS, + overlay_drawables: Sequence[object] | None = None, + background_color=(128, 128, 128), + timeout_seconds: float | None = None, + wait_until: Callable[[float], None] | None = None, + verbose: bool = False, +) -> bool: + """Run the visual light-press feedback loop until ready or aborted. + + Parameters + ---------- + read_pair : callable + Zero-argument callable returning ``(left_pressure, right_pressure)``. + state : PressureFeedbackState + Feedback state machine to drive each frame. + widget : PressureFeedbackWidget + Widget updated and drawn each frame. + screen : object + TachyPy ``Screen``-like object. Must expose ``flip()``; if it exposes + ``fill(color)``, the screen is cleared with ``background_color``. + response_handler : object, optional + ``ResponseHandler``-like object. Exit and quit requests return ``False``. + exit_keys : sequence of str + Keys that abort the wait when ``response_handler`` is active. + overlay_drawables : sequence, optional + Objects with ``.draw()`` called each frame after the widget. + background_color : tuple or callable + RGB color (or callable returning one) used to clear the screen. + timeout_seconds : float, optional + Maximum wait time. Raises ``TimeoutError`` if exceeded. + wait_until : callable, optional + ``wait_until(next_t)`` used to pace the loop. Defaults to a portable + sleep; pass a keyboard's precise tick for tighter timing. + verbose : bool, default=False + Reserved for future logging hooks. + + Returns + ------- + bool + ``True`` when both keys were held in range for the hold duration. + ``False`` when the user exits via ``response_handler``. + """ + if not hasattr(screen, "flip"): + raise AttributeError("screen must expose flip()") + wait_until = _default_wait_until if wait_until is None else wait_until + + exit_key_set = {str(key).lower() for key in exit_keys} + if response_handler is not None and hasattr(response_handler, "keys_to_listen"): + existing = getattr(response_handler, "keys_to_listen", None) or [] + merged = {str(key).lower() for key in existing} | exit_key_set + response_handler.keys_to_listen = sorted(merged) + if hasattr(response_handler, "_probed_keys"): + response_handler._probed_keys.update(merged) + + next_t = time.perf_counter() + deadline = None if timeout_seconds is None else next_t + timeout_seconds + + while True: + now = time.perf_counter() + if deadline is not None and now >= deadline: + raise TimeoutError("run_light_press_visual: timeout exceeded") + if _exit_requested(response_handler, exit_key_set): + return False + + frame_background_color = background_color() if callable(background_color) else background_color + if hasattr(screen, "fill"): + screen.fill(frame_background_color) + + left, right = read_pair() + state.update(left_pressure=float(left), right_pressure=float(right), now=now) + + widget.update(state) + widget.draw() + if overlay_drawables: + for drawable in overlay_drawables: + drawable.draw() + + screen.flip() + + if state.is_ready: + return True + + next_t += _TICK_INTERVAL + now2 = time.perf_counter() + if next_t < (now2 - 0.10): + next_t = now2 + _TICK_INTERVAL + wait_until(next_t) + + +class VisualPressureFeedbackMixin: + """Adds :meth:`wait_light_press_visual` to a :class:`PressureSource`. + + Any acquisition class that satisfies the ``PressureSource`` contract becomes + able to show visual feedback simply by mixing this in:: + + class WOOTING_ACQUISITION(BaseWooting, VisualPressureFeedbackMixin): + ... + """ + + def wait_light_press_visual( + self, + target_keys: Sequence[str | int], + screen, + response_handler=None, + fixation_cross=None, + overlay_drawables: Sequence[object] | None = None, + # ── timing ────────────────────────────────────────────────────────── + hold_seconds: float | None = None, + timeout_seconds: float | None = None, + # ── appearance ────────────────────────────────────────────────────── + background_color: tuple[int, int, int] = (128, 128, 128), + initial_color: tuple[int, int, int] | None = None, + show_pressure_text: bool | None = None, + show_goal_markers: bool | None = None, + # ── behaviour ─────────────────────────────────────────────────────── + exit_keys: Sequence[str] = DEFAULT_EXIT_KEYS, + # ── others ────────────────────────────────────────────────────────── + widget: Any | None = None, + verbose: bool = False, + ) -> bool: + """ + Wait for two keys to stay in the light-press range while showing visual feedback. + + Parameters + ---------- + target_keys : sequence of str or int + Exactly two keys. The first controls the left side of the widget, + the second the right side. + screen : TachyPy Screen object + Must expose ``flip()``. If it exposes ``fill(color)``, the screen is + cleared with ``background_color`` each frame. + response_handler : TachyPy ResponseHandler, optional + When provided, quit requests and ``exit_keys`` presses return ``False``. + fixation_cross : TachyPy FixationCross, optional + Existing fixation cross whose geometry and color are copied by the + auto-created widget. Invalid with ``widget``. + overlay_drawables : sequence, optional + Objects with a ``.draw()`` method called each frame after the widget. + hold_seconds : float, optional + Required continuous hold duration. Defaults to ``self.hold_seconds``. + timeout_seconds : float, optional + Maximum wait time. Raises ``TimeoutError`` if exceeded. + background_color : tuple[int, int, int], default=(128, 128, 128) + RGB color used to clear the screen each frame. + initial_color : tuple[int, int, int], optional + Starting color of the horizontal bar. Defaults to ``(100, 100, 100)``. + Invalid with ``widget``. + show_pressure_text : bool, optional + Show real-time pressure values above the cross for out-of-range keys. + Defaults to ``False``. Invalid with ``widget``. + show_goal_markers : bool, optional + Show thin ticks at the target positions. Defaults to ``False``. + Invalid with ``widget``. + exit_keys : sequence of str + Keys that abort the wait when ``response_handler`` is active. + widget : PressureFeedbackWidget, optional + Full custom widget override. When provided, ``fixation_cross``, + ``initial_color``, ``show_pressure_text`` and ``show_goal_markers`` + are invalid. + verbose : bool, default=False + Reserved for future logging hooks. + + Returns + ------- + bool + ``True`` when both keys were held in range for ``hold_seconds``. + ``False`` when the user exits via ``response_handler``. + + Raises + ------ + ValueError + Invalid arguments or incompatible parameter combinations. + TimeoutError + ``timeout_seconds`` exceeded before readiness. + + Examples + -------- + >>> acq.wait_light_press_visual(target_keys=["c", "z"], screen=screen) + + >>> acq.wait_light_press_visual( + ... target_keys=["c", "z"], screen=screen, + ... response_handler=rh, fixation_cross=fixation, + ... ) + """ + if not getattr(self, "initialized", False): + raise ValueError('Keyboard must be initialized through "initialize_keyboard()".') + if timeout_seconds is not None and timeout_seconds <= 0: + raise ValueError("timeout_seconds must be > 0 if provided") + + target_keys = list(target_keys) + if len(target_keys) != 2: + raise ValueError("wait_light_press_visual requires exactly two target keys") + + if widget is not None: + conflicting = [ + name for name, value in ( + ("fixation_cross", fixation_cross), + ("initial_color", initial_color), + ("show_pressure_text", show_pressure_text), + ("show_goal_markers", show_goal_markers), + ) + if value is not None + ] + if conflicting: + raise ValueError( + "Do not pass auto-widget configuration arguments when `widget` is provided: " + f"{', '.join(conflicting)}. Configure the widget directly instead." + ) + + config = PressureFeedbackConfig.from_source(self, hold_seconds=hold_seconds) + if config.hold_seconds <= 0: + raise ValueError("hold_seconds must be > 0") + + if widget is None: + widget = InteractiveFixationCross( + screen=screen, + fixation_cross=fixation_cross, + background_color=background_color, + initial_color=(100, 100, 100) if initial_color is None else initial_color, + acquisition=self, + show_pressure_text=False if show_pressure_text is None else show_pressure_text, + show_goal_markers=False if show_goal_markers is None else show_goal_markers, + ) + + state = PressureFeedbackState(config) + + left_key, right_key = str(target_keys[0]), str(target_keys[1]) + + def _read_pair() -> tuple[float, float]: + pressures = self.read_pressures(target_keys) + return pressures[left_key], pressures[right_key] + + return run_light_press_visual( + read_pair=_read_pair, + state=state, + widget=widget, + screen=screen, + response_handler=response_handler, + exit_keys=exit_keys, + overlay_drawables=overlay_drawables, + background_color=background_color, + timeout_seconds=timeout_seconds, + wait_until=getattr(self, "_wait_until_next_tick", None), + verbose=verbose, + ) diff --git a/src/tachypy/feedback/widgets.py b/src/tachypy/feedback/widgets.py new file mode 100644 index 0000000..109618e --- /dev/null +++ b/src/tachypy/feedback/widgets.py @@ -0,0 +1,413 @@ +"""Pressure-feedback rendering: the widget contract and the default widget. + +OpenGL primitives are imported lazily inside ``InteractiveFixationCross`` so this +module stays importable without a GL context (the abstract contract and any +pure-logic use remain headless-safe). +""" +from __future__ import annotations + +from abc import ABC, abstractmethod + +from .model import PressureFeedbackState + + +class PressureFeedbackWidget(ABC): + """Abstract drawing interface for pressure feedback widgets. + + Notes + ----- + Widgets consume a :class:`PressureFeedbackState` and render it using a + specific backend. The feedback runner only depends on this interface, so + new visual backends can be added without changing the loop logic. + """ + + @abstractmethod + def update(self, state: PressureFeedbackState) -> None: + """Receive the latest pressure feedback state. + + Parameters + ---------- + state : PressureFeedbackState + Current pressure, scale, status, hold progress, and readiness state. + """ + raise NotImplementedError + + @abstractmethod + def draw(self) -> None: + """Draw the widget using its rendering backend.""" + raise NotImplementedError + + +class InteractiveFixationCross(PressureFeedbackWidget): + def __init__( + self, + # Required TachyPy context + screen, + # Manual fixation-cross geometry + center=None, + half_width: float = 8.0, + half_height: float = 8.0, + thickness: float = 1.0, + # Visual colors + initial_color=(100, 100, 100), + target_color=(0, 0, 0), + vertical_color=None, + background_color=(128, 128, 128), + # Optional objects used to simplify setup + fixation_cross=None, + acquisition=None, + # Optional goal markers (thin vertical ticks at ±half_width) + show_goal_markers: bool = False, + # Optional real-time pressure text + show_pressure_text: bool = False, + left_pressure_label: str = "", + right_pressure_label: str = "", + pressure_text_color=None, + pressure_text_font_size: int | None = None, + pressure_text_width: float | None = None, + pressure_text_height: float | None = None, + pressure_text_gap: float = 10.0, + pressure_text_decimals: int = 2, + pressure_text_font_name: str | None = None, + ): + """ + Create an interactive TachyPy fixation cross for visual pressure feedback. + + This widget can be configured manually, or it can reuse values from + existing objects to reduce duplicated setup in experiments. + + Values taken from `screen` + -------------------------- + `screen` is required because TachyPy drawing needs an active screen. + If `center` is not provided and no `fixation_cross` is provided, the + widget centers itself from `screen.width` / `screen.height` (or `w` / `h` + if those names are used by the screen object). + + Values taken from `fixation_cross` + ---------------------------------- + Pass an existing TachyPy `FixationCross` when you already created one + for the experiment and want this widget to match it. The widget copies: + + - `center` -> widget center + - `half_width` -> horizontal half-size + - `half_height` -> vertical half-size + - `thickness` -> line thickness + - `color` -> `target_color` + + These copied values override the constructor defaults. Manual values are + used only when the existing fixation cross does not expose the matching + attribute. + + Values taken from `acquisition` + ------------------------------- + Pass a keyboard acquisition instance when the widget should be tied to + the same object that owns the pressure configuration. The widget stores: + + - `min_pressure_start` + - `max_pressure_start` + - `threshold` + - `hold_seconds` + + These values are kept on the widget for inspection and consistency with + `wait_light_press_visual()`. The actual real-time pressure state is + still provided through `update(state)`. + + Pressure text + ------------- + If `show_pressure_text=True`, TachyPy `Text` objects are shown above the + cross only for keys currently outside the acceptable pressure interval. + Text size and box dimensions scale from the fixation cross size unless + `pressure_text_font_size`, `pressure_text_width`, or + `pressure_text_height` are provided explicitly. + """ + if fixation_cross is not None: + center = self._as_tuple(getattr(fixation_cross, "center", center)) + half_width = getattr(fixation_cross, "half_width", half_width) + half_height = getattr(fixation_cross, "half_height", half_height) + thickness = getattr(fixation_cross, "thickness", thickness) + target_color = getattr(fixation_cross, "color", target_color) + + initial_color = self._as_rgb_color(initial_color, "initial_color") + target_color = self._as_rgb_color(target_color, "target_color") + background_color = self._as_rgb_color(background_color, "background_color") + vertical_color = target_color if vertical_color is None else self._as_rgb_color(vertical_color, "vertical_color") + pressure_text_color = ( + None if pressure_text_color is None else self._as_rgb_color(pressure_text_color, "pressure_text_color") + ) + + if initial_color == background_color: + raise ValueError("initial_color must differ from background_color") + if half_width <= 0 or half_height <= 0 or thickness <= 0: + raise ValueError("half_width, half_height, and thickness must be positive") + + # OpenGL drawing primitives are imported lazily so this module (and the + # pure-logic classes it depends on) stays importable in headless setups. + from tachypy import Line + if show_pressure_text: + from tachypy import Text + else: + Text = None + + self._set_tachypy_context(screen=screen, line_cls=Line, text_cls=Text) + self._set_acquisition_context(acquisition) + self._set_geometry(center=center, half_width=half_width, half_height=half_height, thickness=thickness) + self._set_colors( + initial_color=initial_color, + target_color=target_color, + vertical_color=vertical_color, + background_color=background_color, + ) + self._set_pressure_text( + show_pressure_text=show_pressure_text, + left_pressure_label=left_pressure_label, + right_pressure_label=right_pressure_label, + pressure_text_color=pressure_text_color, + pressure_text_font_size=pressure_text_font_size, + pressure_text_width=pressure_text_width, + pressure_text_height=pressure_text_height, + pressure_text_gap=pressure_text_gap, + pressure_text_decimals=pressure_text_decimals, + pressure_text_font_name=pressure_text_font_name, + ) + self.show_goal_markers = bool(show_goal_markers) + self._reset_runtime_state() + + def _set_tachypy_context(self, screen, line_cls, text_cls) -> None: + self.screen = screen + self._line_cls = line_cls + self._text_cls = text_cls + + def _set_acquisition_context(self, acquisition) -> None: + self.acquisition = acquisition + self.min_pressure_start = getattr(acquisition, "min_pressure_start", None) + self.max_pressure_start = getattr(acquisition, "max_pressure_start", None) + self.threshold = getattr(acquisition, "threshold", None) + self.hold_seconds = getattr(acquisition, "hold_seconds", None) + + def _set_geometry(self, center, half_width: float, half_height: float, thickness: float) -> None: + self.center = center + self.half_width = float(half_width) + self.half_height = float(half_height) + self.thickness = float(thickness) + + def _set_colors(self, initial_color, target_color, vertical_color, background_color) -> None: + self.initial_color = initial_color + self.target_color = target_color + self.vertical_color = vertical_color + self.background_color = background_color + + def _set_pressure_text( + self, + show_pressure_text: bool, + left_pressure_label: str, + right_pressure_label: str, + pressure_text_color, + pressure_text_font_size: int | None, + pressure_text_width: float | None, + pressure_text_height: float | None, + pressure_text_gap: float, + pressure_text_decimals: int, + pressure_text_font_name: str | None = None, + ) -> None: + self.show_pressure_text = bool(show_pressure_text) + self.left_pressure_label = str(left_pressure_label) + self.right_pressure_label = str(right_pressure_label) + self.pressure_text_color = tuple(pressure_text_color or self.target_color) + self.pressure_text_font_size = int(pressure_text_font_size or self._auto_text_font_size()) + self.pressure_text_width = float(pressure_text_width or self._auto_text_width()) + self.pressure_text_height = float(pressure_text_height or self._auto_text_height()) + self.pressure_text_gap = float(pressure_text_gap) + self.pressure_text_decimals = int(pressure_text_decimals) + self.pressure_text_font_name = pressure_text_font_name + + def _reset_runtime_state(self) -> None: + self.left_pressure = 0.0 + self.right_pressure = 0.0 + self.left_status = "too_weak" + self.right_status = "too_weak" + self.left_scale = 1.0 + self.right_scale = 1.0 + self.color = self.initial_color + self._left_line = None + self._right_line = None + self._vertical_line = None + self._left_marker = None + self._right_marker = None + self._left_text = None + self._right_text = None + + def update(self, state: PressureFeedbackState) -> None: + """Update the widget from a pressure feedback state. + + Parameters + ---------- + state : PressureFeedbackState + Latest pressure feedback state. The widget copies pressure values, + statuses, scales, and hold progress from this object. + + Returns + ------- + None + The widget state is updated in place. + """ + self.left_pressure = state.left_pressure + self.right_pressure = state.right_pressure + self.left_status = state.left_status + self.right_status = state.right_status + self.left_scale = state.left_scale + self.right_scale = state.right_scale + self.color = self._lerp_color(self.initial_color, self.target_color, state.hold_progress) + + def draw(self) -> None: + """Draw the interactive fixation cross and optional pressure text. + + Notes + ----- + The vertical line is always drawn. A horizontal side with scale ``0`` is + hidden, which corresponds to no detected pressure on that side. + """ + center_x, center_y = self._center() + left_x = center_x - self.half_width * self.left_scale + right_x = center_x + self.half_width * self.right_scale + top_y = center_y + self.half_height + bottom_y = center_y - self.half_height + + if self.show_goal_markers: + self._draw_goal_markers(center_x, center_y) + if self.left_scale > 0.0: + self._draw_line("_left_line", (left_x, center_y), (center_x, center_y), self.color) + if self.right_scale > 0.0: + self._draw_line("_right_line", (center_x, center_y), (right_x, center_y), self.color) + self._draw_line( + "_vertical_line", + (center_x, bottom_y), + (center_x, top_y), + self.vertical_color or self.color, + ) + if self.show_pressure_text and (self.left_status != "ideal" or self.right_status != "ideal"): + self._draw_pressure_text(center_x, top_y) + + def _center(self): + if self.center is not None: + return self.center + width = getattr(self.screen, "width", getattr(self.screen, "w", 0)) + height = getattr(self.screen, "height", getattr(self.screen, "h", 0)) + return width / 2, height / 2 + + def _draw_line(self, attr_name: str, start, end, color, thickness: float | None = None) -> None: + t = self.thickness if thickness is None else thickness + line = getattr(self, attr_name) + if line is None: + line = self._line_cls(start_point=start, end_point=end, thickness=t, color=color) + setattr(self, attr_name, line) + else: + line.set_start_point(start) + line.set_end_point(end) + line.set_color(color) + line.set_thickness(t) + line.draw() + + def _draw_goal_markers(self, center_x: float, center_y: float) -> None: + SIZE_FACTOR = 0.33 + marker_width = max(1.0, self.thickness * SIZE_FACTOR) + left_marker_x = center_x - self.half_width + right_marker_x = center_x + self.half_width + self._draw_line( + "_left_marker", + (left_marker_x - marker_width, center_y), + (left_marker_x, center_y), + self.target_color, + thickness=self.thickness, + ) + self._draw_line( + "_right_marker", + (right_marker_x, center_y), + (right_marker_x + marker_width, center_y), + self.target_color, + thickness=self.thickness, + ) + + def _draw_pressure_text(self, center_x: float, top_y: float) -> None: + text_y1 = top_y + self.pressure_text_gap + text_y2 = text_y1 + self.pressure_text_height + left_rect = ( + center_x - self.half_width - self.pressure_text_width, + text_y1, + center_x - self.half_width, + text_y2, + ) + right_rect = ( + center_x + self.half_width, + text_y1, + center_x + self.half_width + self.pressure_text_width, + text_y2, + ) + + if self.left_status != "ideal": + left_text = self._format_pressure(self.left_pressure_label, self.left_pressure) + self._left_text = self._draw_text(self._left_text, left_text, left_rect) + if self.right_status != "ideal": + right_text = self._format_pressure(self.right_pressure_label, self.right_pressure) + self._right_text = self._draw_text(self._right_text, right_text, right_rect) + + def _draw_text(self, text_obj, text: str, dest_rect): + if text_obj is None: + kwargs = { + "text": text, + "font_size": self.pressure_text_font_size, + "color": self.pressure_text_color, + "dest_rect": dest_rect, + } + if self.pressure_text_font_name is not None: + kwargs["font_name"] = self.pressure_text_font_name + text_obj = self._text_cls(**kwargs) + else: + text_obj.set_dest_rect(dest_rect) + if text_obj.text != text: + text_obj.set_text(text) + text_obj.draw() + return text_obj + + def _format_pressure(self, label: str, pressure: float) -> str: + prefix = f"{label}: " if label else "" + return f"{prefix}{pressure:.{self.pressure_text_decimals}f}" + + def _screen_min(self) -> float: + width = float(getattr(self.screen, "width", getattr(self.screen, "w", 1024))) + height = float(getattr(self.screen, "height", getattr(self.screen, "h", 768))) + return min(width, height) + + def _auto_text_font_size(self) -> int: + return max(12, int(round(self._screen_min() * 0.012))) + + def _auto_text_width(self) -> float: + return max(48.0, self._screen_min() * 0.055) + + def _auto_text_height(self) -> float: + return max(20.0, self.pressure_text_font_size * 1.5) + + @staticmethod + def _lerp_color(start, end, progress: float): + progress = max(0.0, min(1.0, float(progress))) + return tuple( + int(round(float(s) + (float(e) - float(s)) * progress)) + for s, e in zip(start, end) + ) + + @staticmethod + def _as_tuple(value): + if value is None: + return None + return tuple(float(item) for item in value) + + @staticmethod + def _as_rgb_color(value, name: str): + try: + color = tuple(int(round(float(channel))) for channel in value) + except TypeError as exc: + raise ValueError(f"{name} must be an RGB sequence") from exc + if len(color) != 3: + raise ValueError(f"{name} must contain exactly 3 RGB channels") + if any(channel < 0 or channel > 255 for channel in color): + raise ValueError(f"{name} RGB channels must be between 0 and 255") + return color diff --git a/src/tachypy/wooting/__init__.py b/src/tachypy/wooting/__init__.py new file mode 100644 index 0000000..acc7f3a --- /dev/null +++ b/src/tachypy/wooting/__init__.py @@ -0,0 +1,55 @@ +"""TachyPy ↔ Wooting integration (requires ``pip install tachypy[wooting]``). + +This module is the single import surface for using a Wooting analog keyboard +*inside* TachyPy experiments. It re-exports TachyWooting's public API and adds an +enriched :class:`WOOTING_ACQUISITION` that gains TachyPy visual feedback +(``wait_light_press_visual``) on top of the hardware acquisition class. + +TachyPy core never imports this module, so ``pip install tachypy`` stays usable +without a keyboard. Importing this module without TachyWooting installed raises a +clear, actionable error. +""" +from __future__ import annotations + +try: + import tachywooting as _tachywooting +except ImportError as exc: # pragma: no cover - exercised via packaging + raise ImportError( + "The Wooting integration requires the 'tachywooting' package.\n" + "Install it with:\n\n pip install 'tachypy[wooting]'\n" + ) from exc + +from tachypy.feedback import VisualPressureFeedbackMixin + +# Re-export the keyboard's public API so experiments need only one import. +from tachywooting import ( # noqa: F401 + convert_char_to_keycode, + ffi, + lib, + load_session, + load_trial, + trial_to_dataframe, +) +from tachywooting.visualize import visualize, visualize_all_keys # noqa: F401 + +# TachyPy-enriched acquisition class that combines Wooting's hardware acquisition and TachyPy's visual feedback. +class WOOTING_ACQUISITION(_tachywooting.WOOTING_ACQUISITION, VisualPressureFeedbackMixin): + """Wooting acquisition enriched with TachyPy visual feedback. + + Identical to :class:`tachywooting.WOOTING_ACQUISITION` (acquisition, logging, + readiness checks) plus :meth:`~tachypy.feedback.VisualPressureFeedbackMixin.wait_light_press_visual` + for on-screen pressure feedback. + """ + + +__all__ = [ + "WOOTING_ACQUISITION", + "convert_char_to_keycode", + "ffi", + "lib", + "load_session", + "load_trial", + "trial_to_dataframe", + "visualize", + "visualize_all_keys", +] diff --git a/src/tachypy/wooting/demos/__init__.py b/src/tachypy/wooting/demos/__init__.py new file mode 100644 index 0000000..ccb4412 --- /dev/null +++ b/src/tachypy/wooting/demos/__init__.py @@ -0,0 +1,8 @@ +"""Visual TachyPy demos for the Wooting keyboard integration. + +These demos require ``pip install tachypy[wooting]`` (they draw on screen and read +analog pressures). Console entry points: + +- ``tachypy-wooting-fixation-demo`` → :func:`visual_fixation_demo.main` +- ``tachypy-wooting-mini-bw`` → :func:`mini_bw_experiment.main` +""" diff --git a/src/tachypy/wooting/demos/mini_bw_experiment.py b/src/tachypy/wooting/demos/mini_bw_experiment.py new file mode 100644 index 0000000..78eb46f --- /dev/null +++ b/src/tachypy/wooting/demos/mini_bw_experiment.py @@ -0,0 +1,141 @@ +from __future__ import annotations + +import os +import random +import time + +import numpy as np + +try: + from tachypy import FixationCross, ResponseHandler, Screen, Text, Texture + from tachypy import WOOTING_ACQUISITION + from tachypy.wooting import convert_char_to_keycode +except ImportError as exc: # pragma: no cover + raise SystemExit( + "This demo requires the Wooting integration: pip install 'tachypy[wooting]'" + ) from exc + +N_TRIALS = 20 +YES_KEY = "z" # white response +NO_KEY = "c" # black response +WARN_STREAK = 2 # flag after this many consecutive removals +QUIT_KEYS = {"escape", "esc", "enter", "return", "space", "q"} +BG = (128, 128, 128) + + +def main() -> int: + acq = WOOTING_ACQUISITION(threshold=0.80, light_press_hold_seconds=1, min_pressure_start = 0.33, max_pressure_start = 0.66) + acq.initialize_keyboard() + + screen = None + try: + screen = Screen(fullscreen=False) + screen.hide_mouse() + rh = ResponseHandler(screen=screen) + rh.keys_to_listen = sorted(QUIT_KEYS) + + w, h = screen.width, screen.height + margin = 16 + readiness_fixation = FixationCross( + center=(w // 2, h // 2), + half_width=18, + half_height=18, + thickness=8, + color=(0, 0, 0), + ) + + white_tex = Texture(np.ones((256, 256, 3), dtype=np.uint8) * 255) + black_tex = Texture(np.zeros((256, 256, 3), dtype=np.uint8)) + + label = Text(text=".", font_size=28, color=(0, 0, 0), + dest_rect=(0, margin, w, margin + 44)) + score = Text(text="Score: —", font_size=24, color=(0, 0, 0), + dest_rect=(w - 240, margin + 50, w - margin, margin + 122)) + + yes_code, no_code = convert_char_to_keycode([YES_KEY, NO_KEY]) + + # --- instructions screen --- + instructions = Text( + text=( + "Welcome!\n \n" + "A black or white image will appear.\n" + f"Press {YES_KEY.upper()} if it is white, {NO_KEY.upper()} if it is black.\n \n" + f"Before each trial, rest your fingers lightly on {YES_KEY.upper()} and {NO_KEY.upper()}.\n" + f"The trial starts as soon as the pressure is stable for {acq.hold_seconds} second(s).\n" + "After your response, lift your fingers completely off the keys.\n \n" + f"Press {YES_KEY.upper()} to begin." + ), + font_size=26, + color=(0, 0, 0), + dest_rect=(int(w * 0.1), int(h * 0.15), int(w * 0.9), int(h * 0.85)), + ) + screen.fill(BG) + instructions.draw() + screen.flip() + acq.wait_keys_light_press(target_keys=[YES_KEY], quit_key=NO_KEY) + + correct = clean = 0 + + for trial in range(1, N_TRIALS + 1): + is_white = bool(random.getrandbits(1)) + + if not acq.wait_light_press_visual( + screen=screen, + target_keys=[YES_KEY, NO_KEY], + response_handler=rh, + exit_keys=QUIT_KEYS, + fixation_cross=readiness_fixation, + show_pressure_text=True, + show_goal_markers=True, + ): + break + + screen.fill(BG) + (white_tex if is_white else black_tex).draw( + (w // 2 - 128, h // 2 - 128, w // 2 + 128, h // 2 + 128) + ) + label.set_text(f"Trial {trial}/{N_TRIALS} | {YES_KEY.upper()} = white {NO_KEY.upper()} = black") + label.draw() + score.draw() + screen.flip() + + hier = acq.acquire_analog_values(target_keys=[YES_KEY, NO_KEY]) + response = acq.get_response_key(hier, target_keys=[YES_KEY, NO_KEY]) + + is_correct = response == (yes_code if is_white else no_code) + had_removal = acq.last_trial_had_removal + correct += int(is_correct) + clean += int(not had_removal) + score.set_text(f"Correct: {correct / trial * 100:.0f}%\nClean: {clean / trial * 100:.0f}%") + + feedback = "Correct!" if is_correct else "Incorrect" + if acq.reached_consecutive_removal_limit(WARN_STREAK): + feedback += f" [finger removal ×{acq.current_removal_streak}]" + elif had_removal: + feedback += " [finger removal]" + + screen.fill(BG) + label.set_text(feedback) + label.draw() + score.draw() + screen.flip() + time.sleep(0.25) + + if acq.wait_keys_released( + target_keys=[YES_KEY, NO_KEY], + response_handler=rh, + exit_keys=QUIT_KEYS, + ) is None: + break + + print(f"Done — {correct}/{acq.total_trials} correct, {acq.removal_trials} removal trials") + return 0 + + finally: + acq.uninitialize_keyboard() + if screen is not None and hasattr(screen, "close"): + screen.close() + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/src/tachypy/wooting/demos/visual_fixation_demo.py b/src/tachypy/wooting/demos/visual_fixation_demo.py new file mode 100644 index 0000000..43d453f --- /dev/null +++ b/src/tachypy/wooting/demos/visual_fixation_demo.py @@ -0,0 +1,489 @@ +from __future__ import annotations + +import argparse +import time + + +BACKGROUND_COLOR = (128, 128, 128) +HIT_BACKGROUND_COLOR = (120, 190, 120) +INITIAL_HORIZONTAL_COLOR = (80, 80, 80) +CROSS_COLOR = (0, 0, 0) +QUIT_KEYS = {"escape", "esc", "enter", "return", "space", "q"} +RUN_KEY = "x" +RUN_DURATION = 15.0 +WINDOWED_WIDTH = 1502 +WINDOWED_HEIGHT = 750 +FIX_CROSS_LINE = 8 +FIX_CROSS_HALF = 14 # run cross arm half-length (pixels) +FIX_CROSS_IDLE_LINE = 12 +FIX_CROSS_IDLE_HALF = 24 # idle cross arm half-length (pixels) + + +def _build_parser() -> argparse.ArgumentParser: + parser = argparse.ArgumentParser( + description="TachyPy demo for the interactive Wooting fixation cross.", + formatter_class=argparse.ArgumentDefaultsHelpFormatter, + ) + parser.add_argument("--screen-number", type=int, default=0) + parser.add_argument("--refresh-rate", type=int, default=240) + parser.add_argument("--fullscreen", action="store_true", help="Use TachyPy fullscreen mode. By default the demo opens in a 1502x750 window.") + parser.add_argument("--left-key", default="z") + parser.add_argument("--right-key", default="c") + parser.add_argument("--hold-seconds", type=float, default=1.00) + parser.add_argument("--min-pressure", type=float, default=0.33) + parser.add_argument("--max-pressure", type=float, default=0.66) + parser.add_argument("--threshold", type=float, default=0.80) + + parser.add_argument("--half-width", type=float, default=FIX_CROSS_IDLE_HALF) + parser.add_argument("--half-height", type=float, default=FIX_CROSS_IDLE_HALF) + parser.add_argument("--thickness", type=float, default=FIX_CROSS_IDLE_LINE) + parser.add_argument("--run-half-width", type=float, default=FIX_CROSS_HALF) + parser.add_argument("--run-half-height", type=float, default=FIX_CROSS_HALF) + parser.add_argument("--run-thickness", type=float, default=FIX_CROSS_LINE) + return parser + + +class _ExitKeyTracker: + """Wraps a ResponseHandler to record which exit key triggered the last exit.""" + + def __init__(self, rh) -> None: + self._rh = rh + self.last_key: str | None = None + self._seen_event_count = 0 + + def get_events(self) -> None: + if hasattr(self._rh, "get_events"): + self._rh.get_events() + + def should_quit(self) -> bool: + return bool(getattr(self._rh, "should_quit", lambda: False)()) + + def get_key_presses(self) -> list: + all_events = self._rh.get_key_presses() if hasattr(self._rh, "get_key_presses") else [] + new_events = all_events[self._seen_event_count:] + self._seen_event_count = len(all_events) + for e in new_events: + if e.get("type") == "keydown": + self.last_key = str(e.get("key", "")).lower() + return all_events + + def clear_events(self) -> None: + if hasattr(self._rh, "clear_events"): + self._rh.clear_events() + self._seen_event_count = 0 + self.last_key = None + + @property + def keys_to_listen(self): + return self._rh.keys_to_listen + + @keys_to_listen.setter + def keys_to_listen(self, value) -> None: + self._rh.keys_to_listen = value + if value is not None: + if hasattr(self._rh, "_probed_keys"): + self._rh._probed_keys.update(value) + if getattr(self._rh, "backend", None) == "glfw": + screen = getattr(self._rh, "screen", None) + if screen is not None and hasattr(screen, "track_keys"): + screen.track_keys(value) + + +class GamifiedFixationWidget: + """Heads-up display wrapper for the visual fixation demo. + + Parameters + ---------- + screen : object + TachyPy ``Screen``-like object used for sizing HUD rectangles. + fixation_widget : object + Widget that draws the interactive fixation cross in idle mode (bigger). + run_fixation_widget : object + Widget that draws the interactive fixation cross during a run (smaller). + text_cls : type + TachyPy ``Text`` class or compatible replacement. + started_at : float + ``time.perf_counter()`` timestamp used as the timer origin. + hold_seconds : float + Hold duration used to compute theoretical hit rate and efficiency. + """ + + def __init__( + self, + screen, + fixation_widget, + run_fixation_widget, + text_cls, + started_at: float, + hold_seconds: float, + font_name: str = "Helvetica", + compact_layout: bool = False, + ): + self.screen = screen + self.fixation_widget = fixation_widget + self.run_fixation_widget = run_fixation_widget + self.text_cls = text_cls + self.started_at = float(started_at) + self.hold_seconds = float(hold_seconds) + self.font_name = font_name + self.compact_layout = bool(compact_layout) + self.hits = 0 + self.flash_until = 0.0 + # Run state + self._run_active = False + self._run_start_time = 0.0 + self._run_duration = RUN_DURATION + self._run_start_hits = 0 + # Best score + self._has_best = False + self.best_hits = 0 + self.best_efficiency = 0.0 + # Text objects + self._timer_text = None + self._hits_text = None + self._efficiency_text = None + self._controls_text = None + self._best_label_text = None + self._best_value_text = None + + @property + def _active_cross(self): + return self.run_fixation_widget if self._run_active else self.fixation_widget + + # ── run lifecycle ──────────────────────────────────────────────────────── + + def start_run(self, duration: float = RUN_DURATION) -> None: + self._run_active = True + self._run_start_time = time.perf_counter() + self._run_duration = float(duration) + self._run_start_hits = self.hits + + def end_run(self) -> None: + if not self._run_active: + return + self._run_active = False + run_hits = self._run_hits() + run_eff = self._run_efficiency_percent() + if not self._has_best or run_hits > self.best_hits or (run_hits == self.best_hits and run_eff > self.best_efficiency): + self.best_hits = run_hits + self.best_efficiency = run_eff + self._has_best = True + + def _run_elapsed(self) -> float: + return min(self._run_duration, time.perf_counter() - self._run_start_time) + + def _run_remaining(self) -> float: + return max(0.0, self._run_duration - (time.perf_counter() - self._run_start_time)) + + def _run_hits(self) -> int: + return self.hits - self._run_start_hits + + def _run_efficiency_percent(self) -> float: + elapsed = self._run_elapsed() + if elapsed <= 0 or self.hold_seconds <= 0: + return 0.0 + theoretical = elapsed / self.hold_seconds + return min(999.9, max(0.0, (self._run_hits() / theoretical) * 100.0)) + + # ── widget protocol ────────────────────────────────────────────────────── + + def update(self, state) -> None: + self._active_cross.update(state) + + def draw(self) -> None: + self._active_cross.draw() + self._draw_hud() + + def register_hit(self) -> None: + self.hits += 1 + self.flash_until = time.perf_counter() + 0.30 + + def background_color(self): + if time.perf_counter() < self.flash_until: + return HIT_BACKGROUND_COLOR + return BACKGROUND_COLOR + + # ── HUD ───────────────────────────────────────────────────────────────── + + def _draw_hud(self) -> None: + width, height = self._screen_size() + margin = max(16, int(min(width, height) * 0.025)) + font_size = max(16, int(min(width, height) * 0.026)) + small_font_size = max(14, int(font_size * 0.78)) + + if self.compact_layout: + font_size = max(22, int(min(width, height) * 0.032)) + small_font_size = max(16, int(font_size * 0.76)) + hud_y = margin + line_h = font_size * 2.1 + timer_w = 260 + hits_w = 220 + efficiency_w = 330 + timer_rect = self._rect_centered(width / 2, hud_y + line_h / 2, timer_w, line_h) + hits_rect = self._rect(margin, hud_y, hits_w, line_h) + efficiency_rect = self._rect(width - margin - efficiency_w, hud_y, efficiency_w, line_h) + else: + timer_rect = self._rect_centered(width / 2, margin + font_size, 220, font_size * 1.8) + hits_rect = self._rect(margin, margin, 180, font_size * 1.8) + efficiency_rect = self._rect(width - margin - 220, margin, 220, font_size * 1.8) + + if self._run_active: + remaining = self._run_remaining() + self._timer_text = self._draw_text(self._timer_text, f"RUN {remaining:04.1f}s", timer_rect, font_size) + self._hits_text = self._draw_text(self._hits_text, f"Hits {self._run_hits()}", hits_rect, font_size) + self._efficiency_text = self._draw_text(self._efficiency_text, f"Efficiency {self._run_efficiency_percent():05.1f}%", efficiency_rect, font_size) + else: + elapsed = max(0.0, time.perf_counter() - self.started_at) + if self.compact_layout: + controls_rect = self._rect_centered(width / 2, height - margin - small_font_size * 2.6, 220, small_font_size * 4.0) + else: + controls_rect = self._rect(width - margin - 180, height - margin - small_font_size * 5, 180, small_font_size * 5) + self._timer_text = self._draw_text(self._timer_text, f"Time {elapsed:05.1f}s", timer_rect, font_size) + self._hits_text = self._draw_text(self._hits_text, f"Hits {self.hits}", hits_rect, font_size) + self._efficiency_text = self._draw_text(self._efficiency_text, f"Efficiency {self._efficiency_percent(elapsed):05.1f}%", efficiency_rect, font_size) + self._controls_text = self._draw_text(self._controls_text, f"{RUN_KEY.upper()}: run\nEsc: quit", controls_rect, small_font_size) + + if self._has_best: + label_h = small_font_size * 1.6 + value_h = font_size * 2.0 + if self.compact_layout: + label_rect = self._rect(margin, height - margin - label_h - value_h, 360, label_h) + value_rect = self._rect(margin, height - margin - value_h, 360, value_h) + else: + label_rect = self._rect(margin, height - margin - label_h - value_h, 300, label_h) + value_rect = self._rect(margin, height - margin - value_h, 300, value_h) + self._best_label_text = self._draw_text(self._best_label_text, "BEST SCORE", label_rect, small_font_size, color=(70, 70, 70)) + self._best_value_text = self._draw_text(self._best_value_text, f"{self.best_hits} hits {self.best_efficiency:.1f}%", value_rect, font_size) + + def _efficiency_percent(self, elapsed: float) -> float: + if elapsed <= 0 or self.hold_seconds <= 0: + return 0.0 + theoretical_hits = elapsed / self.hold_seconds + if theoretical_hits <= 0: + return 0.0 + return min(999.9, max(0.0, (self.hits / theoretical_hits) * 100.0)) + + def _draw_text(self, text_obj, text: str, dest_rect, font_size: int, color=None): + dest_rect = self._snap_rect(self._clamp_rect(dest_rect)) + draw_color = color if color is not None else CROSS_COLOR + if text_obj is None: + text_obj = self.text_cls( + text=text, + font_size=font_size, + color=draw_color, + dest_rect=dest_rect, + font_name=self.font_name, + ) + else: + text_obj.set_dest_rect(dest_rect) + if text_obj.text != text: + text_obj.set_text(text) + text_obj.draw() + return text_obj + + def _screen_size(self): + return ( + float(getattr(self.screen, "width", getattr(self.screen, "w", 1024))), + float(getattr(self.screen, "height", getattr(self.screen, "h", 768))), + ) + + def _rect(self, x, y, width, height): + return (float(x), float(y), float(x + width), float(y + height)) + + def _rect_centered(self, center_x, center_y, width, height): + return self._rect(center_x - width / 2, center_y - height / 2, width, height) + + def _clamp_rect(self, rect): + screen_width, screen_height = self._screen_size() + x1, y1, x2, y2 = rect + width = min(max(1.0, x2 - x1), screen_width) + height = min(max(1.0, y2 - y1), screen_height) + x1 = min(max(0.0, x1), screen_width - width) + y1 = min(max(0.0, y1), screen_height - height) + return (x1, y1, x1 + width, y1 + height) + + @staticmethod + def _snap_rect(rect): + x1, y1, x2, y2 = rect + return (round(x1), round(y1), round(x2), round(y2)) + + +def _draw_countdown(screen, widget, text_cls, background_color_fn, font_name: str) -> None: + """Show a 3-2-1 countdown in the center of the screen.""" + width = float(getattr(screen, "width", getattr(screen, "w", 1024))) + height = float(getattr(screen, "height", getattr(screen, "h", 768))) + font_size = max(80, int(min(width, height) * 0.14)) + box = min(width, height) * 0.22 + center_rect = (width / 2 - box / 2, height / 2 - box / 2, width / 2 + box / 2, height / 2 + box / 2) + + countdown_text = None + for count in range(3, 0, -1): + deadline = time.perf_counter() + 1.0 + label = str(count) + while time.perf_counter() < deadline: + screen.fill(background_color_fn()) + widget.draw() + if countdown_text is None: + countdown_text = text_cls( + text=label, + font_size=font_size, + color=(220, 220, 220), + dest_rect=center_rect, + font_name=font_name, + ) + elif countdown_text.text != label: + countdown_text.set_text(label) + countdown_text.draw() + screen.flip() + + +def main() -> int: + args = _build_parser().parse_args() + + try: + from tachypy import ResponseHandler, Screen, Text + from tachypy.feedback import InteractiveFixationCross + from tachypy import WOOTING_ACQUISITION + except ImportError as exc: + raise SystemExit( + "This demo requires the Wooting integration: pip install 'tachypy[wooting]'" + ) from exc + + acq = WOOTING_ACQUISITION( + threshold=args.threshold, + min_pressure_start=args.min_pressure, + max_pressure_start=args.max_pressure, + light_press_hold_seconds=args.hold_seconds, + backend="auto", + timing_mode="hybrid", + ) + acq.initialize_keyboard(verbose=True) + + screen = None + try: + screen = Screen( + screen_number=args.screen_number, + width=None if args.fullscreen else WINDOWED_WIDTH, + height=None if args.fullscreen else WINDOWED_HEIGHT, + fullscreen=args.fullscreen, + desired_refresh_rate=args.refresh_rate, + ) + if hasattr(screen, "hide_mouse"): + screen.hide_mouse() + + response_handler = ResponseHandler(screen=screen) + key_tracker = _ExitKeyTracker(response_handler) + + center = (screen.width // 2, screen.height // 2) + shared_cross_kwargs = dict( + screen=screen, + acquisition=acq, + center=center, + background_color=BACKGROUND_COLOR, + initial_color=INITIAL_HORIZONTAL_COLOR, + target_color=CROSS_COLOR, + vertical_color=CROSS_COLOR, + show_pressure_text=True, + show_goal_markers=True, + pressure_text_font_name="Helvetica", + ) + + idle_cross = InteractiveFixationCross( + half_width=args.half_width, + half_height=args.half_height, + thickness=args.thickness, + **shared_cross_kwargs, + ) + run_cross = InteractiveFixationCross( + half_width=args.run_half_width, + half_height=args.run_half_height, + thickness=args.run_thickness, + **shared_cross_kwargs, + ) + + widget = GamifiedFixationWidget( + screen=screen, + fixation_widget=idle_cross, + run_fixation_widget=run_cross, + text_cls=Text, + started_at=time.perf_counter(), + hold_seconds=args.hold_seconds, + font_name="Helvetica", + compact_layout=not args.fullscreen, + ) + + def draw_release_frame() -> None: + screen.fill(widget.background_color()) + widget.draw() + screen.flip() + + print("Press Escape, Enter, Space, or q to quit.") + print(f"Use {args.left_key!r} for left and {args.right_key!r} for right.") + print(f"Press {RUN_KEY!r} to start a {RUN_DURATION:.0f}s run.") + + while True: + in_run = widget._run_active + current_exit_keys = QUIT_KEYS if in_run else (QUIT_KEYS | {RUN_KEY}) + timeout = max(0.01, widget._run_remaining()) if in_run else None + key_tracker.last_key = None + + try: + ready = acq.wait_light_press_visual( + screen=screen, + target_keys=[args.left_key, args.right_key], + widget=widget, + background_color=widget.background_color, + response_handler=key_tracker, + exit_keys=current_exit_keys, + timeout_seconds=timeout, + ) + except TimeoutError: + widget.end_run() + key_tracker.clear_events() + continue + + if not ready: + if in_run: + widget.end_run() + key_tracker.clear_events() + continue + if key_tracker.last_key == RUN_KEY: + _draw_countdown(screen, widget, Text, widget.background_color, "Helvetica") + widget.start_run(RUN_DURATION) + continue + print(f"Final hits: {widget.hits}") + return 0 + + widget.register_hit() + + release_timeout = max(0.01, widget._run_remaining()) if widget._run_active else None + try: + released_at = acq.wait_keys_released( + target_keys=[args.left_key, args.right_key], + hold_seconds=0.01, + response_handler=key_tracker, + exit_keys=QUIT_KEYS, + on_tick=draw_release_frame, + timeout_seconds=release_timeout, + ) + except TimeoutError: + widget.end_run() + key_tracker.clear_events() + continue + + if released_at is None: + if widget._run_active: + widget.end_run() + print(f"Final hits: {widget.hits}") + return 0 + + return 0 + + finally: + acq.uninitialize_keyboard() + if screen is not None and hasattr(screen, "close"): + screen.close() + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/tests/test_pressure_keyboard.py b/tests/test_pressure_keyboard.py new file mode 100644 index 0000000..be9032f --- /dev/null +++ b/tests/test_pressure_keyboard.py @@ -0,0 +1,553 @@ +import importlib +import sys +import types + +import pytest + +import tachypy +from tachypy.feedback import ( + InteractiveFixationCross, + PressureFeedbackConfig, + PressureFeedbackState, + VisualPressureFeedbackMixin, +) + + +# Public API + +def test_wooting_shortcut_is_lazy_and_kept_out_of_star_imports(): + assert "WOOTING_ACQUISITION" not in tachypy.__all__ + assert tachypy._EXPORT_MAP["WOOTING_ACQUISITION"] == ( + "tachypy.wooting", + "WOOTING_ACQUISITION", + ) + + +# tachywooting facade + +def test_wooting_facade_enriches_tachywooting_acquisition(monkeypatch): + class BaseAcquisition: + pass + + fake_tachywooting = types.ModuleType("tachywooting") + fake_tachywooting.WOOTING_ACQUISITION = BaseAcquisition + fake_tachywooting.convert_char_to_keycode = lambda keys: keys + fake_tachywooting.ffi = object() + fake_tachywooting.lib = object() + fake_tachywooting.load_session = lambda *args, **kwargs: None + fake_tachywooting.load_trial = lambda *args, **kwargs: None + fake_tachywooting.trial_to_dataframe = lambda *args, **kwargs: None + + fake_visualize = types.ModuleType("tachywooting.visualize") + fake_visualize.visualize = lambda *args, **kwargs: None + fake_visualize.visualize_all_keys = lambda *args, **kwargs: None + + monkeypatch.setitem(sys.modules, "tachywooting", fake_tachywooting) + monkeypatch.setitem(sys.modules, "tachywooting.visualize", fake_visualize) + original_module = sys.modules.pop("tachypy.wooting", None) + try: + module = importlib.import_module("tachypy.wooting") + + assert issubclass(module.WOOTING_ACQUISITION, BaseAcquisition) + assert issubclass(module.WOOTING_ACQUISITION, VisualPressureFeedbackMixin) + assert "WOOTING_ACQUISITION" in module.__all__ + finally: + sys.modules.pop("tachypy.wooting", None) + if original_module is not None: + sys.modules["tachypy.wooting"] = original_module + + +# Pressure model + +def test_pressure_scale_for(): + cfg = PressureFeedbackConfig(min_pressure_start=0.1, max_pressure_start=0.4, threshold=0.8) + + assert cfg.scale_for(0.0) == 0.0 + assert cfg.scale_for(0.001) > 0.25 + assert cfg.scale_for(0.1) == 1.0 + assert cfg.scale_for(0.3) == 1.0 + assert cfg.scale_for(1.0) == 2.0 + + +def test_pressure_feedback_state_hold_timer(): + state = PressureFeedbackState( + PressureFeedbackConfig( + min_pressure_start=0.1, + max_pressure_start=0.4, + threshold=0.8, + hold_seconds=0.5, + ) + ) + + state.update(0.2, 0.2, now=1.0) + assert state.hold_progress == 0.0 + assert not state.is_ready + + state.update(0.2, 0.2, now=1.25) + assert state.hold_progress == 0.5 + assert not state.is_ready + + state.update(0.2, 0.2, now=1.5) + assert state.hold_progress == 1.0 + assert state.is_ready + + +def test_pressure_feedback_state_resets_when_out_of_range(): + state = PressureFeedbackState( + PressureFeedbackConfig( + min_pressure_start=0.1, + max_pressure_start=0.4, + threshold=0.8, + hold_seconds=0.5, + ) + ) + + state.update(0.2, 0.2, now=1.0) + state.update(0.2, 0.5, now=1.25) + + assert state.right_status == "too_strong" + assert state.hold_progress == 0.0 + assert not state.is_ready + + +# Widget rendering + +def test_widget_updates_and_draws_lines(monkeypatch): + class FakeScreen: + width = 100 + height = 80 + + class FakeLine: + created = [] + + def __init__(self, start_point, end_point, thickness, color): + self.start_point = start_point + self.end_point = end_point + self.thickness = thickness + self.color = color + self.draw_count = 0 + self.created.append(self) + + def set_start_point(self, start_point): + self.start_point = start_point + + def set_end_point(self, end_point): + self.end_point = end_point + + def set_thickness(self, thickness): + self.thickness = thickness + + def set_color(self, color): + self.color = color + + def draw(self): + self.draw_count += 1 + + screen = FakeScreen() + monkeypatch.setitem(sys.modules, "tachypy", types.SimpleNamespace(Line=FakeLine)) + widget = InteractiveFixationCross( + screen=screen, + half_width=10, + half_height=5, + initial_color=(100, 100, 100), + target_color=(0, 0, 0), + background_color=(255, 255, 255), + ) + state = PressureFeedbackState( + PressureFeedbackConfig( + min_pressure_start=0.1, + max_pressure_start=0.4, + threshold=0.8, + hold_seconds=0.5, + ) + ) + state.update(0.0, 1.0, now=1.0) + + widget.update(state) + widget.draw() + + assert widget.left_scale == 0.0 + assert widget.right_scale == 2.0 + assert len(FakeLine.created) == 2 + assert all(line.draw_count == 1 for line in FakeLine.created) + + +def test_widget_rejects_invisible_initial_color(): + with pytest.raises(ValueError, match="initial_color"): + InteractiveFixationCross( + screen=object(), + initial_color=(128, 128, 128), + background_color=(128, 128, 128), + ) + + +def test_widget_can_copy_existing_fixation_cross(monkeypatch): + class FakeLine: + def __init__(self, start_point, end_point, thickness, color): + pass + + fixation = types.SimpleNamespace( + center=(12, 34), + half_width=7, + half_height=9, + thickness=3, + color=(10, 20, 30), + ) + monkeypatch.setitem(sys.modules, "tachypy", types.SimpleNamespace(Line=FakeLine)) + + widget = InteractiveFixationCross( + screen=object(), + fixation_cross=fixation, + initial_color=(100, 100, 100), + background_color=(128, 128, 128), + ) + + assert widget.center == (12.0, 34.0) + assert widget.half_width == 7.0 + assert widget.half_height == 9.0 + assert widget.thickness == 3.0 + assert widget.target_color == (10, 20, 30) + + +@pytest.mark.parametrize("thickness", [3, 4, 5, 6]) +def test_goal_markers_touch_ideal_pressure_bar_edges(monkeypatch, thickness): + class FakeLine: + created = [] + + def __init__(self, start_point, end_point, thickness, color): + self.start_point = start_point + self.end_point = end_point + self.thickness = thickness + self.color = color + self.created.append(self) + + def set_start_point(self, start_point): + self.start_point = start_point + + def set_end_point(self, end_point): + self.end_point = end_point + + def set_thickness(self, thickness): + self.thickness = thickness + + def set_color(self, color): + self.color = color + + def draw(self): + pass + + monkeypatch.setitem(sys.modules, "tachypy", types.SimpleNamespace(Line=FakeLine)) + widget = InteractiveFixationCross( + screen=object(), + center=(50, 40), + half_width=10, + half_height=5, + thickness=thickness, + show_goal_markers=True, + ) + state = PressureFeedbackState( + PressureFeedbackConfig( + min_pressure_start=0.1, + max_pressure_start=0.4, + threshold=0.8, + hold_seconds=0.5, + ) + ) + state.update(0.2, 0.2, now=1.0) + + widget.update(state) + widget.draw() + + assert widget.left_scale == 1.0 + assert widget.right_scale == 1.0 + expected_marker_width = max(1.0, widget.thickness * 0.33) + left_edge_x = widget._left_line.start_point[0] + right_edge_x = widget._right_line.end_point[0] + center_y = widget._left_line.start_point[1] + assert widget._left_marker.start_point == (left_edge_x - expected_marker_width, center_y) + assert widget._left_marker.end_point == (left_edge_x, center_y) + assert widget._right_marker.start_point == (right_edge_x, center_y) + assert widget._right_marker.end_point == (right_edge_x + expected_marker_width, center_y) + assert widget._left_marker.thickness == widget.thickness + assert widget._right_marker.thickness == widget.thickness + assert widget._left_marker.thickness == widget._left_line.thickness + assert widget._right_marker.thickness == widget._right_line.thickness + assert FakeLine.created == [ + widget._left_marker, + widget._right_marker, + widget._left_line, + widget._right_line, + widget._vertical_line, + ] + + +def test_widget_accepts_acquisition_settings(monkeypatch): + class FakeLine: + def __init__(self, start_point, end_point, thickness, color): + pass + + acquisition = types.SimpleNamespace( + min_pressure_start=0.33, + max_pressure_start=0.66, + threshold=0.8, + hold_seconds=0.3, + ) + monkeypatch.setitem(sys.modules, "tachypy", types.SimpleNamespace(Line=FakeLine)) + + widget = InteractiveFixationCross( + screen=object(), + acquisition=acquisition, + initial_color=(100, 100, 100), + background_color=(128, 128, 128), + ) + + assert widget.acquisition is acquisition + assert widget.min_pressure_start == 0.33 + assert widget.max_pressure_start == 0.66 + assert widget.threshold == 0.8 + assert widget.hold_seconds == 0.3 + + +def test_widget_lerps_any_rgb_channels(): + assert InteractiveFixationCross._lerp_color((10, 200, 30), (110, 20, 230), 0.0) == (10, 200, 30) + assert InteractiveFixationCross._lerp_color((10, 200, 30), (110, 20, 230), 0.5) == (60, 110, 130) + assert InteractiveFixationCross._lerp_color((10, 200, 30), (110, 20, 230), 1.0) == (110, 20, 230) + assert InteractiveFixationCross._lerp_color((0, 0, 255), (255, 128, 0), 2.0) == (255, 128, 0) + assert InteractiveFixationCross._lerp_color((0, 0, 255), (255, 128, 0), -1.0) == (0, 0, 255) + + +def test_widget_draws_pressure_text(monkeypatch): + class FakeScreen: + width = 100 + height = 80 + + class FakeLine: + def __init__(self, start_point, end_point, thickness, color): + pass + + def draw(self): + pass + + class FakeText: + created = [] + + def __init__(self, text, font_size, color, dest_rect): + self.text = text + self.font_size = font_size + self.color = color + self.dest_rect = dest_rect + self.draw_count = 0 + self.created.append(self) + + def set_text(self, text): + self.text = text + + def set_dest_rect(self, dest_rect): + self.dest_rect = dest_rect + + def draw(self): + self.draw_count += 1 + + monkeypatch.setitem(sys.modules, "tachypy", types.SimpleNamespace(Line=FakeLine, Text=FakeText)) + widget = InteractiveFixationCross( + screen=FakeScreen(), + center=(50, 40), + half_width=10, + half_height=5, + show_pressure_text=True, + left_pressure_label="C", + right_pressure_label="Z", + pressure_text_width=40, + pressure_text_height=20, + pressure_text_gap=6, + ) + state = PressureFeedbackState( + PressureFeedbackConfig( + min_pressure_start=0.1, + max_pressure_start=0.4, + threshold=0.8, + ) + ) + state.update(0.0, 1.0, now=1.0) + + widget.update(state) + widget.draw() + + assert [text.text for text in FakeText.created] == ["C: 0.00", "Z: 1.00"] + assert FakeText.created[0].dest_rect == (0.0, 51.0, 40.0, 71.0) + assert FakeText.created[1].dest_rect == (60.0, 51.0, 100.0, 71.0) + assert all(text.draw_count == 1 for text in FakeText.created) + + +def test_widget_hides_pressure_text_when_pressure_is_ideal(monkeypatch): + class FakeScreen: + width = 100 + height = 80 + + class FakeLine: + def __init__(self, start_point, end_point, thickness, color): + pass + + def draw(self): + pass + + class FakeText: + created = [] + + def __init__(self, text, font_size, color, dest_rect): + self.created.append(self) + + monkeypatch.setitem(sys.modules, "tachypy", types.SimpleNamespace(Line=FakeLine, Text=FakeText)) + widget = InteractiveFixationCross( + screen=FakeScreen(), + center=(50, 40), + half_width=10, + half_height=5, + show_pressure_text=True, + ) + state = PressureFeedbackState( + PressureFeedbackConfig( + min_pressure_start=0.1, + max_pressure_start=0.4, + threshold=0.8, + ) + ) + state.update(0.2, 0.3, now=1.0) + + widget.update(state) + widget.draw() + + assert FakeText.created == [] + + +# Visual wait loop + +def _make_fake_acq(reader, hold_seconds=0.001): + """Build a minimal PressureSource-like object enriched with the visual mixin.""" + + class _FakeAcq(VisualPressureFeedbackMixin): + initialized = True + min_pressure_start = 0.33 + max_pressure_start = 0.66 + threshold = 0.8 + + def read_pressures(self, keys): + return reader(keys) + + acq = _FakeAcq() + acq.hold_seconds = hold_seconds + return acq + + +def test_visual_wait_does_not_show_pressure_text_by_default(monkeypatch): + class FakeScreen: + width = 100 + height = 80 + + def fill(self, color): + pass + + def flip(self): + pass + + class FakeLine: + def __init__(self, start_point, end_point, thickness, color): + pass + + def set_start_point(self, value): + pass + + def set_end_point(self, value): + pass + + def set_thickness(self, value): + pass + + def set_color(self, value): + pass + + def draw(self): + pass + + class FakeText: + created = [] + + def __init__(self, text, font_size, color, dest_rect): + self.text = text + self.created.append(self) + + def set_dest_rect(self, dest_rect): + pass + + def set_text(self, text): + self.text = text + + def draw(self): + pass + + monkeypatch.setitem(sys.modules, "tachypy", types.SimpleNamespace(Line=FakeLine, Text=FakeText)) + + reads = {"count": 0} + + def reader(keys): + reads["count"] += 1 + if reads["count"] == 1: + return {"c": 0.0, "z": 1.0} + return {"c": 0.5, "z": 0.5} + + acq = _make_fake_acq(reader) + + assert acq.wait_light_press_visual(screen=FakeScreen(), target_keys=["c", "z"]) + assert FakeText.created == [] + + +def test_visual_wait_rejects_auto_widget_args_when_widget_is_provided(): + class FakeScreen: + def fill(self, color): + pass + + def flip(self): + pass + + class FakeWidget: + def update(self, state): + pass + + def draw(self): + pass + + acq = _make_fake_acq(lambda keys: {str(k): 0.5 for k in keys}) + + with pytest.raises(ValueError, match="fixation_cross"): + acq.wait_light_press_visual( + screen=FakeScreen(), + target_keys=["c", "z"], + widget=FakeWidget(), + fixation_cross=object(), + ) + + +def test_visual_wait_rejects_non_drawable_overlay(): + class FakeScreen: + def fill(self, color): + pass + + def flip(self): + pass + + class FakeWidget: + def update(self, state): + pass + + def draw(self): + pass + + acq = _make_fake_acq(lambda keys: {str(k): 0.5 for k in keys}) + + with pytest.raises(AttributeError, match="draw"): + acq.wait_light_press_visual( + screen=FakeScreen(), + target_keys=["c", "z"], + widget=FakeWidget(), + overlay_drawables=[object()], + )