Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/copilot-instructions.md
Original file line number Diff line number Diff line change
Expand Up @@ -366,7 +366,7 @@ Whenever `hexdrive.py` is modified, you **must** perform all three steps:
2. **Bump `HEXDRIVE_APP_VERSION`** in `app.py` (and `linefollower.py` if present) to the **same** integer. This is how the app detects that the EEPROM firmware is out-of-date and prompts the user to reprogram.
3. **Rebuild the `.mpy`** by running from the BadgeBot directory:
```bash
mpy-cross -v hexdrive.py
mpy-cross -v EEPROM/hexdrive.py
```
This produces `hexdrive.mpy`, which is what actually gets written to the HexDrive EEPROM.

Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,5 @@ BadgeBot.code-workspace
.deploy_state/test_device_download_state.json
.editorconfig
.venv/
.venv-wsl*/

190 changes: 190 additions & 0 deletions EEPROM/gps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
""" GPS App for Hexpansion """
import app

from app_components.tokens import label_font_size, button_labels
from events.input import Buttons, BUTTON_TYPES, ButtonDownEvent
from system.eventbus import eventbus
from system.hexpansion.config import HexpansionConfig
from system.patterndisplay.events import PatternDisable, PatternEnable
from system.scheduler.events import RequestForegroundPopEvent, RequestForegroundPushEvent, RequestStopAppEvent
from tildagonos import tildagonos
from machine import UART, Pin

# Minimal length method names to make the mpy file as small as possible so it might fit in the 2k hexpansion EEPROM.
# Minimal functionality to get a GPS fix
# This version is NOT for the App Store

VERSION = 1

# Hardware defintions:
TX_PIN = 1 # HS_G for TX
RX_PIN = 0 # HS_F for RX
RESET_PIN = 2 # HS_H for reset
PPS_PIN = 3 # HS_I for PPS

###JUST FOR USE WITH MY PROTOTYPE BOARD
ENABLE_PIN = 0 # First LS pin used to enable the SMPSU
###JUST FOR USE WITH MY PROTOTYPE BOARD

class GPSApp(app.App): # pylint: disable=no-member
""" App to get GPS data from a GPS module connected to the hexpansion and display it on the badge. """
def __init__(self, config: HexpansionConfig | None = None):
super().__init__()
# If run from EEPROM on the hexpansion, the config will be passed in with the correct pin objects
self.config: HexpansionConfig | None = config
if config is None:
return
self.tx_pin = config.pin[TX_PIN]
self.rx_pin = config.pin[RX_PIN]
self.reset = config.pin[RESET_PIN]
self.pps = config.pin[PPS_PIN]

###JUST FOR USE WITH MY PROTOTYPE BOARD
self.power_control = config.ls_pin[ENABLE_PIN]
self.power_control.init(mode=Pin.OUT)
self.power_control.value(1)
###JUST FOR USE WITH MY PROTOTYPE BOARD

self.foreground = False
self.button_states = Buttons(self)
self.last_fix = None

# Event handlers for gaining and losing focus and for stopping the app
eventbus.on_async(RequestStopAppEvent, self.s, self)
eventbus.on_async(RequestForegroundPushEvent, self.r, self)
eventbus.on_async(RequestForegroundPopEvent, self.p, self)

self.uart = UART(1, baudrate=9600, tx=self.tx_pin, rx=self.rx_pin)
self.reset.init(mode=Pin.OUT)
self.pps.init(mode=Pin.IN)
self.reset.value(1) # set reset high here and release when 100ms has passed in foreground update.
self.ticks_since_start = 0
self.ticks_since_last_fix = 0


def deinit(self):
""" Deinitialise the app, releasing any resources (e.g. UART) """
self.uart.deinit()
self.power_control.value(0) # Cut power to the GPS to save power when not in use


def get_version(self) -> int:
""" Get the version of the app - this is used to determine if an upgrade is required. """
return VERSION


async def s(self, event: RequestStopAppEvent):
""" Handle the RequestStopAppEvent so that we can release resources """
if event.app == self:
self.deinit()


async def r(self, event: RequestForegroundPushEvent):
""" Handle the RequestForegroundPushEvent to know when we gain focus """
if event.app == self:
eventbus.emit(PatternDisable())
eventbus.on(ButtonDownEvent, self.d, self)
self.foreground = True


async def p(self, event: RequestForegroundPopEvent):
""" Handle the RequestForegroundPopEvent to know when we lose focus """
if event.app == self:
eventbus.emit(PatternEnable())
eventbus.remove(ButtonDownEvent, self.d, self)


def d(self, event: ButtonDownEvent):
""" Handle button down events """
if event.button == BUTTON_TYPES["CANCEL"]:
self.button_states.clear()
self.minimise()


def update(self, delta):
""" Update the app state - expire last_fix if it is too old """
if self.reset.value():
self.ticks_since_start += delta
if self.ticks_since_start > 100:
Comment thread
Robotmad marked this conversation as resolved.
# Release reset after 100ms to allow the GPS to start up
self.reset.value(0)
if not self.foreground:
# This triggers the automatic foreground display
eventbus.emit(RequestForegroundPushEvent(self))
self.foreground = True
if self.last_fix:
self.ticks_since_last_fix += delta
if self.ticks_since_last_fix > 10000:
# If it's been more than 10 seconds since the last fix, disccard it
self.last_fix = None


def background_update(self, _delta):
""" Update in the background - read from the UART and parse any GPS data """
line = self.uart.readline()
if line:
try:
line = line.decode().strip()
result = n(line)
if result:
self.last_fix = result
self.ticks_since_last_fix = 0
except (UnicodeError, ValueError, AttributeError):
pass


def draw(self, ctx):
""" Draw the app - display the last GPS fix or a searching message if no fix is available """
ctx.rgb(0, 0.2, 0).rectangle(-120, -120, 240, 240).fill()
ctx.rgb(0, 1, 0)
ctx.font_size = label_font_size
ctx.text_align = ctx.LEFT
ctx.text_baseline = ctx.BOTTOM
if self.last_fix:
ctx.move_to(-100, -10).text("Lat: " + str(round(self.last_fix["lat"], 5)))
ctx.move_to(-100, 20).text("Lon: " + str(round(self.last_fix["lon"], 5)))
for i in range(1, 13):
tildagonos.leds[i] = (0,1,0)
tildagonos.leds.write()
else:
ctx.move_to(-100, 0).text("Searching...")
for i in range(1,13):
tildagonos.leds[i] = (0,0,0)
tildagonos.leds.write()

# show labels for buttons
button_labels(ctx, cancel_label="Exit")

def n(line: str) -> dict[str, float] | None:
""" Parse an NMEA RMC sentence and return a dictionary with the latitude and longitude if valid, or None if invalid. """
parts = line.split(',')

if parts[0] not in ("$GNRMC", "$GPRMC"):
return None
elif parts[2] != "A": # A = valid, V = invalid
return None
else:
lat_raw = parts[3]
lat_dir = parts[4]
lon_raw = parts[5]
lon_dir = parts[6]

if not lat_raw or not lon_raw:
return None

# Convert to decimal degrees
lat = float(lat_raw[:2]) + float(lat_raw[2:]) / 60
lon = float(lon_raw[:3]) + float(lon_raw[3:]) / 60

if lat_dir == "S":
lat = -lat
if lon_dir == "W":
lon = -lon

return {
"lat": lat,
"lon": lon
}


__app_export__ = GPSApp #pylint: disable=invalid-name
44 changes: 22 additions & 22 deletions hexdrive.py → EEPROM/hexdrive.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,10 +150,9 @@ def deinitialise(self) -> bool:
""" De-initialise the app - return True if successful, False if failed."""
# Turn off all PWM outputs & release resources
self.set_power(False)
self._pwm_deinit()
self._pwm_deinit()
for hs_pin in self.config.pin:
hs_pin.init(mode=Pin.OUT)
hs_pin.value(0)
hs_pin.init(mode=Pin.IN)
return True


Expand Down Expand Up @@ -224,9 +223,9 @@ def set_power(self, state: bool) -> bool:
self._power_control.value(state)
except Exception as e: # pylint: disable=broad-except
print(f"D:{self.config.port}:power control failed {e}")
return False
return False
self._power_state = state
return self._power_state
return self._power_state


def get_power(self) -> bool:
Expand Down Expand Up @@ -272,7 +271,7 @@ def set_servoposition(self, channel: int | None = None, position: int | None = N
The pulse width for a specific servo output is position + the centre offset (in us)
Based on standard RC servos with centre at 1500us and range of 1000-2000us.
The position is a signed value from -1000 to 1000 which is scaled to 500-2500us.
This is a very wide range and may not be suitable for all servos, some will
This is a very wide range and may not be suitable for all servos, some will
only be happy with 1000-2000us (i.e. position in the range -500 to 500). """
if not self._pwm_setup:
return False
Expand Down Expand Up @@ -301,8 +300,8 @@ def set_servoposition(self, channel: int | None = None, position: int | None = N
print(self._pwm_log_string(channel) + f"Off failed {e}")
return False
# check if all channels are now off and set outputs_energised accordingly
self._check_outputs_energised()
elif channel is not None:
self._check_outputs_energised()
elif channel is not None:
if channel < 0 or channel >= self._hexdrive_type.servos:
return False
if abs(position) > _MAX_SERVO_RANGE:
Expand All @@ -328,7 +327,7 @@ def set_servoposition(self, channel: int | None = None, position: int | None = N
self._freq[channel] = _DEFAULT_SERVO_FREQ
self.PWMOutput[channel].freq(_DEFAULT_SERVO_FREQ)
if self._logging:
print(self._pwm_log_string(channel) + f"{_DEFAULT_SERVO_FREQ}Hz for Servo")
print(self._pwm_log_string(channel) + f"{_DEFAULT_SERVO_FREQ}Hz for Servo")
except Exception as e: # pylint: disable=broad-except
print(self._pwm_log_string(channel) + f"set freq failed {e}")
return False
Expand All @@ -339,7 +338,7 @@ def set_servoposition(self, channel: int | None = None, position: int | None = N
print(self._pwm_log_string(channel) + f"{pulse_width_in_ns}ns")
self.PWMOutput[channel].duty_ns(pulse_width_in_ns)
if self._logging:
print(self._pwm_log_string(channel) + f"{self.PWMOutput[channel]} duty")
print(self._pwm_log_string(channel) + f"{self.PWMOutput[channel]} duty")
except Exception as e: # pylint: disable=broad-except
print(self._pwm_log_string(channel) + f"set duty failed {e}")
return False
Expand All @@ -359,7 +358,7 @@ def set_servocentre(self, centre: int, channel: int | None = None) -> bool:
return False
if channel is not None and (channel < 0 or channel >= self._hexdrive_type.servos):
return False
if centre < (_SERVO_CENTRE - _SERVO_MAX_TRIM ) or centre > (_SERVO_CENTRE + _SERVO_MAX_TRIM):
if centre < (_SERVO_CENTRE - _SERVO_MAX_TRIM ) or centre > (_SERVO_CENTRE + _SERVO_MAX_TRIM):
return False
if channel is None:
self._servo_centre = [centre] * 4
Expand All @@ -380,6 +379,7 @@ def set_motors(self, outputs: tuple[int, ...]) -> bool:
if abs(output) > 65535:
return False
if output == self._motor_output[motor]:
# no change in output for this motor so skip to the next one
continue
try:
# if the output is changing direction then we need to switch which signal is being driven as the PWM output
Expand All @@ -393,7 +393,7 @@ def set_motors(self, outputs: tuple[int, ...]) -> bool:
self.PWMOutput[output_to_disable].deinit()
self.PWMOutput[output_to_disable] = None
self.config.pin[output_to_disable].value(0)
self._set_pwmoutput(output_to_enable, abs(output))
self._set_pwmoutput(output_to_enable, abs(output))
except Exception as e: # pylint: disable=broad-except
print(f"D:{self.config.port}:Motor{motor}:{output} set failed {e}")
self._motor_output[motor] = output
Expand All @@ -409,7 +409,7 @@ def set_pwm(self, duty_cycles: tuple[int, ...]) -> bool:
if not self._pwm_setup:
return False
self._outputs_energised = any(duty_cycles)
for channel, duty_cycle in enumerate(duty_cycles):
for channel, duty_cycle in enumerate(duty_cycles):
if not self._set_pwmoutput(channel, duty_cycle):
return False
self._time_since_last_update = 0
Expand All @@ -424,13 +424,13 @@ def set_pwm(self, duty_cycles: tuple[int, ...]) -> bool:

def motor_step(self, phase: int) -> int | None:
""" Step the motor to a specific phase in the stepping sequence. Returns None if failed (e.g. invalid phase or not configured for stepper),
otherwise returns the phase that was set. The phase is a value from 0 to _STEPPER_NUM_PHASES-1 which corresponds to the
otherwise returns the phase that was set. The phase is a value from 0 to _STEPPER_NUM_PHASES-1 which corresponds to the
stepping sequence defined in _STEPPER_SEQUENCE."""
if phase >= _STEPPER_NUM_PHASES or self._hexdrive_type.steppers == 0:
return None
if not self._stepper:
# not currently configured for stepper motor - configure
self._pwm_deinit()
self._pwm_deinit()
self._stepper = True
for channel, value in enumerate(_STEPPER_SEQUENCE[phase]):
self.config.pin[channel].value(value)
Expand All @@ -454,10 +454,10 @@ def motor_release(self):
def _pwm_init(self) -> bool:
self._pwm_setup = False
# HS Pins
if self.config.pin is not None and len(self.config.pin) == 4:
if self.config.pin is not None and len(self.config.pin) == 4:
# Allocate PWM generation to pins
for channel, _ in enumerate(self.config.pin):
self._freq[channel] = 0
self._freq[channel] = 0
if self._hexdrive_type is not None:
if channel < (2 * self._hexdrive_type.motors):
# First channels are for motors (can be 0, 1 or 2 motors)
Expand All @@ -480,10 +480,10 @@ def _pwm_init(self) -> bool:
if self._set_pwmoutput(channel, 0):
self._stepper = False
else:
return False
return False
self._pwm_setup = True
return self._pwm_setup


# De-initialise all PWM outputs
def _pwm_deinit(self):
Expand Down Expand Up @@ -520,13 +520,13 @@ def _check_outputs_energised(self):
# if the channel has not been setup yet then we initialise it from scratch, otherwise we just change the duty cycle
def _set_pwmoutput(self, channel: int, duty_cycle: int) -> bool:
if duty_cycle < 0 or duty_cycle > 65535:
return False
return False
try:
if self.PWMOutput[channel] is None:
# Channel hasn't been setup yet so we need to initialise it from scratch
self.PWMOutput[channel] = PWM(self.config.pin[channel], freq = self._freq[channel], duty_u16 = duty_cycle)
if self._logging:
print(self._pwm_log_string(channel) + f"{self.PWMOutput[channel]} init")
print(self._pwm_log_string(channel) + f"{self.PWMOutput[channel]} init")
elif duty_cycle != self.PWMOutput[channel].duty_u16():
self.PWMOutput[channel].duty_u16(duty_cycle)
if self._logging:
Expand Down Expand Up @@ -556,7 +556,7 @@ def _check_port_for_hexdrive(self, port: int) -> HexDriveType | None:
return hexpansion_type
# we are not interested in this type of hexpansion
return None


def _parse_version(self, version):
""" Parse a version string, e.g. that of BadgeOS, into a list of components for comparison. Handles versions in the format v1.9.0-beta.1+build.123
Expand Down
Loading
Loading