|
| 1 | +#!/usr/bin/env python3 |
| 2 | + |
| 3 | +import argparse |
| 4 | +import serial |
| 5 | +import time |
| 6 | +import re |
| 7 | +import sys |
| 8 | +from pyproj import Transformer |
| 9 | + |
| 10 | +# Terminal colour codes |
| 11 | +class Colour: |
| 12 | + HEADER = '\033[95m' |
| 13 | + OKBLUE = '\033[94m' |
| 14 | + OKGREEN = '\033[92m' |
| 15 | + WARNING = '\033[93m' |
| 16 | + FAIL = '\033[91m' |
| 17 | + ENDC = '\033[0m' |
| 18 | + BOLD = '\033[1m' |
| 19 | + UNDERLINE = '\033[4m' |
| 20 | + RED = '\033[91m' |
| 21 | + YELLOW = '\033[93m' |
| 22 | + GREEN = '\033[92m' |
| 23 | + |
| 24 | +# Accuracy thresholds (constants for easier adjustment) |
| 25 | +ACCURACY_THRESHOLD_1M = 1.0 |
| 26 | +ACCURACY_THRESHOLD_10CM = 0.1 |
| 27 | + |
| 28 | +# Common baud rates to try for auto-detection |
| 29 | +BAUD_RATES = [9600, 19200, 38400, 57600, 115200, 921600] |
| 30 | + |
| 31 | +def calculate_nmea_checksum(nmea_sentence: str) -> str: |
| 32 | + checksum = 0 |
| 33 | + for char in nmea_sentence[1:]: |
| 34 | + checksum ^= ord(char) |
| 35 | + return f"{nmea_sentence}*{checksum:02X}" |
| 36 | + |
| 37 | +def append_checksum_if_missing(nmea_sentence: str) -> str: |
| 38 | + if '*' not in nmea_sentence: |
| 39 | + return calculate_nmea_checksum(nmea_sentence) |
| 40 | + return nmea_sentence |
| 41 | + |
| 42 | +def read_gps_messages(port: str, speed: int, timeout: int, verbose: bool = False): |
| 43 | + try: |
| 44 | + with serial.Serial(port, baudrate=speed, timeout=timeout) as ser: |
| 45 | + while True: |
| 46 | + response = ser.readline().decode('ascii', errors='ignore').strip() |
| 47 | + if response and response.startswith('$'): |
| 48 | + if verbose: |
| 49 | + print(f"{Colour.OKBLUE}Received message: {response}{Colour.ENDC}") |
| 50 | + yield response |
| 51 | + except serial.SerialException as e: |
| 52 | + print(f"Error opening serial port: {e}") |
| 53 | + |
| 54 | +def ecef_to_geodetic(x: float, y: float, z: float) -> tuple: |
| 55 | + transformer = Transformer.from_crs("EPSG:4978", "EPSG:4326", always_xy=True) |
| 56 | + lon, lat, alt = transformer.transform(x, y, z) |
| 57 | + return lat, lon, alt |
| 58 | + |
| 59 | +def colour_diff(current: str, previous: str) -> str: |
| 60 | + """ |
| 61 | + Compare digits in the current and previous strings. Mark changed digits in red. |
| 62 | + """ |
| 63 | + result = [] |
| 64 | + change_detected = False |
| 65 | + for c, p in zip(current, previous): |
| 66 | + if c.isdigit() and p.isdigit(): |
| 67 | + if change_detected or c != p: |
| 68 | + result.append(f"{Colour.RED}{c}{Colour.ENDC}") |
| 69 | + change_detected = True |
| 70 | + else: |
| 71 | + result.append(f"{Colour.GREEN}{c}{Colour.ENDC}") |
| 72 | + else: |
| 73 | + result.append(c) |
| 74 | + change_detected = False |
| 75 | + if len(current) > len(previous): |
| 76 | + for c in current[len(previous):]: |
| 77 | + result.append(f"{Colour.RED}{c}{Colour.ENDC}" if c.isdigit() else c) |
| 78 | + return ''.join(result) |
| 79 | + |
| 80 | +def redraw_terminal(lines: list, reset_cursor: bool): |
| 81 | + """ |
| 82 | + Redraw lines in the terminal. If reset_cursor is True, reset cursor position for updates. |
| 83 | + If False, simply print lines without resetting the cursor. |
| 84 | + """ |
| 85 | + if sys.stdout.isatty(): |
| 86 | + if reset_cursor: |
| 87 | + sys.stdout.write('\033[F' * len(lines)) # Reset the cursor for updates |
| 88 | + for line in lines: |
| 89 | + sys.stdout.write('\033[K' + line + '\n') # Print each line |
| 90 | + |
| 91 | +def colourise_accuracy(accuracy: float) -> str: |
| 92 | + """Colour accuracy based on its value.""" |
| 93 | + if accuracy > ACCURACY_THRESHOLD_1M: |
| 94 | + return f"{Colour.RED}{accuracy:.2f}{Colour.ENDC} metres" |
| 95 | + elif accuracy >= ACCURACY_THRESHOLD_10CM: |
| 96 | + return f"{Colour.YELLOW}{accuracy:.2f}{Colour.ENDC} metres" |
| 97 | + else: |
| 98 | + return f"{Colour.GREEN}{accuracy:.2f}{Colour.ENDC} metres" |
| 99 | + |
| 100 | +def start_survey_in(port: str, speed: int, timeout: int, min_dur: int, acc_limit: float, verbose: bool): |
| 101 | + command = f"$PQTMCFGSVIN,W,1,{min_dur},{acc_limit},0.0,0.0,0.0" |
| 102 | + send_nmea_command(port, speed, command, timeout, verbose) |
| 103 | + |
| 104 | + print(Colour.HEADER + Colour.BOLD + "Starting survey-in..." + Colour.ENDC) |
| 105 | + |
| 106 | + start_time = time.time() |
| 107 | + prev_ecef = None |
| 108 | + prev_geo = None |
| 109 | + prev_latlon = None # Cache to avoid unnecessary recalculations |
| 110 | + reset_cursor = False # Track whether to reset the cursor |
| 111 | + |
| 112 | + for response in read_gps_messages(port, speed, timeout, verbose): |
| 113 | + match = re.match(r"\$PQTMSVINSTATUS,\d+,\d+,(\d+),,\d+,\d+,\d+,(-?\d+\.\d+),(-?\d+\.\d+),(-?\d+\.\d+),(\d+\.\d+)\*\w+", response) |
| 114 | + if match: |
| 115 | + valid_flag = int(match.group(1)) |
| 116 | + mean_x, mean_y, mean_z = map(float, match.group(2, 3, 4)) |
| 117 | + mean_acc = float(match.group(5)) |
| 118 | + obs_count = int(response.split(',')[6]) |
| 119 | + elapsed_time = int(time.time() - start_time) |
| 120 | + remaining_time = max(0, int(min_dur - elapsed_time)) |
| 121 | + |
| 122 | + # Convert ECEF to geodetic only if ECEF coordinates have changed |
| 123 | + if prev_ecef != (mean_x, mean_y, mean_z): |
| 124 | + lat, lon, alt = ecef_to_geodetic(mean_x, mean_y, mean_z) |
| 125 | + prev_latlon = f"Lat={lat:.7f}, Lon={lon:.7f}, Alt={alt:.3f}" |
| 126 | + |
| 127 | + current_ecef = f"X={mean_x:.4f}, Y={mean_y:.4f}, Z={mean_z:.4f}" |
| 128 | + |
| 129 | + if prev_ecef: |
| 130 | + colourised_ecef = colour_diff(current_ecef, f"X={prev_ecef[0]:.4f}, Y={prev_ecef[1]:.4f}, Z={prev_ecef[2]:.4f}") |
| 131 | + colourised_geo = colour_diff(prev_latlon, prev_geo) |
| 132 | + else: |
| 133 | + colourised_ecef = current_ecef |
| 134 | + colourised_geo = prev_latlon |
| 135 | + |
| 136 | + prev_ecef = (mean_x, mean_y, mean_z) |
| 137 | + prev_geo = prev_latlon |
| 138 | + |
| 139 | + coloured_accuracy = colourise_accuracy(mean_acc) |
| 140 | + |
| 141 | + if valid_flag == 2: |
| 142 | + print(Colour.OKGREEN + "Survey-in complete." + Colour.ENDC) |
| 143 | + print(f"Final {Colour.BOLD}Accuracy{Colour.ENDC}: {coloured_accuracy}") |
| 144 | + print(f"Final {Colour.BOLD}ECEF{Colour.ENDC}: {current_ecef}") |
| 145 | + print(f"Final {Colour.BOLD}Geodetic{Colour.ENDC}: {prev_latlon}") |
| 146 | + break |
| 147 | + elif valid_flag == 1: |
| 148 | + lines_to_display = [ |
| 149 | + f"{Colour.WARNING}Survey-in in progress{Colour.ENDC}: {Colour.BOLD}Elapsed{Colour.ENDC}: {elapsed_time} seconds, {Colour.BOLD}Remaining{Colour.ENDC}: {remaining_time} seconds, {Colour.BOLD}Accuracy{Colour.ENDC}: {coloured_accuracy}, {Colour.BOLD}Observations{Colour.ENDC}: {obs_count}", |
| 150 | + f"{Colour.HEADER}{Colour.BOLD}ECEF{Colour.ENDC}: {colourised_ecef}", |
| 151 | + f"{Colour.HEADER}{Colour.BOLD}Geodetic{Colour.ENDC}: {colourised_geo}" |
| 152 | + ] |
| 153 | + redraw_terminal(lines_to_display, reset_cursor) |
| 154 | + reset_cursor = True # Reset cursor for future updates |
| 155 | + time.sleep(1) |
| 156 | + |
| 157 | +def send_nmea_command(port: str, speed: int, nmea_command: str, timeout: int, verbose: bool = False) -> str: |
| 158 | + try: |
| 159 | + with serial.Serial(port, baudrate=speed, timeout=timeout) as ser: |
| 160 | + nmea_command_with_checksum = append_checksum_if_missing(nmea_command) |
| 161 | + ser.write((nmea_command_with_checksum + '\r\n').encode('ascii')) |
| 162 | + if verbose: |
| 163 | + print(f"Sent command: {Colour.OKBLUE}{nmea_command_with_checksum}{Colour.ENDC}") |
| 164 | + |
| 165 | + response = ser.readline().decode('ascii', errors='ignore').strip() |
| 166 | + if response: |
| 167 | + if verbose: |
| 168 | + print(f"Received response: {Colour.OKBLUE}{response}{Colour.ENDC}") |
| 169 | + return response |
| 170 | + except serial.SerialException as e: |
| 171 | + print(f"Error opening serial port: {e}") |
| 172 | + |
| 173 | +def detect_speed(port: str, timeout: int, verbose: bool = False) -> int: |
| 174 | + command = "$PQTMVERNO" |
| 175 | + command_with_checksum = append_checksum_if_missing(command) |
| 176 | + for speed in BAUD_RATES: |
| 177 | + if verbose: |
| 178 | + print(f"Trying baud rate {speed}...") |
| 179 | + try: |
| 180 | + with serial.Serial(port, baudrate=speed, timeout=timeout) as ser: |
| 181 | + ser.write((command_with_checksum + '\r\n').encode('ascii')) |
| 182 | + if verbose: |
| 183 | + print(f"Sent command: {Colour.OKBLUE}{command_with_checksum}{Colour.ENDC}") |
| 184 | + response = ser.readline().decode('ascii', errors='ignore').strip() |
| 185 | + if response.startswith("$PQTMVERNO"): |
| 186 | + if verbose: |
| 187 | + print(f"{Colour.OKGREEN}Received response at {speed} baud: {response}{Colour.ENDC}") |
| 188 | + return speed |
| 189 | + except serial.SerialException: |
| 190 | + if verbose: |
| 191 | + print(f"{Colour.FAIL}Failed to open serial port at {speed} baud.{Colour.ENDC}") |
| 192 | + raise Exception("Failed to detect baud rate. No valid response for PQTMVERNO command.") |
| 193 | + |
| 194 | +def disable_survey_in(port: str, speed: int, timeout: int, verbose: bool = False): |
| 195 | + command = "$PQTMCFGSVIN,W,0,0,0.0,0.0,0.0,0.0" |
| 196 | + send_nmea_command(port, speed, command, timeout, verbose) |
| 197 | + print(Colour.OKGREEN + "Survey-in disabled." + Colour.ENDC) |
| 198 | + |
| 199 | +def set_fixed_mode(port: str, speed: int, ecef_x: float, ecef_y: float, ecef_z: float, timeout: int, verbose: bool = False): |
| 200 | + command = f"$PQTMCFGSVIN,W,2,0,0.0,{ecef_x},{ecef_y},{ecef_z}" |
| 201 | + response = send_nmea_command(port, speed, command, timeout, verbose) |
| 202 | + if response and "OK" in response: |
| 203 | + print(Colour.OKGREEN + "Fixed mode set successfully." + Colour.ENDC) |
| 204 | + else: |
| 205 | + print(Colour.FAIL + "Failed to set fixed mode." + Colour.ENDC) |
| 206 | + |
| 207 | +if __name__ == "__main__": |
| 208 | + parser = argparse.ArgumentParser(description="Survey-in and Fixed mode tool for Quectel LC29H-BS GPS module.") |
| 209 | + parser.add_argument('port', type=str, help='Serial port to use (e.g., /dev/ttyUSB0 or COM3)') |
| 210 | + parser.add_argument('--timeout', type=int, default=3, help='Timeout in seconds for GPS response (default: 3 seconds)') |
| 211 | + parser.add_argument('--speed', type=int, help='Baud rate (e.g., 9600). If not provided, the script will attempt to detect the speed.') |
| 212 | + parser.add_argument('--mode', type=str, choices=['survey', 'fixed', 'disable'], required=True, help="Select mode: 'survey', 'fixed', or 'disable'") |
| 213 | + parser.add_argument('--ecef', nargs=3, type=float, help="ECEF coordinates (X Y Z) for fixed mode") |
| 214 | + parser.add_argument('--min-dur', type=int, default=86400, help="Minimum duration for survey-in mode (default: 86400 seconds / 1 day)") |
| 215 | + parser.add_argument('--acc-limit', type=float, default=15.0, help="Accuracy limit for survey-in mode in metres (default: 15 metres)") |
| 216 | + parser.add_argument('--verbose', action='store_true', help='Enable verbose output') |
| 217 | + |
| 218 | + args = parser.parse_args() |
| 219 | + |
| 220 | + if args.speed: |
| 221 | + speed = args.speed |
| 222 | + else: |
| 223 | + speed = detect_speed(args.port, args.timeout, args.verbose) |
| 224 | + print(f"Detected speed: {speed} baud") |
| 225 | + |
| 226 | + if args.mode == 'disable': |
| 227 | + disable_survey_in(args.port, speed, args.timeout, args.verbose) |
| 228 | + elif args.mode == 'survey': |
| 229 | + start_survey_in(args.port, speed, args.timeout, args.min_dur, args.acc_limit, args.verbose) |
| 230 | + elif args.mode == 'fixed': |
| 231 | + if not args.ecef: |
| 232 | + print(Colour.FAIL + "Error: You must provide ECEF coordinates for fixed mode." + Colour.ENDC) |
| 233 | + exit(1) |
| 234 | + ecef_x, ecef_y, ecef_z = args.ecef |
| 235 | + set_fixed_mode(args.port, speed, ecef_x, ecef_y, ecef_z, args.timeout, args.verbose) |
| 236 | + |
0 commit comments