diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..6e44dde --- /dev/null +++ b/.gitattributes @@ -0,0 +1,13 @@ +.gitignore export-ignore +.gitattributes export-ignore +download.bat export-ignore +download.sh export-ignore +pyproject.toml export-ignore +README.md export-ignore +.github/ export-ignore +.vscode/ export-ignore +dev/ export-ignore +tests/ export-ignore +typings/ export-ignore +*.py export-ignore +*.mpy binary -diff diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index 7ca787a..c1584aa 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -8,10 +8,12 @@ jobs: runs-on: ubuntu-latest steps: - uses: actions/checkout@v4 + with: + submodules: recursive - name: Set up Python - uses: actions/setup-python@v3 + uses: actions/setup-python@v5 with: - python-version: 3.10 + python-version: "3.10" - name: Run release setup script run: | python3 ./dev/build_release.py -f diff --git a/.gitignore b/.gitignore index d8ccc59..d1e0dba 100644 --- a/.gitignore +++ b/.gitignore @@ -1,9 +1,7 @@ *.pyc *.pyo -*.mpy HexManager.code-workspace .deploy_state/test_device_download_state.json .editorconfig .venv/ .venv-wsl*/ - diff --git a/.gitmodules b/.gitmodules new file mode 100644 index 0000000..a6b30ab --- /dev/null +++ b/.gitmodules @@ -0,0 +1,6 @@ +[submodule "vendor/HexDrive2"] + path = vendor/HexDrive2 + url = https://github.com/TeamRobotmad/HexDrive2.git +[submodule "vendor/HexCurrent"] + path = vendor/HexCurrent + url = https://github.com/TeamRobotmad/HexCurrent.git diff --git a/EEPROM/caffeine.mpy b/EEPROM/caffeine.mpy new file mode 100644 index 0000000..fd6945f Binary files /dev/null and b/EEPROM/caffeine.mpy differ diff --git a/EEPROM/gps.mpy b/EEPROM/gps.mpy new file mode 100644 index 0000000..5f3d9dd Binary files /dev/null and b/EEPROM/gps.mpy differ diff --git a/EEPROM/hexcurrent.mpy b/EEPROM/hexcurrent.mpy new file mode 100644 index 0000000..21a06f2 Binary files /dev/null and b/EEPROM/hexcurrent.mpy differ diff --git a/EEPROM/hexdrive.mpy b/EEPROM/hexdrive.mpy new file mode 100644 index 0000000..f71fa0f Binary files /dev/null and b/EEPROM/hexdrive.mpy differ diff --git a/EEPROM/hexdrive.py b/EEPROM/hexdrive.py index 5405f29..47dff2d 100644 --- a/EEPROM/hexdrive.py +++ b/EEPROM/hexdrive.py @@ -12,6 +12,9 @@ import app +# Define the minimum BadgeOS version required to run this app (e.g. if we need features that are only available in a certain version of BadgeOS) +_MIN_BADGEOS_VERSION = [1, 9, 0] # v1.9.0 is required to be able to read the EEPROM with 16-bit addressing + # HexDrive Hexpansion constants # Hardware defintions: _ENABLE_PIN = 0 # First LS pin used to enable the SMPSU @@ -92,7 +95,7 @@ def __init__(self, config: HexpansionConfig | None = None): ver = self._parse_version(ota.get_version()) #print(f"D:S/W {ver}") # e.g. v1.9.0-beta.1 - if ver >= [1, 9, 0]: + if ver >= _MIN_BADGEOS_VERSION: # we need v1.9.0+ to be able to read the EEPROM with 16-bit addressing, so if we are running on an older version then we cannot continue pass else: diff --git a/EEPROM/hexdrive2.mpy b/EEPROM/hexdrive2.mpy new file mode 100644 index 0000000..24d2aac Binary files /dev/null and b/EEPROM/hexdrive2.mpy differ diff --git a/README.md b/README.md index 4346b14..ea6217e 100644 --- a/README.md +++ b/README.md @@ -55,7 +55,8 @@ HexManager reads hexpansion type definitions from a JSON file named **`hexpansio | Field | Required | Default | Description | |---|---|---|---| | `pid` | ✅ | – | Product ID (0–65535). Must be unique within the same VID. | - | `name` | ✅ | – | Display name shown on screen ≤ 9 chars. | + | `name` | ✅ | – | Display name shown in HexManager. | + | `friendly_name` | No | `name` | Short name written into the EEPROM header and used by BadgeOS insertion notifications. Must fit within 9 chars. | | `vid` | No | "0xCAFE" | Vendor ID. | | `eeprom_total_size` | No | 8192 | EEPROM size in **bytes** (e.g. 2048, 8192, 32768, 65536). | | `eeprom_page_size` | No | 32 | Write page size in **bytes** – check your EEPROM datasheet (e.g. 16, 32, 64, 128). | diff --git a/app.mpy b/app.mpy new file mode 100644 index 0000000..a372c75 Binary files /dev/null and b/app.mpy differ diff --git a/app.py b/app.py index 97f5a5b..6e8bdc7 100644 --- a/app.py +++ b/app.py @@ -15,11 +15,14 @@ RequestStopAppEvent) import app -APP_VERSION = "0.2" # HexManager App Version Number +APP_VERSION = "0.4" # HexManager App Version Number _HEXPANSIONS_JSON = "hexpansions.json" # Name of the hexpansion type definitions file _SETTINGS_NAME_PREFIX = "hexmanager." # Prefix for settings keys in EEPROM +_MESSAGE_MAX_LINES = 5 +_MESSAGE_WRAP_COLUMNS = 18 +_IMPORT_ERRORS: dict[str, str] = {} # Screen positioning constant for scroll mode display H_START = -63 @@ -68,10 +71,13 @@ def _try_import(module_name, *attr_names): try: __import__(full_name) mod = sys.modules[full_name] + _IMPORT_ERRORS.pop(module_name, None) return tuple(getattr(mod, n) for n in attr_names) except ImportError as e: + _IMPORT_ERRORS[module_name] = str(e) print(f"Warning: {module_name} module not found ({e})") except Exception as e: # pylint: disable=broad-except + _IMPORT_ERRORS[module_name] = f"{type(e).__name__}: {e}" print(f"Error importing {module_name} module ({e})") return nones @@ -80,6 +86,71 @@ def _try_import(module_name, *attr_names): SettingsMgr, MySetting = _try_import('settings_mgr', 'SettingsMgr', 'MySetting') +def _wrap_message_line(line, max_columns=_MESSAGE_WRAP_COLUMNS): + text = str(line) + if not text: + return [""] + + def _split_long_word(word): + return [word[i:i + max_columns] for i in range(0, len(word), max_columns)] or [""] + + wrapped = [] + current = "" + for word in text.split(): + chunks = _split_long_word(word) + for idx, chunk in enumerate(chunks): + if idx > 0 and current: + wrapped.append(current) + current = "" + if not current: + current = chunk + elif len(current) + 1 + len(chunk) <= max_columns: + current = f"{current} {chunk}" + else: + wrapped.append(current) + current = chunk + if current or not wrapped: + wrapped.append(current) + return wrapped + + +def _paginate_message(message, colours, max_lines=_MESSAGE_MAX_LINES): + pages = [] + page_lines = [] + page_colours = [] + source_lines = list(message) if message else [""] + source_colours = list(colours) if colours else [] + + for idx, line in enumerate(source_lines): + colour = source_colours[idx] if idx < len(source_colours) else (1, 1, 1) + for wrapped_line in _wrap_message_line(line): + if len(page_lines) >= max_lines: + pages.append((page_lines, page_colours)) + page_lines = [] + page_colours = [] + page_lines.append(wrapped_line) + page_colours.append(colour) + + if page_lines or not pages: + pages.append((page_lines, page_colours)) + + return pages + + +def _startup_warning_message(warning): + title = _HEXPANSIONS_JSON + body = warning + if warning.startswith("hexpansion_mgr import failed:"): + detail = warning.split(":", 1)[1].strip() + title = "hexpansion_mgr" + body = "import failed: " + detail + elif warning.startswith(_HEXPANSIONS_JSON): + body = warning[len(_HEXPANSIONS_JSON):].strip(": ") + else: + title = "HexManager" + return [title, "warning:", body], [(1, 0.6, 0)] * 3 + + def _load_hexpansion_types(app_file_path, json_path=None): """Load hexpansion type definitions from hexpansions.json next to this app file. @@ -100,8 +171,9 @@ def _load_hexpansion_types(app_file_path, json_path=None): types_list = [] warnings = [] if HexpansionType is None: - warnings.append("hexpansion type support unavailable") - print("H:Warning: hexpansion type support unavailable (HexpansionType import failed)") + reason = _IMPORT_ERRORS.get("hexpansion_mgr", "HexpansionType import failed") + warnings.append(f"hexpansion_mgr import failed: {reason}") + print(f"H:Warning: hexpansion_mgr import failed ({reason})") return types_list, warnings if json_path is None: last_slash = max(app_file_path.rfind("/"), app_file_path.rfind("\\")) @@ -139,6 +211,7 @@ def _load_hexpansion_types(app_file_path, json_path=None): types_list.append(HexpansionType( pid=h["pid"], name=str(h["name"]), + friendly_name=str(h["friendly_name"]) if h.get("friendly_name") is not None else str(h["name"]), vid=h.get("vid", 0xCAFE), eeprom_total_size=h.get("eeprom_total_size", 8192), eeprom_page_size=h.get("eeprom_page_size", 32), @@ -175,6 +248,8 @@ def __init__(self): self.message: list = [] self.message_colours: list = [] self.message_type: str | None = None + self._message_pages: list[tuple[list[str], list[tuple[float, float, float]]]] = [] + self._message_page_index: int = 0 self.current_menu: str | None = None self.menu: Menu | None = None self.scroll_mode_enabled: bool = False # Whether pressing the "C" button can toggle scroll mode on/off, which allows the user to scroll through lines on the display. @@ -382,10 +457,9 @@ def _update_main_application(self, delta: int): # Show any startup warnings once (e.g. hexpansions.json not found or parse error) if self._startup_warnings and self.current_state != STATE_MESSAGE: w = self._startup_warnings[0] - if w.startswith(_HEXPANSIONS_JSON): - w = w[len(_HEXPANSIONS_JSON):].strip(": ") self._startup_warning_active = True - self.show_message([_HEXPANSIONS_JSON, "warning:", w], [(1, 0.6, 0)] * 3, msg_type="warning") + msg_content, msg_colours = _startup_warning_message(w) + self.show_message(msg_content, msg_colours, msg_type="warning") return if self.current_state == STATE_MENU: if self.current_menu is None: @@ -430,6 +504,18 @@ def _update_main_application(self, delta: int): def _update_state_message(self, delta: int): # pylint: disable=unused-argument + if len(self._message_pages) > 1 and self.button_states.get(BUTTON_TYPES["UP"]): + self.button_states.clear() + if self._message_page_index > 0: + self._set_message_page(self._message_page_index - 1) + self.refresh = True + return + if len(self._message_pages) > 1 and self.button_states.get(BUTTON_TYPES["DOWN"]): + self.button_states.clear() + if self._message_page_index < len(self._message_pages) - 1: + self._set_message_page(self._message_page_index + 1) + self.refresh = True + return if self.button_states.get(BUTTON_TYPES["CONFIRM"]): if self.message_type == "reboop": self.button_states.clear() @@ -456,6 +542,8 @@ def _update_state_message(self, delta: int): # pylint: disable=unused-argum self.message = [] self.message_colours = [] self.message_type = None + self._message_pages = [] + self._message_page_index = 0 def scroll_mode_enable(self, enable: bool): @@ -514,7 +602,9 @@ def draw(self, ctx): self.message_colours = [(1,0,0)]*len(self.message) self.draw_message(ctx, self.message, self.message_colours, label_font_size) if self.message_type is None or self.message_type == "warning" or self.message_type == "hexpansion" or self.message_type == "serialise": - button_labels(ctx, confirm_label="OK", cancel_label="Exit") + up_label = "Prev" if self._message_page_index > 0 else "" + down_label = "Next" if self._message_page_index < len(self._message_pages) - 1 else "" + button_labels(ctx, confirm_label="OK", cancel_label="Exit", up_label=up_label, down_label=down_label) else: # Delegate to functional area managers via dispatch table if self.current_state in self._state_draw_dispatch: @@ -561,12 +651,21 @@ def return_to_menu(self, menu_name: str | None = None): self.refresh = True + def _set_message_page(self, index: int): + self._message_page_index = index + if not self._message_pages: + self.message = [] + self.message_colours = [] + return + self.message, self.message_colours = self._message_pages[index] + + def show_message(self, msg_content, msg_colours, msg_type = None): """Utility function to set the current state to the message display, and populate the message content and colours. The message_type can be used to indicate whether this is an 'error' (red) or 'warning' (green) message, which can affect both the display and the behaviour when the user acknowledges the message.""" if self.logging: print(f"Showing message: '{msg_content}' with type {msg_type}") - self.message = msg_content - self.message_colours = msg_colours + self._message_pages = _paginate_message(msg_content, msg_colours) + self._set_message_page(0) self.message_type = msg_type self.current_state = STATE_MESSAGE self.refresh = True diff --git a/dev/build_release.py b/dev/build_release.py index e3e22ff..8ab2f41 100644 --- a/dev/build_release.py +++ b/dev/build_release.py @@ -2,19 +2,32 @@ import os import subprocess import sys +from dataclasses import dataclass from pathlib import Path import mpy_cross + +@dataclass(frozen=True) +class ModuleSpec: + source: Path + artifact: Path + RUNTIME_MODULES = { "app", "EEPROM/hexdrive", "EEPROM/gps", "EEPROM/caffeine", "settings_mgr", + "serialise_mgr", "hexpansion_mgr", } +EXTERNAL_MODULES = ( + ModuleSpec(Path("vendor/HexDrive2/hexdrive2.py"), Path("EEPROM/hexdrive2.mpy")), + ModuleSpec(Path("vendor/HexCurrent/hexcurrent.py"), Path("EEPROM/hexcurrent.mpy")), +) + files_to_mpy = {Path(f"{module}.py") for module in RUNTIME_MODULES} files_to_keep = { @@ -24,10 +37,26 @@ Path("hexpansions.json"), } files_to_keep.update({Path(f"{module}.mpy") for module in RUNTIME_MODULES}) +files_to_keep.update({spec.artifact for spec in EXTERNAL_MODULES}) + +IGNORED_SOURCE_DIRS = (Path("vendor/HexDrive2"), Path("vendor/HexCurrent")) def _construct_filepaths(dirname, filenames): return [Path(dirname, filename) for filename in filenames] +def _normalise_parts(path: Path) -> tuple[str, ...]: + return tuple(part for part in path.parts if part not in (".", "")) + +def _is_ignored_dir(dirname: str) -> bool: + parts = _normalise_parts(Path(dirname)) + if ".git" in parts: + return True + for ignored_dir in IGNORED_SOURCE_DIRS: + ignored_parts = _normalise_parts(ignored_dir) + if parts[: len(ignored_parts)] == ignored_parts: + return True + return False + def find_files(top_level_dir): walkerator = iter(os.walk(top_level_dir)) dirname, _, filenames = next(walkerator) @@ -35,8 +64,7 @@ def find_files(top_level_dir): all_files = _construct_filepaths(dirname, filenames) for dirname, _, filenames in walkerator: - # if dirname not in dirs_to_keep: - if dirname != "./.git" and ".git/" not in dirname: + if not _is_ignored_dir(dirname): all_files.extend(_construct_filepaths(dirname, filenames)) return all_files @@ -49,12 +77,16 @@ def find_files(top_level_dir): parser.add_argument("-f", "--force", action="store_true", help="Skip confirmation prompt before file removal.") options = parser.parse_args() force_mode = options.force - found_files = set(find_files(".")) - for file in files_to_mpy: print(f"Mpy-ing file: {file}") mpy_cross.run(file, "-v") + for spec in EXTERNAL_MODULES: + print(f"Mpy-ing file: {spec.source} -> {spec.artifact}") + spec.artifact.parent.mkdir(parents=True, exist_ok=True) + mpy_cross.run(str(spec.source), "-v", "-o", str(spec.artifact)) + + found_files = set(find_files(".")) if not files_to_keep.issubset(found_files): raise FileNotFoundError(f"Some of {files_to_keep} are not found so assuming wrong directory. " "Please run this script from HexManager dir.") diff --git a/dev/download_to_device.py b/dev/download_to_device.py index e2ec229..e219f7f 100644 --- a/dev/download_to_device.py +++ b/dev/download_to_device.py @@ -42,6 +42,8 @@ class ModuleSpec: ModuleSpec(Path("hexpansion_mgr.py"), Path("hexpansion_mgr.mpy")), ModuleSpec(Path("serialise_mgr.py"), Path("serialise_mgr.mpy")), ModuleSpec(Path("EEPROM/hexdrive.py"), Path("EEPROM/hexdrive.mpy")), + ModuleSpec(Path("vendor/HexDrive2/hexdrive2.py"), Path("EEPROM/hexdrive2.mpy")), + ModuleSpec(Path("vendor/HexCurrent/hexcurrent.py"), Path("EEPROM/hexcurrent.mpy")), ModuleSpec(Path("EEPROM/gps.py"), Path("EEPROM/gps.mpy")), ModuleSpec(Path("EEPROM/caffeine.py"), Path("EEPROM/caffeine.mpy")) ) diff --git a/hexpansion_mgr.mpy b/hexpansion_mgr.mpy new file mode 100644 index 0000000..81dd2e5 Binary files /dev/null and b/hexpansion_mgr.mpy differ diff --git a/hexpansion_mgr.py b/hexpansion_mgr.py index c89109d..e107e14 100644 --- a/hexpansion_mgr.py +++ b/hexpansion_mgr.py @@ -1,4 +1,5 @@ """ Hexpansion & EEPROM Management Module for HexManager""" +# pyright: reportAttributeAccessIssue=false # # Handles detection, initialisation, programming, upgrading and erasure of hexpansion EEPROMs. # @@ -16,12 +17,13 @@ import vfs from app_components.notification import Notification from app_components.tokens import label_font_size, button_labels +from eeprom_i2c import EEPROM +from eeprom_partition import EEPROMPartition from events.input import BUTTON_TYPES from machine import I2C from system.eventbus import eventbus from system.hexpansion.events import HexpansionInsertionEvent from system.hexpansion.header import HexpansionHeader, write_header -from system.hexpansion.util import get_hexpansion_block_devices, detect_eeprom_addr from system.scheduler import scheduler _NUM_HEXPANSION_SLOTS = 6 @@ -29,6 +31,8 @@ # EEPROM Constants _DEFAULT_EEPROM_PAGE_SIZE = 32 _DEFAULT_EEPROM_TOTAL_SIZE = 64 * 1024 // 8 +_EEPROM_PAGE_SIZE_CANDIDATES = (16, 32, 64, 128, 256) +_EEPROM_TOTAL_SIZE_CANDIDATES = (256, 512, 1024, 2048, 4096, 8192, 16384, 32768, 65536) _IS_SIMULATOR = sys.platform != "esp32" @@ -41,8 +45,9 @@ _SUB_UPGRADE_CONFIRM = 5 # Hexpansion ready for App upgrade _SUB_PROGRAMMING = 6 # Hexpansion EEPROM programming (Initialsation or Upgrade) in progress _SUB_PORT_SELECT = 7 # User selecting which hexpansion to erase (if multiple) in order to free up a slot for initialisation or upgrade -_SUB_DONE = 8 # Final state after successful initialisation or upgrade, before returning to menu -_SUB_EXIT = 9 # State for exiting from interactive mode back to menu) +_SUB_SCANNING = 8 # Manual blank-EEPROM scan in progress after showing a wait screen +_SUB_DONE = 9 # Final state after successful initialisation or upgrade, before returning to menu +_SUB_EXIT = 10 # State for exiting from interactive mode back to menu) # EEPROM app programming outcomes @@ -66,6 +71,50 @@ def init_settings(s, MySetting): # pylint: disable=unused-argument, inval return +def detect_eeprom_addr(i2c): + devices = i2c.scan() + if 0x57 in devices and 0x50 not in devices: + return (0x57, 2) + if ( + 0x57 in devices + and 0x56 in devices + and 0x55 in devices + and 0x54 in devices + and 0x53 in devices + and 0x52 in devices + and 0x51 in devices + and 0x50 in devices + ): + return (0x50, 1) + if 0x50 in devices: + return (0x50, 2) + return (None, None) + + +def get_hexpansion_block_devices(i2c, header, addr=0x50, addr_len=2): + if header.eeprom_total_size > 2 ** (8 * addr_len): + chip_size = 2 ** (8 * addr_len) + else: + chip_size = header.eeprom_total_size + if header.eeprom_total_size >= 8192: + block_size = 9 + else: + block_size = 6 + eep = EEPROM( + i2c=i2c, + chip_size=chip_size, + page_size=header.eeprom_page_size, + block_size=block_size, + addrsize=addr_len * 8, + ) + partition = EEPROMPartition( + eep=eep, + offset=header.fs_offset, + length=header.eeprom_total_size - header.fs_offset, + ) + return eep, partition + + # ---- Hexpansion management ------------------------------------------------- class HexpansionMgr: @@ -101,7 +150,7 @@ class HexpansionMgr: "App OK" ] - _LFS_META = 2 # Number of free blocks to reserve for LittleFS metadata when calculating how much space we have to write an app to the EEPROM (keep 1 block in hand for file create/sync, and 1 block in hand to ensure we don't fill the EEPROM completely and cause weird LittleFS behaviour) + _LFS_META = 2 # Number of free blocks to reserve for LittleFS metadata when calculating how much space we have to write an app to the EEPROM # Sub-states are defined at module level (_SUB_*); app-level state # routing is handled by the dispatch tables in app.py. @@ -121,12 +170,16 @@ def __init__(self, app, logging: bool = False): self._hexpansion_state_by_slot: list[int] = [self.HEXPANSION_STATE_UNKNOWN]*_NUM_HEXPANSION_SLOTS self._hexpansion_eeprom_addr_len: list[int | None] = [None]*_NUM_HEXPANSION_SLOTS self._hexpansion_eeprom_addr: list[int | None] = [None]*_NUM_HEXPANSION_SLOTS + self._hexpansion_eeprom_total_size: list[int | None] = [None]*_NUM_HEXPANSION_SLOTS + self._hexpansion_eeprom_page_size: list[int | None] = [None]*_NUM_HEXPANSION_SLOTS self._hexpansion_init_type: int = 0 self._detected_port: int | None = None self._waiting_app_port: int | None = None self._erase_port: int | None = None self._upgrade_port: int | None = None + self._scan_port: int | None = None self._ports_to_initialise: set[int] = set() # ports with blank EEPROM which could be initialised + self._ports_initialise_declined: set[int] = set() # blank EEPROMs the user already declined to initialise self._ports_to_check_app: set[int] = set() # ports with recognised hexpansion type which should be checked for the correct app before being used self._reboop_required: bool = False self._hexpansion_serial_number: int | None = None @@ -165,6 +218,128 @@ def logging(self, value: bool): """Set the logging state.""" self._logging = value + @staticmethod + def _mem_addr_bytes(mem_addr: int, addr_len: int) -> bytes: + return bytes((mem_addr >> shift) & 0xFF for shift in range((addr_len - 1) * 8, -1, -8)) + + @staticmethod + def _mem_target(addr: int, addr_len: int, mem_addr: int) -> tuple[int, int]: + mem_addr_mask = (1 << (addr_len * 8)) - 1 + return addr | (mem_addr >> (8 * addr_len)), mem_addr & mem_addr_mask + + @classmethod + def _write_eeprom_bytes(cls, i2c, addr: int, addr_len: int, mem_addr: int, data: bytes): + device_addr, masked_mem_addr = cls._mem_target(addr, addr_len, mem_addr) + i2c.writeto_mem(device_addr, masked_mem_addr, data, addrsize=8 * addr_len) + while True: + try: + if i2c.writeto(device_addr, cls._mem_addr_bytes(masked_mem_addr, addr_len)): + return + except OSError: + pass + time.sleep_ms(1) + + @classmethod + def _read_eeprom_bytes(cls, i2c, addr: int, addr_len: int, mem_addr: int, size: int) -> bytes: + device_addr, masked_mem_addr = cls._mem_target(addr, addr_len, mem_addr) + return i2c.readfrom_mem(device_addr, masked_mem_addr, size, addrsize=8 * addr_len) + + def _clear_eeprom_geometry(self, port: int): + self._hexpansion_eeprom_total_size[port - 1] = None + self._hexpansion_eeprom_page_size[port - 1] = None + + def _cache_eeprom_geometry(self, port: int, total_size: int, page_size: int): + self._hexpansion_eeprom_total_size[port - 1] = total_size + self._hexpansion_eeprom_page_size[port - 1] = page_size + + def _get_eeprom_geometry(self, port: int) -> tuple[int | None, int | None]: + if port not in range(1, _NUM_HEXPANSION_SLOTS + 1): + return None, None + return self._hexpansion_eeprom_total_size[port - 1], self._hexpansion_eeprom_page_size[port - 1] + + def _has_eeprom_geometry(self, port: int) -> bool: + total_size, page_size = self._get_eeprom_geometry(port) + return total_size is not None and page_size is not None + + def _geometry_for_type(self, type_index: int | None) -> tuple[int | None, int | None]: + if type_index is None or type_index < 0 or type_index >= len(self._app.HEXPANSION_TYPES): + return None, None + hexpansion_type = self._app.HEXPANSION_TYPES[type_index] + return hexpansion_type.eeprom_total_size, hexpansion_type.eeprom_page_size + + def _geometry_for_port(self, port: int, fallback_type_index: int | None = None) -> tuple[int | None, int | None]: + total_size, page_size = self._get_eeprom_geometry(port) + if total_size is not None and page_size is not None: + return total_size, page_size + return self._geometry_for_type(fallback_type_index) + + def _probe_eeprom_page_size(self, i2c, addr: int, addr_len: int) -> int | None: + for page_size in _EEPROM_PAGE_SIZE_CANDIDATES: + pattern = bytes(((page_size + index + 1) & 0xFF) for index in range(page_size + 1)) + try: + self._write_eeprom_bytes(i2c, addr, addr_len, 0, pattern) + readback = self._read_eeprom_bytes(i2c, addr, addr_len, 0, len(pattern)) + except OSError as e: + print(f"H:Error probing EEPROM page size: {e}") + return None + if readback != pattern: + return page_size + return _EEPROM_PAGE_SIZE_CANDIDATES[-1] + + def _probe_eeprom_total_size(self, i2c, addr: int, addr_len: int) -> int | None: + baseline = b"\xA5" + probe = b"\x5A" + for total_size in _EEPROM_TOTAL_SIZE_CANDIDATES: + try: + self._write_eeprom_bytes(i2c, addr, addr_len, 0, baseline) + self._write_eeprom_bytes(i2c, addr, addr_len, total_size, probe) + except OSError: + return total_size + try: + if self._read_eeprom_bytes(i2c, addr, addr_len, 0, 1) == probe: + return total_size + except OSError as e: + print(f"H:Error probing EEPROM total size: {e}") + return None + return None + + def _detect_eeprom_geometry(self, port: int, force: bool = False) -> tuple[int | None, int | None]: + if not force and self._has_eeprom_geometry(port): + return self._get_eeprom_geometry(port) + try: + i2c = I2C(port) + except Exception as e: # pylint: disable=broad-except + print(f"H:Error opening I2C port {port}: {e}") + return None, None + try: + header = self._read_header(port, i2c=i2c) + except RuntimeError: + header = None + except OSError as e: + print(f"H:Error probing EEPROM geometry on port {port}: {e}") + return None, None + except Exception as e: # pylint: disable=broad-except + print(f"H:Error reading EEPROM geometry cache on port {port}: {e}") + return None, None + if header is not None: + return header.eeprom_total_size, header.eeprom_page_size + + addr_len = self._hexpansion_eeprom_addr_len[port - 1] + addr = self._hexpansion_eeprom_addr[port - 1] + if addr_len is None or addr is None: + return None, None + + page_size = self._probe_eeprom_page_size(i2c, addr, addr_len) + if page_size is None: + return None, None + total_size = self._probe_eeprom_total_size(i2c, addr, addr_len) + if total_size is None: + return None, None + if not self._erase_eeprom(port, addr, addr_len, total_size, page_size): + return None, None + self._cache_eeprom_geometry(port, total_size, page_size) + return total_size, page_size + def probe_eeprom(self, port: int) -> tuple[int, HexpansionHeader | None]: """Probe a port and report whether its EEPROM is blank, missing, or already written.""" try: @@ -198,12 +373,14 @@ def erase_eeprom_for_type(self, port: int, type_index: int) -> bool: erase_addr = self._hexpansion_eeprom_addr[port - 1] if erase_addr_len is None or erase_addr is None: return False - hexpansion_type = self._app.HEXPANSION_TYPES[type_index] + eeprom_total_size, eeprom_page_size = self._geometry_for_port(port, type_index) + if eeprom_total_size is None or eeprom_page_size is None: + return False return self._erase_eeprom(port, erase_addr, erase_addr_len, - hexpansion_type.eeprom_total_size, - hexpansion_type.eeprom_page_size) + eeprom_total_size, + eeprom_page_size) def prepare_eeprom_for_type(self, port: int, type_index: int, unique_id: int | None) -> bool: @@ -247,11 +424,14 @@ def refresh_slot_records(self): self._waiting_app_port = None self._erase_port = None self._upgrade_port = None + self._scan_port = None self._hexpansion_app_startup_timer = 0 self._hexpansion_type_by_slot = [None] * _NUM_HEXPANSION_SLOTS self._hexpansion_state_by_slot = [self.HEXPANSION_STATE_UNKNOWN] * _NUM_HEXPANSION_SLOTS self._hexpansion_eeprom_addr_len = [None] * _NUM_HEXPANSION_SLOTS self._hexpansion_eeprom_addr = [None] * _NUM_HEXPANSION_SLOTS + self._hexpansion_eeprom_total_size = [None] * _NUM_HEXPANSION_SLOTS + self._hexpansion_eeprom_page_size = [None] * _NUM_HEXPANSION_SLOTS self._port_selected_header = None self._hexpansion_serial_number = None @@ -273,18 +453,24 @@ async def _handle_removal(self, event): port = event.port self._hexpansion_type_by_slot[port - 1] = None self._hexpansion_state_by_slot[port - 1] = self.HEXPANSION_STATE_EMPTY + self._hexpansion_eeprom_addr_len[port - 1] = None + self._hexpansion_eeprom_addr[port - 1] = None + self._clear_eeprom_geometry(port) + self._ports_initialise_declined.discard(port) + scanning_port_removed = self._scan_port == port if port in self._ports_to_initialise: self._ports_to_initialise.remove(port) self._ports_to_check_app.discard(port) + if scanning_port_removed: + self._scan_port = None if (self._detected_port is not None and port == self._detected_port) or \ (self._upgrade_port is not None and port == self._upgrade_port) or \ (self._waiting_app_port is not None and port == self._waiting_app_port) or \ (self._erase_port is not None and port == self._erase_port) or \ + scanning_port_removed or \ (self._port_selected != 0 and port == self._port_selected): # The port from which a hexpansion has been removed is significant - self._hexpansion_eeprom_addr_len[port - 1] = None - self._hexpansion_eeprom_addr[port - 1] = None app.hexpansion_update_required = True if self._logging: print(f"H:Hexpansion removed from port {port}") @@ -293,6 +479,10 @@ async def _handle_removal(self, event): async def _handle_insertion(self, event): if self._app.serialise_active: return + self._ports_initialise_declined.discard(event.port) + self._hexpansion_eeprom_addr_len[event.port - 1] = None + self._hexpansion_eeprom_addr[event.port - 1] = None + self._clear_eeprom_geometry(event.port) if self._check_port_for_known_hexpansions(event.port) or event.port == self._port_selected: # A known hexpansion type has been detected on the inserted port, so trigger an update of # the hexpansion management state machine to handle it. Or the inserted port is the one @@ -337,7 +527,9 @@ def _read_port_header(self, port: int): """Read the EEPROM header for the given port and set the default detail page.""" try: self._port_selected_header = self._read_header(port) - except (OSError, RuntimeError, Exception) as e: + except RuntimeError: + self._port_selected_header = None + except (OSError, Exception) as e: # pylint: disable=broad-except print(f"H:Error reading header for port {port}: {e}") self._port_selected_header = None self._update_detail_page_count() @@ -353,6 +545,9 @@ def _update_detail_page_count(self): # Unrecognised type - show vid/pid page and EEPROM page but not details page self._port_detail_page_count = 2 self._port_detail_page = self._PAGE_VID_PID + elif state_idx == self.HEXPANSION_STATE_BLANK: + self._port_detail_page_count = 1 + self._port_detail_page = self._PAGE_EEPROM elif state_idx >= self.HEXPANSION_STATE_RECOGNISED: # Recognised type - show vid/pid page and details page self._port_detail_page_count = 3 @@ -412,6 +607,8 @@ def update(self, delta) -> bool: self._update_state_upgrade(delta) elif self._sub_state == _SUB_PROGRAMMING: self._update_state_programming(delta) + elif self._sub_state == _SUB_SCANNING: + self._update_state_scanning(delta) elif self._sub_state == _SUB_PORT_SELECT: self._update_state_port_select(delta) elif self._sub_state == _SUB_CHECK: @@ -520,6 +717,9 @@ def _update_state_detected(self, delta): # pylint: disable=unused-argumen app.button_states.clear() if self._logging: print("H:Initialise Cancelled") + if self._detected_port is not None: + self._ports_initialise_declined.add(self._detected_port) + self._ports_to_initialise.discard(self._detected_port) self._detected_port = None self._sub_state = _SUB_INIT if self._mode == _MODE_INIT else _SUB_CHECK elif app.button_states.get(BUTTON_TYPES["UP"]): @@ -568,10 +768,14 @@ def _update_state_erase(self, delta): # pylint: disable=unused-argument return if self._logging: print(f"H:Erasing EEPROM on port {erase_port}") - if self._hexpansion_type_by_slot[erase_port - 1] is not None: - self._hexpansion_init_type = self._hexpansion_type_by_slot[erase_port - 1] - eeprom_page_size=app.HEXPANSION_TYPES[self._hexpansion_init_type].eeprom_page_size if self._hexpansion_init_type > 0 else _DEFAULT_EEPROM_PAGE_SIZE - eeprom_total_size=app.HEXPANSION_TYPES[self._hexpansion_init_type].eeprom_total_size if self._hexpansion_init_type > 0 else _DEFAULT_EEPROM_TOTAL_SIZE + erase_type = self._hexpansion_type_by_slot[erase_port - 1] + if erase_type is not None: + self._hexpansion_init_type = erase_type + eeprom_total_size, eeprom_page_size = self._geometry_for_port(erase_port, self._hexpansion_init_type) + if eeprom_total_size is None: + eeprom_total_size = _DEFAULT_EEPROM_TOTAL_SIZE + if eeprom_page_size is None: + eeprom_page_size = _DEFAULT_EEPROM_PAGE_SIZE erase_addr_len = self._hexpansion_eeprom_addr_len[erase_port - 1] erase_addr = self._hexpansion_eeprom_addr[erase_port - 1] if erase_addr_len is None or erase_addr is None: @@ -690,12 +894,15 @@ def _update_state_port_select(self, delta: int): # pylint: disable=unused-argu if self._hexpansion_state_by_slot[self._port_selected - 1] == self.HEXPANSION_STATE_BLANK: # The selected port has a blank EEPROM, so we can initialise it without erasing first. self._detected_port = self._port_selected + self._ports_initialise_declined.discard(self._detected_port) app.notification = Notification("Init?", port=self._detected_port) self._sub_state = _SUB_DETECTED elif self._hexpansion_state_by_slot[self._port_selected - 1] == self.HEXPANSION_STATE_RECOGNISED_OLD_APP: # The selected port has an old app, so we can upgrade it. self._upgrade_port = self._port_selected - self._hexpansion_init_type = self._hexpansion_type_by_slot[self._upgrade_port - 1] if self._hexpansion_type_by_slot[self._upgrade_port - 1] is not None else 0 + upgrade_type = self._hexpansion_type_by_slot[self._upgrade_port - 1] + if upgrade_type is not None: + self._hexpansion_init_type = upgrade_type app.notification = Notification("Upgrade?", port=self._upgrade_port) self._sub_state = _SUB_UPGRADE_CONFIRM elif self._hexpansion_state_by_slot[self._port_selected - 1] >= self.HEXPANSION_STATE_FAULTY: @@ -710,7 +917,10 @@ def _update_state_port_select(self, delta: int): # pylint: disable=unused-argu app.refresh = True elif app.button_states.get(BUTTON_TYPES["DOWN"]): app.button_states.clear() - if self._port_detail_page_count > 1: + if self._hexpansion_state_by_slot[self._port_selected - 1] == self.HEXPANSION_STATE_BLANK and not self._has_eeprom_geometry(self._port_selected): + self._scan_port = self._port_selected + self._sub_state = _SUB_SCANNING + elif self._port_detail_page_count > 1: self._port_detail_page = (self._port_detail_page + 1) % self._port_detail_page_count app.refresh = True elif app.button_states.get(BUTTON_TYPES["CANCEL"]): @@ -718,6 +928,23 @@ def _update_state_port_select(self, delta: int): # pylint: disable=unused-argu self._sub_state = _SUB_EXIT + def _update_state_scanning(self, delta: int): # pylint: disable=unused-argument + app = self._app + scan_port = self._scan_port + if scan_port is None: + self._sub_state = _SUB_PORT_SELECT if self._mode == _MODE_INTERACTIVE else _SUB_CHECK + return + total_size, page_size = self._detect_eeprom_geometry(scan_port) + if total_size is not None and page_size is not None: + app.notification = Notification("Scanned", port=scan_port) + else: + app.notification = Notification("Failed", port=scan_port) + self._read_port_header(scan_port) + self._scan_port = None + self._sub_state = _SUB_PORT_SELECT if self._mode == _MODE_INTERACTIVE else _SUB_CHECK + app.refresh = True + + # ------------------------------------------------------------------ # Draw hexpansion-related states # ------------------------------------------------------------------ @@ -748,6 +975,10 @@ def draw(self, ctx) -> bool: elif self._sub_state == _SUB_PORT_SELECT: self._draw_port_select(ctx) return True + elif self._sub_state == _SUB_SCANNING: + scan_port = self._scan_port if self._scan_port is not None else self._port_selected + app.draw_message(ctx, [f"Slot {scan_port}", "Blank EEPROM", "Scanning...", "Please wait"], [(1, 1, 0), (1, 0, 1), (0, 1, 1), (1, 1, 1)], label_font_size) + return True elif self._sub_state == _SUB_ERASE_CONFIRM: if self._erase_port is None: return False @@ -791,6 +1022,7 @@ def _draw_port_select(self, ctx): """Draw the port-select screen with paged details.""" app = self._app hdr = self._port_selected_header + total_size, page_size = self._get_eeprom_geometry(self._port_selected) hexpansion_state = self.HEXPANSION_STATE_NAMES[self._hexpansion_state_by_slot[self._port_selected - 1]] if self._hexpansion_state_by_slot[self._port_selected - 1] > self.HEXPANSION_STATE_BLANK: hexpansion_name = self._type_name_for_port(self._port_selected, None) @@ -803,7 +1035,13 @@ def _draw_port_select(self, ctx): else: # Common header lines for all pages page = self._port_detail_page - lines = [f"Slot {self._port_selected}-{self._PAGE_NAMES[page]}", hdr.friendly_name if hdr is not None else hexpansion_name] + if self._hexpansion_state_by_slot[self._port_selected - 1] > self.HEXPANSION_STATE_BLANK and hexpansion_name: + page_title = hexpansion_name + elif hdr is not None and hdr.friendly_name: + page_title = hdr.friendly_name + else: + page_title = "Blank EEPROM" if self._hexpansion_state_by_slot[self._port_selected - 1] == self.HEXPANSION_STATE_BLANK else hexpansion_name + lines = [f"Slot {self._port_selected}-{self._PAGE_NAMES[page]}", page_title] colours = [(1, 1, 0), (1, 0, 1)] if page == self._PAGE_VID_PID: # VID / PID page @@ -818,6 +1056,10 @@ def _draw_port_select(self, ctx): if hdr is not None: lines += [f"Size: {hdr.eeprom_total_size} Bytes", f"Page: {hdr.eeprom_page_size} Bytes"] colours += [(0, 1, 1), (0, 1, 1)] + else: + lines += [f"Size: {total_size} Bytes" if total_size is not None else "Size: Unknown", + f"Page: {page_size} Bytes" if page_size is not None else "Page: Unknown"] + colours += [(0, 1, 1), (0, 1, 1)] else: # page == self._PAGE_DETAILS: # Details page (only when page_count == 3) type_idx = self._hexpansion_type_by_slot[self._port_selected - 1] @@ -859,8 +1101,11 @@ def _draw_port_select(self, ctx): confirm_label = "Init" if self._hexpansion_state_by_slot[self._port_selected - 1] == self.HEXPANSION_STATE_BLANK else \ "Upgrade" if self._hexpansion_state_by_slot[self._port_selected - 1] == self.HEXPANSION_STATE_RECOGNISED_OLD_APP else \ "Erase" if self._hexpansion_state_by_slot[self._port_selected - 1] >= self.HEXPANSION_STATE_FAULTY else "" - button_labels(ctx, confirm_label=confirm_label, left_label=" HexpansionHeader | No # pass the exception up to the caller raise OSError(e) from e hexpansion_header = HexpansionHeader.from_bytes(header_bytes) + self._cache_eeprom_geometry(port, hexpansion_header.eeprom_total_size, hexpansion_header.eeprom_page_size) print(f"H:Header on port {port}: {hexpansion_header}") return hexpansion_header @@ -925,6 +1171,7 @@ def _check_port_for_known_hexpansions(self, port) -> bool: hexpansion_header = self._read_header(port) except OSError: # OSError just means there is no hexpansion EEPROM on this port + self._ports_initialise_declined.discard(port) self._hexpansion_type_by_slot[port - 1] = None self._hexpansion_state_by_slot[port - 1] = self.HEXPANSION_STATE_EMPTY return False @@ -933,16 +1180,19 @@ def _check_port_for_known_hexpansions(self, port) -> bool: if self._logging: print(f"H:Found EEPROM on port {port}") self._hexpansion_state_by_slot[port - 1] = self.HEXPANSION_STATE_BLANK - self._ports_to_initialise.add(port) + if port not in self._ports_initialise_declined: + self._ports_to_initialise.add(port) return True except Exception as e: # pylint: disable=broad-except print(f"H:Error reading header on port {port}: {e}") return False if hexpansion_header is None: print(f"H:Error reading header on port {port}") + self._ports_initialise_declined.discard(port) self._hexpansion_type_by_slot[port - 1] = None self._hexpansion_state_by_slot[port - 1] = self.HEXPANSION_STATE_EMPTY return False + self._ports_initialise_declined.discard(port) for index, hexpansion_type in enumerate(app.HEXPANSION_TYPES): if hexpansion_header.vid == hexpansion_type.vid and hexpansion_header.pid == hexpansion_type.pid: self._hexpansion_type_by_slot[port - 1] = index @@ -1068,7 +1318,7 @@ def _update_app_in_eeprom(self, port, type_index: int | None = None) -> int: # Check how much free space we have after deleting the existing app, to try to give a more informative error if the new app doesn't fit. try: - statvfs = os.statvfs(mountpoint) + statvfs = os.statvfs(mountpoint) # pylint: disable=no-member fs_block_size = statvfs[1] if len(statvfs) > 1 and statvfs[1] else statvfs[0] free_blocks = statvfs[4] if len(statvfs) > 4 else statvfs[3] except Exception as e: # pylint: disable=broad-except @@ -1085,9 +1335,7 @@ def _update_app_in_eeprom(self, port, type_index: int | None = None) -> int: max_payload = self._lfs_max_payload(free_blocks, fs_block_size) if app_mpy_size > max_payload: print( - f"H:Not enough free space to write app.mpy for {app.HEXPANSION_TYPES[selected_type].name}" - f" on port {port}: largest writable file is {max_payload}bytes and the app needs {app_mpy_size}bytes" - ) + f"H:app.mpy for {app.HEXPANSION_TYPES[selected_type].name} on port {port} needs {app_mpy_size}bytes: largest writable file is {max_payload}bytes") return _APP_EEPROM_RESULT_FAILURE if self._logging: @@ -1139,17 +1387,22 @@ def _prepare_eeprom(self, port, type_index: int | None = None, unique_id: int | app = self._app selected_type = self._hexpansion_init_type if type_index is None else type_index serial_number = self._hexpansion_serial_number if unique_id is None else unique_id + eeprom_total_size, eeprom_page_size = self._get_eeprom_geometry(port) + if eeprom_total_size is None or eeprom_page_size is None: + eeprom_total_size, eeprom_page_size = self._detect_eeprom_geometry(port) + if eeprom_total_size is None or eeprom_page_size is None: + return False if self._logging: print(f"H:Initialising EEPROM on port {port} as {selected_type}:{app.HEXPANSION_TYPES[selected_type].name}...") hexpansion_header_to_write = HexpansionHeader( manifest_version="2024", - fs_offset=max(32, app.HEXPANSION_TYPES[selected_type].eeprom_page_size), - eeprom_page_size=app.HEXPANSION_TYPES[selected_type].eeprom_page_size, - eeprom_total_size=app.HEXPANSION_TYPES[selected_type].eeprom_total_size, + fs_offset=max(32, eeprom_page_size), + eeprom_page_size=eeprom_page_size, + eeprom_total_size=eeprom_total_size, vid=app.HEXPANSION_TYPES[selected_type].vid, pid=app.HEXPANSION_TYPES[selected_type].pid, unique_id=serial_number if serial_number is not None else 0, - friendly_name=app.HEXPANSION_TYPES[selected_type].name, + friendly_name=app.HEXPANSION_TYPES[selected_type].friendly_name, ) try: i2c = I2C(port) @@ -1260,8 +1513,11 @@ def _check_ports_to_initialise(self, delta) -> bool: # pylint: disable=unu """Check for hexpansion presence in any ports, and if a new hexpansion with a blank EEPROM is detected, prompt the user to initialise it. Returns True if we are now in the initialise confirmation state, False otherwise.""" app = self._app - if 0 < len(self._ports_to_initialise) and self._detected_port is None: - self._detected_port = self._ports_to_initialise.pop() + while self._ports_to_initialise and self._detected_port is None: + port = self._ports_to_initialise.pop() + if port in self._ports_initialise_declined: + continue + self._detected_port = port app.notification = Notification("Initialise?", port=self._detected_port) self._sub_state = _SUB_DETECTED return True @@ -1360,7 +1616,9 @@ class HexpansionType: pid: the PID value to identify the hexpansion type from its EEPROM header. May be supplied as an int (``0xCBCA``) or a hex/decimal string (``"0xCBCA"``). - name: human-friendly name of the hexpansion type (e.g. "HexDrive") + name: human-friendly name of the hexpansion type shown in HexManager (e.g. "HexDrive") + friendly_name: short name written to the EEPROM header and used by BadgeOS notifications. + Defaults to ``name`` when omitted. vid: the VID value to identify the hexpansion type from its EEPROM header (default 0xCAFE). Accepts int or hex/decimal string. eeprom_page_size: EEPROM page size in bytes. Accepts int or hex/decimal string. @@ -1374,11 +1632,13 @@ class HexpansionType: def __init__(self, pid: int | str, name: str, vid: int | str = 0xCAFE, eeprom_page_size: int | str = _DEFAULT_EEPROM_PAGE_SIZE, eeprom_total_size: int | str = _DEFAULT_EEPROM_TOTAL_SIZE, + friendly_name: str | None = None, sub_type: str | None = None, app_mpy_name: str | None = None, app_mpy_version: str | None = None, app_name: str | None = None): self.vid: int = _parse_int(vid) self.pid: int = _parse_int(pid) self.name: str = name + self.friendly_name: str = name if friendly_name is None else friendly_name self.eeprom_page_size: int = _parse_int(eeprom_page_size) self.eeprom_total_size: int = _parse_int(eeprom_total_size) self.sub_type: str | None = sub_type diff --git a/hexpansions.json b/hexpansions.json index 0203a09..4534de3 100644 --- a/hexpansions.json +++ b/hexpansions.json @@ -10,7 +10,8 @@ ], "fields": { "pid": "REQUIRED. Product ID (16-bit integer), e.g. \"0xD15C\".", - "name": "REQUIRED. Human-friendly display name shown on screen, e.g. 'HexDrive'. MAXIMUM 9 chars.", + "name": "REQUIRED. Human-friendly display name shown in HexManager, e.g. 'HexDrive'.", + "friendly_name": "OPTIONAL. Short name written into the EEPROM header and shown by BadgeOS insertion notifications. Maximum 9 chars. Defaults to 'name'.", "sub_type": "OPTIONAL. Short label for this specific variant, e.g. '2 Motor'. Shown to the user alongside the name.", "vid": "OPTIONAL. Vendor ID (16-bit integer), e.g. \"0xCAFE\". Default 0xCAFE which is the UHB-IF uncontrolled VID.", "eeprom_total_size": "OPTIONAL. Total EEPROM size in BYTES. Default 8192 (8 KB).", @@ -49,22 +50,27 @@ { "pid": "0x10CB", "vid": "0xCBCB", "name": "HexDriveV2", "sub_type": "Uncommitted", "eeprom_total_size": 32768, "eeprom_page_size": 64, - "app_mpy_name": "hexdrive", "app_mpy_version": 6, "app_name": "HexDriveApp" + "app_mpy_name": "hexdrive2", "app_mpy_version": 7, "app_name": "HexDriveApp" }, { "pid": "0x10CA", "vid": "0xCBCB", "name": "HexDriveV2", "sub_type": "2 Motor", "eeprom_total_size": 32768, "eeprom_page_size": 64, - "app_mpy_name": "hexdrive", "app_mpy_version": 6, "app_name": "HexDriveApp" + "app_mpy_name": "hexdrive2", "app_mpy_version": 7, "app_name": "HexDriveApp" }, { "pid": "0x10CC", "vid": "0xCBCB", "name": "HexDriveV2", "sub_type": "2 Servo", "eeprom_total_size": 32768, "eeprom_page_size": 64, - "app_mpy_name": "hexdrive", "app_mpy_version": 6, "app_name": "HexDriveApp" + "app_mpy_name": "hexdrive2", "app_mpy_version": 7, "app_name": "HexDriveApp" }, { "pid": "0x3000", "vid": "0xCBCB", "name": "HexTest", "eeprom_total_size": 65536, "eeprom_page_size": 128 }, + { + "pid": "0x5000", "vid": "0xCBCB", "name": "HexCurrent", "friendly_name": "HexCurent", + "eeprom_total_size": 65536, "eeprom_page_size": 128, + "app_mpy_name": "hexcurrent", "app_mpy_version": 1, "app_name": "HexCurrentApp" + }, { "pid": "0x4000", "vid": "0xCBCB", "name": "HexDiag", "eeprom_total_size": 65536, "eeprom_page_size": 128 diff --git a/serialise_mgr.mpy b/serialise_mgr.mpy new file mode 100644 index 0000000..42a3cf2 Binary files /dev/null and b/serialise_mgr.mpy differ diff --git a/settings_mgr.mpy b/settings_mgr.mpy new file mode 100644 index 0000000..8c14810 Binary files /dev/null and b/settings_mgr.mpy differ diff --git a/tests/test_serialise.py b/tests/test_serialise.py index 381c53d..37adc2f 100644 --- a/tests/test_serialise.py +++ b/tests/test_serialise.py @@ -19,6 +19,54 @@ def clear(self): self._pressed.clear() +class FakeProbeI2C: + def __init__(self, total_size: int, page_size: int, addr_len: int, base_addr: int = 0x50): + self.total_size = total_size + self.page_size = page_size + self.addr_len = addr_len + self.base_addr = base_addr + self.memory = bytearray([0xFF] * total_size) + + def _device_count(self) -> int: + if self.addr_len == 1: + return max(1, self.total_size // 256) + return 1 + + def _valid_addr(self, addr: int) -> bool: + if self.addr_len == 1: + return self.base_addr <= addr < self.base_addr + self._device_count() + return addr == self.base_addr + + def _absolute_addr(self, addr: int, mem_addr: int) -> int: + if not self._valid_addr(addr): + raise OSError("no EEPROM") + if self.addr_len == 1: + return ((addr - self.base_addr) << 8) | mem_addr + return mem_addr % self.total_size + + def scan(self): + return list(range(self.base_addr, self.base_addr + self._device_count())) + + def writeto(self, addr, data): + if not self._valid_addr(addr): + raise OSError("no EEPROM") + return len(data) + + def writeto_mem(self, addr, mem_addr, data, addrsize=16): # pylint: disable=unused-argument + absolute_addr = self._absolute_addr(addr, mem_addr) + page_base = absolute_addr - (absolute_addr % self.page_size) + page_offset = absolute_addr % self.page_size + for index, value in enumerate(data): + target = page_base + ((page_offset + index) % self.page_size) + if target < self.total_size: + self.memory[target] = value + return len(data) + + def readfrom_mem(self, addr, mem_addr, length, addrsize=16): # pylint: disable=unused-argument + absolute_addr = self._absolute_addr(addr, mem_addr) + return bytes(self.memory[(absolute_addr + index) % self.total_size] for index in range(length)) + + @pytest.fixture def serialise_app(hexmanager_app): from events.input import BUTTON_TYPES @@ -200,6 +248,8 @@ def test_refresh_slot_records_rescans_all_slots(hexmanager_app, monkeypatch): helper._hexpansion_state_by_slot = [1] * _NUM_HEXPANSION_SLOTS helper._hexpansion_eeprom_addr_len = [1] * _NUM_HEXPANSION_SLOTS helper._hexpansion_eeprom_addr = [0x50] * _NUM_HEXPANSION_SLOTS + helper._hexpansion_eeprom_total_size = [2048] * _NUM_HEXPANSION_SLOTS + helper._hexpansion_eeprom_page_size = [16] * _NUM_HEXPANSION_SLOTS monkeypatch.setattr(helper, '_check_port_for_known_hexpansions', lambda port: checked_ports.append(port) or False) monkeypatch.setattr(helper, '_read_port_header', lambda port: header_ports.append(port)) @@ -218,6 +268,222 @@ def test_refresh_slot_records_rescans_all_slots(hexmanager_app, monkeypatch): assert helper._hexpansion_state_by_slot == [HexpansionMgr.HEXPANSION_STATE_UNKNOWN] * _NUM_HEXPANSION_SLOTS assert helper._hexpansion_eeprom_addr_len == [None] * _NUM_HEXPANSION_SLOTS assert helper._hexpansion_eeprom_addr == [None] * _NUM_HEXPANSION_SLOTS + assert helper._hexpansion_eeprom_total_size == [None] * _NUM_HEXPANSION_SLOTS + assert helper._hexpansion_eeprom_page_size == [None] * _NUM_HEXPANSION_SLOTS + + +@pytest.mark.parametrize(("total_size", "page_size", "addr_len"), [(2048, 16, 1), (32768, 64, 2)]) +def test_detect_eeprom_geometry_restores_blank_chip(hexmanager_app, monkeypatch, total_size, page_size, addr_len): + from sim.apps.HexManager import hexpansion_mgr as hexpansion_module + + helper = hexmanager_app._hexpansion_mgr + fake_i2c = FakeProbeI2C(total_size=total_size, page_size=page_size, addr_len=addr_len) + helper._hexpansion_eeprom_addr_len[0] = addr_len + helper._hexpansion_eeprom_addr[0] = 0x50 + + monkeypatch.setattr(hexpansion_module, 'I2C', lambda port: fake_i2c) + + assert helper._detect_eeprom_geometry(1) == (total_size, page_size) + assert helper._hexpansion_eeprom_total_size[0] == total_size + assert helper._hexpansion_eeprom_page_size[0] == page_size + assert fake_i2c.memory == bytearray([0xFF] * total_size) + + +def test_prepare_eeprom_uses_detected_geometry_for_header(hexmanager_app, monkeypatch): + from types import SimpleNamespace + + from sim.apps.HexManager import hexpansion_mgr as hexpansion_module + + helper = hexmanager_app._hexpansion_mgr + helper._hexpansion_eeprom_addr_len[0] = 2 + helper._hexpansion_eeprom_addr[0] = 0x50 + captured = {} + + monkeypatch.setattr(hexpansion_module, 'I2C', lambda port: object()) + monkeypatch.setattr(helper, '_detect_eeprom_geometry', lambda port, force=False: (32768, 64)) + monkeypatch.setattr(hexpansion_module, 'write_header', lambda port, header, addr=None, addr_len=None, page_size=None: captured.update({'header': header, 'addr': addr, 'addr_len': addr_len, 'page_size': page_size})) + monkeypatch.setattr(helper, '_read_header', lambda port, i2c=None: captured['header']) + monkeypatch.setattr(hexpansion_module, 'get_hexpansion_block_devices', lambda i2c, header, addr, addr_len=None: (None, object())) + monkeypatch.setattr(hexpansion_module.vfs, 'VfsLfs2', SimpleNamespace(mkfs=lambda partition: None), raising=False) + monkeypatch.setattr(hexpansion_module.vfs, 'mount', lambda partition, mountpoint, readonly=False: None, raising=False) + + assert helper._prepare_eeprom(1, type_index=0, unique_id=123) + assert captured['header'].eeprom_total_size == 32768 + assert captured['header'].eeprom_page_size == 64 + assert captured['header'].fs_offset == 64 + assert captured['header'].friendly_name == helper._app.HEXPANSION_TYPES[0].friendly_name + assert captured['page_size'] == 64 + + +def test_prepare_eeprom_uses_header_friendly_name_override(hexmanager_app, monkeypatch): + from types import SimpleNamespace + + from sim.apps.HexManager import hexpansion_mgr as hexpansion_module + + helper = hexmanager_app._hexpansion_mgr + helper._hexpansion_eeprom_addr_len[0] = 2 + helper._hexpansion_eeprom_addr[0] = 0x50 + helper._app.HEXPANSION_TYPES[0].name = 'HexCurrent' + helper._app.HEXPANSION_TYPES[0].friendly_name = 'HexCurent' + captured = {} + + monkeypatch.setattr(hexpansion_module, 'I2C', lambda port: object()) + monkeypatch.setattr(helper, '_detect_eeprom_geometry', lambda port, force=False: (32768, 64)) + monkeypatch.setattr(hexpansion_module, 'write_header', lambda port, header, addr=None, addr_len=None, page_size=None: captured.update({'header': header, 'addr': addr, 'addr_len': addr_len, 'page_size': page_size})) + monkeypatch.setattr(helper, '_read_header', lambda port, i2c=None: captured['header']) + monkeypatch.setattr(hexpansion_module, 'get_hexpansion_block_devices', lambda i2c, header, addr, addr_len=None: (None, object())) + monkeypatch.setattr(hexpansion_module.vfs, 'VfsLfs2', SimpleNamespace(mkfs=lambda partition: None), raising=False) + monkeypatch.setattr(hexpansion_module.vfs, 'mount', lambda partition, mountpoint, readonly=False: None, raising=False) + + assert helper._prepare_eeprom(1, type_index=0, unique_id=123) + assert captured['header'].friendly_name == 'HexCurent' + + +def test_port_detail_prefers_known_type_name_over_header_friendly_name(hexmanager_app, monkeypatch): + from sim.apps.HexManager import hexpansion_mgr as hexpansion_module + + app = hexmanager_app + helper = app._hexpansion_mgr + helper._port_selected = 1 + helper._hexpansion_type_by_slot[0] = 0 + helper._hexpansion_state_by_slot[0] = helper.HEXPANSION_STATE_RECOGNISED + helper._port_selected_header = SimpleNamespace( + friendly_name='HexCurent', + vid=0xCBCB, + pid=0x5000, + unique_id=123, + eeprom_total_size=65536, + eeprom_page_size=128, + ) + helper._port_detail_page = helper._PAGE_VID_PID + helper._port_detail_page_count = 2 + app.HEXPANSION_TYPES[0].name = 'HexCurrent' + + rendered = {} + monkeypatch.setattr(app, 'draw_message', lambda ctx, lines, colours, font: rendered.update({'lines': list(lines)})) + monkeypatch.setattr(hexpansion_module, 'button_labels', lambda ctx, **kwargs: None) + + helper._draw_port_select(None) + + assert rendered['lines'][1] == 'HexCurrent' + + +def test_blank_port_scan_button_and_geometry_details(hexmanager_app, monkeypatch): + from events.input import BUTTON_TYPES + from sim.apps.HexManager import hexpansion_mgr as hexpansion_module + from sim.apps.HexManager.hexpansion_mgr import _SUB_PORT_SELECT, _SUB_SCANNING + + app = hexmanager_app + helper = app._hexpansion_mgr + app.button_states = FakeButtons(BUTTON_TYPES) + helper._sub_state = _SUB_PORT_SELECT + helper._port_selected = 1 + helper._hexpansion_state_by_slot[0] = helper.HEXPANSION_STATE_BLANK + helper._update_detail_page_count() + + rendered = {} + labels = {} + + monkeypatch.setattr(app, 'draw_message', lambda ctx, lines, colours, font: rendered.update({'lines': list(lines)})) + monkeypatch.setattr(hexpansion_module, 'button_labels', lambda ctx, **kwargs: labels.update(kwargs)) + + helper._draw_port_select(None) + assert 'Size: Unknown' in rendered['lines'] + assert 'Page: Unknown' in rendered['lines'] + assert labels['down_label'] == 'Scan' + assert labels['right_label'] == 'Slot>' + + scan_calls = [] + + def fake_detect(port, force=False): + scan_calls.append((port, force)) + helper._hexpansion_eeprom_total_size[port - 1] = 8192 + helper._hexpansion_eeprom_page_size[port - 1] = 32 + return 8192, 32 + + monkeypatch.setattr(helper, '_detect_eeprom_geometry', fake_detect) + monkeypatch.setattr(helper, '_read_port_header', lambda port: None) + + app.button_states.press('DOWN') + helper._update_state_port_select(0) + + assert helper._sub_state == _SUB_SCANNING + assert helper._scan_port == 1 + assert helper._port_selected == 1 + + rendered.clear() + helper.draw(None) + assert 'Scanning...' in rendered['lines'] + + helper._update_state_scanning(0) + + assert scan_calls == [(1, False)] + assert helper._sub_state == _SUB_PORT_SELECT + assert helper._scan_port is None + assert helper._port_selected == 1 + + rendered.clear() + labels.clear() + helper._draw_port_select(None) + assert 'Size: 8192 Bytes' in rendered['lines'] + assert 'Page: 32 Bytes' in rendered['lines'] + assert labels['right_label'] == 'Slot>' + + +def test_right_button_keeps_slot_navigation_when_blank_port_can_scan(hexmanager_app): + from events.input import BUTTON_TYPES + from sim.apps.HexManager.hexpansion_mgr import _SUB_PORT_SELECT + + app = hexmanager_app + helper = app._hexpansion_mgr + app.button_states = FakeButtons(BUTTON_TYPES) + helper._sub_state = _SUB_PORT_SELECT + helper._port_selected = 1 + helper._hexpansion_state_by_slot[0] = helper.HEXPANSION_STATE_BLANK + helper._update_detail_page_count() + + app.button_states.press('RIGHT') + helper._update_state_port_select(0) + + assert helper._port_selected == 2 + assert helper._sub_state == _SUB_PORT_SELECT + + +def test_declined_initialise_is_not_reprompted_during_init_rescan(hexmanager_app, monkeypatch): + from events.input import BUTTON_TYPES + from sim.apps.HexManager.hexpansion_mgr import _MODE_INIT, _SUB_DETECTED, _SUB_PORT_SELECT + + app = hexmanager_app + helper = app._hexpansion_mgr + app.button_states = FakeButtons(BUTTON_TYPES) + helper._mode = _MODE_INIT + helper._sub_state = _SUB_DETECTED + helper._detected_port = 3 + helper._hexpansion_state_by_slot[2] = helper.HEXPANSION_STATE_BLANK + + app.button_states.press('CANCEL') + helper._update_state_detected(0) + + assert helper._detected_port is None + assert 3 in helper._ports_initialise_declined + + def raise_blank(port, i2c=None): + raise RuntimeError('blank eeprom') + + monkeypatch.setattr(helper, '_read_header', raise_blank) + helper._ports_to_initialise.clear() + helper._check_port_for_known_hexpansions(3) + + assert 3 not in helper._ports_to_initialise + + helper._mode = 3 + helper._sub_state = _SUB_PORT_SELECT + helper._port_selected = 3 + app.button_states.press('CONFIRM') + helper._update_state_port_select(0) + + assert helper._detected_port == 3 + assert helper._sub_state == _SUB_DETECTED def test_hexpansion_events_are_suppressed_while_serialise_active(hexmanager_app, monkeypatch): diff --git a/tests/test_smoke.py b/tests/test_smoke.py index 761f340..bbd6752 100644 --- a/tests/test_smoke.py +++ b/tests/test_smoke.py @@ -1,6 +1,8 @@ import json +import re import sys import tempfile +from pathlib import Path import pytest @@ -51,34 +53,157 @@ def test_hexmanager_runtime_blocks_type_dependent_flows_when_type_load_fails(mon assert "parse error" in app.message[-1] assert app._startup_warnings == ["hexpansions.json parse error"] + +def test_load_hexpansion_types_reports_real_import_failure_reason(monkeypatch): + import sim.apps.HexManager.app as hexmanager_module + + monkeypatch.setattr(hexmanager_module, "HexpansionType", None) + monkeypatch.setitem( + hexmanager_module._IMPORT_ERRORS, + "hexpansion_mgr", + "can't import name read_hexpansion_header", + ) + + types, warnings = hexmanager_module._load_hexpansion_types("dummy/app.py") + + assert types == [] + assert warnings == [ + "hexpansion_mgr import failed: can't import name read_hexpansion_header" + ] + + +def test_startup_warning_lines_paginate_within_visible_five_lines(): + import sim.apps.HexManager.app as hexmanager_module + + candidate_warnings = [ + "hexpansions.json not found", + "hexpansions.json parse error", + "hexpansions.json load error", + "hexpansions.json: 'hexpansions' must be a list", + "hexpansions.json: no valid entries found", + "hexpansion_mgr import failed: can't import name read_hexpansion_header", + ] + + for warning in candidate_warnings: + msg_content, msg_colours = hexmanager_module._startup_warning_message(warning) + pages = hexmanager_module._paginate_message(msg_content, msg_colours) + assert pages, warning + for page_lines, page_colours in pages: + assert len(page_lines) <= hexmanager_module._MESSAGE_MAX_LINES, warning + assert len(page_lines) == len(page_colours), warning + + +def test_long_startup_warning_uses_pages_and_navigation(monkeypatch): + import sim.apps.HexManager.app as hexmanager_module + from events.input import BUTTON_TYPES + + class FakeButtons: + def __init__(self): + self._pressed = set() + + def press(self, *names): + self._pressed = {BUTTON_TYPES[name] for name in names} + + def get(self, button): + return button in self._pressed + + def clear(self): + self._pressed.clear() + + long_warning = ( + "hexpansion_mgr import failed: can't import name " + "read_hexpansion_header while importing system hexpansion support" + ) + + monkeypatch.setattr( + hexmanager_module, + "_load_hexpansion_types", + lambda app_file_path, json_path=None: ([], [long_warning]), + ) + + app = hexmanager_module.HexManagerApp() + app.button_states = FakeButtons() + app.current_state = hexmanager_module.STATE_SERIALISE + + app.update(0) + + assert app.message_type == "warning" + assert app.message[0] == "hexpansion_mgr" + assert len(app.message) <= hexmanager_module._MESSAGE_MAX_LINES + assert len(app._message_pages) > 1 + + first_page = list(app.message) + app.button_states.press("DOWN") + app.update(0) + + assert app.message != first_page + assert app._message_page_index == 1 + + app.button_states.press("UP") + app.update(0) + + assert app.message == first_page + assert app._message_page_index == 0 + def test_hexdrive_app_init(port): from sim.apps.HexManager.EEPROM.hexdrive import HexDriveApp config = HexpansionConfig(port) HexDriveApp(config) def test_app_versions_match(): - """Verify that the HexDrive app_mpy_version recorded in hexpansions.json matches - the HexDriveApp.VERSION constant in EEPROM/hexdrive.py. + """Verify that hexpansions.json records the correct source version per vendored artifact. hexpansions.json is the authoritative record of which .mpy version should be - programmed onto the EEPROM. If someone bumps hexdrive.py VERSION without - updating hexpansions.json (or vice-versa) this test will catch the mismatch. + programmed onto the EEPROM. If someone bumps a vendored app source file + without updating hexpansions.json (or vice-versa) this test will catch it. """ - import json import os from sim.apps.HexManager.EEPROM.hexdrive import HexDriveApp + def extract_version(path: Path) -> int: + content = path.read_text(encoding="utf-8") + match = re.search(r"^\s*VERSION\s*=\s*(\d+)", content, re.MULTILINE) + assert match is not None, f"Could not find VERSION in {path}" + return int(match.group(1)) + json_path = os.path.join(os.path.dirname(__file__), "..", "hexpansions.json") with open(json_path) as f: data = json.load(f) - hexdrive_entries = [h for h in data["hexpansions"] - if h.get("app_name") == "HexDriveApp" and h.get("app_mpy_version") is not None] - assert hexdrive_entries, "No HexDriveApp entries with app_mpy_version found in hexpansions.json" - for entry in hexdrive_entries: - assert entry["app_mpy_version"] == HexDriveApp.VERSION, ( + repo_root = Path(__file__).resolve().parents[1] + hexdrive2_path = repo_root / "vendor" / "HexDrive2" / "hexdrive2.py" + hexcurrent_path = repo_root / "vendor" / "HexCurrent" / "hexcurrent.py" + + missing_vendored_sources = [ + str(path.relative_to(repo_root)) + for path in (hexdrive2_path, hexcurrent_path) + if not path.exists() + ] + if missing_vendored_sources: + pytest.skip( + "Vendored app sources are unavailable; initialize submodules to run this " + f"test: {', '.join(missing_vendored_sources)}" + ) + + expected_versions = { + "hexdrive": HexDriveApp.VERSION, + "hexdrive2": extract_version(hexdrive2_path), + "hexcurrent": extract_version(hexcurrent_path), + } + + versioned_entries = [ + entry for entry in data["hexpansions"] + if entry.get("app_mpy_name") in expected_versions and entry.get("app_mpy_version") is not None + ] + assert versioned_entries, "No recognised versioned app entries found in hexpansions.json" + for entry in versioned_entries: + app_mpy_name = entry.get("app_mpy_name") + assert app_mpy_name in expected_versions, ( + f"Unexpected app_mpy_name for versioned entry pid={entry['pid']}: {app_mpy_name}" + ) + assert entry["app_mpy_version"] == expected_versions[app_mpy_name], ( f"hexpansions.json entry pid={entry['pid']} has app_mpy_version=" - f"{entry['app_mpy_version']} but EEPROM/hexdrive.py VERSION={HexDriveApp.VERSION}" + f"{entry['app_mpy_version']} but {app_mpy_name}.py VERSION={expected_versions[app_mpy_name]}" ) def test_hexdrive_type_pids_consistent(): @@ -249,6 +374,25 @@ def test_default_vid_used_when_omitted(self): types, warnings = self._load_from([{"pid": 1, "name": "NoVid"}]) assert not warnings assert types[0].vid == 0xCAFE + assert types[0].friendly_name == "NoVid" + + def test_friendly_name_defaults_to_name_when_omitted(self): + """friendly_name defaults to the display name when JSON omits it.""" + types, warnings = self._load_from([ + {"pid": 1, "name": "HexCurrent"} + ]) + assert not warnings + assert types[0].name == "HexCurrent" + assert types[0].friendly_name == "HexCurrent" + + def test_friendly_name_override_is_loaded_separately(self): + """friendly_name may differ from the HexManager display name.""" + types, warnings = self._load_from([ + {"pid": 1, "name": "HexCurrent", "friendly_name": "HexCurent"} + ]) + assert not warnings + assert types[0].name == "HexCurrent" + assert types[0].friendly_name == "HexCurent" # ------------------------------------------------------------------ # Quoted hex strings for PID diff --git a/tildagon.toml b/tildagon.toml index 5c80cb2..169222a 100644 --- a/tildagon.toml +++ b/tildagon.toml @@ -37,4 +37,4 @@ description = "tildagon app for managing hexpansion EEPROMs" # increased, we interpret this as a new version being released. # # Version number must be a string in major.minor format (e.g. "1.3"). -version = "0.2" +version = "0.4" diff --git a/vendor/HexCurrent b/vendor/HexCurrent new file mode 160000 index 0000000..8cfe77b --- /dev/null +++ b/vendor/HexCurrent @@ -0,0 +1 @@ +Subproject commit 8cfe77b662d570cc851a5bfab17bc917a5f76588 diff --git a/vendor/HexDrive2 b/vendor/HexDrive2 new file mode 160000 index 0000000..080b31a --- /dev/null +++ b/vendor/HexDrive2 @@ -0,0 +1 @@ +Subproject commit 080b31afa10eae7551300fecf19bbc6fafdadd45