-
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcrank_processor.py
More file actions
118 lines (81 loc) · 2.77 KB
/
crank_processor.py
File metadata and controls
118 lines (81 loc) · 2.77 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
import csv
import time
import math
import logging
from PySide6.QtCore import QThread
from queue import Queue
from comm_protocol import CrankData,TelemetryOrigin,ProcessedDataMsg
class CrankProcessor(QThread):
''' Data from the crank: cadence, power '''
''' Calculated data: calories, speed, distance'''
def __init__(self, in_queue: Queue, out_queue: Queue):
super().__init__()
self.running = True
self.in_queue = in_queue
self.out_queue = out_queue
self.cadence = 0
self.power = 0
self.joules = 0
self.calories = 0
self.speed_ms = 0
self.speed = 0
self.distance = 0
self.last_power = 0
# Refazer pra pegar da queue de verdade (as variáveis fake não são necessárias, apenas setar power e cadence)
# Retornar True se tiver dados novos, False caso contrário
def get_from_queue(self) -> bool:
try:
data = self.in_queue.get(block=True,timeout=0.2)
self.power = data.power
self.cadence = data.cadence
return True
except:
return False
def calculate_data(self):
# Delta energy
d_energy = (self.power + self.last_power) / 2
#if not self.cadence:
# return
time = 10
# Total energy
self.joules += d_energy * time
# In kcal
self.calories = self.joules * 0.23900574 / 1000
wheel_radius = 0.35 + 0.028 # 700 millimiter + tire radius
wheel_circunference = wheel_radius * 3.1415 * 2 # Full rotation
gear_ratio = 44 / 14
pedal_rotations = self.cadence / 10
distance = pedal_rotations * gear_ratio * wheel_circunference
# In meters
self.distance += distance
self.speed_ms = distance / 10
# In km/h
self.speed = self.speed_ms * 3.6
self.last_power = self.power
data = CrankData(self.power, self.cadence, self.joules, self.calories, self.speed_ms ,self.speed, self.distance)
Telemetry = ProcessedDataMsg(TelemetryOrigin.CRANK, data)
self.out_queue.put(Telemetry)
#print(Telemetry)
#print(self.calories)
#print(average)
def run(self):
while(self.running):
if self.get_from_queue():
self.calculate_data()
# time.sleep(0.4) # Precisaria andar a uns 70++kmh para passar disso
def stop(self):
self.running = False
if __name__ == "__main__":
in_queue = Queue()
out_queue = Queue()
obj = DataCalculator(in_queue, out_queue)
obj.start()
time.sleep(5)
obj.stop()
obj.wait()
'''
# Test
for i in range(2700):
obj.get_from_queue()
obj.calculate_data()
'''