Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
15 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion examples/calibrate_cad.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ def main():
parser = argparse.ArgumentParser(description="CAD Calibration Tool with Staged Workflow")
parser.add_argument(
"--radio",
choices=["waveshare", "uconsole", "meshadv-mini"],
choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc", "kiss-modem"],
default="waveshare",
help="Radio type",
)
Expand Down
93 changes: 79 additions & 14 deletions examples/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,12 +32,16 @@
from pymc_core.node.node import MeshNode


def create_radio(radio_type: str = "waveshare", serial_port: str = "/dev/ttyUSB0") -> LoRaRadio:
def create_radio(
radio_type: str = "waveshare",
serial_port: str = "/dev/ttyUSB0",
) -> LoRaRadio:
"""Create a radio instance with configuration for specified hardware.

Args:
radio_type: Type of radio hardware ("waveshare", "uconsole", "meshadv-mini", "kiss-tnc", or "ch341")
serial_port: Serial port for KISS TNC (only used with "kiss-tnc" type)
radio_type: Type of radio hardware ("waveshare", "uconsole", "meshadv-mini",
"kiss-tnc", "kiss-modem", or "ch341")
serial_port: Serial port for KISS devices (only used with "kiss-tnc" or "kiss-modem")

Returns:
Radio instance configured for the specified hardware
Expand Down Expand Up @@ -72,6 +76,38 @@ def create_radio(radio_type: str = "waveshare", serial_port: str = "/dev/ttyUSB0
)
return kiss_wrapper

# Check if this is a MeshCore KISS Modem configuration
if radio_type == "kiss-modem":
from pymc_core.hardware.kiss_modem_wrapper import KissModemWrapper

logger.debug("Using MeshCore KISS Modem Wrapper")

# MeshCore KISS Modem configuration
# Note: Sync word is configured at firmware build time
modem_config = {
"frequency": int(869.618 * 1000000), # EU: 869.618 MHz
"bandwidth": int(62.5 * 1000), # 62.5 kHz
"spreading_factor": 8, # LoRa SF8
"coding_rate": 8, # LoRa CR 4/8
"power": 22, # TX power
}

# Create KISS modem wrapper with specified port.
# To enable host-side LBT (e.g. full-duplex on half-duplex link), call
# modem_wrapper.set_lbt_enabled(True) after creation.
modem_wrapper = KissModemWrapper(
port=serial_port,
baudrate=115200,
radio_config=modem_config,
auto_configure=True,
)

logger.info("Created MeshCore KISS Modem Wrapper")
logger.info(
f"Frequency: {modem_config['frequency']/1000000:.3f}MHz, TX Power: {modem_config['power']}dBm"
)
return modem_wrapper

# Check if this is a CH341 configuration
if radio_type == "ch341":
from pymc_core.hardware.ch341.ch341_gpio_manager import CH341GPIOManager
Expand Down Expand Up @@ -181,7 +217,8 @@ def create_radio(radio_type: str = "waveshare", serial_port: str = "/dev/ttyUSB0

if radio_type not in configs:
raise ValueError(
f"Unknown radio type: {radio_type}. Use 'waveshare', 'meshadv-mini', 'uconsole', 'kiss-tnc', or 'ch341'"
f"Unknown radio type: {radio_type}. "
"Use 'waveshare', 'meshadv-mini', 'uconsole', 'kiss-tnc', 'kiss-modem', or 'ch341'"
)

radio_kwargs = configs[radio_type]
Expand All @@ -203,27 +240,29 @@ def create_radio(radio_type: str = "waveshare", serial_port: str = "/dev/ttyUSB0


def create_mesh_node(
node_name: str = "ExampleNode", radio_type: str = "waveshare", serial_port: str = "/dev/ttyUSB0"
node_name: str = "ExampleNode",
radio_type: str = "waveshare",
serial_port: str = "/dev/ttyUSB0",
use_modem_identity: bool = False,
) -> tuple[MeshNode, LocalIdentity]:
"""Create a mesh node with radio.

Args:
node_name: Name for the mesh node
radio_type: Type of radio hardware ("waveshare", "uconsole", "meshadv-mini", "kiss-tnc", or "ch341")
serial_port: Serial port for KISS TNC (only used with "kiss-tnc" type)
radio_type: Type of radio hardware ("waveshare", "uconsole", "meshadv-mini",
"kiss-tnc", "kiss-modem", or "ch341")
serial_port: Serial port for KISS devices (only used with "kiss-tnc" or "kiss-modem")
use_modem_identity: If True and radio_type is "kiss-modem", use the modem's
cryptographic identity instead of generating a local one.
This keeps the private key secure on the modem hardware.

Returns:
Tuple of (MeshNode, LocalIdentity)
Tuple of (MeshNode, Identity) - Identity may be LocalIdentity or ModemIdentity
"""
logger.info(f"Creating mesh node with name: {node_name} using {radio_type} radio")

try:
# Create a local identity (this generates a new keypair)
logger.debug("Creating LocalIdentity...")
identity = LocalIdentity()
logger.info(f"Created identity with public key: {identity.get_public_key().hex()[:16]}...")

# Create the radio
# Create the radio first (needed for modem identity)
logger.debug("Creating radio...")
radio = create_radio(radio_type, serial_port)

Expand All @@ -241,6 +280,19 @@ def create_mesh_node(
logger.error("Failed to connect KISS radio")
print(f"Failed to connect to KISS radio on {serial_port}")
raise Exception(f"KISS radio connection failed on {serial_port}")
elif radio_type == "kiss-modem":
logger.debug("Connecting MeshCore KISS modem...")
if radio.connect():
logger.info("KISS modem connected successfully")
print(f"KISS modem connected to {serial_port}")
if hasattr(radio, "modem_version") and radio.modem_version:
print(f"Modem version: {radio.modem_version}")
if hasattr(radio, "modem_identity") and radio.modem_identity:
print(f"Modem identity: {radio.modem_identity.hex()[:16]}...")
else:
logger.error("Failed to connect KISS modem")
print(f"Failed to connect to KISS modem on {serial_port}")
raise Exception(f"KISS modem connection failed on {serial_port}")
elif radio_type == "ch341":
logger.debug("Initializing CH341 radio...")
ok = radio.begin()
Expand All @@ -255,6 +307,19 @@ def create_mesh_node(
raise RuntimeError("SX1262 radio begin() returned False")
logger.info("Radio initialized successfully")

# Create identity - use modem identity if requested and available
if use_modem_identity and radio_type == "kiss-modem":
from pymc_core.protocol.modem_identity import ModemIdentity

logger.debug("Creating ModemIdentity from KISS modem...")
identity = ModemIdentity(radio)
logger.info(f"Using modem identity: {identity.get_public_key().hex()[:16]}...")
print(f"Using modem identity (private key secured on modem)")
else:
logger.debug("Creating LocalIdentity...")
identity = LocalIdentity()
logger.info(f"Created local identity: {identity.get_public_key().hex()[:16]}...")

# Create a mesh node with the radio and identity
config = {"node": {"name": node_name}}
logger.debug(f"Creating MeshNode with config: {config}")
Expand Down
2 changes: 1 addition & 1 deletion examples/discover_nodes.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def main():
parser = argparse.ArgumentParser(description="Discover nearby mesh nodes")
parser.add_argument(
"--radio-type",
choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc"],
choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc", "kiss-modem"],
default="waveshare",
help="Radio hardware type (default: waveshare)",
)
Expand Down
2 changes: 1 addition & 1 deletion examples/login_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,7 @@ def main():
)
parser.add_argument(
"--radio-type",
choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc"],
choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc", "kiss-modem"],
default="waveshare",
help="Radio hardware type (default: waveshare)",
)
Expand Down
2 changes: 1 addition & 1 deletion examples/ping_repeater_trace.py
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def main():
parser = argparse.ArgumentParser(description="Ping a repeater using trace packets")
parser.add_argument(
"--radio-type",
choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc"],
choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc", "kiss-modem"],
default="waveshare",
help="Radio hardware type (default: waveshare)",
)
Expand Down
2 changes: 1 addition & 1 deletion examples/respond_to_discovery.py
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ def main():
parser = argparse.ArgumentParser(description="Respond to mesh node discovery requests")
parser.add_argument(
"--radio-type",
choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc"],
choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc", "kiss-modem"],
default="waveshare",
help="Radio hardware type (default: waveshare)",
)
Expand Down
2 changes: 1 addition & 1 deletion examples/send_channel_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ def main():
parser = argparse.ArgumentParser(description="Send a channel message to the Public channel")
parser.add_argument(
"--radio-type",
choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc"],
choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc", "kiss-modem"],
default="waveshare",
help="Radio hardware type (default: waveshare)",
)
Expand Down
2 changes: 1 addition & 1 deletion examples/send_direct_advert.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ def main():
parser = argparse.ArgumentParser(description="Send a direct advertisement packet")
parser.add_argument(
"--radio-type",
choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc"],
choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc", "kiss-modem"],
default="waveshare",
help="Radio hardware type (default: waveshare)",
)
Expand Down
2 changes: 1 addition & 1 deletion examples/send_flood_advert.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def main():
parser = argparse.ArgumentParser(description="Send a flood advertisement packet")
parser.add_argument(
"--radio-type",
choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc"],
choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc", "kiss-modem"],
default="waveshare",
help="Radio hardware type (default: waveshare)",
)
Expand Down
2 changes: 1 addition & 1 deletion examples/send_text_message.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ def main():
parser = argparse.ArgumentParser(description="Send a text message to the mesh network")
parser.add_argument(
"--radio-type",
choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc"],
choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc", "kiss-modem"],
default="waveshare",
help="Radio hardware type (default: waveshare)",
)
Expand Down
4 changes: 2 additions & 2 deletions examples/send_tracked_advert.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def main():
parser = argparse.ArgumentParser(description="Send a location-tracked advertisement")
parser.add_argument(
"--radio-type",
choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc", "ch341"],
choices=["waveshare", "uconsole", "meshadv-mini", "kiss-tnc", "kiss-modem", "ch341"],
default="waveshare",
help="Radio hardware type (default: waveshare)",
)
Expand All @@ -106,7 +106,7 @@ def main():
args = parser.parse_args()

print(f"Using {args.radio_type} radio configuration")
if args.radio_type == "kiss-tnc":
if args.radio_type in ("kiss-tnc", "kiss-modem"):
print(f"Serial port: {args.serial_port}")

try:
Expand Down
Loading