-
Notifications
You must be signed in to change notification settings - Fork 21
feat: bumble BLE transport for external-HCI SMP #99
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Draft
JPHutchins
wants to merge
8
commits into
main
Choose a base branch
from
feature/bumble-transport
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Draft
Changes from all commits
Commits
Show all changes
8 commits
Select commit
Hold shift + click to select a range
f08a7e1
chore: pin smpclient to PR #107 (bumble transport) for hardware integ…
JPHutchins 25f12e7
feat: add bumble BLE transport with scan, pair, bonds, and firmware
JPHutchins c9796ab
chore: bump smpclient to disconnect log-level fix tip (2fe7080)
JPHutchins 97683e5
fix: emit bumble firmware paths as plain lines instead of a truncated…
JPHutchins 80bf543
refactor: enumerate bumble firmware variants as typed subcommands
JPHutchins a45b523
refactor: print firmware path by default; --extract opts into copy
JPHutchins d954aec
fix: address PR #99 review feedback and lint failures
JPHutchins 80977b8
build: bundle Zephyr HCI firmware data in portable PyInstaller artifact
JPHutchins File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Large diffs are not rendered by default.
Oops, something went wrong.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,282 @@ | ||
| """Bumble BLE transport subcommands: scan, pair, bonds, firmware.""" | ||
|
|
||
| import asyncio | ||
| import logging | ||
| import shutil | ||
| from pathlib import Path | ||
| from typing import Final, cast | ||
|
|
||
| import typer | ||
| from rich import print | ||
| from rich.table import Table | ||
| from smpclient.transport.bumble import SMPBumbleTransport | ||
| from smpclient.transport.bumble.pairing import ( | ||
| PairingAlreadyBonded, | ||
| PairingFailed, | ||
| PairingResult, | ||
| PairingSucceeded, | ||
| PairingTimedOut, | ||
| pair_device, | ||
| ) | ||
| from smpclient.transport.bumble.scan import ScanAll, ScanForName, ScanMode | ||
| from typing_extensions import Annotated, assert_never | ||
|
|
||
| from smpmgr.common import Options, build_pair_delegate, resolve_keystore_strategy | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| app: Final = typer.Typer( | ||
| name="bumble", | ||
| help="Bumble BLE transport: scan, pair, bonds, firmware.", | ||
| no_args_is_help=True, | ||
| ) | ||
| bonds_app: Final = typer.Typer( | ||
| name="bonds", | ||
| help="Manage bumble keystore bonds.", | ||
| no_args_is_help=True, | ||
| ) | ||
| firmware_app: Final = typer.Typer( | ||
| name="firmware", | ||
| help="Bundled Zephyr HCI controller firmware.", | ||
| no_args_is_help=True, | ||
| ) | ||
| app.add_typer(bonds_app) | ||
| app.add_typer(firmware_app) | ||
|
|
||
|
|
||
| def _scan_mode(name: str | None) -> ScanMode: | ||
| return ScanForName(name=name) if name is not None else ScanAll() | ||
|
|
||
|
|
||
| @app.command(name="scan") | ||
| def scan( | ||
| ctx: typer.Context, | ||
| timeout: Annotated[float, typer.Option(help="Scan duration in seconds.")] = 5.0, | ||
| name: Annotated[ | ||
| str | None, | ||
| typer.Option(help="Match advertised local name; returns eagerly on first hit."), | ||
| ] = None, | ||
| all_devices: Annotated[ | ||
| bool, | ||
| typer.Option("--all", help="Show every advertiser, not just SMP servers."), | ||
| ] = False, | ||
| ) -> None: | ||
| """Scan for advertising BLE devices via the bumble HCI controller.""" | ||
|
|
||
| options = cast(Options, ctx.obj) | ||
|
|
||
| async def f() -> None: | ||
| results = await SMPBumbleTransport.scan( | ||
| hci=options.hci, timeout_s=timeout, mode=_scan_mode(name) | ||
| ) | ||
| if not all_devices: | ||
| results = tuple(r for r in results if r.has_smp_service) | ||
| if not results: | ||
| print("[yellow]No devices found.[/yellow]") | ||
| raise typer.Exit(code=1) | ||
|
|
||
| table = Table(title="Advertising devices") | ||
| table.add_column("Address") | ||
| table.add_column("Name") | ||
| table.add_column("RSSI", justify="right") | ||
| table.add_column("SMP", justify="center") | ||
| for r in results: | ||
| table.add_row( | ||
| r.address, | ||
| r.name or "[dim]<unnamed>[/dim]", | ||
| str(r.rssi) if r.rssi is not None else "", | ||
| "[green]yes[/green]" if r.has_smp_service else "[dim]no[/dim]", | ||
| ) | ||
| print(table) | ||
|
|
||
| asyncio.run(f()) | ||
|
|
||
|
|
||
| def _report_pairing_result(result: PairingResult) -> int: | ||
| match result: | ||
| case PairingSucceeded(bonded): | ||
| print(f"[green]Pairing succeeded[/green] (bonded={bonded}).") | ||
| return 0 | ||
| case PairingAlreadyBonded(): | ||
| print("[yellow]Already bonded[/yellow]; nothing to do. Use --force to re-pair.") | ||
| return 0 | ||
| case PairingTimedOut(elapsed_s): | ||
| print(f"[red]Pairing timed out[/red] after {elapsed_s:.1f}s.") | ||
| return 1 | ||
| case PairingFailed(reason, detail): | ||
| print(f"[red]Pairing failed[/red]: {reason.value}: {detail}") | ||
| return 1 | ||
| case _ as unreachable: | ||
| assert_never(unreachable) | ||
|
|
||
|
|
||
| @app.command(name="pair") | ||
| def pair_cmd( | ||
| ctx: typer.Context, | ||
| address: Annotated[ | ||
| str, | ||
| typer.Argument(help="BD_ADDR or advertised local name of the peer to bond with."), | ||
| ], | ||
| force: Annotated[ | ||
| bool, | ||
| typer.Option("--force", help="Delete any existing local bond first and pair from scratch."), | ||
| ] = False, | ||
| scan_timeout: Annotated[ | ||
| float, | ||
| typer.Option(help="Scan timeout in seconds when `address` is a local name."), | ||
| ] = 10.0, | ||
| ) -> None: | ||
| """Connect, pair (PIN-entry by default), disconnect — pre-bonds a peer.""" | ||
|
|
||
| options = cast(Options, ctx.obj) | ||
| delegate = build_pair_delegate(options.pair_on_connect) | ||
| if delegate is None: | ||
| print( | ||
| "[red]--pair-on-connect=none is incompatible with [bold]bumble pair[/bold];" | ||
| " choose 'keyboard', 'display', or 'nio'.[/red]" | ||
| ) | ||
| raise typer.Exit(code=1) | ||
|
|
||
| async def f() -> int: | ||
| result = await pair_device( | ||
| address, | ||
| delegate, | ||
| hci=options.hci, | ||
| keystore=resolve_keystore_strategy(options.keystore), | ||
| scan_timeout_s=scan_timeout, | ||
| pair_timeout_s=options.pair_timeout_s, | ||
| force=force, | ||
| ) | ||
| return _report_pairing_result(result) | ||
|
|
||
| raise typer.Exit(code=asyncio.run(f())) | ||
|
|
||
|
|
||
| def _standalone_transport(options: Options) -> SMPBumbleTransport: | ||
| return SMPBumbleTransport( | ||
| hci=options.hci, | ||
| keystore=resolve_keystore_strategy(options.keystore), | ||
| ) | ||
|
|
||
|
|
||
| @bonds_app.command(name="list") | ||
| def bonds_list(ctx: typer.Context) -> None: | ||
| """List BD_ADDRs currently bonded in the active keystore.""" | ||
|
|
||
| options = cast(Options, ctx.obj) | ||
|
|
||
| async def f() -> None: | ||
| bonded = await _standalone_transport(options).bonded_devices() | ||
| if not bonded: | ||
| print("[dim]No bonds in keystore.[/dim]") | ||
| return | ||
| table = Table(title=f"Bonded devices ({options.keystore})") | ||
| table.add_column("Address") | ||
| for addr in bonded: | ||
| table.add_row(addr) | ||
| print(table) | ||
|
|
||
| asyncio.run(f()) | ||
|
|
||
|
|
||
| @bonds_app.command(name="clear") | ||
| def bonds_clear( | ||
| ctx: typer.Context, | ||
| address: Annotated[str, typer.Argument(help="BD_ADDR of the bond to delete.")], | ||
| ) -> None: | ||
| """Delete the bond for one peer from the keystore.""" | ||
|
|
||
| options = cast(Options, ctx.obj) | ||
|
|
||
| async def f() -> None: | ||
| await _standalone_transport(options).clear_bond(address) | ||
| print(f"[green]Cleared bond[/green] for {address}.") | ||
|
|
||
| asyncio.run(f()) | ||
|
|
||
|
|
||
| @bonds_app.command(name="clear-all") | ||
| def bonds_clear_all( | ||
| ctx: typer.Context, | ||
| yes: Annotated[ | ||
| bool, | ||
| typer.Option("--yes", "-y", help="Skip the confirmation prompt."), | ||
| ] = False, | ||
| ) -> None: | ||
| """Delete every bond from the keystore.""" | ||
|
|
||
| options = cast(Options, ctx.obj) | ||
|
|
||
| if not yes and not typer.confirm( | ||
| f"Delete every bond in keystore '{options.keystore}'?", default=False | ||
| ): | ||
| raise typer.Exit(code=1) | ||
|
|
||
| async def f() -> None: | ||
| await _standalone_transport(options).clear_bonds() | ||
| print("[green]Cleared all bonds.[/green]") | ||
|
|
||
| asyncio.run(f()) | ||
|
|
||
|
|
||
| def _register_firmware_command(parent: typer.Typer, name: str, mod: 'FirmwareModule') -> None: | ||
| """Register one subcommand per typed firmware variant on `parent`. | ||
|
|
||
| Default action with no flags: print the absolute .hex path to stdout — | ||
| designed for shell composition, e.g. | ||
| `west flash --hex-file=$(smpmgr bumble firmware nrf52840dk_default)`. | ||
|
|
||
| Pass `--extract <PATH>` to copy the bundled .hex out to a destination | ||
| (useful when running from the portable binary where the bundle is opaque). | ||
| """ | ||
| short_help: Final = f"Board {mod.BOARD}, build {mod.OPTIONS}, sha256={mod.HEX_SHA256[:8]}…" | ||
|
|
||
| @parent.command(name=name, help=short_help) | ||
| def _fw( | ||
| extract: Annotated[ | ||
| Path | None, | ||
| typer.Option( | ||
| "--extract", | ||
| help="Copy the .hex out to this path (parent dir auto-created).", | ||
| ), | ||
| ] = None, | ||
| verify: Annotated[ | ||
| bool, | ||
| typer.Option( | ||
| "--verify/--no-verify", | ||
| help="Verify the embedded SHA-256 before copying" | ||
| " (only meaningful with --extract).", | ||
| ), | ||
| ] = True, | ||
| ) -> None: | ||
| if extract is None: | ||
| typer.echo(str(mod.HEX_PATH)) | ||
| return | ||
| if verify: | ||
| try: | ||
| mod.read_firmware_bytes() | ||
| except ValueError as e: | ||
| typer.echo(f"SHA-256 verification failed: {e}", err=True) | ||
| raise typer.Exit(code=1) from e | ||
| extract.parent.mkdir(parents=True, exist_ok=True) | ||
| shutil.copy(mod.HEX_PATH, extract) | ||
| typer.echo(f"Wrote {extract} ({mod.HEX_PATH.stat().st_size} bytes)", err=True) | ||
|
|
||
|
|
||
| try: | ||
| from smpclient.transport.firmware.hci import Firmware, firmware | ||
| from zephyr_4_4_0_hci import FirmwareModule | ||
|
|
||
| for _name in Firmware._fields: | ||
| _register_firmware_command(firmware_app, _name, getattr(firmware, _name)) | ||
| except ImportError: | ||
|
|
||
| @firmware_app.callback(invoke_without_command=True) | ||
| def _firmware_unavailable(ctx: typer.Context) -> None: | ||
| if ctx.invoked_subcommand is not None: | ||
| return | ||
| print( | ||
| "[red]Bundled HCI firmware is not installed.[/red]" | ||
| " Reinstall smpclient with the [bold]hci_firmware[/bold] extra." | ||
| ) | ||
| raise typer.Exit(code=1) |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Acknowledged — this is the explicit un-draft blocker called out in the PR description. The pin will be reverted to a tagged PyPI
smpclientrelease once intercreate/smpclient#107 merges and a release is cut; theTODO(bumble): revert to a tagged PyPI releasecomment on the pin marks the spot. Leaving this thread open so it remains visible until that swap happens.