diff --git a/packetraven/connections/serial.py b/packetraven/connections/serial.py index 567e8b67..c748ec82 100644 --- a/packetraven/connections/serial.py +++ b/packetraven/connections/serial.py @@ -1,5 +1,7 @@ +from abc import ABC from datetime import datetime +from kiss import KISS from serial import Serial from packetraven.connections.base import ( @@ -9,17 +11,22 @@ TimeIntervalError, ) from packetraven.packets import APRSPacket +from packetraven.packets.parsing import InvalidPacketError -class SerialTNC(APRSPacketSource): - def __init__(self, serial_port: str = None, callsigns: [str] = None): +class SerialAPRSConnection(APRSPacketSource, ABC): + def __init__(self, serial_port: str = None, callsigns: [str] = None, baudrate: int = None): """ - Connect to TNC over given serial port. + Connect to a given serial port. :param serial_port: port name :param callsigns: list of callsigns to return from source + :param baudrate: baud rate of serial communication """ + if baudrate is None: + baudrate = 9600 + if serial_port is None or serial_port == '' or serial_port == 'auto': try: serial_port = next_open_serial_port() @@ -28,8 +35,19 @@ def __init__(self, serial_port: str = None, callsigns: [str] = None): else: serial_port = serial_port.strip('"') - self.serial_connection = Serial(serial_port, baudrate=9600, timeout=1) - super().__init__(self.serial_connection.port, callsigns) + super().__init__(serial_port, callsigns) + + self.__baudrate = baudrate + + @property + def baudrate(self) -> int: + return self.__baudrate + + +class SerialTNC(SerialAPRSConnection): + def __init__(self, serial_port: str = None, callsigns: [str] = None, baudrate: int = None): + super().__init__(serial_port, callsigns, baudrate) + self.serial_connection = Serial(port=self.location, baudrate=self.baudrate, timeout=1) self.__last_access_time = None @property @@ -57,3 +75,43 @@ def close(self): def __repr__(self): return f'{self.__class__.__name__}({repr(self.location)}, {repr(self.callsigns)})' + + +class SerialKISS(APRSPacketSource): + def __init__(self, serial_port: str = None, callsigns: [str] = None, baudrate: int = None): + super().__init__(serial_port, callsigns, baudrate) + self.serial_connection = KISS(port=self.location, speed=self.baudrate, timeout=1) + self.__last_access_time = None + + @property + def packets(self) -> [APRSPacket]: + if self.__last_access_time is not None and self.interval is not None: + interval = datetime.now() - self.__last_access_time + if interval < self.interval: + raise TimeIntervalError( + f'interval {interval} less than minimum interval {self.interval}' + ) + + packets = [] + + def add_frames(frame: str): + try: + packet = APRSPacket.from_frame(frame) + if self.callsigns is not None: + if packet.from_callsign in self.callsigns: + if packet not in packets: + packets.append(packet) + except InvalidPacketError: + pass + + self.serial_connection.start() + self.serial_connection.read(callback=add_frames, readmode=False) + + self.__last_access_time = datetime.now() + return packets + + def close(self): + self.serial_connection.stop() + + def __repr__(self): + return f'{self.__class__.__name__}({repr(self.location)}, {repr(self.callsigns)})'