diff --git a/examples/calibrate_cad.py b/examples/calibrate_cad.py index 96799a5..bcb09bc 100644 --- a/examples/calibrate_cad.py +++ b/examples/calibrate_cad.py @@ -304,7 +304,7 @@ def main(): parser = argparse.ArgumentParser(description="CAD Calibration Tool with Staged Workflow") parser.add_argument( "--radio", - choices=["waveshare", "uconsole", "meshadv-mini"], + choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc", "kiss-modem"], default="waveshare", help="Radio type", ) diff --git a/examples/common.py b/examples/common.py index 5f280b2..9db8ff5 100644 --- a/examples/common.py +++ b/examples/common.py @@ -32,12 +32,16 @@ from pymc_core.node.node import MeshNode -def create_radio(radio_type: str = "waveshare", serial_port: str = "/dev/ttyUSB0") -> LoRaRadio: +def create_radio( + radio_type: str = "waveshare", + serial_port: str = "/dev/ttyUSB0", +) -> LoRaRadio: """Create a radio instance with configuration for specified hardware. Args: - radio_type: Type of radio hardware ("waveshare", "uconsole", "meshadv-mini", "kiss-tnc", or "ch341") - serial_port: Serial port for KISS TNC (only used with "kiss-tnc" type) + radio_type: Type of radio hardware ("waveshare", "uconsole", "meshadv-mini", + "kiss-tnc", "kiss-modem", or "ch341") + serial_port: Serial port for KISS devices (only used with "kiss-tnc" or "kiss-modem") Returns: Radio instance configured for the specified hardware @@ -72,6 +76,38 @@ def create_radio(radio_type: str = "waveshare", serial_port: str = "/dev/ttyUSB0 ) return kiss_wrapper + # Check if this is a MeshCore KISS Modem configuration + if radio_type == "kiss-modem": + from pymc_core.hardware.kiss_modem_wrapper import KissModemWrapper + + logger.debug("Using MeshCore KISS Modem Wrapper") + + # MeshCore KISS Modem configuration + # Note: Sync word is configured at firmware build time + modem_config = { + "frequency": int(869.618 * 1000000), # EU: 869.618 MHz + "bandwidth": int(62.5 * 1000), # 62.5 kHz + "spreading_factor": 8, # LoRa SF8 + "coding_rate": 8, # LoRa CR 4/8 + "power": 22, # TX power + } + + # Create KISS modem wrapper with specified port. + # To enable host-side LBT (e.g. full-duplex on half-duplex link), call + # modem_wrapper.set_lbt_enabled(True) after creation. + modem_wrapper = KissModemWrapper( + port=serial_port, + baudrate=115200, + radio_config=modem_config, + auto_configure=True, + ) + + logger.info("Created MeshCore KISS Modem Wrapper") + logger.info( + f"Frequency: {modem_config['frequency']/1000000:.3f}MHz, TX Power: {modem_config['power']}dBm" + ) + return modem_wrapper + # Check if this is a CH341 configuration if radio_type == "ch341": from pymc_core.hardware.ch341.ch341_gpio_manager import CH341GPIOManager @@ -181,7 +217,8 @@ def create_radio(radio_type: str = "waveshare", serial_port: str = "/dev/ttyUSB0 if radio_type not in configs: raise ValueError( - f"Unknown radio type: {radio_type}. Use 'waveshare', 'meshadv-mini', 'uconsole', 'kiss-tnc', or 'ch341'" + f"Unknown radio type: {radio_type}. " + "Use 'waveshare', 'meshadv-mini', 'uconsole', 'kiss-tnc', 'kiss-modem', or 'ch341'" ) radio_kwargs = configs[radio_type] @@ -203,27 +240,29 @@ def create_radio(radio_type: str = "waveshare", serial_port: str = "/dev/ttyUSB0 def create_mesh_node( - node_name: str = "ExampleNode", radio_type: str = "waveshare", serial_port: str = "/dev/ttyUSB0" + node_name: str = "ExampleNode", + radio_type: str = "waveshare", + serial_port: str = "/dev/ttyUSB0", + use_modem_identity: bool = False, ) -> tuple[MeshNode, LocalIdentity]: """Create a mesh node with radio. Args: node_name: Name for the mesh node - radio_type: Type of radio hardware ("waveshare", "uconsole", "meshadv-mini", "kiss-tnc", or "ch341") - serial_port: Serial port for KISS TNC (only used with "kiss-tnc" type) + radio_type: Type of radio hardware ("waveshare", "uconsole", "meshadv-mini", + "kiss-tnc", "kiss-modem", or "ch341") + serial_port: Serial port for KISS devices (only used with "kiss-tnc" or "kiss-modem") + use_modem_identity: If True and radio_type is "kiss-modem", use the modem's + cryptographic identity instead of generating a local one. + This keeps the private key secure on the modem hardware. Returns: - Tuple of (MeshNode, LocalIdentity) + Tuple of (MeshNode, Identity) - Identity may be LocalIdentity or ModemIdentity """ logger.info(f"Creating mesh node with name: {node_name} using {radio_type} radio") try: - # Create a local identity (this generates a new keypair) - logger.debug("Creating LocalIdentity...") - identity = LocalIdentity() - logger.info(f"Created identity with public key: {identity.get_public_key().hex()[:16]}...") - - # Create the radio + # Create the radio first (needed for modem identity) logger.debug("Creating radio...") radio = create_radio(radio_type, serial_port) @@ -241,6 +280,19 @@ def create_mesh_node( logger.error("Failed to connect KISS radio") print(f"Failed to connect to KISS radio on {serial_port}") raise Exception(f"KISS radio connection failed on {serial_port}") + elif radio_type == "kiss-modem": + logger.debug("Connecting MeshCore KISS modem...") + if radio.connect(): + logger.info("KISS modem connected successfully") + print(f"KISS modem connected to {serial_port}") + if hasattr(radio, "modem_version") and radio.modem_version: + print(f"Modem version: {radio.modem_version}") + if hasattr(radio, "modem_identity") and radio.modem_identity: + print(f"Modem identity: {radio.modem_identity.hex()[:16]}...") + else: + logger.error("Failed to connect KISS modem") + print(f"Failed to connect to KISS modem on {serial_port}") + raise Exception(f"KISS modem connection failed on {serial_port}") elif radio_type == "ch341": logger.debug("Initializing CH341 radio...") ok = radio.begin() @@ -255,6 +307,19 @@ def create_mesh_node( raise RuntimeError("SX1262 radio begin() returned False") logger.info("Radio initialized successfully") + # Create identity - use modem identity if requested and available + if use_modem_identity and radio_type == "kiss-modem": + from pymc_core.protocol.modem_identity import ModemIdentity + + logger.debug("Creating ModemIdentity from KISS modem...") + identity = ModemIdentity(radio) + logger.info(f"Using modem identity: {identity.get_public_key().hex()[:16]}...") + print(f"Using modem identity (private key secured on modem)") + else: + logger.debug("Creating LocalIdentity...") + identity = LocalIdentity() + logger.info(f"Created local identity: {identity.get_public_key().hex()[:16]}...") + # Create a mesh node with the radio and identity config = {"node": {"name": node_name}} logger.debug(f"Creating MeshNode with config: {config}") diff --git a/examples/discover_nodes.py b/examples/discover_nodes.py index 097be79..5e69845 100644 --- a/examples/discover_nodes.py +++ b/examples/discover_nodes.py @@ -149,7 +149,7 @@ def main(): parser = argparse.ArgumentParser(description="Discover nearby mesh nodes") parser.add_argument( "--radio-type", - choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc"], + choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc", "kiss-modem"], default="waveshare", help="Radio hardware type (default: waveshare)", ) diff --git a/examples/login_server.py b/examples/login_server.py index f3dbb61..368f400 100644 --- a/examples/login_server.py +++ b/examples/login_server.py @@ -381,7 +381,7 @@ def main(): ) parser.add_argument( "--radio-type", - choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc"], + choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc", "kiss-modem"], default="waveshare", help="Radio hardware type (default: waveshare)", ) diff --git a/examples/ping_repeater_trace.py b/examples/ping_repeater_trace.py index 340864f..5451bad 100644 --- a/examples/ping_repeater_trace.py +++ b/examples/ping_repeater_trace.py @@ -86,7 +86,7 @@ def main(): parser = argparse.ArgumentParser(description="Ping a repeater using trace packets") parser.add_argument( "--radio-type", - choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc"], + choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc", "kiss-modem"], default="waveshare", help="Radio hardware type (default: waveshare)", ) diff --git a/examples/respond_to_discovery.py b/examples/respond_to_discovery.py index bf8fb93..f0fad58 100644 --- a/examples/respond_to_discovery.py +++ b/examples/respond_to_discovery.py @@ -121,7 +121,7 @@ def main(): parser = argparse.ArgumentParser(description="Respond to mesh node discovery requests") parser.add_argument( "--radio-type", - choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc"], + choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc", "kiss-modem"], default="waveshare", help="Radio hardware type (default: waveshare)", ) diff --git a/examples/send_channel_message.py b/examples/send_channel_message.py index 0280506..8d48e2c 100644 --- a/examples/send_channel_message.py +++ b/examples/send_channel_message.py @@ -71,7 +71,7 @@ def main(): parser = argparse.ArgumentParser(description="Send a channel message to the Public channel") parser.add_argument( "--radio-type", - choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc"], + choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc", "kiss-modem"], default="waveshare", help="Radio hardware type (default: waveshare)", ) diff --git a/examples/send_direct_advert.py b/examples/send_direct_advert.py index 2953fc0..9d9046e 100644 --- a/examples/send_direct_advert.py +++ b/examples/send_direct_advert.py @@ -51,7 +51,7 @@ def main(): parser = argparse.ArgumentParser(description="Send a direct advertisement packet") parser.add_argument( "--radio-type", - choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc"], + choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc", "kiss-modem"], default="waveshare", help="Radio hardware type (default: waveshare)", ) diff --git a/examples/send_flood_advert.py b/examples/send_flood_advert.py index d6288ef..ecf51e2 100644 --- a/examples/send_flood_advert.py +++ b/examples/send_flood_advert.py @@ -58,7 +58,7 @@ def main(): parser = argparse.ArgumentParser(description="Send a flood advertisement packet") parser.add_argument( "--radio-type", - choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc"], + choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc", "kiss-modem"], default="waveshare", help="Radio hardware type (default: waveshare)", ) diff --git a/examples/send_text_message.py b/examples/send_text_message.py index 01bfcf1..fa77f2a 100644 --- a/examples/send_text_message.py +++ b/examples/send_text_message.py @@ -78,7 +78,7 @@ def main(): parser = argparse.ArgumentParser(description="Send a text message to the mesh network") parser.add_argument( "--radio-type", - choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc"], + choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc", "kiss-modem"], default="waveshare", help="Radio hardware type (default: waveshare)", ) diff --git a/examples/send_tracked_advert.py b/examples/send_tracked_advert.py index b182767..c20fcc5 100644 --- a/examples/send_tracked_advert.py +++ b/examples/send_tracked_advert.py @@ -93,7 +93,7 @@ def main(): parser = argparse.ArgumentParser(description="Send a location-tracked advertisement") parser.add_argument( "--radio-type", - choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc", "ch341"], + choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc", "kiss-modem", "ch341"], default="waveshare", help="Radio hardware type (default: waveshare)", ) @@ -106,7 +106,7 @@ def main(): args = parser.parse_args() print(f"Using {args.radio_type} radio configuration") - if args.radio_type == "kiss-tnc": + if args.radio_type in ("kiss-tnc", "kiss-modem"): print(f"Serial port: {args.serial_port}") try: diff --git a/scripts/test_modem_crypto.py b/scripts/test_modem_crypto.py new file mode 100644 index 0000000..cb4bbf4 --- /dev/null +++ b/scripts/test_modem_crypto.py @@ -0,0 +1,239 @@ +#!/usr/bin/env python3 +""" +Test script to verify modem cryptographic operations match Python implementation. + +Compares: +- Key exchange / shared secret computation +- Signing and verification +- Hashing +- Encryption/decryption +""" + +import sys +import os + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "src")) + +from pymc_core.hardware.kiss_modem_wrapper import KissModemWrapper +from pymc_core.protocol.identity import Identity, LocalIdentity +from pymc_core.protocol.crypto import CryptoUtils + + +def test_modem_crypto(port: str = "/dev/cu.usbmodem1101"): + """Run cryptographic comparison tests between modem and Python.""" + + print(f"Connecting to modem on {port}...") + modem = KissModemWrapper(port=port, auto_configure=False) + + if not modem.connect(): + print("ERROR: Failed to connect to modem") + return False + + print(f"Connected! Modem version: {modem.modem_version}") + print(f"Modem identity: {modem.modem_identity.hex()}") + print() + + all_passed = True + + # ========================================================================== + # Test 1: Hash comparison + # ========================================================================== + print("=" * 60) + print("Test 1: SHA-256 Hash") + print("=" * 60) + + test_data = b"Hello, MeshCore!" + + modem_hash = modem.hash_data(test_data) + python_hash = CryptoUtils.sha256(test_data) + + print(f"Test data: {test_data}") + print(f"Modem hash: {modem_hash.hex() if modem_hash else 'FAILED'}") + print(f"Python hash: {python_hash.hex()}") + + if modem_hash == python_hash: + print("PASS: Hashes match!") + else: + print("FAIL: Hashes do not match!") + all_passed = False + print() + + # ========================================================================== + # Test 2: Signature verification + # ========================================================================== + print("=" * 60) + print("Test 2: Sign and Verify") + print("=" * 60) + + message = b"Test message for signing" + + # Get modem to sign + modem_signature = modem.sign_data(message) + print(f"Message: {message}") + print(f"Modem signature: {modem_signature.hex() if modem_signature else 'FAILED'}") + + if modem_signature: + # Verify with modem + modem_verify = modem.verify_signature(modem.modem_identity, modem_signature, message) + print(f"Modem self-verify: {modem_verify}") + + # Verify with Python using modem's public key + modem_identity_obj = Identity(modem.modem_identity) + python_verify = modem_identity_obj.verify(message, modem_signature) + print(f"Python verify of modem signature: {python_verify}") + + if modem_verify and python_verify: + print("PASS: Signature verified by both!") + else: + print("FAIL: Signature verification mismatch!") + all_passed = False + else: + print("FAIL: Modem signing failed!") + all_passed = False + print() + + # ========================================================================== + # Test 3: Key Exchange / Shared Secret + # ========================================================================== + print("=" * 60) + print("Test 3: Key Exchange / Shared Secret") + print("=" * 60) + + # Create a local identity to exchange keys with + local_identity = LocalIdentity() + print(f"Local identity pubkey: {local_identity.get_public_key().hex()}") + print(f"Modem identity pubkey: {modem.modem_identity.hex()}") + + # Modem computes shared secret with local's Ed25519 pubkey + modem_shared = modem.key_exchange(local_identity.get_public_key()) + print(f"Modem shared secret: {modem_shared.hex() if modem_shared else 'FAILED'}") + + if modem_shared: + # Python computes shared secret: create Identity from modem's pubkey, + # then call calc_shared_secret with local's private key + modem_as_peer = Identity(modem.modem_identity) + python_shared = modem_as_peer.calc_shared_secret(local_identity.get_private_key()) + print(f"Python shared secret: {python_shared.hex()}") + + if modem_shared == python_shared: + print("PASS: Shared secrets match!") + else: + print("FAIL: Shared secrets do not match!") + all_passed = False + else: + print("FAIL: Modem key exchange failed!") + all_passed = False + print() + + # ========================================================================== + # Test 4: Encryption/Decryption round-trip + # ========================================================================== + print("=" * 60) + print("Test 4: Encryption/Decryption Round-trip") + print("=" * 60) + + if modem_shared: + plaintext = b"Secret message for encryption test!" + key = modem_shared # Use the shared secret as encryption key + + print(f"Plaintext: {plaintext}") + print(f"Key: {key.hex()[:32]}...") + + # Encrypt with modem + encrypt_result = modem.encrypt_data(key, plaintext) + if encrypt_result: + mac, ciphertext = encrypt_result + print(f"Modem encrypted - MAC: {mac.hex()}, Ciphertext: {ciphertext.hex()}") + + # Decrypt with modem + modem_decrypted = modem.decrypt_data(key, mac, ciphertext) + if modem_decrypted: + # Trim padding (modem pads to block size) + modem_decrypted = modem_decrypted[:len(plaintext)] + print(f"Modem decrypted: {modem_decrypted}") + + if modem_decrypted == plaintext: + print("PASS: Modem encrypt/decrypt round-trip works!") + else: + print("FAIL: Decrypted data doesn't match!") + all_passed = False + else: + print("FAIL: Modem decryption failed!") + all_passed = False + else: + print("FAIL: Modem encryption failed!") + all_passed = False + + # Now test Python encrypt -> Modem decrypt + print() + print("Cross-implementation test (Python encrypt -> Modem decrypt):") + + # Python encryption uses AES key (first 16 bytes) and shared secret for MAC + aes_key = key[:16] + python_encrypted = CryptoUtils.encrypt_then_mac(aes_key, key, plaintext) + python_mac = python_encrypted[:2] + python_ciphertext = python_encrypted[2:] + print(f"Python encrypted - MAC: {python_mac.hex()}, Ciphertext: {python_ciphertext.hex()}") + + # Decrypt with modem + modem_decrypted2 = modem.decrypt_data(key, python_mac, python_ciphertext) + if modem_decrypted2: + modem_decrypted2 = modem_decrypted2[:len(plaintext)] + print(f"Modem decrypted Python ciphertext: {modem_decrypted2}") + + if modem_decrypted2 == plaintext: + print("PASS: Cross-implementation encryption works!") + else: + print("FAIL: Cross-implementation decryption mismatch!") + all_passed = False + else: + print("FAIL: Modem failed to decrypt Python ciphertext!") + all_passed = False + print() + + # ========================================================================== + # Test 5: Random number generation + # ========================================================================== + print("=" * 60) + print("Test 5: Random Number Generation") + print("=" * 60) + + random1 = modem.get_random(32) + random2 = modem.get_random(32) + + print(f"Random 1: {random1.hex() if random1 else 'FAILED'}") + print(f"Random 2: {random2.hex() if random2 else 'FAILED'}") + + if random1 and random2: + if random1 != random2: + print("PASS: Random values are different (as expected)") + else: + print("FAIL: Random values are identical (suspicious!)") + all_passed = False + else: + print("FAIL: Random generation failed!") + all_passed = False + print() + + # ========================================================================== + # Summary + # ========================================================================== + print("=" * 60) + print("SUMMARY") + print("=" * 60) + + if all_passed: + print("ALL TESTS PASSED!") + else: + print("SOME TESTS FAILED!") + + modem.disconnect() + print("Modem disconnected.") + + return all_passed + + +if __name__ == "__main__": + port = sys.argv[1] if len(sys.argv) > 1 else "/dev/cu.usbmodem1101" + success = test_modem_crypto(port) + sys.exit(0 if success else 1) diff --git a/src/pymc_core/hardware/__init__.py b/src/pymc_core/hardware/__init__.py index 2802c28..c865506 100644 --- a/src/pymc_core/hardware/__init__.py +++ b/src/pymc_core/hardware/__init__.py @@ -22,6 +22,24 @@ _SX1262_AVAILABLE = False SX1262Radio = None +# Conditional import for KissSerialWrapper (requires pyserial) +try: + from .kiss_serial_wrapper import KissSerialWrapper + + _KISS_SERIAL_AVAILABLE = True +except ImportError: + _KISS_SERIAL_AVAILABLE = False + KissSerialWrapper = None + +# Conditional import for KissModemWrapper (requires pyserial) +try: + from .kiss_modem_wrapper import KissModemWrapper + + _KISS_MODEM_AVAILABLE = True +except ImportError: + _KISS_MODEM_AVAILABLE = False + KissModemWrapper = None + __all__ = ["LoRaRadio"] # Add WsRadio to exports if available @@ -31,4 +49,11 @@ # Add SX1262Radio to exports if available if _SX1262_AVAILABLE: __all__.append("SX1262Radio") - __all__.append("SX1262Radio") + +# Add KissSerialWrapper to exports if available +if _KISS_SERIAL_AVAILABLE: + __all__.append("KissSerialWrapper") + +# Add KissModemWrapper to exports if available +if _KISS_MODEM_AVAILABLE: + __all__.append("KissModemWrapper") diff --git a/src/pymc_core/hardware/kiss_modem_wrapper.py b/src/pymc_core/hardware/kiss_modem_wrapper.py new file mode 100644 index 0000000..d93e187 --- /dev/null +++ b/src/pymc_core/hardware/kiss_modem_wrapper.py @@ -0,0 +1,1429 @@ +""" +MeshCore KISS Modem Protocol Wrapper + +Implements the MeshCore KISS modem protocol for sending/receiving +MeshCore packets over LoRa and cryptographic operations. + +Protocol spec (frame format, SetHardware sub-commands, Data + RxMeta ordering): + https://github.com/meshcore-dev/MeshCore/blob/dev/docs/kiss_modem_protocol.md +""" + +import asyncio +import inspect +import logging +import random +import struct +import threading +from collections import deque +from concurrent.futures import ThreadPoolExecutor +from typing import Any, Callable, Dict, Optional, Union + +# RX callback: (data) for backward compat, or (data, rssi, snr) for per-packet metrics +RxCallback = Union[ + Callable[[bytes], None], + Callable[[bytes, Optional[int], Optional[float]], None], +] + + +def _invoke_rx_callback( + callback: RxCallback, + data: bytes, + rssi: int, + snr: float, +) -> None: + """Invoke RX callback with 1 or 3 args depending on what it accepts.""" + try: + sig = inspect.signature(callback) + nparams = len([p for p in sig.parameters if p != "self"]) + except (ValueError, TypeError): + nparams = 1 + if nparams >= 3: + callback(data, rssi, snr) + else: + callback(data) + +import serial + +from .base import LoRaRadio + +# KISS Protocol Constants (shared with standard KISS) +KISS_FEND = 0xC0 # Frame End +KISS_FESC = 0xDB # Frame Escape +KISS_TFEND = 0xDC # Transposed Frame End +KISS_TFESC = 0xDD # Transposed Frame Escape + +# Standard KISS type bytes (port in bits 7-4, command in bits 3-0) +CMD_DATA = 0x00 # Data frame (raw packet) +KISS_CMD_TXDELAY = 0x01 # Transmitter keyup delay in 10ms units (firmware default 50 = 500ms) +KISS_CMD_PERSISTENCE = 0x02 # CSMA persistence 0-255 (firmware default 63) +KISS_CMD_SLOTTIME = 0x03 # CSMA slot interval in 10ms units (firmware default 10 = 100ms) +KISS_CMD_TXTAIL = 0x04 # Post-TX hold time in 10ms units (default: 0) +KISS_CMD_FULLDUPLEX = 0x05 # 0 = half duplex, nonzero = full duplex (default: 0) +KISS_CMD_SETHARDWARE = 0x06 # SetHardware: first payload byte is sub-command +KISS_CMD_RETURN = 0xFF # Exit KISS mode (no-op) + +# SetHardware request sub-commands (Host -> TNC, first data byte inside 0x06) +HW_CMD_GET_IDENTITY = 0x01 +HW_CMD_GET_RANDOM = 0x02 +HW_CMD_VERIFY_SIGNATURE = 0x03 +HW_CMD_SIGN_DATA = 0x04 +HW_CMD_ENCRYPT_DATA = 0x05 +HW_CMD_DECRYPT_DATA = 0x06 +HW_CMD_KEY_EXCHANGE = 0x07 +HW_CMD_HASH = 0x08 +HW_CMD_SET_RADIO = 0x09 +HW_CMD_SET_TX_POWER = 0x0A +HW_CMD_GET_RADIO = 0x0B +HW_CMD_GET_TX_POWER = 0x0C +HW_CMD_GET_CURRENT_RSSI = 0x0D +HW_CMD_IS_CHANNEL_BUSY = 0x0E +HW_CMD_GET_AIRTIME = 0x0F +HW_CMD_GET_NOISE_FLOOR = 0x10 +HW_CMD_GET_VERSION = 0x11 +HW_CMD_GET_STATS = 0x12 +HW_CMD_GET_BATTERY = 0x13 +HW_CMD_GET_MCU_TEMP = 0x14 +HW_CMD_GET_SENSORS = 0x15 +HW_CMD_GET_DEVICE_NAME = 0x16 +HW_CMD_PING = 0x17 +HW_CMD_REBOOT = 0x18 +HW_CMD_SET_SIGNAL_REPORT = 0x19 +HW_CMD_GET_SIGNAL_REPORT = 0x1A + +# SetHardware response sub-commands (TNC -> Host) +# Spec: response = command | 0x80 for command responses; 0xF0+ for generic/unsolicited +HW_RESP_IDENTITY = 0x81 # HW_CMD_GET_IDENTITY | 0x80 +HW_RESP_RANDOM = 0x82 +HW_RESP_VERIFY = 0x83 +HW_RESP_SIGNATURE = 0x84 +HW_RESP_ENCRYPTED = 0x85 +HW_RESP_DECRYPTED = 0x86 +HW_RESP_SHARED_SECRET = 0x87 +HW_RESP_HASH = 0x88 +HW_RESP_RADIO = 0x8B # HW_CMD_GET_RADIO | 0x80 +HW_RESP_TX_POWER = 0x8C +HW_RESP_CURRENT_RSSI = 0x8D +HW_RESP_CHANNEL_BUSY = 0x8E +HW_RESP_AIRTIME = 0x8F +HW_RESP_NOISE_FLOOR = 0x90 +HW_RESP_VERSION = 0x91 +HW_RESP_STATS = 0x92 +HW_RESP_BATTERY = 0x93 +HW_RESP_MCU_TEMP = 0x94 +HW_RESP_SENSORS = 0x95 +HW_RESP_DEVICE_NAME = 0x96 +HW_RESP_PONG = 0x97 # HW_CMD_PING | 0x80 +HW_RESP_OK = 0xF0 +HW_RESP_ERROR = 0xF1 +HW_RESP_TX_DONE = 0xF8 # Unsolicited +HW_RESP_RX_META = 0xF9 # Unsolicited +HW_RESP_SIGNAL_REPORT = 0x9A # HW_CMD_GET_SIGNAL_REPORT | 0x80 + +# Backward-compatible aliases (same values as HW_*) +CMD_GET_IDENTITY = HW_CMD_GET_IDENTITY +CMD_GET_RANDOM = HW_CMD_GET_RANDOM +CMD_VERIFY_SIGNATURE = HW_CMD_VERIFY_SIGNATURE +CMD_SIGN_DATA = HW_CMD_SIGN_DATA +CMD_ENCRYPT_DATA = HW_CMD_ENCRYPT_DATA +CMD_DECRYPT_DATA = HW_CMD_DECRYPT_DATA +CMD_KEY_EXCHANGE = HW_CMD_KEY_EXCHANGE +CMD_HASH = HW_CMD_HASH +CMD_SET_RADIO = HW_CMD_SET_RADIO +CMD_SET_TX_POWER = HW_CMD_SET_TX_POWER +CMD_GET_RADIO = HW_CMD_GET_RADIO +CMD_GET_TX_POWER = HW_CMD_GET_TX_POWER +CMD_GET_CURRENT_RSSI = HW_CMD_GET_CURRENT_RSSI +CMD_IS_CHANNEL_BUSY = HW_CMD_IS_CHANNEL_BUSY +CMD_GET_AIRTIME = HW_CMD_GET_AIRTIME +CMD_GET_NOISE_FLOOR = HW_CMD_GET_NOISE_FLOOR +CMD_GET_VERSION = HW_CMD_GET_VERSION +CMD_GET_STATS = HW_CMD_GET_STATS +CMD_GET_BATTERY = HW_CMD_GET_BATTERY +CMD_GET_SENSORS = HW_CMD_GET_SENSORS +CMD_PING = HW_CMD_PING + +RESP_IDENTITY = HW_RESP_IDENTITY +RESP_RANDOM = HW_RESP_RANDOM +RESP_VERIFY = HW_RESP_VERIFY +RESP_SIGNATURE = HW_RESP_SIGNATURE +RESP_ENCRYPTED = HW_RESP_ENCRYPTED +RESP_DECRYPTED = HW_RESP_DECRYPTED +RESP_SHARED_SECRET = HW_RESP_SHARED_SECRET +RESP_HASH = HW_RESP_HASH +RESP_OK = HW_RESP_OK +RESP_RADIO = HW_RESP_RADIO +RESP_TX_POWER = HW_RESP_TX_POWER +RESP_VERSION = HW_RESP_VERSION +RESP_ERROR = HW_RESP_ERROR +RESP_TX_DONE = HW_RESP_TX_DONE +RESP_CURRENT_RSSI = HW_RESP_CURRENT_RSSI +RESP_CHANNEL_BUSY = HW_RESP_CHANNEL_BUSY +RESP_AIRTIME = HW_RESP_AIRTIME +RESP_NOISE_FLOOR = HW_RESP_NOISE_FLOOR +RESP_STATS = HW_RESP_STATS +RESP_BATTERY = HW_RESP_BATTERY +RESP_PONG = HW_RESP_PONG +RESP_SENSORS = HW_RESP_SENSORS + +# Error codes (SetHardware Error response payload) +HW_ERR_INVALID_LENGTH = 0x01 +HW_ERR_INVALID_PARAM = 0x02 +HW_ERR_NO_CALLBACK = 0x03 +HW_ERR_MAC_FAILED = 0x04 +HW_ERR_UNKNOWN_CMD = 0x05 +HW_ERR_ENCRYPT_FAILED = 0x06 + +ERR_INVALID_LENGTH = HW_ERR_INVALID_LENGTH +ERR_INVALID_PARAM = HW_ERR_INVALID_PARAM +ERR_NO_CALLBACK = HW_ERR_NO_CALLBACK +ERR_MAC_FAILED = HW_ERR_MAC_FAILED +ERR_UNKNOWN_CMD = HW_ERR_UNKNOWN_CMD +ERR_ENCRYPT_FAILED = HW_ERR_ENCRYPT_FAILED + +# Buffer and timing constants +MAX_FRAME_SIZE = 512 +# Data payload ≤255 bytes (MeshCore MAX_TRANS_UNIT); queue bounds unpaired Data frames +KISS_MAX_PACKET_SIZE = 255 +MAX_PENDING_RX_FRAMES = 64 # max Data frames queued awaiting RxMeta; each payload ≤255 bytes +RX_BUFFER_SIZE = 1024 +TX_BUFFER_SIZE = 1024 +DEFAULT_BAUDRATE = 115200 +DEFAULT_TIMEOUT = 1.0 +RESPONSE_TIMEOUT = 5.0 # Timeout for command responses + +logger = logging.getLogger("KissModemWrapper") + + +class KissModemWrapper(LoRaRadio): + """ + MeshCore KISS Modem Protocol Interface + + Provides full-duplex KISS protocol communication with MeshCore modem firmware. + Supports packet transmission/reception, radio configuration, and cryptographic + operations via the modem's identity. + + Implements the LoRaRadio interface for PyMC Core compatibility. + + Threading Model: + This wrapper uses background threads for serial RX/TX. The RX callback + (on_frame_received) is invoked from the RX thread by default. For async + applications, call set_event_loop() to have callbacks scheduled onto + the event loop via call_soon_threadsafe(). + + RX Callback Signature: + The callback may accept either: + - (data: bytes) - backward compatible, single argument + - (data: bytes, rssi: int, snr: float) - per-packet signal metrics + + When using the 3-argument form, rssi and snr are the values for that + specific packet, avoiding race conditions with get_last_rssi/get_last_snr. + """ + + def __init__( + self, + port: str, + baudrate: int = DEFAULT_BAUDRATE, + timeout: float = DEFAULT_TIMEOUT, + on_frame_received: Optional[RxCallback] = None, + radio_config: Optional[Dict[str, Any]] = None, + auto_configure: bool = True, + lbt_enabled: bool = False, + ): + """ + Initialize MeshCore KISS Modem Wrapper + + Args: + port: Serial port device path (e.g., '/dev/ttyUSB0', '/dev/ttyACM0') + baudrate: Serial communication baud rate (default: 115200) + timeout: Serial read timeout in seconds (default: 1.0) + on_frame_received: Callback for received data packets. May be invoked + from a background thread unless set_event_loop() is used. + radio_config: Optional radio configuration dict with keys: + frequency, bandwidth, spreading_factor, coding_rate, + power (or tx_power), tx_delay_ms (KISS key-up delay in ms; + default 50), kiss_persistence (0-255), kiss_slottime_ms, + kiss_txtail_ms (post-TX hold), kiss_full_duplex (bool), + and SetHardware options as needed + auto_configure: If True, automatically configure radio on connect + lbt_enabled: If True, run Listen-Before-Talk before each send (default False). + For standard half-duplex the modem firmware performs p-persistent + CSMA; host-side LBT is redundant. Only enable for the marginal case + of full-duplex modem on a physically half-duplex link, where a + host "is channel busy?" check can delay submitting the next frame + to avoid collisions. + """ + self.port = port + self.baudrate = baudrate + self.timeout = timeout + self.auto_configure = auto_configure + self.lbt_enabled = lbt_enabled + + self.radio_config = radio_config or {} + self.is_configured = False + + self.serial_conn: Optional[serial.Serial] = None + self.is_connected = False + + self.rx_buffer = deque(maxlen=RX_BUFFER_SIZE) + self.tx_buffer = deque(maxlen=TX_BUFFER_SIZE) + + self.rx_frame_buffer = bytearray() + self.in_frame = False + self.escaped = False + + self.rx_thread: Optional[threading.Thread] = None + self.tx_thread: Optional[threading.Thread] = None + self.stop_event = threading.Event() + + # Callbacks + self.on_frame_received = on_frame_received + + # Event loop for thread-safe async callback invocation + self._event_loop: Optional[asyncio.AbstractEventLoop] = None + # When no event loop is set, run callback in a worker so RX thread never blocks + self._callback_executor: Optional[ThreadPoolExecutor] = None + + # Response handling + self._response_event = threading.Event() + self._pending_response: Optional[tuple[int, bytes]] = None + self._response_lock = threading.Lock() + + # TX completion tracking + self._tx_done_event = threading.Event() + self._tx_done_result: Optional[bool] = None + + # Pending RX data payloads (Data frame) waiting for RxMeta frame + self._pending_rx_queue: deque = deque() + + self.stats = { + "frames_sent": 0, + "frames_received": 0, + "bytes_sent": 0, + "bytes_received": 0, + "frame_errors": 0, + "buffer_overruns": 0, + "rx_packets": 0, + "tx_packets": 0, + "errors": 0, + "last_rssi": -999, + "last_snr": -999.0, + "noise_floor": None, + } + + # Modem info + self.modem_version: Optional[int] = None + self.modem_identity: Optional[bytes] = None + + def set_event_loop(self, loop: asyncio.AbstractEventLoop) -> None: + """ + Set the event loop for thread-safe async callback invocation. + + When set, RX callbacks are scheduled onto the event loop via + call_soon_threadsafe() instead of being invoked directly from + the RX thread. This is required for proper async integration. + + Args: + loop: The asyncio event loop to use for callbacks + """ + self._event_loop = loop + logger.debug("Event loop set for thread-safe callbacks") + + def set_lbt_enabled(self, enabled: bool) -> None: + """ + Enable or disable host-side Listen-Before-Talk before each send. + + When enabled, send() checks is_channel_busy() and backs off (120/240/360 ms) + until clear or 4 s. For standard half-duplex the modem already does CSMA; + enable only for full-duplex modem on a physically half-duplex link. + """ + self.lbt_enabled = enabled + logger.debug("Software LBT %s", "enabled" if enabled else "disabled") + + def get_lbt_enabled(self) -> bool: + """Return whether host-side Listen-Before-Talk is enabled.""" + return self.lbt_enabled + + def connect(self) -> bool: + """ + Connect to serial port and start communication threads + + Returns: + True if connection successful, False otherwise + """ + try: + self.serial_conn = serial.Serial( + port=self.port, + baudrate=self.baudrate, + timeout=self.timeout, + bytesize=serial.EIGHTBITS, + parity=serial.PARITY_NONE, + stopbits=serial.STOPBITS_ONE, + ) + + self.is_connected = True + self.stop_event.clear() + + # Start communication threads + self.rx_thread = threading.Thread(target=self._rx_worker, daemon=True) + self.tx_thread = threading.Thread(target=self._tx_worker, daemon=True) + + self.rx_thread.start() + self.tx_thread.start() + + logger.info(f"KISS modem connected to {self.port} at {self.baudrate} baud") + + # Auto-configure if requested + if self.auto_configure and self.radio_config: + if not self.configure_radio(): + logger.warning("Auto-configuration failed") + return False + + # Query modem info + self._query_modem_info() + + # Set KISS TXDELAY so key-up delay is not the firmware default 500ms (reduces + # round-trip latency for repeaters). Value in 10ms units; default 50ms. + tx_delay_ms = self.radio_config.get("tx_delay_ms", 50) + self._set_kiss_tx_delay(tx_delay_ms) + if "kiss_persistence" in self.radio_config: + self.set_kiss_persistence(self.radio_config["kiss_persistence"]) + if "kiss_slottime_ms" in self.radio_config: + self.set_kiss_slottime(self.radio_config["kiss_slottime_ms"]) + if "kiss_txtail_ms" in self.radio_config: + self.set_kiss_txtail(self.radio_config["kiss_txtail_ms"]) + if "kiss_full_duplex" in self.radio_config: + self.set_kiss_full_duplex(bool(self.radio_config["kiss_full_duplex"])) + + return True + + except Exception as e: + logger.error(f"Failed to connect to {self.port}: {e}") + self.is_connected = False + return False + + def disconnect(self): + """Disconnect from serial port and stop threads""" + self.is_connected = False + self.stop_event.set() + + # Wait for threads to finish + if self.rx_thread and self.rx_thread.is_alive(): + self.rx_thread.join(timeout=2.0) + if self.tx_thread and self.tx_thread.is_alive(): + self.tx_thread.join(timeout=2.0) + + if self._callback_executor is not None: + self._callback_executor.shutdown(wait=False) + self._callback_executor = None + + # Close serial connection + if self.serial_conn and self.serial_conn.is_open: + self.serial_conn.close() + + logger.info(f"KISS modem disconnected from {self.port}") + + def _write_frame(self, frame: bytes) -> bool: + """ + Write a complete KISS frame to the serial port. + + Ensures the entire frame (including trailing FEND) is written; retries + on partial write so we never send a truncated frame. + + Returns: + True if all bytes written, False on error or incomplete write. + """ + if not self.serial_conn or not self.serial_conn.is_open: + return False + offset = 0 + while offset < len(frame): + try: + n = self.serial_conn.write(frame[offset:]) + if n is None or n <= 0: + logger.error("Serial write returned %s", n) + return False + offset += n + except Exception as e: + logger.error("Serial write error: %s", e) + return False + try: + self.serial_conn.flush() + except Exception as e: + logger.error("Serial flush error: %s", e) + return False + return True + + def _set_kiss_tx_delay(self, delay_ms: int) -> None: + """ + Send KISS TXDELAY command so modem key-up delay is not the default 500ms. + Value is in 10ms units; firmware default is 50 (= 500ms). Typical for + repeaters: 50ms (value 5). + """ + value = max(1, min(255, delay_ms // 10)) + frame = self._encode_kiss_frame(KISS_CMD_TXDELAY, bytes([value])) + if self._write_frame(frame): + logger.debug("KISS TXDELAY set to %dms (value %d)", value * 10, value) + else: + logger.warning("Failed to set KISS TXDELAY") + + def set_kiss_persistence(self, value: int) -> bool: + """ + Set KISS CSMA persistence parameter (0-255). Lower values defer longer + when channel is busy; firmware default is 63. + + Returns: + True if the command was written successfully. + """ + val = max(0, min(255, value)) + frame = self._encode_kiss_frame(KISS_CMD_PERSISTENCE, bytes([val])) + ok = self._write_frame(frame) + if ok: + logger.debug("KISS PERSISTENCE set to %d", val) + return ok + + def set_kiss_slottime(self, slottime_ms: int) -> bool: + """ + Set KISS CSMA slot time in milliseconds (sent as 10ms units to modem). + Firmware default is 100ms (value 10). Lower values reduce backoff delay + when channel is busy at the cost of more collisions under load. + + Returns: + True if the command was written successfully. + """ + value = max(0, min(255, slottime_ms // 10)) + frame = self._encode_kiss_frame(KISS_CMD_SLOTTIME, bytes([value])) + ok = self._write_frame(frame) + if ok: + logger.debug("KISS SLOTTIME set to %dms (value %d)", value * 10, value) + return ok + + def set_kiss_txtail(self, txtail_ms: int) -> bool: + """ + Set KISS post-TX hold time (TXtail) in milliseconds (sent as 10ms units). + Firmware default is 0. Some radios need a short hold after TX. + + Returns: + True if the command was written successfully. + """ + value = max(0, min(255, txtail_ms // 10)) + frame = self._encode_kiss_frame(KISS_CMD_TXTAIL, bytes([value])) + ok = self._write_frame(frame) + if ok: + logger.debug("KISS TXTAIL set to %dms (value %d)", value * 10, value) + return ok + + def set_kiss_full_duplex(self, full_duplex: bool) -> bool: + """ + Set KISS full-duplex mode. When False (default), modem uses p-persistent + CSMA. When True, CSMA is bypassed and packets transmit after TXDELAY only. + + Returns: + True if the command was written successfully. + """ + value = 0x01 if full_duplex else 0x00 + frame = self._encode_kiss_frame(KISS_CMD_FULLDUPLEX, bytes([value])) + ok = self._write_frame(frame) + if ok: + logger.debug("KISS FullDuplex set to %s", full_duplex) + return ok + + def set_signal_report(self, enabled: bool) -> bool: + """ + Enable or disable RxMeta frames (SNR + RSSI after each Data frame). + Enabled by default. When disabled, the modem does not send SetHardware + RxMeta (0xF9) after received packets. + + Returns: + True if the command was sent and a valid response was received. + """ + payload = bytes([0x01 if enabled else 0x00]) + resp = self._send_command(HW_CMD_SET_SIGNAL_REPORT, payload) + if resp and resp[0] in (HW_RESP_SIGNAL_REPORT, HW_RESP_OK): + return True + return False + + def get_signal_report(self) -> Optional[bool]: + """ + Query whether RxMeta (signal report) is enabled. When enabled, the modem + sends an RxMeta frame after each received Data frame. + + Returns: + True if enabled, False if disabled, None on error. + """ + resp = self._send_command(HW_CMD_GET_SIGNAL_REPORT) + if resp and resp[0] == HW_RESP_SIGNAL_REPORT and len(resp[1]) >= 1: + return resp[1][0] != 0x00 + return None + + def _query_modem_info(self): + """Query modem version and identity""" + try: + # Get version + version_resp = self._send_command(CMD_GET_VERSION) + if version_resp and version_resp[0] == RESP_VERSION and len(version_resp[1]) >= 1: + self.modem_version = version_resp[1][0] + logger.info(f"Modem version: {self.modem_version}") + + # Get identity (public key) + identity_resp = self._send_command(CMD_GET_IDENTITY) + if identity_resp and identity_resp[0] == RESP_IDENTITY and len(identity_resp[1]) == 32: + self.modem_identity = identity_resp[1] + logger.info(f"Modem identity: {self.modem_identity.hex()[:16]}...") + + except Exception as e: + logger.warning(f"Failed to query modem info: {e}") + + def configure_radio(self) -> bool: + """ + Configure radio parameters + + Returns: + True if configuration successful, False otherwise + """ + if not self.is_connected: + logger.error("Cannot configure radio: not connected") + return False + + try: + # Extract configuration parameters with defaults + # Support both "power" and "tx_power" for compatibility with different config styles + frequency_hz = self.radio_config.get("frequency", int(869.618 * 1000000)) + bandwidth_hz = self.radio_config.get("bandwidth", int(62500)) + sf = self.radio_config.get("spreading_factor", 8) + cr = self.radio_config.get("coding_rate", 8) + power = self.radio_config.get("power", self.radio_config.get("tx_power", 22)) + + # Set radio parameters (frequency, bandwidth, SF, CR) + # Format: Freq (4) + BW (4) + SF (1) + CR (1) - all little-endian + radio_data = struct.pack(" bool: + """ + Send a data frame via KISS modem + + Args: + data: Raw packet data to send (2-255 bytes) + + Returns: + True if frame queued successfully, False otherwise + """ + if not self.is_connected: + logger.warning("Cannot send frame: not connected") + return False + + if len(data) < 2 or len(data) > KISS_MAX_PACKET_SIZE: + logger.warning( + f"Invalid frame size: {len(data)} (must be 2-{KISS_MAX_PACKET_SIZE} bytes)" + ) + return False + + try: + # Create KISS frame with CMD_DATA command + kiss_frame = self._encode_kiss_frame(CMD_DATA, data) + + # Add to TX buffer + if len(self.tx_buffer) < TX_BUFFER_SIZE: + self.tx_buffer.append(kiss_frame) + return True + else: + self.stats["buffer_overruns"] += 1 + logger.warning("TX buffer overrun") + return False + + except Exception as e: + logger.error(f"Failed to send frame: {e}") + return False + + def send_frame_and_wait(self, data: bytes, timeout: float = RESPONSE_TIMEOUT) -> bool: + """ + Send a data frame and wait for TX_DONE response + + Args: + data: Raw packet data to send + timeout: Timeout in seconds to wait for TX_DONE + + Returns: + True if transmission successful, False otherwise + """ + self._tx_done_event.clear() + self._tx_done_result = None + + if not self.send_frame(data): + return False + + # Wait for TX_DONE response + if self._tx_done_event.wait(timeout): + return self._tx_done_result or False + else: + logger.warning("TX_DONE timeout") + return False + + def _send_command( + self, sub_cmd: int, data: bytes = b"", timeout: float = RESPONSE_TIMEOUT + ) -> Optional[tuple[int, bytes]]: + """ + Send a SetHardware command and wait for response. + + Encodes as KISS frame: FEND + 0x06 (SetHardware) + sub_cmd + data + FEND. + + Args: + sub_cmd: SetHardware sub-command byte (e.g. HW_CMD_GET_IDENTITY) + data: Sub-command payload + timeout: Response timeout in seconds + + Returns: + Tuple of (response_sub_cmd, response_data) or None on timeout + """ + with self._response_lock: + self._response_event.clear() + self._pending_response = None + + # SetHardware frame: type 0x06, payload = sub_cmd (1 byte) + data + kiss_frame = self._encode_kiss_frame( + KISS_CMD_SETHARDWARE, bytes([sub_cmd]) + data + ) + + if not self._write_frame(kiss_frame): + logger.warning("SetHardware frame write failed") + return None + + # Wait for response + if self._response_event.wait(timeout): + with self._response_lock: + return self._pending_response + else: + logger.warning(f"SetHardware sub_cmd 0x{sub_cmd:02X} timeout") + return None + + def get_radio_config(self) -> Optional[Dict[str, Any]]: + """ + Get current radio configuration from modem + + Returns: + Dict with frequency, bandwidth, sf, cr, or None on error + """ + resp = self._send_command(CMD_GET_RADIO) + if resp and resp[0] == RESP_RADIO and len(resp[1]) >= 10: + freq, bw, sf, cr = struct.unpack(" Optional[int]: + """Get current TX power in dBm""" + resp = self._send_command(CMD_GET_TX_POWER) + if resp and resp[0] == RESP_TX_POWER and len(resp[1]) >= 1: + return resp[1][0] + return None + + def get_current_rssi(self) -> int: + """Get current RSSI from modem""" + resp = self._send_command(CMD_GET_CURRENT_RSSI) + if resp and resp[0] == RESP_CURRENT_RSSI and len(resp[1]) >= 1: + # RSSI is signed byte + rssi = resp[1][0] + if rssi > 127: + rssi -= 256 + return rssi + return -999 + + def is_channel_busy(self) -> bool: + """Check if channel is busy""" + resp = self._send_command(CMD_IS_CHANNEL_BUSY) + if resp and resp[0] == RESP_CHANNEL_BUSY and len(resp[1]) >= 1: + return resp[1][0] == 0x01 + return False + + def get_airtime(self, packet_length: int) -> Optional[int]: + """ + Get estimated airtime for a packet + + Args: + packet_length: Length of packet in bytes + + Returns: + Airtime in milliseconds or None on error + """ + resp = self._send_command(CMD_GET_AIRTIME, bytes([packet_length])) + if resp and resp[0] == RESP_AIRTIME and len(resp[1]) >= 4: + return struct.unpack(" Optional[int]: + """Get noise floor in dBm""" + resp = self._send_command(CMD_GET_NOISE_FLOOR) + if resp and resp[0] == RESP_NOISE_FLOOR and len(resp[1]) >= 2: + # Noise floor is signed 16-bit + noise = struct.unpack(" Optional[Dict[str, int]]: + """ + Get modem statistics + + Returns: + Dict with rx, tx, errors counts or None on error + """ + resp = self._send_command(CMD_GET_STATS) + if resp and resp[0] == RESP_STATS and len(resp[1]) >= 12: + rx, tx, errors = struct.unpack(" Optional[int]: + """Get battery voltage in millivolts""" + resp = self._send_command(CMD_GET_BATTERY) + if resp and resp[0] == RESP_BATTERY and len(resp[1]) >= 2: + return struct.unpack(" bool: + """Ping the modem to check connectivity""" + resp = self._send_command(CMD_PING) + return resp is not None and resp[0] == RESP_PONG + + def get_sensors(self, permissions: int = 0x07) -> Optional[bytes]: + """ + Get sensor data in CayenneLPP format + + Args: + permissions: Bitmask of sensors to query + 0x01 = battery, 0x02 = GPS, 0x04 = environment + + Returns: + CayenneLPP encoded sensor data or None + """ + resp = self._send_command(CMD_GET_SENSORS, bytes([permissions])) + if resp and resp[0] == RESP_SENSORS: + return resp[1] + return None + + def get_mcu_temp(self) -> Optional[float]: + """ + Get MCU temperature in degrees Celsius. + + Returns: + Temperature in °C, or None if unsupported or error. + """ + resp = self._send_command(HW_CMD_GET_MCU_TEMP) + if resp and resp[0] == HW_RESP_MCU_TEMP and len(resp[1]) >= 2: + temp_tenths = struct.unpack("= 1: + if resp[1][0] == HW_ERR_NO_CALLBACK: + return None + return None + + def get_device_name(self) -> Optional[str]: + """ + Get device/manufacturer name (UTF-8 string). + + Returns: + Device name string or None on error. + """ + resp = self._send_command(HW_CMD_GET_DEVICE_NAME) + if resp and resp[0] == HW_RESP_DEVICE_NAME: + try: + return resp[1].decode("utf-8") + except UnicodeDecodeError: + return None + return None + + def reboot(self) -> None: + """ + Request modem reboot. Sends Reboot (0x18), expects OK then connection drop. + Does not wait for disconnect. + """ + self._send_command(HW_CMD_REBOOT, timeout=1.0) + + # Cryptographic operations using modem's identity + + def get_identity(self) -> Optional[bytes]: + """Get modem's public key (32 bytes)""" + resp = self._send_command(CMD_GET_IDENTITY) + if resp and resp[0] == RESP_IDENTITY and len(resp[1]) == 32: + self.modem_identity = resp[1] + return resp[1] + return None + + def get_random(self, length: int) -> Optional[bytes]: + """ + Get random bytes from modem + + Args: + length: Number of random bytes (1-64) + + Returns: + Random bytes or None on error + """ + if length < 1 or length > 64: + logger.error("Random length must be 1-64") + return None + resp = self._send_command(CMD_GET_RANDOM, bytes([length])) + if resp and resp[0] == RESP_RANDOM: + return resp[1] + return None + + def sign_data(self, data: bytes) -> Optional[bytes]: + """ + Sign data with modem's private key + + Args: + data: Data to sign + + Returns: + 64-byte signature or None on error + """ + resp = self._send_command(CMD_SIGN_DATA, data) + if resp and resp[0] == RESP_SIGNATURE and len(resp[1]) == 64: + return resp[1] + return None + + def verify_signature(self, pubkey: bytes, signature: bytes, data: bytes) -> Optional[bool]: + """ + Verify a signature + + Args: + pubkey: 32-byte public key + signature: 64-byte signature + data: Original data + + Returns: + True if valid, False if invalid, None on error + """ + if len(pubkey) != 32 or len(signature) != 64: + logger.error("Invalid pubkey or signature length") + return None + payload = pubkey + signature + data + resp = self._send_command(CMD_VERIFY_SIGNATURE, payload) + if resp and resp[0] == RESP_VERIFY and len(resp[1]) >= 1: + return resp[1][0] == 0x01 + return None + + def encrypt_data(self, key: bytes, plaintext: bytes) -> Optional[tuple[bytes, bytes]]: + """ + Encrypt data using a shared key + + Args: + key: 32-byte encryption key + plaintext: Data to encrypt + + Returns: + Tuple of (mac, ciphertext) or None on error + """ + if len(key) != 32: + logger.error("Key must be 32 bytes") + return None + payload = key + plaintext + resp = self._send_command(CMD_ENCRYPT_DATA, payload) + if resp and resp[0] == RESP_ENCRYPTED and len(resp[1]) >= 2: + mac = resp[1][:2] + ciphertext = resp[1][2:] + return (mac, ciphertext) + return None + + def decrypt_data(self, key: bytes, mac: bytes, ciphertext: bytes) -> Optional[bytes]: + """ + Decrypt data using a shared key + + Args: + key: 32-byte decryption key + mac: 2-byte MAC + ciphertext: Encrypted data + + Returns: + Plaintext or None on error (includes MAC failure) + """ + if len(key) != 32 or len(mac) != 2: + logger.error("Invalid key or MAC length") + return None + payload = key + mac + ciphertext + resp = self._send_command(CMD_DECRYPT_DATA, payload) + if resp and resp[0] == RESP_DECRYPTED: + return resp[1] + return None + + def key_exchange(self, remote_pubkey: bytes) -> Optional[bytes]: + """ + Perform key exchange with remote public key + + Args: + remote_pubkey: 32-byte remote public key + + Returns: + 32-byte shared secret or None on error + """ + if len(remote_pubkey) != 32: + logger.error("Remote public key must be 32 bytes") + return None + resp = self._send_command(CMD_KEY_EXCHANGE, remote_pubkey) + if resp and resp[0] == RESP_SHARED_SECRET and len(resp[1]) == 32: + return resp[1] + return None + + def hash_data(self, data: bytes) -> Optional[bytes]: + """ + Compute SHA-256 hash of data + + Args: + data: Data to hash + + Returns: + 32-byte hash or None on error + """ + resp = self._send_command(CMD_HASH, data) + if resp and resp[0] == RESP_HASH and len(resp[1]) == 32: + return resp[1] + return None + + # LoRaRadio interface implementation + + def set_rx_callback(self, callback: RxCallback): + """ + Set the RX callback function. + + The callback may be (data: bytes) or (data, rssi, snr). When invoked + by this wrapper it is always called with (data, rssi, snr) so each + packet gets correct per-packet metrics without race conditions. + """ + self.on_frame_received = callback + logger.debug("RX callback set") + + def begin(self): + """Initialize the modem (LoRaRadio interface)""" + success = self.connect() + if not success: + raise Exception("Failed to initialize KISS modem") + + def check_radio_health(self) -> bool: + """Check modem connectivity. Returns True if connected and modem responds to ping.""" + if not self.is_connected: + return False + try: + return self.ping() + except Exception as e: + logger.debug(f"KISS modem health check failed: {e}") + return False + + # Optional host-side LBT (only when lbt_enabled, e.g. full-duplex on half-duplex link) + LBT_RETRY_DELAYS_MS = (120, 240, 360) + LBT_MAX_WAIT_MS = 4000 + + async def _prepare_for_tx_lbt(self) -> tuple[bool, list[float]]: + """ + Listen-Before-Talk: query modem channel busy until clear or max wait. + Used only when lbt_enabled (marginal case: full-duplex modem on physically + half-duplex link). Returns (success, lbt_backoff_delays_ms). + """ + lbt_backoff_delays: list[float] = [] + total_wait_ms = 0.0 + + while total_wait_ms < self.LBT_MAX_WAIT_MS: + try: + channel_busy = await asyncio.to_thread(self.is_channel_busy) + if not channel_busy: + logger.debug( + "Channel busy check clear - channel available after " + f"{len(lbt_backoff_delays) + 1} check(s)" + ) + break + + logger.debug("Channel busy check still busy - activity detected") + remaining_ms = self.LBT_MAX_WAIT_MS - total_wait_ms + retry_delay_ms = random.choice(self.LBT_RETRY_DELAYS_MS) + backoff_ms = min(retry_delay_ms, remaining_ms) + lbt_backoff_delays.append(float(backoff_ms)) + total_wait_ms += backoff_ms + + logger.debug( + f"LBT backoff - waiting {backoff_ms}ms before retry " + f"(total wait {total_wait_ms:.0f}ms / {self.LBT_MAX_WAIT_MS}ms)" + ) + await asyncio.sleep(backoff_ms / 1000.0) + + if total_wait_ms >= self.LBT_MAX_WAIT_MS: + logger.warning( + f"LBT max duration reached ({self.LBT_MAX_WAIT_MS}ms) - " + "channel still busy, transmitting anyway" + ) + except Exception as e: + logger.warning(f"Channel busy check failed: {e}, proceeding with transmission") + break + + return True, lbt_backoff_delays + + async def send(self, data: bytes) -> Optional[Dict[str, Any]]: + """ + Send data via KISS modem (LoRaRadio interface) + + For standard half-duplex, relies on the modem's p-persistent CSMA; no + host-side LBT. When lbt_enabled is True (full-duplex on half-duplex link), + runs a channel-busy check before submitting the frame. + + Args: + data: Data to send + + Returns: + Transmission metadata dict (airtime_ms, lbt_attempts, + lbt_backoff_delays_ms, lbt_channel_busy) + + Raises: + Exception: If send fails + """ + lbt_backoff_delays: list[float] = [] + if self.lbt_enabled: + _, lbt_backoff_delays = await self._prepare_for_tx_lbt() + + success = self.send_frame(data) + if not success: + raise Exception("Failed to send frame via KISS modem") + + airtime = self.get_airtime(len(data)) + return { + "airtime_ms": airtime if airtime is not None else 0, + "lbt_attempts": len(lbt_backoff_delays), + "lbt_backoff_delays_ms": lbt_backoff_delays, + "lbt_channel_busy": len(lbt_backoff_delays) > 0, + } + + async def wait_for_rx(self) -> bytes: + """ + Wait for a packet to be received asynchronously (LoRaRadio interface) + + Returns: + Received packet data + """ + future = asyncio.Future() + + original_callback = self.on_frame_received + + def temp_callback(data: bytes, rssi: Optional[int] = None, snr: Optional[float] = None): + if not future.done(): + future.set_result(data) + if original_callback: + try: + rssi_val = rssi if rssi is not None else -999 + snr_val = snr if snr is not None else -999.0 + _invoke_rx_callback(original_callback, data, rssi_val, snr_val) + except Exception as e: + logger.error(f"Error in original callback: {e}") + + self.on_frame_received = temp_callback + + try: + data = await future + return data + finally: + self.on_frame_received = original_callback + + def sleep(self): + """Put the modem into low-power mode (LoRaRadio interface)""" + logger.debug("Sleep mode not directly supported for KISS modem") + pass + + def get_last_rssi(self) -> int: + """Return last received RSSI in dBm (LoRaRadio interface)""" + return self.stats.get("last_rssi", -999) + + def get_last_snr(self) -> float: + """Return last received SNR in dB (LoRaRadio interface)""" + return self.stats.get("last_snr", -999.0) + + def get_stats(self) -> Dict[str, Any]: + """Get interface statistics""" + return self.stats.copy() + + def get_status(self) -> Dict[str, Any]: + """Get radio status. Uses cached config/stats where possible.""" + cfg = self.get_radio_config() + tx_power = self.get_tx_power() + status: Dict[str, Any] = { + "initialized": self.is_connected, + "frequency": cfg["frequency"] if cfg else self.radio_config.get("frequency", 0), + "tx_power": tx_power if tx_power is not None else self.radio_config.get("tx_power", self.radio_config.get("power", 0)), + "spreading_factor": cfg["spreading_factor"] if cfg else self.radio_config.get("spreading_factor", 0), + "bandwidth": cfg["bandwidth"] if cfg else self.radio_config.get("bandwidth", 0), + "coding_rate": cfg["coding_rate"] if cfg else self.radio_config.get("coding_rate", 0), + "last_rssi": self.stats.get("last_rssi", -999), + "last_snr": self.stats.get("last_snr", -999.0), + "last_signal_rssi": self.stats.get("last_rssi", -999), + "hardware_ready": self.is_connected, + } + return status + + # KISS frame encoding/decoding + + def _encode_kiss_frame(self, cmd: int, data: bytes) -> bytes: + """ + Encode data into KISS frame format + + Args: + cmd: Command byte + data: Raw data to encode + + Returns: + Encoded KISS frame + """ + # Start with FEND and command + frame = bytearray([KISS_FEND, cmd]) + + # Escape and add data + for byte in data: + if byte == KISS_FEND: + frame.extend([KISS_FESC, KISS_TFEND]) + elif byte == KISS_FESC: + frame.extend([KISS_FESC, KISS_TFESC]) + else: + frame.append(byte) + + # End with FEND + frame.append(KISS_FEND) + + return bytes(frame) + + def _decode_kiss_byte(self, byte: int): + """ + Process received byte for KISS frame decoding + + Args: + byte: Received byte + """ + if byte == KISS_FEND: + if self.in_frame and len(self.rx_frame_buffer) > 0: + # Complete frame received + self._process_received_frame() + # Start new frame + self.rx_frame_buffer.clear() + self.in_frame = True + self.escaped = False + + elif byte == KISS_FESC: + if self.in_frame: + self.escaped = True + + elif self.escaped: + if byte == KISS_TFEND: + self.rx_frame_buffer.append(KISS_FEND) + elif byte == KISS_TFESC: + self.rx_frame_buffer.append(KISS_FESC) + else: + # Invalid escape sequence; reset so we resync at next FEND + self.stats["frame_errors"] += 1 + logger.warning(f"Invalid KISS escape sequence: 0x{byte:02X}") + self.rx_frame_buffer.clear() + self.in_frame = False + self.escaped = False + + else: + if self.in_frame: + if len(self.rx_frame_buffer) >= MAX_FRAME_SIZE: + # Frame too long (e.g. lost FEND); reset and resync at next FEND + self.stats["frame_errors"] += 1 + logger.warning( + "KISS frame exceeded max size (%d), resyncing", MAX_FRAME_SIZE + ) + self.rx_frame_buffer.clear() + self.in_frame = False + else: + self.rx_frame_buffer.append(byte) + + def _dispatch_rx_callback(self, data: bytes, rssi: int, snr: float) -> None: + """ + Dispatch RX callback without blocking the RX thread. + + If an event loop is set via set_event_loop(), the callback is scheduled + onto that loop. Otherwise, the callback is run in a single-worker thread + pool so the RX thread can keep reading serial data (avoids dropped + packets when the callback does I/O or heavy work). + + Args: + data: Received packet data + rssi: RSSI in dBm + snr: SNR in dB + """ + if self.on_frame_received is None: + return + + if self._event_loop is not None: + try: + self._event_loop.call_soon_threadsafe( + lambda: _invoke_rx_callback(self.on_frame_received, data, rssi, snr) + ) + except RuntimeError as e: + logger.warning(f"Failed to schedule RX callback on event loop: {e}") + elif self.rx_thread is not None and threading.current_thread() is self.rx_thread: + # We're in the RX thread; run callback in executor so we don't block reading + if self._callback_executor is None: + self._callback_executor = ThreadPoolExecutor(max_workers=1) + self._callback_executor.submit( + _invoke_rx_callback, self.on_frame_received, data, rssi, snr + ) + else: + # Called from main thread (e.g. unit test); invoke directly + _invoke_rx_callback(self.on_frame_received, data, rssi, snr) + + def _process_received_frame(self): + """Process a complete received KISS frame (spec: type byte = port | cmd).""" + if len(self.rx_frame_buffer) < 1: + return + + type_byte = self.rx_frame_buffer[0] + port = (type_byte >> 4) & 0x0F + cmd = type_byte & 0x0F + + # Only process port 0 (single-port TNC) + if port != 0: + return + + self.stats["frames_received"] += 1 + self.stats["bytes_received"] += len(self.rx_frame_buffer) - 1 + + if cmd == CMD_DATA: + # Data frame: raw packet only (≤255 bytes per spec); queue until RxMeta arrives + payload = bytes(self.rx_frame_buffer[1:]) + if len(self._pending_rx_queue) >= MAX_PENDING_RX_FRAMES: + self.stats["frame_errors"] += 1 + logger.warning( + "Pending RX queue full (max %d), dropping Data frame", + MAX_PENDING_RX_FRAMES, + ) + else: + self._pending_rx_queue.append(payload) + + elif cmd == KISS_CMD_SETHARDWARE: + # SetHardware: first byte is sub_cmd, rest is payload + if len(self.rx_frame_buffer) < 2: + return + sub_cmd = self.rx_frame_buffer[1] + payload = bytes(self.rx_frame_buffer[2:]) + + if sub_cmd == HW_RESP_RX_META: + # RxMeta follows a Data frame: SNR (1), RSSI (1); deliver queued data + rssi_raw = -999 + snr_db = -999.0 + if len(payload) >= 2: + snr_raw = payload[0] + rssi_raw = payload[1] + if snr_raw > 127: + snr_raw -= 256 + if rssi_raw > 127: + rssi_raw -= 256 + snr_db = snr_raw / 4.0 # 0.25 dB steps + self.stats["last_snr"] = snr_db + self.stats["last_rssi"] = rssi_raw + self.stats["rx_packets"] += 1 + if self._pending_rx_queue: + packet_data = self._pending_rx_queue.popleft() + if self.on_frame_received: + try: + self._dispatch_rx_callback(packet_data, rssi_raw, snr_db) + except Exception as e: + logger.error(f"Error in frame received callback: {e}") + else: + logger.warning("RxMeta received with no pending Data frame") + + elif sub_cmd == HW_RESP_TX_DONE: + if len(payload) >= 1: + self._tx_done_result = payload[0] == 0x01 + self.stats["tx_packets"] += 1 + self._tx_done_event.set() + + elif sub_cmd == HW_RESP_ERROR: + if len(payload) >= 1: + self.stats["errors"] += 1 + logger.warning(f"Modem error: 0x{payload[0]:02X}") + with self._response_lock: + self._pending_response = (sub_cmd, payload) + self._response_event.set() + + else: + # Other response sub-commands (Identity, Radio, OK, etc.) + with self._response_lock: + self._pending_response = (sub_cmd, payload) + self._response_event.set() + # cmd 0xFF (Return) has port=15 so is already discarded above + + def _rx_worker(self): + """Background thread for receiving data""" + while not self.stop_event.is_set() and self.is_connected: + try: + if self.serial_conn and self.serial_conn.in_waiting > 0: + data = self.serial_conn.read(self.serial_conn.in_waiting) + + for byte in data: + self._decode_kiss_byte(byte) + + else: + threading.Event().wait(0.01) + + except Exception as e: + if self.is_connected: + logger.error(f"RX worker error: {e}") + break + + def _tx_worker(self): + """Background thread for sending data""" + while not self.stop_event.is_set() and self.is_connected: + try: + if self.tx_buffer: + frame = self.tx_buffer.popleft() + + if self.serial_conn and self.serial_conn.is_open: + if self._write_frame(frame): + self.stats["frames_sent"] += 1 + self.stats["bytes_sent"] += len(frame) + else: + logger.warning("TX frame write failed, dropping frame") + else: + logger.warning("Serial connection not open") + else: + threading.Event().wait(0.01) + + except Exception as e: + if self.is_connected: + logger.error(f"TX worker error: {e}") + break + + def __enter__(self): + """Context manager entry""" + self.connect() + return self + + def __exit__(self, exc_type, exc_val, exc_tb): + """Context manager exit""" + self.disconnect() + + def __del__(self): + """Destructor to ensure cleanup""" + try: + self.disconnect() + except Exception: + pass diff --git a/src/pymc_core/node/dispatcher.py b/src/pymc_core/node/dispatcher.py index 5a0a2ea..9cd5a3e 100644 --- a/src/pymc_core/node/dispatcher.py +++ b/src/pymc_core/node/dispatcher.py @@ -57,6 +57,10 @@ def __init__( log_fn: Optional[Callable[[str], None]] = None, packet_filter: Optional[Any] = None, ) -> None: + # tx_delay: seconds to wait after TX before starting ACK wait (only when wait_for_ack). + # Round-trip latency can also be increased by: modem CSMA (TXDELAY/SlotTime in + # firmware), handler response delays (e.g. login_server 300 ms), and serial/ + # event-loop scheduling. KISS wrapper relies on modem CSMA by default (no host LBT). self.radio = radio self.tx_delay = tx_delay self.state: DispatcherState = DispatcherState.IDLE @@ -297,18 +301,26 @@ def set_raw_packet_callback( """Set callback for raw packet data (includes both parsed packet and raw bytes).""" self.raw_packet_callback = callback - def _on_packet_received(self, data: bytes) -> None: - """Called by the radio when a packet comes in.""" - # Schedule the packet processing in the event loop + def _on_packet_received( + self, + data: bytes, + rssi: Optional[int] = None, + snr: Optional[float] = None, + ) -> None: + """Called by the radio when a packet comes in. rssi/snr are per-packet when provided.""" try: loop = asyncio.get_running_loop() - loop.create_task(self._process_received_packet(data)) + loop.create_task(self._process_received_packet(data, rssi, snr)) except RuntimeError: - # No event loop running, can't process packet self._log("No event loop running, cannot process received packet") - async def _process_received_packet(self, data: bytes) -> None: - """Process a received packet from the radio callback.""" + async def _process_received_packet( + self, + data: bytes, + rssi: Optional[int] = None, + snr: Optional[float] = None, + ) -> None: + """Process a received packet from the radio callback. rssi/snr are per-packet when provided.""" self._log(f"[RX DEBUG] Processing packet: {len(data)} bytes, data: {data.hex()[:32]}...") # Generate packet hash for deduplication and blacklist checking @@ -342,9 +354,9 @@ async def _process_received_packet(self, data: bytes) -> None: self._log(f"[RX DEBUG] Packet type: {ptype:02X}") - # Add signal strength information to packet from radio - pkt._rssi = self.radio.get_last_rssi() - pkt._snr = self.radio.get_last_snr() + # Use per-packet rssi/snr when provided (avoids race); else fall back to radio last values + pkt._rssi = rssi if rssi is not None else self.radio.get_last_rssi() + pkt._snr = snr if snr is not None else self.radio.get_last_snr() # Let the node know about this packet for analysis (statistics, caching, etc.) if self.packet_analysis_callback: @@ -528,7 +540,7 @@ async def _dispatch(self, pkt: Packet) -> None: def _register_ack_received(self, crc: int) -> None: """Record that an ACK with the given CRC was received.""" - ts = asyncio.get_event_loop().time() + ts = asyncio.get_running_loop().time() self._recent_acks[crc] = ts # Notify waiting sender if this CRC matches @@ -541,7 +553,7 @@ async def run_forever(self) -> None: health_check_counter = 0 while True: # Clean out old ACK CRCs (older than 5 seconds) - now = asyncio.get_event_loop().time() + now = asyncio.get_running_loop().time() self._recent_acks = {crc: ts for crc, ts in self._recent_acks.items() if now - ts < 5} # Clean old packet hashes for deduplication diff --git a/src/pymc_core/protocol/__init__.py b/src/pymc_core/protocol/__init__.py index 62ffd46..75cdddf 100644 --- a/src/pymc_core/protocol/__init__.py +++ b/src/pymc_core/protocol/__init__.py @@ -58,6 +58,7 @@ # Import identity classes after other imports to avoid circular dependencies from .identity import Identity, LocalIdentity +from .modem_identity import ModemIdentity from .packet import Packet # PacketBuilder imports from other protocol modules so import it last @@ -81,6 +82,7 @@ "PacketFilter", "CryptoUtils", "LocalIdentity", + "ModemIdentity", "Identity", # Utility functions "parse_advert_payload", diff --git a/src/pymc_core/protocol/modem_identity.py b/src/pymc_core/protocol/modem_identity.py new file mode 100644 index 0000000..e8551ee --- /dev/null +++ b/src/pymc_core/protocol/modem_identity.py @@ -0,0 +1,276 @@ +""" +Modem-based Identity for MeshCore KISS Modem + +Provides an Identity implementation that delegates cryptographic operations +to the KISS modem hardware, keeping the private key secure on the device. +""" + +from typing import TYPE_CHECKING + +from nacl.public import PublicKey +from nacl.signing import VerifyKey + +from . import CryptoUtils + +if TYPE_CHECKING: + from pymc_core.hardware.kiss_modem_wrapper import KissModemWrapper + + +class ModemIdentity: + """ + Identity implementation using the KISS modem's cryptographic capabilities. + + Delegates signing, verification, and key exchange to the modem hardware, + ensuring the private key never leaves the secure modem environment. + + Implements the same interface as LocalIdentity for compatibility with + the rest of the pyMC_core stack. + """ + + def __init__(self, modem: "KissModemWrapper"): + """ + Initialize ModemIdentity with a connected KISS modem. + + Args: + modem: A connected KissModemWrapper instance + + Raises: + ValueError: If modem is not connected or identity cannot be retrieved + """ + if not modem.is_connected: + raise ValueError("Modem must be connected before creating ModemIdentity") + + self._modem = modem + + # Get the modem's public key + pubkey = modem.get_identity() + if pubkey is None or len(pubkey) != 32: + raise ValueError("Failed to retrieve modem identity") + + self._ed25519_pubkey = pubkey + self.verify_key = VerifyKey(pubkey) + + # Derive X25519 public key for ECDH + x25519_pubkey = CryptoUtils.ed25519_pk_to_x25519(pubkey) + self.x25519_pubkey = PublicKey(x25519_pubkey) + + # Cache the X25519 public key bytes + self._x25519_public = x25519_pubkey + + def get_public_key(self) -> bytes: + """ + Get the Ed25519 public key for this identity. + + Returns: + The 32-byte Ed25519 public key. + """ + return self._ed25519_pubkey + + def get_address_bytes(self) -> bytes: + """ + Get the address bytes derived from the public key. + + Returns: + The first byte of SHA256 hash of the public key, used as address. + """ + return CryptoUtils.sha256(self._ed25519_pubkey)[:1] + + def get_shared_public_key(self) -> bytes: + """ + Get the X25519 public key for ECDH operations. + + Returns: + The 32-byte X25519 public key. + """ + return self._x25519_public + + def sign(self, message: bytes) -> bytes: + """ + Sign a message using the modem's private key. + + Args: + message: The message to sign. + + Returns: + The 64-byte Ed25519 signature. + + Raises: + RuntimeError: If signing fails + """ + signature = self._modem.sign_data(message) + if signature is None: + raise RuntimeError("Modem signing failed") + return signature + + def verify(self, message: bytes, signature: bytes) -> bool: + """ + Verify a signature against a message. + + Note: This uses PyNaCl locally since verification only needs + the public key and is not security-sensitive. + + Args: + message: The original message bytes. + signature: The signature to verify. + + Returns: + True if the signature is valid, False otherwise. + """ + result = self._modem.verify_signature(self._ed25519_pubkey, signature, message) + if result is None: + # Fall back to local verification if modem fails + try: + self.verify_key.verify(message, signature) + return True + except Exception: + return False + return result + + def calc_shared_secret(self, remote_ed25519_pubkey: bytes) -> bytes: + """ + Compute the ECDH shared secret with a remote party's public key. + + Uses the modem's key_exchange command for secure computation. + The modem internally converts the Ed25519 public key to X25519 + and performs the ECDH computation. + + Note: This method signature differs from Identity.calc_shared_secret() + which takes a local private key. ModemIdentity.calc_shared_secret() + takes the remote's Ed25519 public key because the modem holds the + local private key internally. + + For use in pyMC_core handlers, which call calc_shared_secret on the + *peer's* Identity object (not on LocalIdentity/ModemIdentity), this + method is provided for cases where you want to compute a shared + secret directly using the modem's identity. + + Args: + remote_ed25519_pubkey: The remote party's 32-byte Ed25519 public key. + The modem converts this to X25519 internally. + + Returns: + The 32-byte shared secret for encryption. + + Raises: + RuntimeError: If key exchange fails + """ + if len(remote_ed25519_pubkey) != 32: + raise ValueError("Remote public key must be 32 bytes (Ed25519)") + + shared_secret = self._modem.key_exchange(remote_ed25519_pubkey) + if shared_secret is None: + raise RuntimeError("Modem key exchange failed") + return shared_secret + + def get_private_key(self) -> bytes: + """ + Get the X25519 private key for ECDH operations. + + Note: ModemIdentity does NOT expose the private key since it + remains secure on the modem. This method raises an error. + + Raises: + RuntimeError: Always, as private key is not accessible + """ + raise RuntimeError( + "ModemIdentity does not expose private keys. " + "Use calc_shared_secret() for ECDH operations." + ) + + def get_signing_key_bytes(self) -> bytes: + """ + Get the signing key bytes for this identity. + + Note: ModemIdentity does NOT expose the signing key since it + remains secure on the modem. This method raises an error. + + Raises: + RuntimeError: Always, as signing key is not accessible + """ + raise RuntimeError( + "ModemIdentity does not expose signing keys. " + "Use sign() for signing operations." + ) + + # Additional modem-specific methods + + def hash_data(self, data: bytes) -> bytes: + """ + Compute SHA-256 hash using the modem. + + Args: + data: Data to hash. + + Returns: + The 32-byte SHA-256 hash. + + Raises: + RuntimeError: If hashing fails + """ + result = self._modem.hash_data(data) + if result is None: + # Fall back to local hashing + return CryptoUtils.sha256(data) + return result + + def get_random(self, length: int) -> bytes: + """ + Get random bytes from the modem's hardware RNG. + + Args: + length: Number of random bytes (1-64). + + Returns: + Random bytes from the modem. + + Raises: + RuntimeError: If random generation fails + """ + result = self._modem.get_random(length) + if result is None: + raise RuntimeError("Modem random generation failed") + return result + + def encrypt(self, key: bytes, plaintext: bytes) -> tuple[bytes, bytes]: + """ + Encrypt data using the modem. + + Args: + key: 32-byte encryption key. + plaintext: Data to encrypt. + + Returns: + Tuple of (2-byte MAC, ciphertext). + + Raises: + RuntimeError: If encryption fails + """ + result = self._modem.encrypt_data(key, plaintext) + if result is None: + raise RuntimeError("Modem encryption failed") + return result + + def decrypt(self, key: bytes, mac: bytes, ciphertext: bytes) -> bytes: + """ + Decrypt data using the modem. + + Args: + key: 32-byte decryption key. + mac: 2-byte MAC. + ciphertext: Encrypted data. + + Returns: + Decrypted plaintext. + + Raises: + RuntimeError: If decryption fails (includes MAC verification failure) + """ + result = self._modem.decrypt_data(key, mac, ciphertext) + if result is None: + raise RuntimeError("Modem decryption failed (MAC verification may have failed)") + return result + + @property + def modem(self) -> "KissModemWrapper": + """Get the underlying modem instance.""" + return self._modem diff --git a/tests/test_kiss_modem_wrapper.py b/tests/test_kiss_modem_wrapper.py new file mode 100644 index 0000000..77c8e72 --- /dev/null +++ b/tests/test_kiss_modem_wrapper.py @@ -0,0 +1,1013 @@ +""" +Tests for MeshCore KISS Modem Wrapper + +Tests the KISS frame encoding/decoding, command/response handling, +and LoRaRadio interface implementation. +""" + +import struct +import threading +from unittest.mock import MagicMock, patch + +import pytest + +from pymc_core.hardware.kiss_modem_wrapper import ( + CMD_DATA, + CMD_ENCRYPT_DATA, + CMD_GET_AIRTIME, + CMD_GET_BATTERY, + CMD_GET_IDENTITY, + CMD_GET_NOISE_FLOOR, + CMD_GET_RADIO, + CMD_GET_RANDOM, + CMD_GET_STATS, + CMD_GET_TX_POWER, + CMD_GET_VERSION, + CMD_HASH, + CMD_KEY_EXCHANGE, + CMD_PING, + CMD_SET_RADIO, + CMD_SET_TX_POWER, + CMD_SIGN_DATA, + CMD_VERIFY_SIGNATURE, + HW_CMD_GET_DEVICE_NAME, + HW_CMD_GET_MCU_TEMP, + HW_CMD_GET_SIGNAL_REPORT, + HW_CMD_GET_VERSION, + HW_CMD_REBOOT, + HW_CMD_SET_SIGNAL_REPORT, + HW_RESP_DEVICE_NAME, + HW_RESP_MCU_TEMP, + HW_RESP_OK, + HW_RESP_SIGNAL_REPORT, + KISS_CMD_FULLDUPLEX, + KISS_CMD_PERSISTENCE, + KISS_CMD_SETHARDWARE, + KISS_CMD_SLOTTIME, + KISS_CMD_TXTAIL, + KISS_FEND, + KISS_FESC, + KISS_TFEND, + KISS_TFESC, + RESP_AIRTIME, + RESP_BATTERY, + RESP_ENCRYPTED, + RESP_ERROR, + RESP_HASH, + RESP_IDENTITY, + RESP_NOISE_FLOOR, + RESP_OK, + RESP_PONG, + RESP_RADIO, + RESP_RANDOM, + RESP_SHARED_SECRET, + RESP_SIGNATURE, + RESP_STATS, + RESP_TX_DONE, + RESP_TX_POWER, + RESP_VERIFY, + RESP_VERSION, + HW_RESP_RX_META, + KissModemWrapper, +) + + +class TestKissFrameEncoding: + """Test KISS frame encoding/decoding""" + + def test_encode_simple_frame(self): + """Test encoding a simple frame without special characters""" + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + frame = modem._encode_kiss_frame(CMD_DATA, b"\x01\x02\x03") + + # Should be: FEND + CMD + data + FEND + assert frame[0] == KISS_FEND + assert frame[1] == CMD_DATA + assert frame[2:5] == b"\x01\x02\x03" + assert frame[5] == KISS_FEND + + def test_encode_frame_with_fend_escape(self): + """Test encoding a frame containing FEND byte""" + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + frame = modem._encode_kiss_frame(CMD_DATA, bytes([0xC0])) # FEND + + # FEND in data should be escaped as FESC + TFEND + assert frame[0] == KISS_FEND + assert frame[1] == CMD_DATA + assert frame[2] == KISS_FESC + assert frame[3] == KISS_TFEND + assert frame[4] == KISS_FEND + + def test_encode_frame_with_fesc_escape(self): + """Test encoding a frame containing FESC byte""" + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + frame = modem._encode_kiss_frame(CMD_DATA, bytes([0xDB])) # FESC + + # FESC in data should be escaped as FESC + TFESC + assert frame[0] == KISS_FEND + assert frame[1] == CMD_DATA + assert frame[2] == KISS_FESC + assert frame[3] == KISS_TFESC + assert frame[4] == KISS_FEND + + def test_encode_frame_with_multiple_escapes(self): + """Test encoding a frame with multiple special characters""" + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + frame = modem._encode_kiss_frame(CMD_DATA, bytes([0xC0, 0xDB, 0xC0])) + + expected = bytes( + [ + KISS_FEND, + CMD_DATA, + KISS_FESC, + KISS_TFEND, # escaped 0xC0 + KISS_FESC, + KISS_TFESC, # escaped 0xDB + KISS_FESC, + KISS_TFEND, # escaped 0xC0 + KISS_FEND, + ] + ) + assert frame == expected + + def test_decode_simple_frame(self): + """Test decoding Data frame then RxMeta (spec: data and metadata separate)""" + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem.is_connected = True + + received_frames = [] + modem.on_frame_received = lambda data: received_frames.append(data) + + # Data frame: FEND + 0x00 + raw_packet + FEND (no in-frame metadata) + data_frame = bytes([KISS_FEND, CMD_DATA, 0x01, 0x02, 0x03, KISS_FEND]) + # RxMeta: FEND + 0x06 + 0xF9 + SNR + RSSI + FEND (sent immediately after Data) + rx_meta_frame = bytes([KISS_FEND, KISS_CMD_SETHARDWARE, HW_RESP_RX_META, 0x10, 0xB0, KISS_FEND]) + + for byte in data_frame: + modem._decode_kiss_byte(byte) + for byte in rx_meta_frame: + modem._decode_kiss_byte(byte) + + assert len(received_frames) == 1 + assert received_frames[0] == b"\x01\x02\x03" + + def test_decode_frame_with_escapes(self): + """Test decoding Data frame with escaped FEND, then RxMeta""" + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem.is_connected = True + + received_frames = [] + modem.on_frame_received = lambda data: received_frames.append(data) + + # Data frame: payload is escaped 0xC0 (FESC + TFEND) + data_frame = bytes( + [KISS_FEND, CMD_DATA, KISS_FESC, KISS_TFEND, KISS_FEND] + ) + rx_meta_frame = bytes([KISS_FEND, KISS_CMD_SETHARDWARE, HW_RESP_RX_META, 0x10, 0xB0, KISS_FEND]) + + for byte in data_frame: + modem._decode_kiss_byte(byte) + for byte in rx_meta_frame: + modem._decode_kiss_byte(byte) + + assert len(received_frames) == 1 + assert received_frames[0] == bytes([0xC0]) + + def test_decode_extracts_rssi_snr(self): + """Test that RSSI and SNR are extracted from RxMeta frame""" + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem.is_connected = True + + data_frame = bytes([KISS_FEND, CMD_DATA, 0xAA, 0xBB, KISS_FEND]) + # RxMeta: SNR=0x10 (4.0 dB), RSSI=0xB0 (-80) + rx_meta_frame = bytes([KISS_FEND, KISS_CMD_SETHARDWARE, HW_RESP_RX_META, 0x10, 0xB0, KISS_FEND]) + + for byte in data_frame: + modem._decode_kiss_byte(byte) + for byte in rx_meta_frame: + modem._decode_kiss_byte(byte) + + assert modem.stats["last_snr"] == pytest.approx(4.0) + assert modem.stats["last_rssi"] == -80 + + def test_rx_callback_receives_per_packet_rssi_snr(self): + """Test that a 3-arg callback receives (data, rssi, snr) per Data+RxMeta pair""" + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem.is_connected = True + + received = [] + def capture(data, rssi, snr): + received.append((data, rssi, snr)) + + modem.on_frame_received = capture + + # First packet: Data then RxMeta (SNR=4.0 dB, RSSI=-80) + data1 = bytes([KISS_FEND, CMD_DATA, 0x01, 0x02, KISS_FEND]) + meta1 = bytes([KISS_FEND, KISS_CMD_SETHARDWARE, HW_RESP_RX_META, 0x10, 0xB0, KISS_FEND]) + for byte in data1: + modem._decode_kiss_byte(byte) + for byte in meta1: + modem._decode_kiss_byte(byte) + + # Second packet: Data then RxMeta (SNR=2.0 dB, RSSI=-100) + data2 = bytes([KISS_FEND, CMD_DATA, 0x03, 0x04, KISS_FEND]) + meta2 = bytes([KISS_FEND, KISS_CMD_SETHARDWARE, HW_RESP_RX_META, 0x08, 0x9C, KISS_FEND]) + for byte in data2: + modem._decode_kiss_byte(byte) + for byte in meta2: + modem._decode_kiss_byte(byte) + + assert len(received) == 2 + assert received[0] == (b"\x01\x02", -80, 4.0) + assert received[1] == (b"\x03\x04", -100, 2.0) + + def test_data_frame_without_rx_meta_does_not_call_callback(self): + """Spec: Data frame queues payload; callback only on following RxMeta""" + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem.is_connected = True + + received = [] + modem.on_frame_received = lambda data: received.append(data) + + # Only Data frame, no RxMeta + data_frame = bytes([KISS_FEND, CMD_DATA, 0x01, 0x02, 0x03, KISS_FEND]) + for byte in data_frame: + modem._decode_kiss_byte(byte) + + assert len(received) == 0 + assert len(modem._pending_rx_queue) == 1 + assert modem._pending_rx_queue[0] == b"\x01\x02\x03" + + def test_port_non_zero_discarded(self): + """Frames with port != 0 are ignored (type byte 0x10 = port 1, cmd 0)""" + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem.is_connected = True + + received = [] + modem.on_frame_received = lambda data: received.append(data) + + # Type 0x10: port=1, cmd=0 (Data on port 1) - should be discarded + frame = bytes([KISS_FEND, 0x10, 0x01, 0x02, 0x03, KISS_FEND]) + for byte in frame: + modem._decode_kiss_byte(byte) + + assert len(received) == 0 + assert len(modem._pending_rx_queue) == 0 + + +class TestCommandResponses: + """Test command sending and response parsing""" + + def test_send_command_encodes_correctly(self): + """Test that _send_command sends SetHardware frame (FEND + 0x06 + sub_cmd + data + FEND)""" + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + + mock_serial = MagicMock() + mock_serial.is_open = True + modem.serial_conn = mock_serial + modem.is_connected = True + + modem._send_command(CMD_GET_VERSION, timeout=0.1) + + assert mock_serial.write.called + written_frame = mock_serial.write.call_args[0][0] + + assert written_frame[0] == KISS_FEND + assert written_frame[1] == KISS_CMD_SETHARDWARE # type SetHardware + assert written_frame[2] == HW_CMD_GET_VERSION # sub_cmd GetVersion + assert written_frame[-1] == KISS_FEND + + def test_response_parsing_identity(self): + """Test parsing SetHardware Identity response (FEND + 0x06 + 0x21 + pubkey + FEND)""" + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem.is_connected = True + + pubkey = bytes(range(32)) + raw_bytes = ( + bytes([KISS_FEND, KISS_CMD_SETHARDWARE, RESP_IDENTITY]) + pubkey + bytes([KISS_FEND]) + ) + + modem._response_event = threading.Event() + modem._pending_response = None + + for byte in raw_bytes: + modem._decode_kiss_byte(byte) + + assert modem._pending_response is not None + assert modem._pending_response[0] == RESP_IDENTITY + assert modem._pending_response[1] == pubkey + + def test_response_parsing_error(self): + """Test parsing SetHardware Error response (FEND + 0x06 + 0x2A + code + FEND)""" + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem.is_connected = True + + raw_bytes = bytes([KISS_FEND, KISS_CMD_SETHARDWARE, RESP_ERROR, 0x05, KISS_FEND]) + + modem._response_event = threading.Event() + modem._pending_response = None + + for byte in raw_bytes: + modem._decode_kiss_byte(byte) + + assert modem._pending_response is not None + assert modem._pending_response[0] == RESP_ERROR + assert modem._pending_response[1][0] == 0x05 + + def test_tx_done_response(self): + """Test SetHardware TxDone (0xF8) response sets event""" + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem.is_connected = True + modem._tx_done_event = threading.Event() + + raw_bytes = bytes([KISS_FEND, KISS_CMD_SETHARDWARE, RESP_TX_DONE, 0x01, KISS_FEND]) + + for byte in raw_bytes: + modem._decode_kiss_byte(byte) + + assert modem._tx_done_event.is_set() + assert modem._tx_done_result is True + + +class TestRadioConfiguration: + """Test radio configuration encoding""" + + def test_radio_config_struct_format(self): + """Test that radio config is packed correctly""" + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + + freq_hz = 869618000 + bw_hz = 62500 + sf = 8 + cr = 8 + + # This is what configure_radio should pack + expected = struct.pack(" 255 bytes) + assert modem.send_frame(bytes(256)) is False + + def test_send_frame_requires_connection(self): + """Test send_frame requires connection""" + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem.is_connected = False + + assert modem.send_frame(b"\x00\x01") is False + + def test_send_frame_queues_to_buffer(self): + """Test send_frame adds to TX buffer""" + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + modem.is_connected = True + + assert len(modem.tx_buffer) == 0 + + result = modem.send_frame(b"\x01\x02\x03") + + assert result is True + assert len(modem.tx_buffer) == 1 + + # Verify frame is properly encoded + frame = modem.tx_buffer[0] + assert frame[0] == KISS_FEND + assert frame[1] == CMD_DATA + assert frame[-1] == KISS_FEND + + +class TestQueryMethods: + """Test modem query methods""" + + def test_get_radio_config_parses_response(self): + """Test get_radio_config parses response correctly""" + modem = KissModemWrapper(port="/dev/null", auto_configure=False) + + freq = 869618000 + bw = 62500 + sf = 8 + cr = 8 + response_data = struct.pack("