From 3beade54dfc48a52419e53bb8ae57daa54b50eec Mon Sep 17 00:00:00 2001 From: Aleksandr Savin Date: Fri, 12 Sep 2025 13:36:58 +0200 Subject: [PATCH 1/3] fix(autopa_v2.py): Update to v2.8 NINA integration error detect in minutes --- AutoPA/Software/source/autopa_v2.py | 1263 ++++++++++++++------------- 1 file changed, 632 insertions(+), 631 deletions(-) diff --git a/AutoPA/Software/source/autopa_v2.py b/AutoPA/Software/source/autopa_v2.py index beb5221..a802f79 100644 --- a/AutoPA/Software/source/autopa_v2.py +++ b/AutoPA/Software/source/autopa_v2.py @@ -1,631 +1,632 @@ -# -*- coding: utf-8 -*- - -# Form implementation generated from reading ui file 'untitled.ui' -# -# Created by: PyQt5 UI code generator 5.15.4 -# -# WARNING: Any manual changes made to this file will be lost when pyuic5 is -# run again. Do not edit this file unless you know what you are doing. - -import glob -import re -from datetime import datetime, date, timedelta -import json -import math -from PyQt5 import QtCore, QtGui, QtWidgets -import sys, os -import collections -import logging -from pathlib import Path -from PyQt5.QtCore import QObject, QThread, pyqtSignal - -# Application version -VERSION = "2.7" - -class ElapsedTimeFormatter(logging.Formatter): - def __init__(self, start_time): - super().__init__('%(asctime)s - %(elapsed)s - %(levelname)s - %(message)s') - self.start_time = start_time - self.datefmt = '%Y-%m-%d %H:%M:%S.%f'[:-3] # Format: YYYY-MM-DD HH:MM:SS.mmm - - def formatTime(self, record, datefmt=None): - if datefmt: - # Format absolute timestamp - return datetime.fromtimestamp(record.created).strftime(datefmt) - else: - # Calculate elapsed time - elapsed = datetime.now() - self.start_time - # Convert to HH:mm:ss.s format - hours = int(elapsed.total_seconds() // 3600) - minutes = int((elapsed.total_seconds() % 3600) // 60) - seconds = elapsed.total_seconds() % 60 - return f"{hours:02d}:{minutes:02d}:{seconds:05.1f}" - - def format(self, record): - # Add elapsed time to the message - record.elapsed = self.formatTime(record) - # Format the absolute timestamp - record.asctime = self.formatTime(record, self.datefmt) - return super().format(record) - -class QTextEditLogger(logging.Handler): - def __init__(self, parent): - super().__init__() - self.widget = QtWidgets.QPlainTextEdit(parent) - self.widget.setReadOnly(True) - - def emit(self, record): - msg = self.format(record) - self.widget.appendPlainText(msg) - -class DuplicateFilter(object): - def __init__(self): - self.msgs = collections.deque(maxlen=3) - - def filter(self, record): - rv = record.msg not in self.msgs - self.msgs.append(record.msg) - return rv - -class CommandWorker(QObject): - commandFinished = pyqtSignal(str, str) # (label, result) - finished = pyqtSignal() - - def __init__(self, parent, commands): - super().__init__() - self.parent = parent - self.commands = commands - - def run(self): - for label, command in self.commands: - result = self.parent.sendCommand(command, self.parent.software.currentText(), self.parent.telescope, self.parent.serialport) - self.commandFinished.emit(label, str(result)) - self.finished.emit() - -class AutoPA(QtWidgets.QDialog, QtWidgets.QPlainTextEdit): - def __init__(self): - super().__init__() - self.config_file = Path.home() / 'AutoPA' / 'config.json' - self.log_dir = Path.home() / 'AutoPA' / 'logs' - self.log_dir.mkdir(parents=True, exist_ok=True) - self.load_last_software() - self.load_last_accuracy() - # Load verbose state before UI setup - self.verbose_state = self.load_last_verbose() - self.setupUi(self) - self.retranslateUi(self) - - def load_last_software(self): - try: - if self.config_file.exists(): - with open(self.config_file, 'r') as f: - config = json.load(f) - self.last_software = config.get('last_software', '') - else: - self.last_software = '' - except Exception as e: - logging.error(f"Error loading config: {e}") - self.last_software = '' - - def load_last_accuracy(self): - try: - if self.config_file.exists(): - with open(self.config_file, 'r') as f: - config = json.load(f) - self.last_accuracy = config.get('last_accuracy', '60') - else: - self.last_accuracy = '60' - except Exception as e: - logging.error(f"Error loading config: {e}") - self.last_accuracy = '60' - - def load_last_verbose(self): - try: - if self.config_file.exists(): - with open(self.config_file, 'r') as f: - config = json.load(f) - if 'last_verbose' in config: - return bool(config['last_verbose']) - return False - except Exception as e: - logging.error(f"Error loading config: {e}") - return False - - def save_last_software(self, software): - try: - config = {} - if self.config_file.exists(): - with open(self.config_file, 'r') as f: - config = json.load(f) - config['last_software'] = software - with open(self.config_file, 'w') as f: - json.dump(config, f) - except Exception as e: - logging.error(f"Error saving config: {e}") - - def save_last_accuracy(self, accuracy): - try: - config = {} - if self.config_file.exists(): - with open(self.config_file, 'r') as f: - config = json.load(f) - config['last_accuracy'] = accuracy - with open(self.config_file, 'w') as f: - json.dump(config, f) - except Exception as e: - logging.error(f"Error saving config: {e}") - - def save_last_verbose(self, verbose): - try: - config = {} - if self.config_file.exists(): - with open(self.config_file, 'r') as f: - config = json.load(f) - config['last_verbose'] = verbose - with open(self.config_file, 'w') as f: - json.dump(config, f) - except Exception as e: - logging.error(f"Error saving config: {e}") - - def setupUi(self, Dialog): - Dialog.setObjectName("AutoPA") - Dialog.resize(400, 300) - self.formLayout = QtWidgets.QFormLayout(Dialog) - self.formLayout.setObjectName("formLayout") - self.label = QtWidgets.QLabel(Dialog) - self.label.setObjectName("label") - self.formLayout.setWidget(0, QtWidgets.QFormLayout.LabelRole, self.label) - self.software = QtWidgets.QComboBox(Dialog) - self.software.setCurrentText("") - self.software.setObjectName("software") - self.formLayout.setWidget(0, QtWidgets.QFormLayout.FieldRole, self.software) - self.label_4 = QtWidgets.QLabel(Dialog) - self.label_4.setObjectName("label_4") - self.formLayout.setWidget(1, QtWidgets.QFormLayout.LabelRole, self.label_4) - self.accuracy_input = QtWidgets.QLineEdit(Dialog) - self.accuracy_input.setObjectName("accuracy") - self.formLayout.setWidget(1, QtWidgets.QFormLayout.FieldRole, self.accuracy_input) - self.label_2 = QtWidgets.QLabel(Dialog) - self.label_2.setObjectName("label_2") - self.formLayout.setWidget(2, QtWidgets.QFormLayout.LabelRole, self.label_2) - self.azimuthOffset = QtWidgets.QLineEdit(Dialog) - self.azimuthOffset.setObjectName("azimuthOffset") - self.formLayout.setWidget(2, QtWidgets.QFormLayout.FieldRole, self.azimuthOffset) - self.label_3 = QtWidgets.QLabel(Dialog) - self.label_3.setObjectName("label_3") - self.formLayout.setWidget(3, QtWidgets.QFormLayout.LabelRole, self.label_3) - self.altitudeOffset = QtWidgets.QLineEdit(Dialog) - self.altitudeOffset.setObjectName("altitudeOffset") - self.formLayout.setWidget(3, QtWidgets.QFormLayout.FieldRole, self.altitudeOffset) - self.label_5 = QtWidgets.QLabel(Dialog) - self.label_5.setObjectName("label_5") - self.formLayout.setWidget(4, QtWidgets.QFormLayout.LabelRole, self.label_5) - self.telescopeName = QtWidgets.QLineEdit(Dialog) - self.telescopeName.setObjectName("telescopeName") - self.formLayout.setWidget(4, QtWidgets.QFormLayout.FieldRole, self.telescopeName) - self.label_6 = QtWidgets.QLabel(Dialog) - self.label_6.setObjectName("label_6") - self.formLayout.setWidget(5, QtWidgets.QFormLayout.LabelRole, self.label_6) - self.serialportInput = QtWidgets.QLineEdit(Dialog) - self.serialportInput.setObjectName("serialportInput") - self.formLayout.setWidget(5, QtWidgets.QFormLayout.FieldRole, self.serialportInput) - self.verbose = QtWidgets.QCheckBox(Dialog) - self.verbose.setObjectName("verbose") - self.verbose.setChecked(self.verbose_state) - self.verbose.stateChanged.connect(self.on_verbose_changed) - self.formLayout.setWidget(6, QtWidgets.QFormLayout.LabelRole, self.verbose) - - self.horizontalLayout = QtWidgets.QHBoxLayout() - self.horizontalLayout.setObjectName("horizontalLayout") - self.startButton = QtWidgets.QPushButton(Dialog) - self.startButton.setObjectName("startButton") - self.startButton.clicked.connect(self.start) - self.horizontalLayout.addWidget(self.startButton) - self.stopButton = QtWidgets.QPushButton(Dialog) - self.stopButton.setObjectName("stopButton") - self.stopButton.clicked.connect(self.stop) - self.horizontalLayout.addWidget(self.stopButton) - self.openLogsButton = QtWidgets.QPushButton(Dialog) - self.openLogsButton.setObjectName("openLogsButton") - self.openLogsButton.clicked.connect(self.open_logs_folder) - self.horizontalLayout.addWidget(self.openLogsButton) - self.cancelButton = QtWidgets.QPushButton(Dialog) - self.cancelButton.setObjectName("cancelButton") - self.cancelButton.clicked.connect(self.close) - self.horizontalLayout.addWidget(self.cancelButton) - self.formLayout.setLayout(6, QtWidgets.QFormLayout.FieldRole, self.horizontalLayout) - - # Store start time for elapsed time calculation - self.start_time = datetime.now() - - # Get the root logger and remove any existing handlers - self.logger = logging.getLogger() - for handler in self.logger.handlers[:]: - self.logger.removeHandler(handler) - - logTextBox = QTextEditLogger(self) - # Simple formatter for GUI - just show the message - gui_formatter = logging.Formatter('%(message)s') - logTextBox.setFormatter(gui_formatter) - - # Add file handler with DEBUG level - log_file = self.log_dir / f'autopa_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log' - file_handler = logging.FileHandler(log_file) - # Full formatter for file - show timestamps and level - file_formatter = ElapsedTimeFormatter(self.start_time) - file_handler.setFormatter(file_formatter) - file_handler.setLevel(logging.DEBUG) # Always log DEBUG level to file - self.logger.addHandler(file_handler) - - # Add GUI handler with level controlled by verbose checkbox - self.logger.addHandler(logTextBox) - self.logger.addFilter(DuplicateFilter()) - self.logger.setLevel(logging.DEBUG) # Set root logger to DEBUG to allow all levels to pass through - logTextBox.widget.setFont(QtGui.QFont("Consolas", 8)) # (or "Courier New" if Consolas is not available) - self.formLayout.setWidget(10, QtWidgets.QFormLayout.SpanningRole, logTextBox.widget) - - self.timer=QtCore.QTimer() - self.timer.timeout.connect(self.alignment) - - self.lastEntry = datetime.now() - self.aligned = True - self.stillAdjusting = False - self.adjustmentFinished = datetime.now() - self.after_id = None - self.serialport = "" - self.indiclient = None - self.ser = None - self.solveCounter = 0 - self.autorun = False - if len(sys.argv) > 1: - if sys.argv[1] == "--autorun": - self.autorun = True - self.retranslateUi(Dialog) - QtCore.QMetaObject.connectSlotsByName(Dialog) - if self.autorun: - self.startButton.click() - - self.software.currentTextChanged.connect(self.on_software_changed) - self.accuracy_input.textChanged.connect(self.on_accuracy_changed) - - # Set the last selected software if it exists - if self.last_software and self.last_software in software_options: - self.software.setCurrentText(self.last_software) - - # Set the last accuracy value if it exists - self.accuracy_input.setText(self.last_accuracy) - - # Log startup completion after a short delay to ensure GUI is ready - QtCore.QTimer.singleShot(100, lambda: logging.info(f"AutoPA v{VERSION} started at {self.start_time.strftime('%Y-%m-%d %H:%M:%S')} and is ready")) - - def retranslateUi(self, Dialog): - _translate = QtCore.QCoreApplication.translate - Dialog.setWindowTitle(_translate("Dialog", f"AutoPA v{VERSION}")) - self.label.setText(_translate("Dialog", "Choose your AutoPA software:")) - self.software.addItems(software_options.keys()) - self.label_4.setText(_translate("Dialog", "Accuracy to align to (Default 60 arcseconds):")) - self.accuracy_input.setText(_translate("Dialog", "60")) - self.accuracy_input.setPlaceholderText(_translate("Dialog", "60")) - self.label_2.setText(_translate("Dialog", "+/- Azimuth Offset (Default 0 arcminutes):")) - self.azimuthOffset.setText(_translate("Dialog", "0")) - self.azimuthOffset.setPlaceholderText(_translate("Dialog", "0")) - self.label_3.setText(_translate("Dialog", "+/- Altitude Offset (Default 0 arcminutes):")) - self.altitudeOffset.setText(_translate("Dialog", "0")) - self.altitudeOffset.setPlaceholderText(_translate("Dialog", "0")) - self.label_5.setText(_translate("Dialog", "Telescope Name (ASCOM: \"OpenAstroTracker\", INDI: \"LX200 GPS\"")) - self.telescopeName.setPlaceholderText(_translate("Dialog", "Override default?")) - self.label_6.setText(_translate("Dialog", "Serial Port of OAT [Ekos only] (Default /dev/ttyACM0):")) - self.serialportInput.setPlaceholderText(_translate("Dialog", "Override default? (Ekos only)")) - self.verbose.setText(_translate("Dialog", "Verbose Output")) - self.startButton.setText(_translate("Dialog", "Start")) - self.stopButton.setText(_translate("Dialog", "Stop")) - self.openLogsButton.setText(_translate("Dialog", "Logs")) - self.cancelButton.setText(_translate("Dialog", "Close")) - - def getLatestLogEntry(self, logpath, expression): - if sys.platform == "win32": - try: - import win32file - list_of_files = glob.glob(logpath) - latest_file = max(list_of_files, key=os.path.getctime) - logging.debug("Opening file: " + latest_file) - f = win32file.CreateFile(latest_file, win32file.GENERIC_READ, win32file.FILE_SHARE_DELETE | win32file.FILE_SHARE_READ | win32file.FILE_SHARE_WRITE, None, win32file.OPEN_EXISTING, win32file.FILE_ATTRIBUTE_NORMAL, None) - bufSize = 4096 - code, data = win32file.ReadFile(f, bufSize) - buf = data - while len(data) == bufSize: - result, data = win32file.ReadFile(f, bufSize, None) - buf += data - result = re.findall(expression, buf.decode("utf-8"))[-1] - logfileModification = os.path.getmtime(latest_file) - return(result, logfileModification) - except: - raise FileNotFoundError - elif sys.platform == "linux": - try: - list_of_files = glob.glob(logpath) - latest_file = max(list_of_files, key=os.path.getctime) - logging.debug(latest_file) - FileObject = open(latest_file,"r") - contents = FileObject.readlines() - result = re.findall(expression, contents.decode("utf-8"))[-1] - logfileModification = os.path.getmtime(latest_file) - return(result, logfileModification) - except: - raise FileNotFoundError - - def altitudeError(self, error, pole): - return(self.dmsTodeg(pole)-self.dmsTodeg(error)) - - def azimuthError(self, error, pole): - return(((self.dmsTodeg(pole) + 180) % 360 - 180)-((self.dmsTodeg(error) + 180) % 360 - 180)) - - def dmsTodeg(self, input): - temp = input.split(':') - d = float(temp[0]) - m = float(temp[1]) / 60 - s = float(temp[2]) / 3600 - return (d + m + s) - - def parseNINA3deg(self, input0, input1, input2): - sgn = input0[0] - if sgn == "-": - sgn = -1 - else: - sgn = 1 - d = abs(float(input0)) - m = float(input1) / 60 - s = float(input2) / 3600 - return (sgn * (d + m + s)) - - - def degToArcmin(self, input): - return(input * 60) - - def parseError(self, software, input, azimuthOffset, altitudeOffset): - error = [] - if software == "NINA3.x": - # Log file lists Az, then Alt error - error.append(self.parseNINA3deg(input[4], input[5], input[6]) - altitudeOffset) - error.append(self.parseNINA3deg(input[1], input[2], input[3]) - azimuthOffset) - error.append(math.hypot(error[0], error[1])) - elif software.startswith("Sharpcap"): - error.append(self.degToArcmin(self.altitudeError(input[1], input[3])) - altitudeOffset) - error.append(self.degToArcmin(self.azimuthError(input[2], input[4])) - azimuthOffset) - error.append(math.hypot(error[0], error[1])) - elif software == "Ekos": - error.append((self.degToArcmin(float(input[1])) - altitudeOffset)*(-1)) - error.append((self.degToArcmin(float(input[1])) - azimuthOffset)*(-1)) - error.append(math.hypot(error[0], error[1])) - logging.debug(f"Error from log: {error}.") - return(error) - - def sendCommand(self, command, software, telescope, serialport, baudrate=19200): - if software != "Ekos": - import win32com.client - logging.debug(f"Command sent: \"{command}\"") - logging.debug(f"Telescope name: \"{telescope}\"") - tel = win32com.client.Dispatch(f"ASCOM.{telescope}.Telescope") - if tel.Connected: - logging.debug("Telescope was already connected") - else: - tel.Connected = True - if not tel.Connected: - logging.error("Unable to connect to telescope.") - return False - result = tel.Action("Serial:PassThroughCommand", command) - tel.Connected = False - else: - #Send command - logging.debug("Sending command...") - self.ser.flush() - self.ser.write(str.encode(command)) - result = self.ser.readline() - result = result[:-1].decode('utf-8') - logging.debug("Command response received") - return result - - def isAdjusting(self, software, telescope, serialport): - try: - logging.debug("Getting mount status...") - result = self.sendCommand(":GX#,#", software, telescope, serialport) - if not result: - raise Exception - logging.debug(result) - status = re.search(",(......),", result).group(1) - if status[3]=="-" and status[4]=="-": - return False - else: - return True - except: - if software == "Ekos": - logging.error("Problem determining mount status. Verify mount is connected to INDI. Stopping AutoPA.") - else: - logging.error("Problem determining mount status. Verify mount is connected to ASCOM. Stopping AutoPA.") - self.timer.stop() - raise ConnectionError - - def start(self): - if self.verbose.isChecked(): - for handler in self.logger.handlers: - if isinstance(handler, QTextEditLogger): - handler.setLevel(logging.DEBUG) - else: - for handler in self.logger.handlers: - if isinstance(handler, QTextEditLogger): - handler.setLevel(logging.INFO) - if self.aligned: - logging.info("Starting AutoPA routine") - self.aligned = False - self.accuracy = float(self.accuracy_input.text()) / 60 - self.timer.start(2500) - if self.telescopeName.text() == "": - if self.software.currentText() == "Ekos": - self.telescope = "LX200 GPS" - else: - self.telescope = "OpenAstroTracker" - if self.serialportInput.text() == "": - if self.software.currentText() == "Ekos": - self.serialport = "/dev/ttyACM0" - else: - self.serialport = self.serialportInput.text() - if self.software.currentText() == "Ekos": - import indi, serial - #Connect to indi server - self.indiclient, self.blobEvent = indi.indiserverConnect() - logging.debug("AutoPA connected to INDI server") - - #Disconnect OAT from indi to free up serial port - indi.disconnectScope(self.indiclient, self.telescope) - logging.debug("Telescope disconnected from INDI") - - print("Opening serial port on " + self.serialport + '...') - self.ser = serial.Serial(self.serialport, 19200, timeout = 0.2) - - # Refactored: Send initial mount commands in a background thread - commands = [ - ("Mount", ":GVP#,#"), - ("LST", ":XGL#,#"), - ("Latitude", ":Gt#,#"), - ("Longitude", ":Gg#,#"), - ("Hemisphere", ":XGHS#,#"), - ("Hardware", ":XGM#,#"), - ] - self.thread = QThread() - self.worker = CommandWorker(self, commands) - self.worker.moveToThread(self.thread) - self.worker.commandFinished.connect(self.handle_command_result) - self.worker.finished.connect(self.thread.quit) - self.worker.finished.connect(self.worker.deleteLater) - self.thread.finished.connect(self.thread.deleteLater) - self.thread.started.connect(self.worker.run) - self.thread.start() - - def stop(self): - self.aligned = True - self.timer.stop() - if self.software.currentText() == "Ekos": - import indi - self.ser.close() - #Reconnect OAT to indi and disconnect from server - indi.connectScope(self.indiclient, self.telescope) - logging.debug("Telescope reconnected to INDI") - indi.indiserverDisconnect(self.indiclient) - logging.debug("AutoPA disconnected from INDI server") - logging.info("Stopping AutoPA routine") - - def close(self): - sys.exit(self) - - def alignment(self): - if not self.aligned: - try: - if self.isAdjusting(self.software.currentText(), self.telescope, self.serialport): - logging.info("Mount is still adjusting position.") - self.stillAdjusting = True - else: - if self.stillAdjusting: - logging.info(f"Mount adjustment finished.") - self.stillAdjusting = False - self.adjustmentFinished = datetime.now() - self.solveCounter = 0 - logging.info(f"Getting latest log entry from {self.software.currentText()}.") - try: - log = self.getLatestLogEntry(softwareTypes[self.software.currentText()]["logpath"], softwareTypes[self.software.currentText()]["expression"]) - except FileNotFoundError: - log = None - logging.error(f"Error retrieving log from {self.software.currentText()}. Logfile may not exist or does not contain alignment info.") - if self.autorun: - sys.exit("Polar alignment values not found.") - if log is not None: - #Entry date is based on file timestamp, entry time is based on log entry data - currentEntry = datetime.strptime(datetime.fromtimestamp(log[1]).strftime("%Y-%m-%d") + " " + log[0][0][:-1], '%Y-%m-%d %H:%M:%S.%f') - if currentEntry != self.lastEntry and currentEntry > self.adjustmentFinished: - self.solveCounter += 1 #Increment counter if the latest unused entry was entered into the log after the adjustment was finished. - if (self.software.currentText() != "NINA3.x" and self.solveCounter >= 1) or (self.software.currentText() == "NINA3.x" and self.solveCounter >= 3): - #If using NINA, wait for three complete solves after adjustment is finished to prevent using old data - self.solveCounter = 0 - error = self.parseError(self.software.currentText(), log[0], float(self.azimuthOffset.text()), float(self.altitudeOffset.text())) - logging.info(f"Altitude error in arcminutes: {error[0]:.3f}\'") - logging.info(f"Azimuth error in arcminutes: {error[1]:.3f}\'") - logging.info(f"Total error in arcminutes: {error[2]:.3f}\'") - if abs(error[2]) < self.accuracy: - logging.info(f"Polar aligned to within {error[0]*60:.0f}\" altitude and {error[1]*60:.0f}\" azimuth.") - self.stop() - if self.autorun: - sys.exit(self) - return - else: - logging.info("Correction needed.") - result = self.sendCommand(f":MAL{error[0]:.4f}#", self.software.currentText(), self.telescope, self.serialport) - logging.debug(f"Adjusting altitude by {error[0]:.3f} arcminutes.") - result = self.sendCommand(f":MAZ{error[1]*(-1):.4f}#", self.software.currentText(), self.telescope, self.serialport) - logging.debug(f"Adjusting azimuth by {error[1]:.3f} arcminutes.") - self.lastEntry = currentEntry - else: - logging.info(f"Waiting for {self.software.currentText()} to re-solve since last adjustment finished.") - else: - logging.info(f"{self.software.currentText()} has not yet determined the polar alignment error.") - except ConnectionError: - self.stop() - if self.autorun: - sys.exit("AutoPA could not connect to mount.") - return - - def on_software_changed(self, software): - self.save_last_software(software) - - def on_accuracy_changed(self, accuracy): - self.save_last_accuracy(accuracy) - - def on_verbose_changed(self, state): - self.save_last_verbose(bool(state)) - if state: - for handler in self.logger.handlers: - if isinstance(handler, QTextEditLogger): - handler.setLevel(logging.DEBUG) - else: - for handler in self.logger.handlers: - if isinstance(handler, QTextEditLogger): - handler.setLevel(logging.INFO) - - def open_logs_folder(self): - if sys.platform == "win32": - os.startfile(self.log_dir) - elif sys.platform == "darwin": # macOS - os.system(f"open {self.log_dir}") - else: # Linux - os.system(f"xdg-open {self.log_dir}") - - def handle_command_result(self, label, result): - logging.info(f"{label:>10}: {result}") - QtWidgets.QApplication.processEvents() - -software_options = collections.OrderedDict([ - ('NINA3.x', ''), - ('Sharpcap4.x', ''), - ('Sharpcap3.2', ''), - ('Ekos', '') -]) - -today = date.today().strftime("%Y-%m-%d") -softwareTypes = { -"NINA3.x":{ "expression": r"[\d-]*T(.*?)\|.*PolarAlignment.cs\|.*Calculated Error: Az: (-*\d{2}).*?(\d{2})' (\d{2})\", Alt: (-*\d{2}).*?(\d{2})' (\d{2})\",.*", - "logpath": fr"{os.getenv('LOCALAPPDATA')}\NINA\Logs\*.log"}, -"Sharpcap3.2":{ "expression": "(?:Info:)\t(\d{2}:\d{2}:\d{2}.\d{7}).*(?:AltAzCor=)(?:Alt=)(.*)[,](?:Az=)(.*).\s(?:AltAzPole=)(?:Alt=)(.*)[,](?:Az=)(.*).[,]\s(?:AltAzOffset=).*", - "logpath": fr"{os.getenv('LOCALAPPDATA')}\SharpCap\logs\*.log"}, -"Sharpcap4.x":{ "expression": "(?:Info)\W*(\d{2}:\d{2}:\d{2}.\d{6}).*(?:AltAzCor=)(?:Alt=)(.*)[,](?:Az=)(.*).\s(?:AltAzPole=)(?:Alt=)(.*)[,](?:Az=)(.*).[,]\s(?:AltAzOffset=).*", - "logpath": fr"{os.getenv('LOCALAPPDATA')}\SharpCap\logs\*.log"}, -"Ekos":{ "expression": "(\d{2}:\d{2}:\d{2}.\d{3}).*(?:PAA Refresh).*(?:Corrected az:).*(?:\()(\s?-?\d\.\d{3}).*(?:alt:).*(\s?-?\d\.\d{3}).*(?:total:)", - "logpath": f"{Path.home()}/.local/share/kstars/logs/{today}/*.txt"} -} - -if __name__ == "__main__": - app = QtWidgets.QApplication(sys.argv) - window = QtWidgets.QDialog() - ui = AutoPA() - ui.setupUi(window) - - window.show() - sys.exit(app.exec_()) - +# -*- coding: utf-8 -*- + +# Form implementation generated from reading ui file 'untitled.ui' +# +# Created by: PyQt5 UI code generator 5.15.4 +# +# WARNING: Any manual changes made to this file will be lost when pyuic5 is +# run again. Do not edit this file unless you know what you are doing. + +import glob +import re +from datetime import datetime, date, timedelta +import json +import math +from PyQt5 import QtCore, QtGui, QtWidgets +import sys, os +import collections +import logging +from pathlib import Path +from PyQt5.QtCore import QObject, QThread, pyqtSignal + +# Application version +VERSION = "2.8" + +class ElapsedTimeFormatter(logging.Formatter): + def __init__(self, start_time): + super().__init__('%(asctime)s - %(elapsed)s - %(levelname)s - %(message)s') + self.start_time = start_time + self.datefmt = '%Y-%m-%d %H:%M:%S.%f'[:-3] # Format: YYYY-MM-DD HH:MM:SS.mmm + + def formatTime(self, record, datefmt=None): + if datefmt: + # Format absolute timestamp + return datetime.fromtimestamp(record.created).strftime(datefmt) + else: + # Calculate elapsed time + elapsed = datetime.now() - self.start_time + # Convert to HH:mm:ss.s format + hours = int(elapsed.total_seconds() // 3600) + minutes = int((elapsed.total_seconds() % 3600) // 60) + seconds = elapsed.total_seconds() % 60 + return f"{hours:02d}:{minutes:02d}:{seconds:05.1f}" + + def format(self, record): + # Add elapsed time to the message + record.elapsed = self.formatTime(record) + # Format the absolute timestamp + record.asctime = self.formatTime(record, self.datefmt) + return super().format(record) + +class QTextEditLogger(logging.Handler): + def __init__(self, parent): + super().__init__() + self.widget = QtWidgets.QPlainTextEdit(parent) + self.widget.setReadOnly(True) + + def emit(self, record): + msg = self.format(record) + self.widget.appendPlainText(msg) + +class DuplicateFilter(object): + def __init__(self): + self.msgs = collections.deque(maxlen=3) + + def filter(self, record): + rv = record.msg not in self.msgs + self.msgs.append(record.msg) + return rv + +class CommandWorker(QObject): + commandFinished = pyqtSignal(str, str) # (label, result) + finished = pyqtSignal() + + def __init__(self, parent, commands): + super().__init__() + self.parent = parent + self.commands = commands + + def run(self): + for label, command in self.commands: + result = self.parent.sendCommand(command, self.parent.software.currentText(), self.parent.telescope, self.parent.serialport) + self.commandFinished.emit(label, str(result)) + self.finished.emit() + +class AutoPA(QtWidgets.QDialog, QtWidgets.QPlainTextEdit): + def __init__(self): + super().__init__() + self.config_file = Path.home() / 'AutoPA' / 'config.json' + self.log_dir = Path.home() / 'AutoPA' / 'logs' + self.log_dir.mkdir(parents=True, exist_ok=True) + self.load_last_software() + self.load_last_accuracy() + # Load verbose state before UI setup + self.verbose_state = self.load_last_verbose() + self.setupUi(self) + self.retranslateUi(self) + + def load_last_software(self): + try: + if self.config_file.exists(): + with open(self.config_file, 'r') as f: + config = json.load(f) + self.last_software = config.get('last_software', '') + else: + self.last_software = '' + except Exception as e: + logging.error(f"Error loading config: {e}") + self.last_software = '' + + def load_last_accuracy(self): + try: + if self.config_file.exists(): + with open(self.config_file, 'r') as f: + config = json.load(f) + self.last_accuracy = config.get('last_accuracy', '60') + else: + self.last_accuracy = '60' + except Exception as e: + logging.error(f"Error loading config: {e}") + self.last_accuracy = '60' + + def load_last_verbose(self): + try: + if self.config_file.exists(): + with open(self.config_file, 'r') as f: + config = json.load(f) + if 'last_verbose' in config: + return bool(config['last_verbose']) + return False + except Exception as e: + logging.error(f"Error loading config: {e}") + return False + + def save_last_software(self, software): + try: + config = {} + if self.config_file.exists(): + with open(self.config_file, 'r') as f: + config = json.load(f) + config['last_software'] = software + with open(self.config_file, 'w') as f: + json.dump(config, f) + except Exception as e: + logging.error(f"Error saving config: {e}") + + def save_last_accuracy(self, accuracy): + try: + config = {} + if self.config_file.exists(): + with open(self.config_file, 'r') as f: + config = json.load(f) + config['last_accuracy'] = accuracy + with open(self.config_file, 'w') as f: + json.dump(config, f) + except Exception as e: + logging.error(f"Error saving config: {e}") + + def save_last_verbose(self, verbose): + try: + config = {} + if self.config_file.exists(): + with open(self.config_file, 'r') as f: + config = json.load(f) + config['last_verbose'] = verbose + with open(self.config_file, 'w') as f: + json.dump(config, f) + except Exception as e: + logging.error(f"Error saving config: {e}") + + def setupUi(self, Dialog): + Dialog.setObjectName("AutoPA") + Dialog.resize(400, 300) + self.formLayout = QtWidgets.QFormLayout(Dialog) + self.formLayout.setObjectName("formLayout") + self.label = QtWidgets.QLabel(Dialog) + self.label.setObjectName("label") + self.formLayout.setWidget(0, QtWidgets.QFormLayout.LabelRole, self.label) + self.software = QtWidgets.QComboBox(Dialog) + self.software.setCurrentText("") + self.software.setObjectName("software") + self.formLayout.setWidget(0, QtWidgets.QFormLayout.FieldRole, self.software) + self.label_4 = QtWidgets.QLabel(Dialog) + self.label_4.setObjectName("label_4") + self.formLayout.setWidget(1, QtWidgets.QFormLayout.LabelRole, self.label_4) + self.accuracy_input = QtWidgets.QLineEdit(Dialog) + self.accuracy_input.setObjectName("accuracy") + self.formLayout.setWidget(1, QtWidgets.QFormLayout.FieldRole, self.accuracy_input) + self.label_2 = QtWidgets.QLabel(Dialog) + self.label_2.setObjectName("label_2") + self.formLayout.setWidget(2, QtWidgets.QFormLayout.LabelRole, self.label_2) + self.azimuthOffset = QtWidgets.QLineEdit(Dialog) + self.azimuthOffset.setObjectName("azimuthOffset") + self.formLayout.setWidget(2, QtWidgets.QFormLayout.FieldRole, self.azimuthOffset) + self.label_3 = QtWidgets.QLabel(Dialog) + self.label_3.setObjectName("label_3") + self.formLayout.setWidget(3, QtWidgets.QFormLayout.LabelRole, self.label_3) + self.altitudeOffset = QtWidgets.QLineEdit(Dialog) + self.altitudeOffset.setObjectName("altitudeOffset") + self.formLayout.setWidget(3, QtWidgets.QFormLayout.FieldRole, self.altitudeOffset) + self.label_5 = QtWidgets.QLabel(Dialog) + self.label_5.setObjectName("label_5") + self.formLayout.setWidget(4, QtWidgets.QFormLayout.LabelRole, self.label_5) + self.telescopeName = QtWidgets.QLineEdit(Dialog) + self.telescopeName.setObjectName("telescopeName") + self.formLayout.setWidget(4, QtWidgets.QFormLayout.FieldRole, self.telescopeName) + self.label_6 = QtWidgets.QLabel(Dialog) + self.label_6.setObjectName("label_6") + self.formLayout.setWidget(5, QtWidgets.QFormLayout.LabelRole, self.label_6) + self.serialportInput = QtWidgets.QLineEdit(Dialog) + self.serialportInput.setObjectName("serialportInput") + self.formLayout.setWidget(5, QtWidgets.QFormLayout.FieldRole, self.serialportInput) + self.verbose = QtWidgets.QCheckBox(Dialog) + self.verbose.setObjectName("verbose") + self.verbose.setChecked(self.verbose_state) + self.verbose.stateChanged.connect(self.on_verbose_changed) + self.formLayout.setWidget(6, QtWidgets.QFormLayout.LabelRole, self.verbose) + + self.horizontalLayout = QtWidgets.QHBoxLayout() + self.horizontalLayout.setObjectName("horizontalLayout") + self.startButton = QtWidgets.QPushButton(Dialog) + self.startButton.setObjectName("startButton") + self.startButton.clicked.connect(self.start) + self.horizontalLayout.addWidget(self.startButton) + self.stopButton = QtWidgets.QPushButton(Dialog) + self.stopButton.setObjectName("stopButton") + self.stopButton.clicked.connect(self.stop) + self.horizontalLayout.addWidget(self.stopButton) + self.openLogsButton = QtWidgets.QPushButton(Dialog) + self.openLogsButton.setObjectName("openLogsButton") + self.openLogsButton.clicked.connect(self.open_logs_folder) + self.horizontalLayout.addWidget(self.openLogsButton) + self.cancelButton = QtWidgets.QPushButton(Dialog) + self.cancelButton.setObjectName("cancelButton") + self.cancelButton.clicked.connect(self.close) + self.horizontalLayout.addWidget(self.cancelButton) + self.formLayout.setLayout(6, QtWidgets.QFormLayout.FieldRole, self.horizontalLayout) + + # Store start time for elapsed time calculation + self.start_time = datetime.now() + + # Get the root logger and remove any existing handlers + self.logger = logging.getLogger() + for handler in self.logger.handlers[:]: + self.logger.removeHandler(handler) + + logTextBox = QTextEditLogger(self) + # Simple formatter for GUI - just show the message + gui_formatter = logging.Formatter('%(message)s') + logTextBox.setFormatter(gui_formatter) + + # Add file handler with DEBUG level + log_file = self.log_dir / f'autopa_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log' + file_handler = logging.FileHandler(log_file) + # Full formatter for file - show timestamps and level + file_formatter = ElapsedTimeFormatter(self.start_time) + file_handler.setFormatter(file_formatter) + file_handler.setLevel(logging.DEBUG) # Always log DEBUG level to file + self.logger.addHandler(file_handler) + + # Add GUI handler with level controlled by verbose checkbox + self.logger.addHandler(logTextBox) + self.logger.addFilter(DuplicateFilter()) + self.logger.setLevel(logging.DEBUG) # Set root logger to DEBUG to allow all levels to pass through + logTextBox.widget.setFont(QtGui.QFont("Consolas", 8)) # (or "Courier New" if Consolas is not available) + self.formLayout.setWidget(10, QtWidgets.QFormLayout.SpanningRole, logTextBox.widget) + + self.timer=QtCore.QTimer() + self.timer.timeout.connect(self.alignment) + + self.lastEntry = datetime.now() + self.aligned = True + self.stillAdjusting = False + self.adjustmentFinished = datetime.now() + self.after_id = None + self.serialport = "" + self.indiclient = None + self.ser = None + self.solveCounter = 0 + self.autorun = False + if len(sys.argv) > 1: + if sys.argv[1] == "--autorun": + self.autorun = True + self.retranslateUi(Dialog) + QtCore.QMetaObject.connectSlotsByName(Dialog) + if self.autorun: + self.startButton.click() + + self.software.currentTextChanged.connect(self.on_software_changed) + self.accuracy_input.textChanged.connect(self.on_accuracy_changed) + + # Set the last selected software if it exists + if self.last_software and self.last_software in software_options: + self.software.setCurrentText(self.last_software) + + # Set the last accuracy value if it exists + self.accuracy_input.setText(self.last_accuracy) + + # Log startup completion after a short delay to ensure GUI is ready + QtCore.QTimer.singleShot(100, lambda: logging.info(f"AutoPA v{VERSION} started at {self.start_time.strftime('%Y-%m-%d %H:%M:%S')} and is ready")) + + def retranslateUi(self, Dialog): + _translate = QtCore.QCoreApplication.translate + Dialog.setWindowTitle(_translate("Dialog", f"AutoPA v{VERSION}")) + self.label.setText(_translate("Dialog", "Choose your AutoPA software:")) + self.software.addItems(software_options.keys()) + self.label_4.setText(_translate("Dialog", "Accuracy to align to (Default 60 arcseconds):")) + self.accuracy_input.setText(_translate("Dialog", "60")) + self.accuracy_input.setPlaceholderText(_translate("Dialog", "60")) + self.label_2.setText(_translate("Dialog", "+/- Azimuth Offset (Default 0 arcminutes):")) + self.azimuthOffset.setText(_translate("Dialog", "0")) + self.azimuthOffset.setPlaceholderText(_translate("Dialog", "0")) + self.label_3.setText(_translate("Dialog", "+/- Altitude Offset (Default 0 arcminutes):")) + self.altitudeOffset.setText(_translate("Dialog", "0")) + self.altitudeOffset.setPlaceholderText(_translate("Dialog", "0")) + self.label_5.setText(_translate("Dialog", "Telescope Name (ASCOM: \"OpenAstroTracker\", INDI: \"LX200 GPS\"")) + self.telescopeName.setPlaceholderText(_translate("Dialog", "Override default?")) + self.label_6.setText(_translate("Dialog", "Serial Port of OAT [Ekos only] (Default /dev/ttyACM0):")) + self.serialportInput.setPlaceholderText(_translate("Dialog", "Override default? (Ekos only)")) + self.verbose.setText(_translate("Dialog", "Verbose Output")) + self.startButton.setText(_translate("Dialog", "Start")) + self.stopButton.setText(_translate("Dialog", "Stop")) + self.openLogsButton.setText(_translate("Dialog", "Logs")) + self.cancelButton.setText(_translate("Dialog", "Close")) + + def getLatestLogEntry(self, logpath, expression): + if sys.platform == "win32": + try: + import win32file + list_of_files = glob.glob(logpath) + latest_file = max(list_of_files, key=os.path.getctime) + logging.debug("Opening file: " + latest_file) + f = win32file.CreateFile(latest_file, win32file.GENERIC_READ, win32file.FILE_SHARE_DELETE | win32file.FILE_SHARE_READ | win32file.FILE_SHARE_WRITE, None, win32file.OPEN_EXISTING, win32file.FILE_ATTRIBUTE_NORMAL, None) + bufSize = 4096 + code, data = win32file.ReadFile(f, bufSize) + buf = data + while len(data) == bufSize: + result, data = win32file.ReadFile(f, bufSize, None) + buf += data + result = re.findall(expression, buf.decode("utf-8"))[-1] + logfileModification = os.path.getmtime(latest_file) + return(result, logfileModification) + except: + raise FileNotFoundError + elif sys.platform == "linux": + try: + list_of_files = glob.glob(logpath) + latest_file = max(list_of_files, key=os.path.getctime) + logging.debug(latest_file) + FileObject = open(latest_file,"r") + contents = FileObject.readlines() + result = re.findall(expression, contents.decode("utf-8"))[-1] + logfileModification = os.path.getmtime(latest_file) + return(result, logfileModification) + except: + raise FileNotFoundError + + def altitudeError(self, error, pole): + return(self.dmsTodeg(pole)-self.dmsTodeg(error)) + + def azimuthError(self, error, pole): + return(((self.dmsTodeg(pole) + 180) % 360 - 180)-((self.dmsTodeg(error) + 180) % 360 - 180)) + + def dmsTodeg(self, input): + temp = input.split(':') + d = float(temp[0]) + m = float(temp[1]) / 60 + s = float(temp[2]) / 3600 + return (d + m + s) + + def parseNINA3deg(self, input0, input1, input2): + sgn = input0[0] + if sgn == "-": + sgn = -1 + else: + sgn = 1 + d = abs(float(input0)) + m = float(input1) / 60 + s = float(input2) / 3600 + return (sgn * (d + m + s)) + + + def degToArcmin(self, input): + return(input * 60) + + def parseError(self, software, input, azimuthOffset, altitudeOffset): + error = [] + if software == "NINA3.x": + # Log file lists Az, then Alt error + # Convert degrees to arcminutes using degToArcmin() + error.append(self.degToArcmin(self.parseNINA3deg(input[4], input[5], input[6])) - altitudeOffset) + error.append(self.degToArcmin(self.parseNINA3deg(input[1], input[2], input[3])) - azimuthOffset) + error.append(math.hypot(error[0], error[1])) + elif software.startswith("Sharpcap"): + error.append(self.degToArcmin(self.altitudeError(input[1], input[3])) - altitudeOffset) + error.append(self.degToArcmin(self.azimuthError(input[2], input[4])) - azimuthOffset) + error.append(math.hypot(error[0], error[1])) + elif software == "Ekos": + error.append((self.degToArcmin(float(input[1])) - altitudeOffset)*(-1)) + error.append((self.degToArcmin(float(input[1])) - azimuthOffset)*(-1)) + error.append(math.hypot(error[0], error[1])) + logging.debug(f"Error from log: {error}.") + return(error) + + def sendCommand(self, command, software, telescope, serialport, baudrate=19200): + if software != "Ekos": + import win32com.client + logging.debug(f"Command sent: \"{command}\"") + logging.debug(f"Telescope name: \"{telescope}\"") + tel = win32com.client.Dispatch(f"ASCOM.{telescope}.Telescope") + if tel.Connected: + logging.debug("Telescope was already connected") + else: + tel.Connected = True + if not tel.Connected: + logging.error("Unable to connect to telescope.") + return False + result = tel.Action("Serial:PassThroughCommand", command) + tel.Connected = False + else: + #Send command + logging.debug("Sending command...") + self.ser.flush() + self.ser.write(str.encode(command)) + result = self.ser.readline() + result = result[:-1].decode('utf-8') + logging.debug("Command response received") + return result + + def isAdjusting(self, software, telescope, serialport): + try: + logging.debug("Getting mount status...") + result = self.sendCommand(":GX#,#", software, telescope, serialport) + if not result: + raise Exception + logging.debug(result) + status = re.search(",(......),", result).group(1) + if status[3]=="-" and status[4]=="-": + return False + else: + return True + except: + if software == "Ekos": + logging.error("Problem determining mount status. Verify mount is connected to INDI. Stopping AutoPA.") + else: + logging.error("Problem determining mount status. Verify mount is connected to ASCOM. Stopping AutoPA.") + self.timer.stop() + raise ConnectionError + + def start(self): + if self.verbose.isChecked(): + for handler in self.logger.handlers: + if isinstance(handler, QTextEditLogger): + handler.setLevel(logging.DEBUG) + else: + for handler in self.logger.handlers: + if isinstance(handler, QTextEditLogger): + handler.setLevel(logging.INFO) + if self.aligned: + logging.info("Starting AutoPA routine") + self.aligned = False + self.accuracy = float(self.accuracy_input.text()) / 60 + self.timer.start(2500) + if self.telescopeName.text() == "": + if self.software.currentText() == "Ekos": + self.telescope = "LX200 GPS" + else: + self.telescope = "OpenAstroTracker" + if self.serialportInput.text() == "": + if self.software.currentText() == "Ekos": + self.serialport = "/dev/ttyACM0" + else: + self.serialport = self.serialportInput.text() + if self.software.currentText() == "Ekos": + import indi, serial + #Connect to indi server + self.indiclient, self.blobEvent = indi.indiserverConnect() + logging.debug("AutoPA connected to INDI server") + + #Disconnect OAT from indi to free up serial port + indi.disconnectScope(self.indiclient, self.telescope) + logging.debug("Telescope disconnected from INDI") + + print("Opening serial port on " + self.serialport + '...') + self.ser = serial.Serial(self.serialport, 19200, timeout = 0.2) + + # Refactored: Send initial mount commands in a background thread + commands = [ + ("Mount", ":GVP#,#"), + ("LST", ":XGL#,#"), + ("Latitude", ":Gt#,#"), + ("Longitude", ":Gg#,#"), + ("Hemisphere", ":XGHS#,#"), + ("Hardware", ":XGM#,#"), + ] + self.thread = QThread() + self.worker = CommandWorker(self, commands) + self.worker.moveToThread(self.thread) + self.worker.commandFinished.connect(self.handle_command_result) + self.worker.finished.connect(self.thread.quit) + self.worker.finished.connect(self.worker.deleteLater) + self.thread.finished.connect(self.thread.deleteLater) + self.thread.started.connect(self.worker.run) + self.thread.start() + + def stop(self): + self.aligned = True + self.timer.stop() + if self.software.currentText() == "Ekos": + import indi + self.ser.close() + #Reconnect OAT to indi and disconnect from server + indi.connectScope(self.indiclient, self.telescope) + logging.debug("Telescope reconnected to INDI") + indi.indiserverDisconnect(self.indiclient) + logging.debug("AutoPA disconnected from INDI server") + logging.info("Stopping AutoPA routine") + + def close(self): + sys.exit(self) + + def alignment(self): + if not self.aligned: + try: + if self.isAdjusting(self.software.currentText(), self.telescope, self.serialport): + logging.info("Mount is still adjusting position.") + self.stillAdjusting = True + else: + if self.stillAdjusting: + logging.info(f"Mount adjustment finished.") + self.stillAdjusting = False + self.adjustmentFinished = datetime.now() + self.solveCounter = 0 + logging.info(f"Getting latest log entry from {self.software.currentText()}.") + try: + log = self.getLatestLogEntry(softwareTypes[self.software.currentText()]["logpath"], softwareTypes[self.software.currentText()]["expression"]) + except FileNotFoundError: + log = None + logging.error(f"Error retrieving log from {self.software.currentText()}. Logfile may not exist or does not contain alignment info.") + if self.autorun: + sys.exit("Polar alignment values not found.") + if log is not None: + #Entry date is based on file timestamp, entry time is based on log entry data + currentEntry = datetime.strptime(datetime.fromtimestamp(log[1]).strftime("%Y-%m-%d") + " " + log[0][0][:-1], '%Y-%m-%d %H:%M:%S.%f') + if currentEntry != self.lastEntry and currentEntry > self.adjustmentFinished: + self.solveCounter += 1 #Increment counter if the latest unused entry was entered into the log after the adjustment was finished. + if (self.software.currentText() != "NINA3.x" and self.solveCounter >= 1) or (self.software.currentText() == "NINA3.x" and self.solveCounter >= 3): + #If using NINA, wait for three complete solves after adjustment is finished to prevent using old data + self.solveCounter = 0 + error = self.parseError(self.software.currentText(), log[0], float(self.azimuthOffset.text()), float(self.altitudeOffset.text())) + logging.info(f"Altitude error in arcminutes: {error[0]:.3f}\'") + logging.info(f"Azimuth error in arcminutes: {error[1]:.3f}\'") + logging.info(f"Total error in arcminutes: {error[2]:.3f}\'") + if abs(error[2]) < self.accuracy: + logging.info(f"Polar aligned to within {error[0]*60:.0f}\" altitude and {error[1]*60:.0f}\" azimuth.") + self.stop() + if self.autorun: + sys.exit(self) + return + else: + logging.info("Correction needed.") + result = self.sendCommand(f":MAL{error[0]:.4f}#", self.software.currentText(), self.telescope, self.serialport) + logging.debug(f"Adjusting altitude by {error[0]:.3f} arcminutes.") + result = self.sendCommand(f":MAZ{error[1]*(-1):.4f}#", self.software.currentText(), self.telescope, self.serialport) + logging.debug(f"Adjusting azimuth by {error[1]:.3f} arcminutes.") + self.lastEntry = currentEntry + else: + logging.info(f"Waiting for {self.software.currentText()} to re-solve since last adjustment finished.") + else: + logging.info(f"{self.software.currentText()} has not yet determined the polar alignment error.") + except ConnectionError: + self.stop() + if self.autorun: + sys.exit("AutoPA could not connect to mount.") + return + + def on_software_changed(self, software): + self.save_last_software(software) + + def on_accuracy_changed(self, accuracy): + self.save_last_accuracy(accuracy) + + def on_verbose_changed(self, state): + self.save_last_verbose(bool(state)) + if state: + for handler in self.logger.handlers: + if isinstance(handler, QTextEditLogger): + handler.setLevel(logging.DEBUG) + else: + for handler in self.logger.handlers: + if isinstance(handler, QTextEditLogger): + handler.setLevel(logging.INFO) + + def open_logs_folder(self): + if sys.platform == "win32": + os.startfile(self.log_dir) + elif sys.platform == "darwin": # macOS + os.system(f"open {self.log_dir}") + else: # Linux + os.system(f"xdg-open {self.log_dir}") + + def handle_command_result(self, label, result): + logging.info(f"{label:>10}: {result}") + QtWidgets.QApplication.processEvents() + +software_options = collections.OrderedDict([ + ('NINA3.x', ''), + ('Sharpcap4.x', ''), + ('Sharpcap3.2', ''), + ('Ekos', '') +]) + +today = date.today().strftime("%Y-%m-%d") +softwareTypes = { +"NINA3.x":{ "expression": r"[\d-]*T(.*?)\|.*PolarAlignment.cs\|.*Calculated Error: Az: (-*\d{2}).*?(\d{2})' (\d{2})\", Alt: (-*\d{2}).*?(\d{2})' (\d{2})\",.*", + "logpath": fr"{os.getenv('LOCALAPPDATA')}\NINA\Logs\*.log"}, +"Sharpcap3.2":{ "expression": "(?:Info:)\t(\d{2}:\d{2}:\d{2}.\d{7}).*(?:AltAzCor=)(?:Alt=)(.*)[,](?:Az=)(.*).\s(?:AltAzPole=)(?:Alt=)(.*)[,](?:Az=)(.*).[,]\s(?:AltAzOffset=).*", + "logpath": fr"{os.getenv('LOCALAPPDATA')}\SharpCap\logs\*.log"}, +"Sharpcap4.x":{ "expression": "(?:Info)\W*(\d{2}:\d{2}:\d{2}.\d{6}).*(?:AltAzCor=)(?:Alt=)(.*)[,](?:Az=)(.*).\s(?:AltAzPole=)(?:Alt=)(.*)[,](?:Az=)(.*).[,]\s(?:AltAzOffset=).*", + "logpath": fr"{os.getenv('LOCALAPPDATA')}\SharpCap\logs\*.log"}, +"Ekos":{ "expression": "(\d{2}:\d{2}:\d{2}.\d{3}).*(?:PAA Refresh).*(?:Corrected az:).*(?:\()(\s?-?\d\.\d{3}).*(?:alt:).*(\s?-?\d\.\d{3}).*(?:total:)", + "logpath": f"{Path.home()}/.local/share/kstars/logs/{today}/*.txt"} +} + +if __name__ == "__main__": + app = QtWidgets.QApplication(sys.argv) + window = QtWidgets.QDialog() + ui = AutoPA() + ui.setupUi(window) + + window.show() + sys.exit(app.exec_()) + From 42bac397684e4ebac67401481ecbf28623764042 Mon Sep 17 00:00:00 2001 From: Aleksandr Savin Date: Fri, 12 Sep 2025 13:59:43 +0200 Subject: [PATCH 2/3] chore(AutoPA_v2): Bump version to 2.8.0 --- AutoPA/Software/source/autopa_v2.py | 1264 +++++++++++++-------------- AutoPA/Software/source/setup.py | 6 +- 2 files changed, 635 insertions(+), 635 deletions(-) diff --git a/AutoPA/Software/source/autopa_v2.py b/AutoPA/Software/source/autopa_v2.py index a802f79..9c78efc 100644 --- a/AutoPA/Software/source/autopa_v2.py +++ b/AutoPA/Software/source/autopa_v2.py @@ -1,632 +1,632 @@ -# -*- coding: utf-8 -*- - -# Form implementation generated from reading ui file 'untitled.ui' -# -# Created by: PyQt5 UI code generator 5.15.4 -# -# WARNING: Any manual changes made to this file will be lost when pyuic5 is -# run again. Do not edit this file unless you know what you are doing. - -import glob -import re -from datetime import datetime, date, timedelta -import json -import math -from PyQt5 import QtCore, QtGui, QtWidgets -import sys, os -import collections -import logging -from pathlib import Path -from PyQt5.QtCore import QObject, QThread, pyqtSignal - -# Application version -VERSION = "2.8" - -class ElapsedTimeFormatter(logging.Formatter): - def __init__(self, start_time): - super().__init__('%(asctime)s - %(elapsed)s - %(levelname)s - %(message)s') - self.start_time = start_time - self.datefmt = '%Y-%m-%d %H:%M:%S.%f'[:-3] # Format: YYYY-MM-DD HH:MM:SS.mmm - - def formatTime(self, record, datefmt=None): - if datefmt: - # Format absolute timestamp - return datetime.fromtimestamp(record.created).strftime(datefmt) - else: - # Calculate elapsed time - elapsed = datetime.now() - self.start_time - # Convert to HH:mm:ss.s format - hours = int(elapsed.total_seconds() // 3600) - minutes = int((elapsed.total_seconds() % 3600) // 60) - seconds = elapsed.total_seconds() % 60 - return f"{hours:02d}:{minutes:02d}:{seconds:05.1f}" - - def format(self, record): - # Add elapsed time to the message - record.elapsed = self.formatTime(record) - # Format the absolute timestamp - record.asctime = self.formatTime(record, self.datefmt) - return super().format(record) - -class QTextEditLogger(logging.Handler): - def __init__(self, parent): - super().__init__() - self.widget = QtWidgets.QPlainTextEdit(parent) - self.widget.setReadOnly(True) - - def emit(self, record): - msg = self.format(record) - self.widget.appendPlainText(msg) - -class DuplicateFilter(object): - def __init__(self): - self.msgs = collections.deque(maxlen=3) - - def filter(self, record): - rv = record.msg not in self.msgs - self.msgs.append(record.msg) - return rv - -class CommandWorker(QObject): - commandFinished = pyqtSignal(str, str) # (label, result) - finished = pyqtSignal() - - def __init__(self, parent, commands): - super().__init__() - self.parent = parent - self.commands = commands - - def run(self): - for label, command in self.commands: - result = self.parent.sendCommand(command, self.parent.software.currentText(), self.parent.telescope, self.parent.serialport) - self.commandFinished.emit(label, str(result)) - self.finished.emit() - -class AutoPA(QtWidgets.QDialog, QtWidgets.QPlainTextEdit): - def __init__(self): - super().__init__() - self.config_file = Path.home() / 'AutoPA' / 'config.json' - self.log_dir = Path.home() / 'AutoPA' / 'logs' - self.log_dir.mkdir(parents=True, exist_ok=True) - self.load_last_software() - self.load_last_accuracy() - # Load verbose state before UI setup - self.verbose_state = self.load_last_verbose() - self.setupUi(self) - self.retranslateUi(self) - - def load_last_software(self): - try: - if self.config_file.exists(): - with open(self.config_file, 'r') as f: - config = json.load(f) - self.last_software = config.get('last_software', '') - else: - self.last_software = '' - except Exception as e: - logging.error(f"Error loading config: {e}") - self.last_software = '' - - def load_last_accuracy(self): - try: - if self.config_file.exists(): - with open(self.config_file, 'r') as f: - config = json.load(f) - self.last_accuracy = config.get('last_accuracy', '60') - else: - self.last_accuracy = '60' - except Exception as e: - logging.error(f"Error loading config: {e}") - self.last_accuracy = '60' - - def load_last_verbose(self): - try: - if self.config_file.exists(): - with open(self.config_file, 'r') as f: - config = json.load(f) - if 'last_verbose' in config: - return bool(config['last_verbose']) - return False - except Exception as e: - logging.error(f"Error loading config: {e}") - return False - - def save_last_software(self, software): - try: - config = {} - if self.config_file.exists(): - with open(self.config_file, 'r') as f: - config = json.load(f) - config['last_software'] = software - with open(self.config_file, 'w') as f: - json.dump(config, f) - except Exception as e: - logging.error(f"Error saving config: {e}") - - def save_last_accuracy(self, accuracy): - try: - config = {} - if self.config_file.exists(): - with open(self.config_file, 'r') as f: - config = json.load(f) - config['last_accuracy'] = accuracy - with open(self.config_file, 'w') as f: - json.dump(config, f) - except Exception as e: - logging.error(f"Error saving config: {e}") - - def save_last_verbose(self, verbose): - try: - config = {} - if self.config_file.exists(): - with open(self.config_file, 'r') as f: - config = json.load(f) - config['last_verbose'] = verbose - with open(self.config_file, 'w') as f: - json.dump(config, f) - except Exception as e: - logging.error(f"Error saving config: {e}") - - def setupUi(self, Dialog): - Dialog.setObjectName("AutoPA") - Dialog.resize(400, 300) - self.formLayout = QtWidgets.QFormLayout(Dialog) - self.formLayout.setObjectName("formLayout") - self.label = QtWidgets.QLabel(Dialog) - self.label.setObjectName("label") - self.formLayout.setWidget(0, QtWidgets.QFormLayout.LabelRole, self.label) - self.software = QtWidgets.QComboBox(Dialog) - self.software.setCurrentText("") - self.software.setObjectName("software") - self.formLayout.setWidget(0, QtWidgets.QFormLayout.FieldRole, self.software) - self.label_4 = QtWidgets.QLabel(Dialog) - self.label_4.setObjectName("label_4") - self.formLayout.setWidget(1, QtWidgets.QFormLayout.LabelRole, self.label_4) - self.accuracy_input = QtWidgets.QLineEdit(Dialog) - self.accuracy_input.setObjectName("accuracy") - self.formLayout.setWidget(1, QtWidgets.QFormLayout.FieldRole, self.accuracy_input) - self.label_2 = QtWidgets.QLabel(Dialog) - self.label_2.setObjectName("label_2") - self.formLayout.setWidget(2, QtWidgets.QFormLayout.LabelRole, self.label_2) - self.azimuthOffset = QtWidgets.QLineEdit(Dialog) - self.azimuthOffset.setObjectName("azimuthOffset") - self.formLayout.setWidget(2, QtWidgets.QFormLayout.FieldRole, self.azimuthOffset) - self.label_3 = QtWidgets.QLabel(Dialog) - self.label_3.setObjectName("label_3") - self.formLayout.setWidget(3, QtWidgets.QFormLayout.LabelRole, self.label_3) - self.altitudeOffset = QtWidgets.QLineEdit(Dialog) - self.altitudeOffset.setObjectName("altitudeOffset") - self.formLayout.setWidget(3, QtWidgets.QFormLayout.FieldRole, self.altitudeOffset) - self.label_5 = QtWidgets.QLabel(Dialog) - self.label_5.setObjectName("label_5") - self.formLayout.setWidget(4, QtWidgets.QFormLayout.LabelRole, self.label_5) - self.telescopeName = QtWidgets.QLineEdit(Dialog) - self.telescopeName.setObjectName("telescopeName") - self.formLayout.setWidget(4, QtWidgets.QFormLayout.FieldRole, self.telescopeName) - self.label_6 = QtWidgets.QLabel(Dialog) - self.label_6.setObjectName("label_6") - self.formLayout.setWidget(5, QtWidgets.QFormLayout.LabelRole, self.label_6) - self.serialportInput = QtWidgets.QLineEdit(Dialog) - self.serialportInput.setObjectName("serialportInput") - self.formLayout.setWidget(5, QtWidgets.QFormLayout.FieldRole, self.serialportInput) - self.verbose = QtWidgets.QCheckBox(Dialog) - self.verbose.setObjectName("verbose") - self.verbose.setChecked(self.verbose_state) - self.verbose.stateChanged.connect(self.on_verbose_changed) - self.formLayout.setWidget(6, QtWidgets.QFormLayout.LabelRole, self.verbose) - - self.horizontalLayout = QtWidgets.QHBoxLayout() - self.horizontalLayout.setObjectName("horizontalLayout") - self.startButton = QtWidgets.QPushButton(Dialog) - self.startButton.setObjectName("startButton") - self.startButton.clicked.connect(self.start) - self.horizontalLayout.addWidget(self.startButton) - self.stopButton = QtWidgets.QPushButton(Dialog) - self.stopButton.setObjectName("stopButton") - self.stopButton.clicked.connect(self.stop) - self.horizontalLayout.addWidget(self.stopButton) - self.openLogsButton = QtWidgets.QPushButton(Dialog) - self.openLogsButton.setObjectName("openLogsButton") - self.openLogsButton.clicked.connect(self.open_logs_folder) - self.horizontalLayout.addWidget(self.openLogsButton) - self.cancelButton = QtWidgets.QPushButton(Dialog) - self.cancelButton.setObjectName("cancelButton") - self.cancelButton.clicked.connect(self.close) - self.horizontalLayout.addWidget(self.cancelButton) - self.formLayout.setLayout(6, QtWidgets.QFormLayout.FieldRole, self.horizontalLayout) - - # Store start time for elapsed time calculation - self.start_time = datetime.now() - - # Get the root logger and remove any existing handlers - self.logger = logging.getLogger() - for handler in self.logger.handlers[:]: - self.logger.removeHandler(handler) - - logTextBox = QTextEditLogger(self) - # Simple formatter for GUI - just show the message - gui_formatter = logging.Formatter('%(message)s') - logTextBox.setFormatter(gui_formatter) - - # Add file handler with DEBUG level - log_file = self.log_dir / f'autopa_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log' - file_handler = logging.FileHandler(log_file) - # Full formatter for file - show timestamps and level - file_formatter = ElapsedTimeFormatter(self.start_time) - file_handler.setFormatter(file_formatter) - file_handler.setLevel(logging.DEBUG) # Always log DEBUG level to file - self.logger.addHandler(file_handler) - - # Add GUI handler with level controlled by verbose checkbox - self.logger.addHandler(logTextBox) - self.logger.addFilter(DuplicateFilter()) - self.logger.setLevel(logging.DEBUG) # Set root logger to DEBUG to allow all levels to pass through - logTextBox.widget.setFont(QtGui.QFont("Consolas", 8)) # (or "Courier New" if Consolas is not available) - self.formLayout.setWidget(10, QtWidgets.QFormLayout.SpanningRole, logTextBox.widget) - - self.timer=QtCore.QTimer() - self.timer.timeout.connect(self.alignment) - - self.lastEntry = datetime.now() - self.aligned = True - self.stillAdjusting = False - self.adjustmentFinished = datetime.now() - self.after_id = None - self.serialport = "" - self.indiclient = None - self.ser = None - self.solveCounter = 0 - self.autorun = False - if len(sys.argv) > 1: - if sys.argv[1] == "--autorun": - self.autorun = True - self.retranslateUi(Dialog) - QtCore.QMetaObject.connectSlotsByName(Dialog) - if self.autorun: - self.startButton.click() - - self.software.currentTextChanged.connect(self.on_software_changed) - self.accuracy_input.textChanged.connect(self.on_accuracy_changed) - - # Set the last selected software if it exists - if self.last_software and self.last_software in software_options: - self.software.setCurrentText(self.last_software) - - # Set the last accuracy value if it exists - self.accuracy_input.setText(self.last_accuracy) - - # Log startup completion after a short delay to ensure GUI is ready - QtCore.QTimer.singleShot(100, lambda: logging.info(f"AutoPA v{VERSION} started at {self.start_time.strftime('%Y-%m-%d %H:%M:%S')} and is ready")) - - def retranslateUi(self, Dialog): - _translate = QtCore.QCoreApplication.translate - Dialog.setWindowTitle(_translate("Dialog", f"AutoPA v{VERSION}")) - self.label.setText(_translate("Dialog", "Choose your AutoPA software:")) - self.software.addItems(software_options.keys()) - self.label_4.setText(_translate("Dialog", "Accuracy to align to (Default 60 arcseconds):")) - self.accuracy_input.setText(_translate("Dialog", "60")) - self.accuracy_input.setPlaceholderText(_translate("Dialog", "60")) - self.label_2.setText(_translate("Dialog", "+/- Azimuth Offset (Default 0 arcminutes):")) - self.azimuthOffset.setText(_translate("Dialog", "0")) - self.azimuthOffset.setPlaceholderText(_translate("Dialog", "0")) - self.label_3.setText(_translate("Dialog", "+/- Altitude Offset (Default 0 arcminutes):")) - self.altitudeOffset.setText(_translate("Dialog", "0")) - self.altitudeOffset.setPlaceholderText(_translate("Dialog", "0")) - self.label_5.setText(_translate("Dialog", "Telescope Name (ASCOM: \"OpenAstroTracker\", INDI: \"LX200 GPS\"")) - self.telescopeName.setPlaceholderText(_translate("Dialog", "Override default?")) - self.label_6.setText(_translate("Dialog", "Serial Port of OAT [Ekos only] (Default /dev/ttyACM0):")) - self.serialportInput.setPlaceholderText(_translate("Dialog", "Override default? (Ekos only)")) - self.verbose.setText(_translate("Dialog", "Verbose Output")) - self.startButton.setText(_translate("Dialog", "Start")) - self.stopButton.setText(_translate("Dialog", "Stop")) - self.openLogsButton.setText(_translate("Dialog", "Logs")) - self.cancelButton.setText(_translate("Dialog", "Close")) - - def getLatestLogEntry(self, logpath, expression): - if sys.platform == "win32": - try: - import win32file - list_of_files = glob.glob(logpath) - latest_file = max(list_of_files, key=os.path.getctime) - logging.debug("Opening file: " + latest_file) - f = win32file.CreateFile(latest_file, win32file.GENERIC_READ, win32file.FILE_SHARE_DELETE | win32file.FILE_SHARE_READ | win32file.FILE_SHARE_WRITE, None, win32file.OPEN_EXISTING, win32file.FILE_ATTRIBUTE_NORMAL, None) - bufSize = 4096 - code, data = win32file.ReadFile(f, bufSize) - buf = data - while len(data) == bufSize: - result, data = win32file.ReadFile(f, bufSize, None) - buf += data - result = re.findall(expression, buf.decode("utf-8"))[-1] - logfileModification = os.path.getmtime(latest_file) - return(result, logfileModification) - except: - raise FileNotFoundError - elif sys.platform == "linux": - try: - list_of_files = glob.glob(logpath) - latest_file = max(list_of_files, key=os.path.getctime) - logging.debug(latest_file) - FileObject = open(latest_file,"r") - contents = FileObject.readlines() - result = re.findall(expression, contents.decode("utf-8"))[-1] - logfileModification = os.path.getmtime(latest_file) - return(result, logfileModification) - except: - raise FileNotFoundError - - def altitudeError(self, error, pole): - return(self.dmsTodeg(pole)-self.dmsTodeg(error)) - - def azimuthError(self, error, pole): - return(((self.dmsTodeg(pole) + 180) % 360 - 180)-((self.dmsTodeg(error) + 180) % 360 - 180)) - - def dmsTodeg(self, input): - temp = input.split(':') - d = float(temp[0]) - m = float(temp[1]) / 60 - s = float(temp[2]) / 3600 - return (d + m + s) - - def parseNINA3deg(self, input0, input1, input2): - sgn = input0[0] - if sgn == "-": - sgn = -1 - else: - sgn = 1 - d = abs(float(input0)) - m = float(input1) / 60 - s = float(input2) / 3600 - return (sgn * (d + m + s)) - - - def degToArcmin(self, input): - return(input * 60) - - def parseError(self, software, input, azimuthOffset, altitudeOffset): - error = [] - if software == "NINA3.x": - # Log file lists Az, then Alt error - # Convert degrees to arcminutes using degToArcmin() - error.append(self.degToArcmin(self.parseNINA3deg(input[4], input[5], input[6])) - altitudeOffset) - error.append(self.degToArcmin(self.parseNINA3deg(input[1], input[2], input[3])) - azimuthOffset) - error.append(math.hypot(error[0], error[1])) - elif software.startswith("Sharpcap"): - error.append(self.degToArcmin(self.altitudeError(input[1], input[3])) - altitudeOffset) - error.append(self.degToArcmin(self.azimuthError(input[2], input[4])) - azimuthOffset) - error.append(math.hypot(error[0], error[1])) - elif software == "Ekos": - error.append((self.degToArcmin(float(input[1])) - altitudeOffset)*(-1)) - error.append((self.degToArcmin(float(input[1])) - azimuthOffset)*(-1)) - error.append(math.hypot(error[0], error[1])) - logging.debug(f"Error from log: {error}.") - return(error) - - def sendCommand(self, command, software, telescope, serialport, baudrate=19200): - if software != "Ekos": - import win32com.client - logging.debug(f"Command sent: \"{command}\"") - logging.debug(f"Telescope name: \"{telescope}\"") - tel = win32com.client.Dispatch(f"ASCOM.{telescope}.Telescope") - if tel.Connected: - logging.debug("Telescope was already connected") - else: - tel.Connected = True - if not tel.Connected: - logging.error("Unable to connect to telescope.") - return False - result = tel.Action("Serial:PassThroughCommand", command) - tel.Connected = False - else: - #Send command - logging.debug("Sending command...") - self.ser.flush() - self.ser.write(str.encode(command)) - result = self.ser.readline() - result = result[:-1].decode('utf-8') - logging.debug("Command response received") - return result - - def isAdjusting(self, software, telescope, serialport): - try: - logging.debug("Getting mount status...") - result = self.sendCommand(":GX#,#", software, telescope, serialport) - if not result: - raise Exception - logging.debug(result) - status = re.search(",(......),", result).group(1) - if status[3]=="-" and status[4]=="-": - return False - else: - return True - except: - if software == "Ekos": - logging.error("Problem determining mount status. Verify mount is connected to INDI. Stopping AutoPA.") - else: - logging.error("Problem determining mount status. Verify mount is connected to ASCOM. Stopping AutoPA.") - self.timer.stop() - raise ConnectionError - - def start(self): - if self.verbose.isChecked(): - for handler in self.logger.handlers: - if isinstance(handler, QTextEditLogger): - handler.setLevel(logging.DEBUG) - else: - for handler in self.logger.handlers: - if isinstance(handler, QTextEditLogger): - handler.setLevel(logging.INFO) - if self.aligned: - logging.info("Starting AutoPA routine") - self.aligned = False - self.accuracy = float(self.accuracy_input.text()) / 60 - self.timer.start(2500) - if self.telescopeName.text() == "": - if self.software.currentText() == "Ekos": - self.telescope = "LX200 GPS" - else: - self.telescope = "OpenAstroTracker" - if self.serialportInput.text() == "": - if self.software.currentText() == "Ekos": - self.serialport = "/dev/ttyACM0" - else: - self.serialport = self.serialportInput.text() - if self.software.currentText() == "Ekos": - import indi, serial - #Connect to indi server - self.indiclient, self.blobEvent = indi.indiserverConnect() - logging.debug("AutoPA connected to INDI server") - - #Disconnect OAT from indi to free up serial port - indi.disconnectScope(self.indiclient, self.telescope) - logging.debug("Telescope disconnected from INDI") - - print("Opening serial port on " + self.serialport + '...') - self.ser = serial.Serial(self.serialport, 19200, timeout = 0.2) - - # Refactored: Send initial mount commands in a background thread - commands = [ - ("Mount", ":GVP#,#"), - ("LST", ":XGL#,#"), - ("Latitude", ":Gt#,#"), - ("Longitude", ":Gg#,#"), - ("Hemisphere", ":XGHS#,#"), - ("Hardware", ":XGM#,#"), - ] - self.thread = QThread() - self.worker = CommandWorker(self, commands) - self.worker.moveToThread(self.thread) - self.worker.commandFinished.connect(self.handle_command_result) - self.worker.finished.connect(self.thread.quit) - self.worker.finished.connect(self.worker.deleteLater) - self.thread.finished.connect(self.thread.deleteLater) - self.thread.started.connect(self.worker.run) - self.thread.start() - - def stop(self): - self.aligned = True - self.timer.stop() - if self.software.currentText() == "Ekos": - import indi - self.ser.close() - #Reconnect OAT to indi and disconnect from server - indi.connectScope(self.indiclient, self.telescope) - logging.debug("Telescope reconnected to INDI") - indi.indiserverDisconnect(self.indiclient) - logging.debug("AutoPA disconnected from INDI server") - logging.info("Stopping AutoPA routine") - - def close(self): - sys.exit(self) - - def alignment(self): - if not self.aligned: - try: - if self.isAdjusting(self.software.currentText(), self.telescope, self.serialport): - logging.info("Mount is still adjusting position.") - self.stillAdjusting = True - else: - if self.stillAdjusting: - logging.info(f"Mount adjustment finished.") - self.stillAdjusting = False - self.adjustmentFinished = datetime.now() - self.solveCounter = 0 - logging.info(f"Getting latest log entry from {self.software.currentText()}.") - try: - log = self.getLatestLogEntry(softwareTypes[self.software.currentText()]["logpath"], softwareTypes[self.software.currentText()]["expression"]) - except FileNotFoundError: - log = None - logging.error(f"Error retrieving log from {self.software.currentText()}. Logfile may not exist or does not contain alignment info.") - if self.autorun: - sys.exit("Polar alignment values not found.") - if log is not None: - #Entry date is based on file timestamp, entry time is based on log entry data - currentEntry = datetime.strptime(datetime.fromtimestamp(log[1]).strftime("%Y-%m-%d") + " " + log[0][0][:-1], '%Y-%m-%d %H:%M:%S.%f') - if currentEntry != self.lastEntry and currentEntry > self.adjustmentFinished: - self.solveCounter += 1 #Increment counter if the latest unused entry was entered into the log after the adjustment was finished. - if (self.software.currentText() != "NINA3.x" and self.solveCounter >= 1) or (self.software.currentText() == "NINA3.x" and self.solveCounter >= 3): - #If using NINA, wait for three complete solves after adjustment is finished to prevent using old data - self.solveCounter = 0 - error = self.parseError(self.software.currentText(), log[0], float(self.azimuthOffset.text()), float(self.altitudeOffset.text())) - logging.info(f"Altitude error in arcminutes: {error[0]:.3f}\'") - logging.info(f"Azimuth error in arcminutes: {error[1]:.3f}\'") - logging.info(f"Total error in arcminutes: {error[2]:.3f}\'") - if abs(error[2]) < self.accuracy: - logging.info(f"Polar aligned to within {error[0]*60:.0f}\" altitude and {error[1]*60:.0f}\" azimuth.") - self.stop() - if self.autorun: - sys.exit(self) - return - else: - logging.info("Correction needed.") - result = self.sendCommand(f":MAL{error[0]:.4f}#", self.software.currentText(), self.telescope, self.serialport) - logging.debug(f"Adjusting altitude by {error[0]:.3f} arcminutes.") - result = self.sendCommand(f":MAZ{error[1]*(-1):.4f}#", self.software.currentText(), self.telescope, self.serialport) - logging.debug(f"Adjusting azimuth by {error[1]:.3f} arcminutes.") - self.lastEntry = currentEntry - else: - logging.info(f"Waiting for {self.software.currentText()} to re-solve since last adjustment finished.") - else: - logging.info(f"{self.software.currentText()} has not yet determined the polar alignment error.") - except ConnectionError: - self.stop() - if self.autorun: - sys.exit("AutoPA could not connect to mount.") - return - - def on_software_changed(self, software): - self.save_last_software(software) - - def on_accuracy_changed(self, accuracy): - self.save_last_accuracy(accuracy) - - def on_verbose_changed(self, state): - self.save_last_verbose(bool(state)) - if state: - for handler in self.logger.handlers: - if isinstance(handler, QTextEditLogger): - handler.setLevel(logging.DEBUG) - else: - for handler in self.logger.handlers: - if isinstance(handler, QTextEditLogger): - handler.setLevel(logging.INFO) - - def open_logs_folder(self): - if sys.platform == "win32": - os.startfile(self.log_dir) - elif sys.platform == "darwin": # macOS - os.system(f"open {self.log_dir}") - else: # Linux - os.system(f"xdg-open {self.log_dir}") - - def handle_command_result(self, label, result): - logging.info(f"{label:>10}: {result}") - QtWidgets.QApplication.processEvents() - -software_options = collections.OrderedDict([ - ('NINA3.x', ''), - ('Sharpcap4.x', ''), - ('Sharpcap3.2', ''), - ('Ekos', '') -]) - -today = date.today().strftime("%Y-%m-%d") -softwareTypes = { -"NINA3.x":{ "expression": r"[\d-]*T(.*?)\|.*PolarAlignment.cs\|.*Calculated Error: Az: (-*\d{2}).*?(\d{2})' (\d{2})\", Alt: (-*\d{2}).*?(\d{2})' (\d{2})\",.*", - "logpath": fr"{os.getenv('LOCALAPPDATA')}\NINA\Logs\*.log"}, -"Sharpcap3.2":{ "expression": "(?:Info:)\t(\d{2}:\d{2}:\d{2}.\d{7}).*(?:AltAzCor=)(?:Alt=)(.*)[,](?:Az=)(.*).\s(?:AltAzPole=)(?:Alt=)(.*)[,](?:Az=)(.*).[,]\s(?:AltAzOffset=).*", - "logpath": fr"{os.getenv('LOCALAPPDATA')}\SharpCap\logs\*.log"}, -"Sharpcap4.x":{ "expression": "(?:Info)\W*(\d{2}:\d{2}:\d{2}.\d{6}).*(?:AltAzCor=)(?:Alt=)(.*)[,](?:Az=)(.*).\s(?:AltAzPole=)(?:Alt=)(.*)[,](?:Az=)(.*).[,]\s(?:AltAzOffset=).*", - "logpath": fr"{os.getenv('LOCALAPPDATA')}\SharpCap\logs\*.log"}, -"Ekos":{ "expression": "(\d{2}:\d{2}:\d{2}.\d{3}).*(?:PAA Refresh).*(?:Corrected az:).*(?:\()(\s?-?\d\.\d{3}).*(?:alt:).*(\s?-?\d\.\d{3}).*(?:total:)", - "logpath": f"{Path.home()}/.local/share/kstars/logs/{today}/*.txt"} -} - -if __name__ == "__main__": - app = QtWidgets.QApplication(sys.argv) - window = QtWidgets.QDialog() - ui = AutoPA() - ui.setupUi(window) - - window.show() - sys.exit(app.exec_()) - +# -*- coding: utf-8 -*- + +# Form implementation generated from reading ui file 'untitled.ui' +# +# Created by: PyQt5 UI code generator 5.15.4 +# +# WARNING: Any manual changes made to this file will be lost when pyuic5 is +# run again. Do not edit this file unless you know what you are doing. + +import glob +import re +from datetime import datetime, date, timedelta +import json +import math +from PyQt5 import QtCore, QtGui, QtWidgets +import sys, os +import collections +import logging +from pathlib import Path +from PyQt5.QtCore import QObject, QThread, pyqtSignal + +# Application version +VERSION = "2.8" + +class ElapsedTimeFormatter(logging.Formatter): + def __init__(self, start_time): + super().__init__('%(asctime)s - %(elapsed)s - %(levelname)s - %(message)s') + self.start_time = start_time + self.datefmt = '%Y-%m-%d %H:%M:%S.%f'[:-3] # Format: YYYY-MM-DD HH:MM:SS.mmm + + def formatTime(self, record, datefmt=None): + if datefmt: + # Format absolute timestamp + return datetime.fromtimestamp(record.created).strftime(datefmt) + else: + # Calculate elapsed time + elapsed = datetime.now() - self.start_time + # Convert to HH:mm:ss.s format + hours = int(elapsed.total_seconds() // 3600) + minutes = int((elapsed.total_seconds() % 3600) // 60) + seconds = elapsed.total_seconds() % 60 + return f"{hours:02d}:{minutes:02d}:{seconds:05.1f}" + + def format(self, record): + # Add elapsed time to the message + record.elapsed = self.formatTime(record) + # Format the absolute timestamp + record.asctime = self.formatTime(record, self.datefmt) + return super().format(record) + +class QTextEditLogger(logging.Handler): + def __init__(self, parent): + super().__init__() + self.widget = QtWidgets.QPlainTextEdit(parent) + self.widget.setReadOnly(True) + + def emit(self, record): + msg = self.format(record) + self.widget.appendPlainText(msg) + +class DuplicateFilter(object): + def __init__(self): + self.msgs = collections.deque(maxlen=3) + + def filter(self, record): + rv = record.msg not in self.msgs + self.msgs.append(record.msg) + return rv + +class CommandWorker(QObject): + commandFinished = pyqtSignal(str, str) # (label, result) + finished = pyqtSignal() + + def __init__(self, parent, commands): + super().__init__() + self.parent = parent + self.commands = commands + + def run(self): + for label, command in self.commands: + result = self.parent.sendCommand(command, self.parent.software.currentText(), self.parent.telescope, self.parent.serialport) + self.commandFinished.emit(label, str(result)) + self.finished.emit() + +class AutoPA(QtWidgets.QDialog, QtWidgets.QPlainTextEdit): + def __init__(self): + super().__init__() + self.config_file = Path.home() / 'AutoPA' / 'config.json' + self.log_dir = Path.home() / 'AutoPA' / 'logs' + self.log_dir.mkdir(parents=True, exist_ok=True) + self.load_last_software() + self.load_last_accuracy() + # Load verbose state before UI setup + self.verbose_state = self.load_last_verbose() + self.setupUi(self) + self.retranslateUi(self) + + def load_last_software(self): + try: + if self.config_file.exists(): + with open(self.config_file, 'r') as f: + config = json.load(f) + self.last_software = config.get('last_software', '') + else: + self.last_software = '' + except Exception as e: + logging.error(f"Error loading config: {e}") + self.last_software = '' + + def load_last_accuracy(self): + try: + if self.config_file.exists(): + with open(self.config_file, 'r') as f: + config = json.load(f) + self.last_accuracy = config.get('last_accuracy', '60') + else: + self.last_accuracy = '60' + except Exception as e: + logging.error(f"Error loading config: {e}") + self.last_accuracy = '60' + + def load_last_verbose(self): + try: + if self.config_file.exists(): + with open(self.config_file, 'r') as f: + config = json.load(f) + if 'last_verbose' in config: + return bool(config['last_verbose']) + return False + except Exception as e: + logging.error(f"Error loading config: {e}") + return False + + def save_last_software(self, software): + try: + config = {} + if self.config_file.exists(): + with open(self.config_file, 'r') as f: + config = json.load(f) + config['last_software'] = software + with open(self.config_file, 'w') as f: + json.dump(config, f) + except Exception as e: + logging.error(f"Error saving config: {e}") + + def save_last_accuracy(self, accuracy): + try: + config = {} + if self.config_file.exists(): + with open(self.config_file, 'r') as f: + config = json.load(f) + config['last_accuracy'] = accuracy + with open(self.config_file, 'w') as f: + json.dump(config, f) + except Exception as e: + logging.error(f"Error saving config: {e}") + + def save_last_verbose(self, verbose): + try: + config = {} + if self.config_file.exists(): + with open(self.config_file, 'r') as f: + config = json.load(f) + config['last_verbose'] = verbose + with open(self.config_file, 'w') as f: + json.dump(config, f) + except Exception as e: + logging.error(f"Error saving config: {e}") + + def setupUi(self, Dialog): + Dialog.setObjectName("AutoPA") + Dialog.resize(400, 300) + self.formLayout = QtWidgets.QFormLayout(Dialog) + self.formLayout.setObjectName("formLayout") + self.label = QtWidgets.QLabel(Dialog) + self.label.setObjectName("label") + self.formLayout.setWidget(0, QtWidgets.QFormLayout.LabelRole, self.label) + self.software = QtWidgets.QComboBox(Dialog) + self.software.setCurrentText("") + self.software.setObjectName("software") + self.formLayout.setWidget(0, QtWidgets.QFormLayout.FieldRole, self.software) + self.label_4 = QtWidgets.QLabel(Dialog) + self.label_4.setObjectName("label_4") + self.formLayout.setWidget(1, QtWidgets.QFormLayout.LabelRole, self.label_4) + self.accuracy_input = QtWidgets.QLineEdit(Dialog) + self.accuracy_input.setObjectName("accuracy") + self.formLayout.setWidget(1, QtWidgets.QFormLayout.FieldRole, self.accuracy_input) + self.label_2 = QtWidgets.QLabel(Dialog) + self.label_2.setObjectName("label_2") + self.formLayout.setWidget(2, QtWidgets.QFormLayout.LabelRole, self.label_2) + self.azimuthOffset = QtWidgets.QLineEdit(Dialog) + self.azimuthOffset.setObjectName("azimuthOffset") + self.formLayout.setWidget(2, QtWidgets.QFormLayout.FieldRole, self.azimuthOffset) + self.label_3 = QtWidgets.QLabel(Dialog) + self.label_3.setObjectName("label_3") + self.formLayout.setWidget(3, QtWidgets.QFormLayout.LabelRole, self.label_3) + self.altitudeOffset = QtWidgets.QLineEdit(Dialog) + self.altitudeOffset.setObjectName("altitudeOffset") + self.formLayout.setWidget(3, QtWidgets.QFormLayout.FieldRole, self.altitudeOffset) + self.label_5 = QtWidgets.QLabel(Dialog) + self.label_5.setObjectName("label_5") + self.formLayout.setWidget(4, QtWidgets.QFormLayout.LabelRole, self.label_5) + self.telescopeName = QtWidgets.QLineEdit(Dialog) + self.telescopeName.setObjectName("telescopeName") + self.formLayout.setWidget(4, QtWidgets.QFormLayout.FieldRole, self.telescopeName) + self.label_6 = QtWidgets.QLabel(Dialog) + self.label_6.setObjectName("label_6") + self.formLayout.setWidget(5, QtWidgets.QFormLayout.LabelRole, self.label_6) + self.serialportInput = QtWidgets.QLineEdit(Dialog) + self.serialportInput.setObjectName("serialportInput") + self.formLayout.setWidget(5, QtWidgets.QFormLayout.FieldRole, self.serialportInput) + self.verbose = QtWidgets.QCheckBox(Dialog) + self.verbose.setObjectName("verbose") + self.verbose.setChecked(self.verbose_state) + self.verbose.stateChanged.connect(self.on_verbose_changed) + self.formLayout.setWidget(6, QtWidgets.QFormLayout.LabelRole, self.verbose) + + self.horizontalLayout = QtWidgets.QHBoxLayout() + self.horizontalLayout.setObjectName("horizontalLayout") + self.startButton = QtWidgets.QPushButton(Dialog) + self.startButton.setObjectName("startButton") + self.startButton.clicked.connect(self.start) + self.horizontalLayout.addWidget(self.startButton) + self.stopButton = QtWidgets.QPushButton(Dialog) + self.stopButton.setObjectName("stopButton") + self.stopButton.clicked.connect(self.stop) + self.horizontalLayout.addWidget(self.stopButton) + self.openLogsButton = QtWidgets.QPushButton(Dialog) + self.openLogsButton.setObjectName("openLogsButton") + self.openLogsButton.clicked.connect(self.open_logs_folder) + self.horizontalLayout.addWidget(self.openLogsButton) + self.cancelButton = QtWidgets.QPushButton(Dialog) + self.cancelButton.setObjectName("cancelButton") + self.cancelButton.clicked.connect(self.close) + self.horizontalLayout.addWidget(self.cancelButton) + self.formLayout.setLayout(6, QtWidgets.QFormLayout.FieldRole, self.horizontalLayout) + + # Store start time for elapsed time calculation + self.start_time = datetime.now() + + # Get the root logger and remove any existing handlers + self.logger = logging.getLogger() + for handler in self.logger.handlers[:]: + self.logger.removeHandler(handler) + + logTextBox = QTextEditLogger(self) + # Simple formatter for GUI - just show the message + gui_formatter = logging.Formatter('%(message)s') + logTextBox.setFormatter(gui_formatter) + + # Add file handler with DEBUG level + log_file = self.log_dir / f'autopa_{datetime.now().strftime("%Y%m%d_%H%M%S")}.log' + file_handler = logging.FileHandler(log_file) + # Full formatter for file - show timestamps and level + file_formatter = ElapsedTimeFormatter(self.start_time) + file_handler.setFormatter(file_formatter) + file_handler.setLevel(logging.DEBUG) # Always log DEBUG level to file + self.logger.addHandler(file_handler) + + # Add GUI handler with level controlled by verbose checkbox + self.logger.addHandler(logTextBox) + self.logger.addFilter(DuplicateFilter()) + self.logger.setLevel(logging.DEBUG) # Set root logger to DEBUG to allow all levels to pass through + logTextBox.widget.setFont(QtGui.QFont("Consolas", 8)) # (or "Courier New" if Consolas is not available) + self.formLayout.setWidget(10, QtWidgets.QFormLayout.SpanningRole, logTextBox.widget) + + self.timer=QtCore.QTimer() + self.timer.timeout.connect(self.alignment) + + self.lastEntry = datetime.now() + self.aligned = True + self.stillAdjusting = False + self.adjustmentFinished = datetime.now() + self.after_id = None + self.serialport = "" + self.indiclient = None + self.ser = None + self.solveCounter = 0 + self.autorun = False + if len(sys.argv) > 1: + if sys.argv[1] == "--autorun": + self.autorun = True + self.retranslateUi(Dialog) + QtCore.QMetaObject.connectSlotsByName(Dialog) + if self.autorun: + self.startButton.click() + + self.software.currentTextChanged.connect(self.on_software_changed) + self.accuracy_input.textChanged.connect(self.on_accuracy_changed) + + # Set the last selected software if it exists + if self.last_software and self.last_software in software_options: + self.software.setCurrentText(self.last_software) + + # Set the last accuracy value if it exists + self.accuracy_input.setText(self.last_accuracy) + + # Log startup completion after a short delay to ensure GUI is ready + QtCore.QTimer.singleShot(100, lambda: logging.info(f"AutoPA v{VERSION} started at {self.start_time.strftime('%Y-%m-%d %H:%M:%S')} and is ready")) + + def retranslateUi(self, Dialog): + _translate = QtCore.QCoreApplication.translate + Dialog.setWindowTitle(_translate("Dialog", f"AutoPA v{VERSION}")) + self.label.setText(_translate("Dialog", "Choose your AutoPA software:")) + self.software.addItems(software_options.keys()) + self.label_4.setText(_translate("Dialog", "Accuracy to align to (Default 60 arcseconds):")) + self.accuracy_input.setText(_translate("Dialog", "60")) + self.accuracy_input.setPlaceholderText(_translate("Dialog", "60")) + self.label_2.setText(_translate("Dialog", "+/- Azimuth Offset (Default 0 arcminutes):")) + self.azimuthOffset.setText(_translate("Dialog", "0")) + self.azimuthOffset.setPlaceholderText(_translate("Dialog", "0")) + self.label_3.setText(_translate("Dialog", "+/- Altitude Offset (Default 0 arcminutes):")) + self.altitudeOffset.setText(_translate("Dialog", "0")) + self.altitudeOffset.setPlaceholderText(_translate("Dialog", "0")) + self.label_5.setText(_translate("Dialog", "Telescope Name (ASCOM: \"OpenAstroTracker\", INDI: \"LX200 GPS\"")) + self.telescopeName.setPlaceholderText(_translate("Dialog", "Override default?")) + self.label_6.setText(_translate("Dialog", "Serial Port of OAT [Ekos only] (Default /dev/ttyACM0):")) + self.serialportInput.setPlaceholderText(_translate("Dialog", "Override default? (Ekos only)")) + self.verbose.setText(_translate("Dialog", "Verbose Output")) + self.startButton.setText(_translate("Dialog", "Start")) + self.stopButton.setText(_translate("Dialog", "Stop")) + self.openLogsButton.setText(_translate("Dialog", "Logs")) + self.cancelButton.setText(_translate("Dialog", "Close")) + + def getLatestLogEntry(self, logpath, expression): + if sys.platform == "win32": + try: + import win32file + list_of_files = glob.glob(logpath) + latest_file = max(list_of_files, key=os.path.getctime) + logging.debug("Opening file: " + latest_file) + f = win32file.CreateFile(latest_file, win32file.GENERIC_READ, win32file.FILE_SHARE_DELETE | win32file.FILE_SHARE_READ | win32file.FILE_SHARE_WRITE, None, win32file.OPEN_EXISTING, win32file.FILE_ATTRIBUTE_NORMAL, None) + bufSize = 4096 + code, data = win32file.ReadFile(f, bufSize) + buf = data + while len(data) == bufSize: + result, data = win32file.ReadFile(f, bufSize, None) + buf += data + result = re.findall(expression, buf.decode("utf-8"))[-1] + logfileModification = os.path.getmtime(latest_file) + return(result, logfileModification) + except: + raise FileNotFoundError + elif sys.platform == "linux": + try: + list_of_files = glob.glob(logpath) + latest_file = max(list_of_files, key=os.path.getctime) + logging.debug(latest_file) + FileObject = open(latest_file,"r") + contents = FileObject.readlines() + result = re.findall(expression, contents.decode("utf-8"))[-1] + logfileModification = os.path.getmtime(latest_file) + return(result, logfileModification) + except: + raise FileNotFoundError + + def altitudeError(self, error, pole): + return(self.dmsTodeg(pole)-self.dmsTodeg(error)) + + def azimuthError(self, error, pole): + return(((self.dmsTodeg(pole) + 180) % 360 - 180)-((self.dmsTodeg(error) + 180) % 360 - 180)) + + def dmsTodeg(self, input): + temp = input.split(':') + d = float(temp[0]) + m = float(temp[1]) / 60 + s = float(temp[2]) / 3600 + return (d + m + s) + + def parseNINA3deg(self, input0, input1, input2): + sgn = input0[0] + if sgn == "-": + sgn = -1 + else: + sgn = 1 + d = abs(float(input0)) + m = float(input1) / 60 + s = float(input2) / 3600 + return (sgn * (d + m + s)) + + + def degToArcmin(self, input): + return(input * 60) + + def parseError(self, software, input, azimuthOffset, altitudeOffset): + error = [] + if software == "NINA3.x": + # Log file lists Az, then Alt error + # Convert degrees to arcminutes using degToArcmin() + error.append(self.degToArcmin(self.parseNINA3deg(input[4], input[5], input[6])) - altitudeOffset) + error.append(self.degToArcmin(self.parseNINA3deg(input[1], input[2], input[3])) - azimuthOffset) + error.append(math.hypot(error[0], error[1])) + elif software.startswith("Sharpcap"): + error.append(self.degToArcmin(self.altitudeError(input[1], input[3])) - altitudeOffset) + error.append(self.degToArcmin(self.azimuthError(input[2], input[4])) - azimuthOffset) + error.append(math.hypot(error[0], error[1])) + elif software == "Ekos": + error.append((self.degToArcmin(float(input[1])) - altitudeOffset)*(-1)) + error.append((self.degToArcmin(float(input[1])) - azimuthOffset)*(-1)) + error.append(math.hypot(error[0], error[1])) + logging.debug(f"Error from log: {error}.") + return(error) + + def sendCommand(self, command, software, telescope, serialport, baudrate=19200): + if software != "Ekos": + import win32com.client + logging.debug(f"Command sent: \"{command}\"") + logging.debug(f"Telescope name: \"{telescope}\"") + tel = win32com.client.Dispatch(f"ASCOM.{telescope}.Telescope") + if tel.Connected: + logging.debug("Telescope was already connected") + else: + tel.Connected = True + if not tel.Connected: + logging.error("Unable to connect to telescope.") + return False + result = tel.Action("Serial:PassThroughCommand", command) + tel.Connected = False + else: + #Send command + logging.debug("Sending command...") + self.ser.flush() + self.ser.write(str.encode(command)) + result = self.ser.readline() + result = result[:-1].decode('utf-8') + logging.debug("Command response received") + return result + + def isAdjusting(self, software, telescope, serialport): + try: + logging.debug("Getting mount status...") + result = self.sendCommand(":GX#,#", software, telescope, serialport) + if not result: + raise Exception + logging.debug(result) + status = re.search(",(......),", result).group(1) + if status[3]=="-" and status[4]=="-": + return False + else: + return True + except: + if software == "Ekos": + logging.error("Problem determining mount status. Verify mount is connected to INDI. Stopping AutoPA.") + else: + logging.error("Problem determining mount status. Verify mount is connected to ASCOM. Stopping AutoPA.") + self.timer.stop() + raise ConnectionError + + def start(self): + if self.verbose.isChecked(): + for handler in self.logger.handlers: + if isinstance(handler, QTextEditLogger): + handler.setLevel(logging.DEBUG) + else: + for handler in self.logger.handlers: + if isinstance(handler, QTextEditLogger): + handler.setLevel(logging.INFO) + if self.aligned: + logging.info("Starting AutoPA routine") + self.aligned = False + self.accuracy = float(self.accuracy_input.text()) / 60 + self.timer.start(2500) + if self.telescopeName.text() == "": + if self.software.currentText() == "Ekos": + self.telescope = "LX200 GPS" + else: + self.telescope = "OpenAstroTracker" + if self.serialportInput.text() == "": + if self.software.currentText() == "Ekos": + self.serialport = "/dev/ttyACM0" + else: + self.serialport = self.serialportInput.text() + if self.software.currentText() == "Ekos": + import indi, serial + #Connect to indi server + self.indiclient, self.blobEvent = indi.indiserverConnect() + logging.debug("AutoPA connected to INDI server") + + #Disconnect OAT from indi to free up serial port + indi.disconnectScope(self.indiclient, self.telescope) + logging.debug("Telescope disconnected from INDI") + + print("Opening serial port on " + self.serialport + '...') + self.ser = serial.Serial(self.serialport, 19200, timeout = 0.2) + + # Refactored: Send initial mount commands in a background thread + commands = [ + ("Mount", ":GVP#,#"), + ("LST", ":XGL#,#"), + ("Latitude", ":Gt#,#"), + ("Longitude", ":Gg#,#"), + ("Hemisphere", ":XGHS#,#"), + ("Hardware", ":XGM#,#"), + ] + self.thread = QThread() + self.worker = CommandWorker(self, commands) + self.worker.moveToThread(self.thread) + self.worker.commandFinished.connect(self.handle_command_result) + self.worker.finished.connect(self.thread.quit) + self.worker.finished.connect(self.worker.deleteLater) + self.thread.finished.connect(self.thread.deleteLater) + self.thread.started.connect(self.worker.run) + self.thread.start() + + def stop(self): + self.aligned = True + self.timer.stop() + if self.software.currentText() == "Ekos": + import indi + self.ser.close() + #Reconnect OAT to indi and disconnect from server + indi.connectScope(self.indiclient, self.telescope) + logging.debug("Telescope reconnected to INDI") + indi.indiserverDisconnect(self.indiclient) + logging.debug("AutoPA disconnected from INDI server") + logging.info("Stopping AutoPA routine") + + def close(self): + sys.exit(self) + + def alignment(self): + if not self.aligned: + try: + if self.isAdjusting(self.software.currentText(), self.telescope, self.serialport): + logging.info("Mount is still adjusting position.") + self.stillAdjusting = True + else: + if self.stillAdjusting: + logging.info(f"Mount adjustment finished.") + self.stillAdjusting = False + self.adjustmentFinished = datetime.now() + self.solveCounter = 0 + logging.info(f"Getting latest log entry from {self.software.currentText()}.") + try: + log = self.getLatestLogEntry(softwareTypes[self.software.currentText()]["logpath"], softwareTypes[self.software.currentText()]["expression"]) + except FileNotFoundError: + log = None + logging.error(f"Error retrieving log from {self.software.currentText()}. Logfile may not exist or does not contain alignment info.") + if self.autorun: + sys.exit("Polar alignment values not found.") + if log is not None: + #Entry date is based on file timestamp, entry time is based on log entry data + currentEntry = datetime.strptime(datetime.fromtimestamp(log[1]).strftime("%Y-%m-%d") + " " + log[0][0][:-1], '%Y-%m-%d %H:%M:%S.%f') + if currentEntry != self.lastEntry and currentEntry > self.adjustmentFinished: + self.solveCounter += 1 #Increment counter if the latest unused entry was entered into the log after the adjustment was finished. + if (self.software.currentText() != "NINA3.x" and self.solveCounter >= 1) or (self.software.currentText() == "NINA3.x" and self.solveCounter >= 3): + #If using NINA, wait for three complete solves after adjustment is finished to prevent using old data + self.solveCounter = 0 + error = self.parseError(self.software.currentText(), log[0], float(self.azimuthOffset.text()), float(self.altitudeOffset.text())) + logging.info(f"Altitude error in arcminutes: {error[0]:.3f}\'") + logging.info(f"Azimuth error in arcminutes: {error[1]:.3f}\'") + logging.info(f"Total error in arcminutes: {error[2]:.3f}\'") + if abs(error[2]) < self.accuracy: + logging.info(f"Polar aligned to within {error[0]*60:.0f}\" altitude and {error[1]*60:.0f}\" azimuth.") + self.stop() + if self.autorun: + sys.exit(self) + return + else: + logging.info("Correction needed.") + result = self.sendCommand(f":MAL{error[0]:.4f}#", self.software.currentText(), self.telescope, self.serialport) + logging.debug(f"Adjusting altitude by {error[0]:.3f} arcminutes.") + result = self.sendCommand(f":MAZ{error[1]*(-1):.4f}#", self.software.currentText(), self.telescope, self.serialport) + logging.debug(f"Adjusting azimuth by {error[1]:.3f} arcminutes.") + self.lastEntry = currentEntry + else: + logging.info(f"Waiting for {self.software.currentText()} to re-solve since last adjustment finished.") + else: + logging.info(f"{self.software.currentText()} has not yet determined the polar alignment error.") + except ConnectionError: + self.stop() + if self.autorun: + sys.exit("AutoPA could not connect to mount.") + return + + def on_software_changed(self, software): + self.save_last_software(software) + + def on_accuracy_changed(self, accuracy): + self.save_last_accuracy(accuracy) + + def on_verbose_changed(self, state): + self.save_last_verbose(bool(state)) + if state: + for handler in self.logger.handlers: + if isinstance(handler, QTextEditLogger): + handler.setLevel(logging.DEBUG) + else: + for handler in self.logger.handlers: + if isinstance(handler, QTextEditLogger): + handler.setLevel(logging.INFO) + + def open_logs_folder(self): + if sys.platform == "win32": + os.startfile(self.log_dir) + elif sys.platform == "darwin": # macOS + os.system(f"open {self.log_dir}") + else: # Linux + os.system(f"xdg-open {self.log_dir}") + + def handle_command_result(self, label, result): + logging.info(f"{label:>10}: {result}") + QtWidgets.QApplication.processEvents() + +software_options = collections.OrderedDict([ + ('NINA3.x', ''), + ('Sharpcap4.x', ''), + ('Sharpcap3.2', ''), + ('Ekos', '') +]) + +today = date.today().strftime("%Y-%m-%d") +softwareTypes = { +"NINA3.x":{ "expression": r"[\d-]*T(.*?)\|.*PolarAlignment.cs\|.*Calculated Error: Az: (-*\d{2}).*?(\d{2})' (\d{2})\", Alt: (-*\d{2}).*?(\d{2})' (\d{2})\",.*", + "logpath": fr"{os.getenv('LOCALAPPDATA')}\NINA\Logs\*.log"}, +"Sharpcap3.2":{ "expression": "(?:Info:)\t(\d{2}:\d{2}:\d{2}.\d{7}).*(?:AltAzCor=)(?:Alt=)(.*)[,](?:Az=)(.*).\s(?:AltAzPole=)(?:Alt=)(.*)[,](?:Az=)(.*).[,]\s(?:AltAzOffset=).*", + "logpath": fr"{os.getenv('LOCALAPPDATA')}\SharpCap\logs\*.log"}, +"Sharpcap4.x":{ "expression": "(?:Info)\W*(\d{2}:\d{2}:\d{2}.\d{6}).*(?:AltAzCor=)(?:Alt=)(.*)[,](?:Az=)(.*).\s(?:AltAzPole=)(?:Alt=)(.*)[,](?:Az=)(.*).[,]\s(?:AltAzOffset=).*", + "logpath": fr"{os.getenv('LOCALAPPDATA')}\SharpCap\logs\*.log"}, +"Ekos":{ "expression": "(\d{2}:\d{2}:\d{2}.\d{3}).*(?:PAA Refresh).*(?:Corrected az:).*(?:\()(\s?-?\d\.\d{3}).*(?:alt:).*(\s?-?\d\.\d{3}).*(?:total:)", + "logpath": f"{Path.home()}/.local/share/kstars/logs/{today}/*.txt"} +} + +if __name__ == "__main__": + app = QtWidgets.QApplication(sys.argv) + window = QtWidgets.QDialog() + ui = AutoPA() + ui.setupUi(window) + + window.show() + sys.exit(app.exec_()) + diff --git a/AutoPA/Software/source/setup.py b/AutoPA/Software/source/setup.py index 779ae66..fda78fe 100644 --- a/AutoPA/Software/source/setup.py +++ b/AutoPA/Software/source/setup.py @@ -4,9 +4,9 @@ def getTargetName(): myOS = platform.system() if myOS == 'Linux': - return "AutoPA_v2.7.0" + return "AutoPA_v2.8.0" elif myOS == 'Windows': - return "AutoPA_v2.7.0.exe" + return "AutoPA_v2.8.0.exe" base = None if sys.platform == "win32": @@ -36,7 +36,7 @@ def getTargetName(): setup( name="AutoPA", - version="2.7.0", + version="2.8.0", description="AutoPA Polar Alignment Tool", executables=[ Executable( From 9d0aa4cf08a4ae7933532063db9cf5ce99da12cf Mon Sep 17 00:00:00 2001 From: Aleksandr Savin Date: Fri, 12 Sep 2025 14:21:38 +0200 Subject: [PATCH 3/3] fix(autopa_v2.py): Correct regex patterns for Sharpcap and Ekos log expressions --- AutoPA/Software/source/autopa_v2.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/AutoPA/Software/source/autopa_v2.py b/AutoPA/Software/source/autopa_v2.py index 9c78efc..87068db 100644 --- a/AutoPA/Software/source/autopa_v2.py +++ b/AutoPA/Software/source/autopa_v2.py @@ -613,11 +613,11 @@ def handle_command_result(self, label, result): softwareTypes = { "NINA3.x":{ "expression": r"[\d-]*T(.*?)\|.*PolarAlignment.cs\|.*Calculated Error: Az: (-*\d{2}).*?(\d{2})' (\d{2})\", Alt: (-*\d{2}).*?(\d{2})' (\d{2})\",.*", "logpath": fr"{os.getenv('LOCALAPPDATA')}\NINA\Logs\*.log"}, -"Sharpcap3.2":{ "expression": "(?:Info:)\t(\d{2}:\d{2}:\d{2}.\d{7}).*(?:AltAzCor=)(?:Alt=)(.*)[,](?:Az=)(.*).\s(?:AltAzPole=)(?:Alt=)(.*)[,](?:Az=)(.*).[,]\s(?:AltAzOffset=).*", +"Sharpcap3.2":{ "expression": r"(?:Info:)\t(\d{2}:\d{2}:\d{2}.\d{7}).*(?:AltAzCor=)(?:Alt=)(.*)[,](?:Az=)(.*).\s(?:AltAzPole=)(?:Alt=)(.*)[,](?:Az=)(.*).[,]\s(?:AltAzOffset=).*", "logpath": fr"{os.getenv('LOCALAPPDATA')}\SharpCap\logs\*.log"}, -"Sharpcap4.x":{ "expression": "(?:Info)\W*(\d{2}:\d{2}:\d{2}.\d{6}).*(?:AltAzCor=)(?:Alt=)(.*)[,](?:Az=)(.*).\s(?:AltAzPole=)(?:Alt=)(.*)[,](?:Az=)(.*).[,]\s(?:AltAzOffset=).*", +"Sharpcap4.x":{ "expression": r"(?:Info)\W*(\d{2}:\d{2}:\d{2}.\d{6}).*(?:AltAzCor=)(?:Alt=)(.*)[,](?:Az=)(.*).\s(?:AltAzPole=)(?:Alt=)(.*)[,](?:Az=)(.*).[,]\s(?:AltAzOffset=).*", "logpath": fr"{os.getenv('LOCALAPPDATA')}\SharpCap\logs\*.log"}, -"Ekos":{ "expression": "(\d{2}:\d{2}:\d{2}.\d{3}).*(?:PAA Refresh).*(?:Corrected az:).*(?:\()(\s?-?\d\.\d{3}).*(?:alt:).*(\s?-?\d\.\d{3}).*(?:total:)", +"Ekos":{ "expression": r"(\d{2}:\d{2}:\d{2}.\d{3}).*(?:PAA Refresh).*(?:Corrected az:).*(?:\()(\s?-?\d\.\d{3}).*(?:alt:).*(\s?-?\d\.\d{3}).*(?:total:)", "logpath": f"{Path.home()}/.local/share/kstars/logs/{today}/*.txt"} }