-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathapp.py
More file actions
251 lines (211 loc) · 8.63 KB
/
app.py
File metadata and controls
251 lines (211 loc) · 8.63 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
#!/usr/bin/env python3
### BEN IF YOU DELETE THE SHEBANG ONE MORE TIME I'M GONNA GET YOU (I KNOW WHERE YOU SLEEP)
## Import everything ##
from flask_socketio import SocketIO
from flask import Flask, render_template
from threading import Thread, Event
from data_streamer import DataStreamer
from firebase_communicator import FirebaseCommunicator
from data_classifier import DataClassifier
from const import *
import numpy as np
## Initialize some stuff ##
# Flask setup
__author__ = 'Ben Falkenburg'
app = Flask(__name__)
app.config['SECRET_KEY'] = 'secret!'
app.config['DEBUG'] = True
# Turn the flask app into a socketio app
socketio = SocketIO(app, cors_allowed_origins="*", async_mode=None, logger=True, engineio_logger=True)
# Create thread
thread = Thread()
thread_stop_event = Event()
# This thingy streams data from the headset. More info in data_streamer.py
streamer = DataStreamer()
# This thingy communicates with our database to fetch and deposit data. More info in firebase_communication.py
firebase_comm = FirebaseCommunicator()
# This thingy classifies our data and shit. More info in data_classifier.py
classifier = DataClassifier(firebase_comm=firebase_comm)
# Keeps track of page
namespace = '/test'
## Functions that facilitate data streaming and database interaction ##
# If we have enough data to chunk out and add to our database, let's do so
def add_data_streamed_at_current_time():
global streamer, firebase_comm
# If currently recording left motion, record into our left motion category
if streamer.recording_class == "LEFT":
firebase_comm.add_data_to_left_motion_recordings(streamer.left_motion_file_count, streamer.all_c3_data, streamer.all_c4_data)
streamer.left_motion_file_count += 1
firebase_comm.update_left_motion_file_count(streamer.left_motion_file_count)
# If currently recording right motion, record into our right motion category
elif streamer.recording_class == "RIGHT":
firebase_comm.add_data_to_right_motion_recordings(streamer.right_motion_file_count, streamer.all_c3_data, streamer.all_c4_data)
streamer.right_motion_file_count += 1
firebase_comm.update_right_motion_file_count(streamer.right_motion_file_count)
def initialize_analytics_chart():
global streamer, firebase_comm
# Get the file counts (metadata) just because it's good to display it right off the bat
streamer.left_motion_file_count = firebase_comm.get_left_motion_file_count()
streamer.right_motion_file_count = firebase_comm.get_right_motion_file_count()
# Update the count in the analytics chart
def update_analytics_chart():
global streamer, firebase_comm
if streamer.left_motion_file_count > 0:
firebase_comm.update_left_motion_file_count(streamer.left_motion_file_count)
if streamer.right_motion_file_count > 0:
firebase_comm.update_right_motion_file_count(streamer.right_motion_file_count)
# Downsample our signal for the graph so it's more efficient
def downsample_data(data):
downsampled_data = []
samples = 10
step_size = int(len(data)/samples)
try:
for i in range(0, len(data), step_size):
downsampled_data.append(data[i])
except:
return downsampled_data
return downsampled_data
# Send all necessary info to the main page/graphs
def send_data(c3_data=[], c4_data=[], direction_to_move=''):
global namespace
socketio.emit('new_test_data', {
'c3_data': c3_data,
'c4_data': c4_data,
'direction_to_move': direction_to_move,
'window_size': WINDOW_SIZE}, namespace='/test')
socketio.emit('new_train_data', {
'c3_data': c3_data,
'c4_data': c4_data,
'graph_frozen': False,
'left_motion_file_count': streamer.left_motion_file_count,
'right_motion_file_count': streamer.right_motion_file_count,
'window_size': WINDOW_SIZE}, namespace='/training')
# Basically looks at which value is greater in magnitude. We interpret the greater value as a command to move our avatar a certain direction
def process_prediction(prediction):
processed_prediction = int(np.argwhere(prediction[0] == np.max(prediction[0]))[0][0])
if processed_prediction == 0:
return "left"
else:
return "right"
# Decides whether to send data to be stored in our database, or to feed it into a neural network for testing. Then, we send the data to our webpage
def process_data(c3_data=[], c4_data=[]):
global streamer, classifier, namespace
direction_to_move = ''
if streamer.is_recording_training_data and streamer.current_time > 0 and streamer.current_time >= DATA_CHUNK_SIZE:
streamer.current_time = 0
add_data_streamed_at_current_time()
elif streamer.is_streaming_testing_data and streamer.current_time > DATA_CHUNK_SIZE:
prediction = classifier.classify_input(streamer.all_c3_data, streamer.all_c4_data)
print(prediction)
direction_to_move = process_prediction(prediction)
send_data(c3_data, c4_data, direction_to_move)
# Our website's bread and butter. Initializes the charts and webpage, then collects data until the page is closed
def eeg_processor():
global streamer, counter_of_thing
initialize_analytics_chart()
send_data()
while not thread_stop_event.isSet():
# Collect eeg data
if streamer.is_recording_training_data or streamer.is_streaming_testing_data:
c3_data, c4_data = streamer.get_current_data()
#print(len(streamer.all_c3_data))
c3_data = downsample_data(c3_data)
c4_data = downsample_data(c4_data)
process_data(c3_data, c4_data)
elif namespace == '/test':
process_data()
socketio.sleep(0.25)
update_analytics_chart()
print("The data stream has ended")
## Page rendering functions ##
@app.route('/')
def home():
global streamer, namespace
namespace = '/test'
send_data()
return render_template('home.html')
@app.route("/training")
def training():
global streamer, namespace
namespace = '/training'
send_data()
return render_template("training.html")
@app.route("/about")
def about():
return render_template("about.html")
@app.route("/contact")
def contact():
return render_template("contact.html")
@app.route("/navbar")
def navbar():
return render_template("navbar.html")
@app.route("/footer")
def footer():
return render_template("footer.html")
@app.route("/networkmodal")
def networkmodal():
return render_template("networkmodal.html")
## Button-triggered functions ##
# Set all variables to begin collecting eeg data for left motion
@app.route('/start_recording_left_motion')
def start_recording_left_motion():
global streamer, namespace
namespace = '/training'
streamer.is_recording_training_data = True
streamer.is_streaming_testing_data = False
streamer.recording_class = "LEFT"
return "200"
# Set all variables to begin collecting eeg data for right motion
@app.route('/start_recording_right_motion')
def start_recording_right_motion():
global streamer, namespace
namespace = '/training'
streamer.is_recording_training_data = True
streamer.is_streaming_testing_data = False
streamer.recording_class = "RIGHT"
return "200"
# Don't record from any recording class, and don't stream any data
@app.route('/stop_recording')
def stop_recording():
global streamer
update_analytics_chart()
streamer.is_recording_training_data = False
streamer.is_streaming_testing_data = False
streamer.recording_class = "OFF"
return "200"
# Create a neural network with the data collected
@app.route('/create_network')
def create_network():
global streamer
data, labels = classifier.build_data()
if data is not None:
classifier.train_network(data, labels)
print("Network created!")
else:
print("Unable to create network")
return "200"
# Start streaming data from the headset, with no intention of recording it. Data will be fed into neural network
@app.route('/start_streaming')
def start_streaming():
global streamer, namespace
namespace = '/test'
streamer.is_recording_training_data = False
streamer.is_streaming_testing_data = True
return "200"
## Threading functions ##
# Actions to take when the website loads (start thread)
@socketio.on('connect')
def test_connect():
global thread
print('Client connected')
send_data()
if not thread.is_alive():
print("Starting Thread")
thread = socketio.start_background_task(eeg_processor)
# Actions to take when the thread ends
@socketio.on('disconnect')
def test_disconnect():
print('Client disconnected')
## Run the thing ##
if __name__ == '__main__':
socketio.run(app)