-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathrx_logger.py
More file actions
82 lines (68 loc) · 2.78 KB
/
rx_logger.py
File metadata and controls
82 lines (68 loc) · 2.78 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import serial
import csv
import time
import base64
import argparse
# Arguments
parser = argparse.ArgumentParser(
prog='RxLogger',
description='Logs data packets to CSV file')
parser.add_argument('-p', '--port', type=str, required=True)
parser.add_argument('-f', '--file', type=str, required=True)
args = parser.parse_args()
if not args.file.endswith(".csv"):
print('Usage: python3 rx_logger -p <PORT> -f <CSV_FILE>;\ninvalid file extension')
exit()
# CONFIGURATION
SERIAL_PORT = args.port
BAUD_RATE = 115200
OUTPUT_FILE = args.file
DELIMITER = ';' # Must match the C code
CSV_HEADERS = [
"Millis",
"AHT_Temp", "AHT_Hum",
"BMP_Temp", "BMP_Press",
"Accel_X", "Accel_Y", "Accel_Z",
"Rot_X", "Rot_Y", "Rot_Z",
"SCD_CO2", "SCD_Temp", "SCD_Hum",
"GPS_Lat", "GPS_Lon", "GPS_Time"
]
def run_logger():
try:
ser = serial.Serial(SERIAL_PORT, BAUD_RATE, timeout=1)
print(f"Connected to {SERIAL_PORT}...")
except Exception as e:
print(f"Error: {e}")
return
# Open CSV with the specified delimiter
with open(OUTPUT_FILE, 'a', newline='') as f:
# csv.writer needs to know we are using semicolons
writer = csv.writer(f, delimiter=DELIMITER)
if f.tell() == 0:
writer.writerow(CSV_HEADERS)
while True:
if ser.in_waiting > 0:
try:
line = ser.readline().decode('utf-8', errors='ignore').strip()
if line.startswith("DATA_PKT"):
# Remove prefix and split by SEMICOLON
clean_line = line.replace("DATA_PKT;", "")
data_values = clean_line.split(DELIMITER)
if len(data_values) == len(CSV_HEADERS):
writer.writerow(data_values)
f.flush()
print(f"[DATA] Saved: {data_values[0]} | GPS: {data_values[-1]}")
else:
print(f"[WARN] Column mismatch. Got {len(data_values)}, expected {len(CSV_HEADERS)}")
elif line.startswith('JPEG_PKT;'):
parts = line.split(';', 2)
size = int(parts[1])
jpeg_data = base64.b64decode(parts[2])
with open(f'image_{int(time.time())}.jpg', 'wb') as img_f:
img_f.write(jpeg_data)
else:
print(f"[ESP32] {line}")
except Exception as e:
print(f"Error: {e}")
if __name__ == "__main__":
run_logger()