-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathexample_dyscom_write_csv.py
More file actions
128 lines (100 loc) · 5.35 KB
/
example_dyscom_write_csv.py
File metadata and controls
128 lines (100 loc) · 5.35 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
"""Example how to use dyscom and write values to a .csv file.
Because writing takes time, we handle it in a
background thread and synchronize measurement data vie Queue from main thread"""
import asyncio
from timeit import default_timer as timer
from science_mode_4 import DeviceI24
from science_mode_4 import Commands
from science_mode_4 import SerialPortConnection
from science_mode_4.dyscom.ads129x.ads129x_config_register_1 import Ads129xOutputDataRate, Ads129xPowerMode
from science_mode_4.dyscom.dyscom_get_operation_mode import PacketDyscomGetAckOperationMode
from science_mode_4.dyscom.dyscom_send_live_data import PacketDyscomSendLiveData
from science_mode_4.dyscom.dyscom_types import DyscomGetType, DyscomInitParams, DyscomPowerModulePowerType,\
DyscomPowerModuleType, DyscomSignalType
from science_mode_4.protocol.types import ResultAndError
from science_mode_4.utils.logger import logger
from examples.utils.example_utils import ExampleUtils
from examples.utils.csv_utils import CsvHelper
def main():
"""Main function"""
csv_helper = CsvHelper("values.csv", ["package_nr", "Channel 1", "Channel 2", "Channel 3", "Channel 4", "Channel 5", "time_delta"])
csv_helper.start()
async def device_communication() -> int:
"""Communication with science mode device"""
# disable logger to increase performance
logger().disabled = True
# get comport from command line argument
com_port = ExampleUtils.get_comport_from_commandline_argument()
# create serial port connection
connection = SerialPortConnection(com_port)
# open connection, now we can read and write data
connection.open()
# create science mode device
device = DeviceI24(connection)
# call initialize to get basic information (serial, versions) and stop any active stimulation/measurement
# to have a defined state
await device.initialize()
# get dyscom layer to call dyscom level commands
dyscom = device.get_layer_dyscom()
# call enable measurement power module for measurement
await dyscom.power_module(DyscomPowerModuleType.MEASUREMENT, DyscomPowerModulePowerType.SWITCH_ON)
# call init with 4k sample rate and enable signal types
init_params = DyscomInitParams()
init_params.signal_type = [DyscomSignalType.BI, DyscomSignalType.EMG_1,\
DyscomSignalType.EMG_2, DyscomSignalType.BREATHING]
init_params.register_map_ads129x.config_register_1.output_data_rate = Ads129xOutputDataRate.HR_MODE_4_KSPS__LP_MODE_2_KSPS
init_params.register_map_ads129x.config_register_1.power_mode = Ads129xPowerMode.HIGH_RESOLUTION
await dyscom.init(init_params)
# start dyscom measurement
await dyscom.start()
start_time = timer()
total_count = 0
# loop for some time
for x in range(1000):
# check operation mode from time to time, this function is not waiting for response
# so we have to handle it by ourself later
if x % 100 == 0:
dyscom.send_get_operation_mode()
live_data_counter = 0
while True:
# process all available packages
ack = dyscom.packet_buffer.get_packet_from_buffer(live_data_counter == 0)
if ack:
# because there are multiple get commands, we need to additionally check kind,
# which is always associated DyscomGetType
if ack.command == Commands.DL_GET_ACK and ack.kind == DyscomGetType.OPERATION_MODE:
om_ack: PacketDyscomGetAckOperationMode = ack
print(f"Operation mode {om_ack.operation_mode.name}")
# check if measurement is still active
if om_ack.result_error != ResultAndError.NO_ERROR:
break
elif ack.command == Commands.DL_SEND_LIVE_DATA:
live_data_counter += 1
total_count += 1
sld: PacketDyscomSendLiveData = ack
if sld.status_error:
print(f"SendLiveData status error {sld.samples}")
break
csv_helper.append_values(ack.number, [sld.samples[0].value, sld.samples[1].value,\
sld.samples[2].value, sld.samples[3].value,\
sld.samples[4].value], sld.time_offset)
else:
# print(f"Live data acknowledges per iteration {live_data_counter}")
break
# await asyncio.sleep(0.001)
# print stats
end_time = timer()
print(f"Samples: {total_count}, duration: {end_time - start_time}, sample rate: {total_count / (end_time - start_time)}")
# stop measurement
await dyscom.stop()
# turn power module off
await dyscom.power_module(DyscomPowerModuleType.MEASUREMENT, DyscomPowerModulePowerType.SWITCH_OFF)
# close serial port connection
connection.close()
# stop writing file
csv_helper.stop()
return 0
# start device communication
asyncio.run(device_communication())
if __name__ == "__main__":
main()