-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathHR_reader.py
More file actions
203 lines (178 loc) · 7.91 KB
/
HR_reader.py
File metadata and controls
203 lines (178 loc) · 7.91 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
# Hear beat reader synchronised to a specific device whose mac address has been
# hardcoded in.
# This device is a polarOH heart rate reader. It broadcasts the heartrate notification
# when the notifications are switched on.
# Unlike some other device(e.g. Hexiwear) which just broadcasts all values of interest at
# all time this needs to be switched on for each value of interest
import pexpect
import threading
import time
DEVICE = 'A0:9E:1A:25:71:5C' # Mac address of the device
class HeartBeat_BLE(threading.Thread):
def __init__(self, logger=None):
"""
Initialise the gattool. Note we need to be on a Unix based system
@param logger: If logging is required then a python logger needs to be passed to it
"""
# This implements threading so super class thread will need initialising
super(self.__class__, self).__init__()
print("Run gatttool...")
# The gattHandle is the pipe to the gatttool which speaks to the device
self.gattHandle = pexpect.spawn("gatttool -I")
self.stop_thread = False
self.listen = False
self.heartRate = 60 # This value is updated with new measured heart rate
self.logger = logger
def hexStrToInt(self, hexstr):
"""
Function to transform hexadecimal string like "cd" i.e.0xCD into signed integer
"""
val = int(hexstr[4],16) + (int(hexstr[3],16)<<4)
return val
def stop(self):
"""
Cleaning up and stopping thread
"""
self.stop_thread = True
time.sleep(0.5)
# switch of notifications since we will no longer be listening to them
self.switch_HR_notifications(False)
# Close the gatthandle
self.gattHandle.close(force=True)
self.gattHandle = None
self.heartRate = -1
def connect(self):
"""
Attempts to connect to the BLE measurement tool via the gattTool. The device needs to be connected to
before starting the thread
@return index: This value is 1 when there is a successful connection.
"""
child = self.gattHandle
index = -1
# Connect to the device.
print("Connecting to "),
print(DEVICE),
if self.logger:
self.logger.info("Trying to connect to %s" %DEVICE)
child.sendline("connect {0}".format(DEVICE))
try:
# Look out for success or error in connection
index = child.expect(["connect error", "Connection successful"], timeout=10)
if index > 0:
print(" Connected!")
if self.logger:
self.logger.info("Connected to %s" %DEVICE)
else:
print("Error connecting")
if self.logger:
self.logger.info("Error Connecting to %s" %DEVICE)
except pexpect.TIMEOUT:
print("Timed out, device not responding to connection request. Please check device")
if self.logger:
self.logger.info("Timed out, device not responding to connection request. Please check device")
return index
def disconnect(self):
"""
Attempts to disconnect from the BLE measurement tool. BLE devices can only be connected
to one other device at a time. So it must be disconnected
"""
child = self.gattHandle
child.sendline("disconnect {0}".format(DEVICE))
# As long as we have sent the command out we are not too fussy about what device sends back
# it may already have disconnected
index = child.expect([pexpect.TIMEOUT, pexpect.EOF], timeout=5)
return index
def switch_HR_notifications(self, switchOn=True):
"""
Switching heart rate notification on/off
@param switchOn: bool to indicated whether notification should be on
"""
child = self.gattHandle
if switchOn:
# polarOH specific heart rate notification switch on command
child.sendline("char-write-req 0x0026 0100")
else:
# polarOH specific heart rate notification switch off command
child.sendline("char-write-req 0x0026 0000")
# As long as it was written successfully we should have hear-rate brodcasting
# switched on/off as needed
child.expect("Characteristic value was written successfully", timeout=10)
self.listen = switchOn
def monitor_hr_notification(self):
"""
Monitors heart-rate if the notification was switched on
"""
child = self.gattHandle
# If notifications are on
if self.listen:
# Listen for heart rate notification channel for values
child.expect("Notification handle = 0x0025 value: ", timeout=30)
child.expect("\r\n", timeout=30)
# isolate the part that gives the heart rate value
hr_string = child.before.strip()
# hear rate values come in as hexadecimal strings reversed so
# convert it to int
hr = self.hexStrToInt(hr_string)
if not self.heartRate == hr:
#Log and update if there is a change of heart rate
if self.logger:
self.logger.info("Heart rate change: %d to %d" % (self.heartRate, hr))
self.heartRate = hr
else:
# print current heart rate
# print("Heart rate read: %s" % hr)
if self.logger:
self.logger.info("Heart rate read: %d" % hr)
def run(self):
"""
Thread to constant monitor heart rate
Some times device disconnects. We will retry connecting in that case
But some reasonable number of times just in case the monitor has died
i.e. signal to weak, not picking up, battery dead.
"""
max_retry = 11
while not self.stop_thread:
try:
# read monitor
self.monitor_hr_notification()
# reset retries
max_retry = 11
except pexpect.TIMEOUT:
if self.stop_thread:
#We are to stop so do nothing
break
if max_retry <= 1:
print("Heart rate monitor not responding... ")
if self.logger:
self.logger.info("Heart rate monitor not responding... ")
self.stop()
else:
try:
max_retry -= 1
print("Heart rate notification error. Retry %d times" % max_retry)
if self.logger:
self.logger.info("Heart rate notification error. Retry %d times" % max_retry)
self.connect()
self.switch_HR_notifications(True)
except pexpect.TIMEOUT:
pass
except Exception:
# We are getting some errors when stopping the thread which we don't care about
# if we are stopping
if self.stop_thread:
#We are to stop so do nothing
break
else:
raise
if __name__ =='__main__':
hr_listener = None
try:
hr_listener = HeartBeat_BLE()
hr_listener.connect()
hr_listener.switch_HR_notifications(True)
hr_listener.start()
time.sleep(120)
finally:
print(" final stop")
if hr_listener:
hr_listener.stop()