-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathgaze_source.py
More file actions
95 lines (71 loc) · 2.63 KB
/
gaze_source.py
File metadata and controls
95 lines (71 loc) · 2.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
import threading
import time
from typing import Optional
from gaze_event import GazeEvent
from blackboard import Blackboard
from eyetrax import GazeEstimator, run_9_point_calibration
import cv2
class GazeSource(threading.Thread):
"""
runs on own thread that continuously reads gaze data from gaze library
and publishes it to the Blackboard as GazeEvent objects
"""
def __init__(self, blackboard, poll_interval=0.03):
# daemon=True means thread exits when main exits
super().__init__(daemon=True)
self._poll_interval = poll_interval
self._blackboard = blackboard
# boolean to keep track of running status --> allows for a safe exit
self._running = False
self._estimator = None
self._cap = None
def start(self):
"""
start the gaze source thread
override start() only to set the _running flag before the thread begins
"""
self._running = True
super().start()
def stop(self):
"""
signal the thread to stop on the next loop iteratio
"""
self._running = False
def calibrate(self):
self._estimator = GazeEstimator()
run_9_point_calibration(self._estimator)
def run(self):
"""
thread loop:
- read a gaze sample from the gaze library
- wrap it in a GazeEvent
- push it to the Blackboard
- sleep for poll_interval
"""
# Save model
self._estimator.save_model("gaze_model.pkl")
# Load model
self._estimator.load_model("gaze_model.pkl")
self._cap = cv2.VideoCapture(0)
while self._running:
event = self._read_gaze_event()
if event is not None:
self._blackboard.set_current_gaze(event)
# avoid busy-waiting; control sampling rate
time.sleep(self._poll_interval)
def _read_gaze_event(self):
"""
read a single frame from the camera, run eyetrax on it,
and return a GazeEvent or None if no valid gaze
"""
ret, frame = self._cap.read()
features, blink = self._estimator.extract_features(frame)
# predict screen coordinates
if features is not None and not blink:
x, y = self._estimator.predict([features])[0]
norm_x = float(x) / float(self._blackboard.get_screen_width())
norm_y = float(y) / float(self._blackboard.get_screen_height())
#print(f"Gaze: ({x:.3f}, {y:.3f})")
ge = GazeEvent(x=norm_x, y=norm_y, timestamp=time.time())
return ge
return None