-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathmoondeck.py
More file actions
1936 lines (1723 loc) · 85.7 KB
/
Copy pathmoondeck.py
File metadata and controls
1936 lines (1723 loc) · 85.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/usr/bin/env python3
"""MoonDeck — browser-based developer console for projectMM."""
import http.server
import json
import os
import signal
import subprocess
import sys
import tempfile
import threading
from contextlib import suppress
from pathlib import Path
PORT = 8420
ROOT = Path(__file__).resolve().parent.parent
SCRIPTS_DIR = Path(__file__).resolve().parent
UI_DIR = SCRIPTS_DIR / "moondeck_ui"
ASSETS_DIR = ROOT / "docs" / "assets"
STATE_FILE = SCRIPTS_DIR / "moondeck.json"
# Shared test-metadata parsers live next to the doc generator. Both this server
# and scripts/docs/generate_test_docs.py import from there so the two views of
# the same source files (HTML in MoonDeck, markdown in docs/tests/) can't drift.
sys.path.insert(0, str(SCRIPTS_DIR / "docs"))
import _test_metadata as test_meta # noqa: E402
# Re-use the doc generator's perf-table formatter so the MoonDeck step view
# and the generated scenario-tests.md show the same shape per step (single
# source of truth — adding/changing a metric updates both surfaces at once).
import generate_test_docs as test_doc_gen # noqa: E402
def _app_version():
"""Read the project version from library.json. '?' if unavailable."""
try:
return json.loads((ROOT / "library.json").read_text(encoding="utf-8")).get("version", "?")
except Exception:
return "?"
APP_VERSION = _app_version()
# ---------------------------------------------------------------------------
# Boards catalog (single source of truth, shared with the web installer)
# ---------------------------------------------------------------------------
BOARDS_FILE = ROOT / "docs" / "install" / "deviceModels.json"
def _load_boards():
"""Load docs/install/deviceModels.json. Returns [] on missing/malformed file —
`_deduce_board` then always returns "" (no firmware uniquely identifies
a board), MoonDeck JS shows only the empty default. The web installer
Step 2 picker will share this file.
"""
try:
return json.loads(BOARDS_FILE.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return []
BOARDS = _load_boards()
FIRMWARES_FILE = ROOT / "docs" / "install" / "firmwares.json"
def _load_firmwares():
"""Shipping firmware-variant names from docs/install/firmwares.json — the
generated projection of build_esp32's FIRMWARES dict (the single source of
truth, shared with the CI release matrix). Returns [] on missing/malformed
file, so the MoonDeck UI just shows no firmware entries. Filtering on
`ships` keeps held-out variants (e.g. esp32p4-eth-wifi) out of the picker.
"""
try:
doc = json.loads(FIRMWARES_FILE.read_text(encoding="utf-8"))
return [f["name"] for f in doc["firmwares"] if f.get("ships")]
except (OSError, json.JSONDecodeError, KeyError):
return []
# ---------------------------------------------------------------------------
# Script definitions (loaded from scripts.json)
# ---------------------------------------------------------------------------
SCRIPTS_FILE = SCRIPTS_DIR / "moondeck_config.json"
def load_scripts():
with open(SCRIPTS_FILE) as f:
return json.load(f)
_scripts_data = load_scripts()
SCRIPTS = _scripts_data["scripts"]
FIRMWARES = _load_firmwares()
# ---------------------------------------------------------------------------
# Device discovery
# ---------------------------------------------------------------------------
def _lan_ip():
"""This machine's LAN IP. '' if it can't be determined (offline).
connect() on a UDP socket sends no packet — it just picks the outbound
interface, whose address is the LAN IP.
"""
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
s.connect(("8.8.8.8", 80))
return s.getsockname()[0]
except Exception:
return ""
finally:
s.close()
def _get_local_subnet():
"""The local /24 subnet prefix (e.g. '192.168.1'). Falls back to a default."""
ip = _lan_ip()
return ".".join(ip.split(".")[:3]) if ip else "192.168.1"
def _walk_modules(modules):
"""Yield every module in the tree (depth-first), including nested children."""
for m in modules or []:
yield m
yield from _walk_modules(m.get("children", []))
def _probe_device(ip, port=8080, timeout=0.4):
"""Probe a single IP for /api/state. Returns device info or None.
Short timeout: on a LAN a live device answers in a few ms and a dead IP
refuses the connection almost instantly; 0.4s only matters for IPs that
silently drop packets (firewalled hosts), and a subnet scan should not
stall seconds on those.
Returns: { ip, deviceName, firmware, board }
- `firmware` is the variant flashed (value of the `firmware` control on
SystemModule, set from kFirmwareName in build_info.h). Used to deduce
`board` when the device hasn't been told its board yet. See
docs/architecture.md § Firmware vs board.
- `board` is the physical hardware key. Preferred source: the device's
own `deviceModel` control on SystemModule (the value MoonDeck pushed earlier
and the device persisted). Fall back to firmware-based deduction
(catalog lookup) when the device hasn't been told yet — then MoonDeck
pushes the deduced value on next discover, the device persists it,
and subsequent probes read it back from the device.
"""
import urllib.request
import urllib.error
url = f"http://{ip}:{port}/api/state"
try:
req = urllib.request.Request(url)
with urllib.request.urlopen(req, timeout=timeout) as resp:
data = json.loads(resp.read())
modules = data.get("modules", [])
device_name = ""
firmware = ""
device_board = ""
for m in _walk_modules(modules):
# deviceName, firmware AND deviceModel all live on SystemModule now
# (the deviceModel identity was folded in from the former BoardModule).
if m.get("type") == "SystemModule":
for c in m.get("controls", []):
if c.get("name") == "deviceName":
device_name = c.get("value", "") or ""
elif c.get("name") == "firmware":
firmware = c.get("value", "") or ""
elif c.get("name") == "deviceModel":
device_board = c.get("value", "") or ""
return {
"ip": f"{ip}:{port}",
"deviceName": device_name,
"firmware": firmware,
"board": device_board or _deduce_board(firmware),
}
except Exception:
return None
def _deduce_board(firmware: str) -> str:
"""Firmware → board name when exactly one catalog entry claims this
firmware. Returns "" when zero (unknown firmware) or multiple boards
claim it (ambiguous — user picks). Catalog lives at
docs/install/deviceModels.json; see docs/architecture.md § Firmware vs board.
"""
if not firmware:
return ""
matches = [b["name"] for b in BOARDS if firmware in b.get("firmwares", [])]
return matches[0] if len(matches) == 1 else ""
def _push_board_to_device(ip: str, board: str) -> bool:
"""POST /api/control on the device for every per-board control in deviceModels.json.
For boards that have a catalog entry in docs/install/deviceModels.json: fans
out the full `controls.<Module>.<control>` block (matching the web
installer's and the device-side `?deviceModel=` Inject path — same generic
iteration, so adding a new field to a board entry Just Works without
code changes here). For boards without a catalog entry (custom names,
unknown firmware): still pushes `System.deviceModel` so the bare name lands —
keeps the legacy single-field behaviour as the fallback.
Returns True iff EVERY POST returned 200. False on any failure (timeout,
non-2xx, network error) — partial state may have been applied; the next
refresh re-attempts. Same best-effort semantics as the prior single-
field shape.
`ip` is the "host:port" string from the device record (already includes
the port discovery picked). `board` is the catalog key MoonDeck wants
the device to remember (empty string means "clear" — no push).
"""
if not board:
return True # nothing to push; not a failure
import urllib.request
import urllib.error
# Look up the catalog entry. BOARDS is loaded at module init; we don't
# re-read deviceModels.json per push so a tight discover-refresh cycle
# doesn't hammer the disk. If the user edits deviceModels.json, restart
# MoonDeck (same as every other catalog change).
entry = next((b for b in BOARDS if b.get("name") == board), None)
if entry is not None:
modules = entry.get("modules") or []
else:
# Custom / unknown deviceModel: push the bare name onto System's `deviceModel`
# control (the identity lives on SystemModule now — no parent_id, it's the
# boot-wired top-level module, so _apply just sets the control, no add).
modules = [{"type": "System", "id": "System",
"controls": {"deviceModel": board}}]
return _apply_modules_to_device(ip, modules)
def _apply_modules_to_device(ip: str, modules: list) -> bool:
"""Add-then-configure a list of module-with-controls units on a device.
Each unit is `{type, id, parent_id?, controls?}` — the SAME shape deviceModels.json
catalog entries use and a saved device-profile stores, so both the board push
(_push_board_to_device) and a profile restore share this one fan-out. Per
module: add it first when it has a parent_id (a fresh flash has no user-added
modules like AudioModule, so a control write would 404), then set its controls.
A module without parent_id is boot-wired/top-level (Board under System,
Network) that already exists — skip the add, just set controls. The add is
idempotent (an existing id returns 200). Returns True iff EVERY POST returned
200; best-effort (partial state may apply, the next refresh re-attempts).
"""
import urllib.request
import urllib.error
def _post(path: str, body_obj: dict) -> bool:
body = json.dumps(body_obj).encode()
try:
req = urllib.request.Request(
f"http://{ip}{path}",
data=body, method="POST",
headers={"Content-Type": "application/json"})
with urllib.request.urlopen(req, timeout=0.6) as resp:
return resp.status == 200
except (urllib.error.URLError, OSError):
return False
for m in modules:
if not isinstance(m, dict):
continue
if m.get("parent_id") and m.get("type"):
if not _post("/api/modules", {
"type": m.get("type"),
"id": m.get("id"),
"parent_id": m.get("parent_id"),
}):
return False
ctrls = m.get("controls") or {}
for control_name, value in ctrls.items():
if not _post("/api/control", {
"module": m.get("id"),
"control": control_name,
"value": value,
}):
return False
return True
# Module types whose controls a device-profile captures: the drivers + the
# system/network/audio config a user sets by hand. Effects/layouts/layers are
# animation state, not the device's physical pin wiring, so they're left out —
# a profile is "the GPIO/peripheral setup", re-applied after a reflash wipes it.
# SystemModule carries the device identity (deviceName + deviceModel, the latter
# folded in from the former BoardModule); its read-only telemetry controls are
# dropped by the _NON_REPLAYABLE_CONTROL_TYPES filter, leaving just the Text config.
_PROFILE_MODULE_TYPES = {
"SystemModule", "NetworkModule", "AudioModule",
"RmtLedDriver", "LcdLedDriver", "ParlioLedDriver", "NetworkSendDriver",
}
# Control types that must NOT go into a captured profile: password values are
# XOR-obfuscated in /api/state (re-pushing double-encodes them), and display /
# display-int / progress are device-derived read-outs. These mirror the device's
# own isPersistable() exclusions (src/core/Control.cpp); strings are the JSON
# `type` values from controlTypeName().
_NON_REPLAYABLE_CONTROL_TYPES = {"password", "display", "display-int", "progress"}
def _capture_device_profile(ip: str) -> "list | None":
"""Read /api/state and flatten the module tree into profile units.
Returns a list of `{type, id, parent_id?, controls}` units (the same shape
_apply_modules_to_device + deviceModels.json use), or None if the device is
unreachable. Only the config-bearing module types in _PROFILE_MODULE_TYPES are
captured (the physical pin/peripheral setup), and each module's controls list
`[{name, value}]` is collapsed to a `{name: value}` dict. parent_id comes from
the tree position so restore re-creates user-added modules under the right
container. The catalog `type` is the short id the device reports as the module
type; we keep it verbatim (e.g. "RmtLedDriver"), matching deviceModels.json.
"""
import urllib.request
import urllib.error
host = ip.split(":")[0]
port = ip.split(":")[1] if ":" in ip else "8080"
try:
with urllib.request.urlopen(f"http://{host}:{port}/api/state", timeout=1.0) as resp:
state = json.loads(resp.read())
except (urllib.error.URLError, OSError, json.JSONDecodeError):
return None
units: list = []
def _collect(modules, parent_id) -> None:
for m in modules or []:
mtype = m.get("type")
mid = m.get("name") # /api/state reports the instance id under "name"
if mtype in _PROFILE_MODULE_TYPES and mid:
controls = {}
for c in m.get("controls", []):
cn = c.get("name")
# Skip non-replayable control types: password values are
# XOR-obfuscated in /api/state (replaying them would double-
# encode), and display / display-int / progress are device-
# derived read-outs the device overwrites every tick. These are
# exactly the types the device's own isPersistable() (Control.cpp)
# refuses to save — mirror that here so a profile only carries
# writable config. Type strings come from controlTypeName().
if c.get("type") in _NON_REPLAYABLE_CONTROL_TYPES:
continue
if cn is not None and "value" in c:
controls[cn] = c["value"]
unit = {"type": mtype, "id": mid}
if parent_id:
unit["parent_id"] = parent_id
if controls:
unit["controls"] = controls
units.append(unit)
# recurse with THIS module's id as the parent for its children
_collect(m.get("children", []), m.get("name"))
_collect(state.get("modules", []), None)
return units
def _push_boards_in_parallel(pushes):
"""Fire _push_board_to_device for each (ip, board) tuple in parallel.
Discovery + refresh probe a /24, so the push count is bounded by the
device count (single digits in practice). A small thread pool keeps
total latency near the slowest single push instead of summing. Result
is fire-and-forget: callers don't act on the bool returned by each
push — failures are recoverable on the next refresh cycle.
"""
if not pushes:
return
import concurrent.futures
with concurrent.futures.ThreadPoolExecutor(max_workers=8) as pool:
# list() forces all futures to start; the with-block waits for them.
list(pool.map(lambda p: _push_board_to_device(*p), pushes))
def discover_devices(subnet=""):
"""Scan subnet for devices responding to /api/state."""
if not subnet:
subnet = _get_local_subnet()
# .1-.254 on port 80 (ESP32) and 8080 (desktop), plus localhost.
targets = [(f"{subnet}.{i}", port)
for i in range(1, 255) for port in (80, 8080)]
targets.append(("localhost", 8080))
# Wide thread pool — the probes are I/O-bound (almost always blocked on the
# socket, not the CPU), so running all ~509 in one wave means the whole /24
# scan finishes in about one probe-timeout window (~0.4s) instead of
# batch-serializing. The pool still caps thread churn vs. raw thread spawns.
from concurrent.futures import ThreadPoolExecutor
devices = []
with ThreadPoolExecutor(max_workers=len(targets)) as pool:
for result in pool.map(lambda t: _probe_device(*t), targets):
if result:
devices.append(result)
# The local app answers on both localhost and this machine's LAN IP — the
# subnet scan finds the LAN-IP entry, the explicit localhost probe finds the
# other. Keep the LAN IP (usable from any device) and drop the redundant
# localhost entry so the discovered list shows real network addresses.
localIp = _lan_ip()
hasLanEntry = localIp and any(d["ip"].startswith(localIp + ":") for d in devices)
if hasLanEntry:
devices = [d for d in devices if not d["ip"].startswith("localhost:")]
# Sort by IP
devices.sort(key=lambda d: d["ip"])
return devices, subnet
_LAST_FLASH_FILE = SCRIPTS_DIR / ".last_flash.json"
_LAST_FLASH_TTL_S = 5 * 60 # ignore markers older than 5 minutes
def _consume_last_flash() -> dict | None:
"""Read the breadcrumb scripts/.last_flash.json that flash_esp32.py drops
after a successful flash. Returns {port, firmware} when the marker is
recent (< TTL); deletes the file so the link only happens once. Returns
None when there's no recent marker."""
if not _LAST_FLASH_FILE.exists():
return None
try:
data = json.loads(_LAST_FLASH_FILE.read_text(encoding="utf-8"))
except (OSError, json.JSONDecodeError):
return None
import time
if time.time() - float(data.get("ts", 0)) > _LAST_FLASH_TTL_S:
with suppress(OSError):
_LAST_FLASH_FILE.unlink()
return None
return {"port": data.get("port", ""), "firmware": data.get("firmware", "")}
def refresh_devices(known_devices):
"""Probe known devices to check online/offline status.
Preserves user-set fields (`board`, `last_port`) across refreshes: the
probe result carries fresh `firmware`/`deviceName` and a deduced `board`
(set only when firmware unambiguously identifies hardware), but a user-set
`board` for a firmware that can run on multiple boards (e.g. `esp32` on
LOLIN D32 vs generic DevKit) must survive a refresh. Same for `last_port`,
which is set by the flash-event breadcrumb (see _consume_last_flash).
"""
def probe(device):
ip = device.get("ip", "")
if ":" in ip:
host, port = ip.rsplit(":", 1)
fresh = _probe_device(host, int(port))
else:
fresh = _probe_device(ip)
if not fresh:
return None
# Merge: probe wins for live-readable fields; user-set / flash-tracked
# fields must survive. Without this, `online` (the device responded
# → True), `selected` (user checkbox), `board` (user-set when not
# deducible), and `last_port` (set by the flash breadcrumb) would
# disappear from the persisted record after every refresh.
fresh["online"] = True
if "selected" in device:
fresh["selected"] = device["selected"]
if not fresh.get("board") and device.get("board"):
fresh["board"] = device["board"]
if device.get("last_port"):
fresh["last_port"] = device["last_port"]
return fresh
if not known_devices:
return []
from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor(max_workers=16) as pool:
refreshed = [r for r in pool.map(probe, known_devices) if r]
# Link a recent flash event to the device whose firmware matches. The
# flash flow writes scripts/.last_flash.json after a successful flash;
# here we attribute it to a refreshed device with the same firmware
# variant (newest "online with matching firmware" wins — usually the
# only candidate). After linking we consume the marker so the same
# event doesn't keep applying on every refresh.
last_flash = _consume_last_flash()
if last_flash and last_flash["firmware"]:
matches = [d for d in refreshed if d.get("firmware") == last_flash["firmware"]]
if len(matches) == 1:
matches[0]["last_port"] = last_flash["port"]
with suppress(OSError):
_LAST_FLASH_FILE.unlink()
# If 0 matches (device hasn't booted yet) or 2+ matches (ambiguous),
# leave the marker for the next refresh to retry / re-evaluate.
return refreshed
# ---------------------------------------------------------------------------
# State management
# ---------------------------------------------------------------------------
def load_state():
"""Load MoonDeck state. Migrates the old flat-list shape (top-level
`devices` + `port`) to the new networks-grouped shape on first load
after this commit ships; new-shape files load as-is. See _migrate_to_networks."""
if STATE_FILE.exists():
with open(STATE_FILE) as f:
state = json.load(f)
if "networks" not in state and ("devices" in state or "port" in state):
state = _migrate_to_networks(state)
return state
return {"networks": [], "active_network": "", "tab": "pc"}
def _migrate_to_networks(old_state: dict) -> dict:
"""One-shot migration of the pre-networks moondeck.json shape:
{env, port, devices: [...], tab, firmware, scenario, module, flag_*}
into the networks-grouped shape:
{networks: [{name, subnet, wifi, port, devices: [...]}, ...],
active_network, tab, firmware, scenario, module, flag_*}
Buckets existing devices by `/24` subnet derived from each device's `ip`.
Names the largest bucket "Home", subsequent buckets "Network 2", "Network 3",
... User can rename via the dropdown. The old top-level `port` migrates
into the bucket that holds the largest device count (heuristic — usually
that's where the user was working). Drops the legacy `env` field
(already migrated to `firmware` in app.js).
"""
import sys
devices = old_state.get("devices") or []
by_subnet: dict[str, list] = {}
for d in devices:
ip_port = d.get("ip", "")
host = ip_port.split(":", 1)[0]
parts = host.split(".")
subnet = ".".join(parts[:3]) + ".0/24" if len(parts) == 4 else "unknown"
by_subnet.setdefault(subnet, []).append(d)
# Largest bucket first → named "Home"; rest "Network 2", "Network 3", ...
ordered = sorted(by_subnet.items(), key=lambda kv: -len(kv[1]))
networks = []
for i, (subnet, bucket) in enumerate(ordered):
name = "Home" if i == 0 else f"Network {i + 1}"
networks.append({
"name": name,
"subnet": subnet,
"wifi": {"ssid": "", "password": ""},
"port": "",
"devices": bucket,
})
# Old top-level port → largest bucket (which is networks[0] if any).
if networks and old_state.get("port"):
networks[0]["port"] = old_state["port"]
new_state = {k: v for k, v in old_state.items()
if k not in ("devices", "port", "env")}
new_state["networks"] = networks
new_state["active_network"] = networks[0]["name"] if networks else ""
new_state.setdefault("tab", "pc")
print(f"moondeck: migrated {len(devices)} device(s) into {len(networks)} "
f"network(s): {', '.join(n['name'] for n in networks)}", file=sys.stderr)
return new_state
# `modules` is the full module tree from /api/state — kilobytes per device, no
# UI consumer between probes; strip on save to keep moondeck.json small.
# `deviceName` and `firmware` ARE displayed in the device row label, so keep
# them persisted: stripping made the row show only an IP after every server
# restart until the user clicked Discover. Both fields are correctly
# overwritten by the next probe (no staleness drift problem).
_VOLATILE_DEVICE_FIELDS = ("modules",)
# Serializes the full load → mutate → save transaction across the threaded
# HTTP handlers (ThreadingHTTPServer dispatches each request on its own
# thread). Without this, two concurrent /api/discover requests could both
# load_state(), mutate their own copies, and each save_state() — last write
# wins, half the work is lost. RLock (not Lock) so save_state can also be
# called standalone for the no-mutator path (POST /api/state body merge)
# without deadlocking when nested inside mutate_state.
_state_write_lock = threading.RLock()
def mutate_state(mutator):
"""Run a full load → mutator(state) → save cycle under the state lock.
Returns the post-mutation state so the handler can echo it to the
client. `mutator` receives the loaded state dict, mutates in place
(or returns a new dict, which becomes the value to save), and may
return None to mean "keep the in-place mutation."
Slow work (subnet scans, device probes) should happen BEFORE calling
mutate_state — pass already-gathered data in by closure. Holding the
lock across network I/O would serialise everything behind the slowest
scan."""
with _state_write_lock:
state = load_state()
result = mutator(state)
if result is not None:
state = result
save_state(state)
return state
def save_state(state):
"""Persist MoonDeck state. Strips per-device fields that the device itself
is the source of truth for (`deviceName`, `firmware`) — caching them
invites stale values when the device is reflashed/renamed via another
host. They are re-read from `/api/state` on each refresh and live only
in the in-memory device lists until the next save. User-set fields
(`board`, `last_port`, `selected`, `online`) persist. Iterates per network.
Write is atomic + serialized: a temp file in the same dir → fsync → rename.
The rename is atomic on POSIX (same filesystem); fsync makes the bytes
durable before the swap so a crash mid-write never leaves a half-written
moondeck.json (the previous version stays intact). The lock ensures two
handler threads don't race on the temp file or the rename."""
persisted = dict(state)
networks = persisted.get("networks") or []
if networks:
persisted["networks"] = [_strip_network_volatiles(n) for n in networks]
data = json.dumps(persisted, indent=2)
with _state_write_lock:
# NamedTemporaryFile in the same dir so os.replace stays on one
# filesystem (cross-FS rename is not atomic). delete=False because
# we hand the path to os.replace ourselves.
tmp = tempfile.NamedTemporaryFile(
mode="w", encoding="utf-8",
dir=str(STATE_FILE.parent),
prefix=STATE_FILE.name + ".",
suffix=".tmp",
delete=False,
)
try:
tmp.write(data)
tmp.flush()
os.fsync(tmp.fileno())
tmp.close()
os.replace(tmp.name, STATE_FILE)
except Exception:
# On failure, drop the stray temp file so we don't accumulate
# .tmp leftovers across crashes. Re-raise so the caller sees it.
with suppress(OSError):
os.unlink(tmp.name)
raise
def _strip_network_volatiles(network: dict) -> dict:
"""Return a copy of a network with volatile per-device fields stripped.
`board` is conditionally volatile: when it equals the value `_deduce_board`
produces from the device's current firmware, the next probe will re-derive
it for free — no need to persist. When the user picked a board manually
(firmware doesn't deduce to anything, e.g. `esp32` could be LOLIN D32 or
generic), the picker's choice is the only source so we must keep it.
"""
out = dict(network)
devs = out.get("devices") or []
cleaned = []
for d in devs:
c = {k: v for k, v in d.items() if k not in _VOLATILE_DEVICE_FIELDS}
# `board` strip: drop it when empty (noise) or when it matches the
# firmware-deduced value (recomputable). Keep when the user picked
# it (firmware deduces "" so the picker's value is the only source).
firmware = d.get("firmware") or ""
if "board" in c and (not c["board"] or c["board"] == _deduce_board(firmware)):
del c["board"]
cleaned.append(c)
out["devices"] = cleaned
return out
def _active_network(state: dict) -> dict | None:
"""Return the dict for state['active_network'] from state['networks'],
or None when the name doesn't match or there's no active selection.
Every consumer that previously read state['devices'] or state['port']
routes through this helper."""
name = state.get("active_network") or ""
for n in state.get("networks") or []:
if n.get("name") == name:
return n
return None
def _subnet_from_host_subnet(host_subnet: str) -> str:
"""Normalise `_get_local_subnet()` output (e.g. "192.168.1") to the
network record's `subnet` field shape ("192.168.1.0/24")."""
if not host_subnet:
return ""
return f"{host_subnet}.0/24"
def _auto_select_network(state: dict, host_subnet: str) -> None:
"""In-place: set state['active_network'] to whichever known network's
subnet matches the host's current subnet — but only if the user hasn't
pinned a different network. Pinning happens when the user changes the
dropdown; cleared when the pinned network's subnet stops matching the
host (next time we land on its LAN, auto-select takes over again)."""
if not state.get("networks"):
return
target_subnet = _subnet_from_host_subnet(host_subnet)
if not target_subnet:
return
pinned = state.get("active_network_user_pinned")
if pinned:
active = _active_network(state)
if active and active.get("subnet") == target_subnet:
return # pinned network still matches host — leave as is
# Pinned network no longer matches host — release the pin so the
# next auto-select picks the right network for where we are now.
state["active_network_user_pinned"] = False
for n in state["networks"]:
if n.get("subnet") == target_subnet:
state["active_network"] = n["name"]
return
# ---------------------------------------------------------------------------
# Process management
# ---------------------------------------------------------------------------
_running: dict[str, subprocess.Popen] = {}
_lock = threading.Lock()
_IS_WIN = sys.platform == "win32"
def _kill_process_by_name(name: str):
"""Kill processes matching name. Cross-platform."""
if _IS_WIN:
subprocess.run(["taskkill", "/F", "/IM", name + ".exe"],
capture_output=True)
else:
subprocess.run(["pkill", "-f", name], capture_output=True)
def kill_script(script_id: str):
with _lock:
proc = _running.pop(script_id, None)
if proc and proc.poll() is None:
try:
if _IS_WIN:
proc.terminate()
else:
os.killpg(os.getpgid(proc.pid), signal.SIGTERM)
except (OSError, ProcessLookupError):
pass
# Clean up any orphaned processes (e.g. projectMM after os.execv)
script_def = next((s for s in SCRIPTS if s["id"] == script_id), None)
pname = script_def.get("process_name") if script_def else None
if pname:
_kill_process_by_name(pname)
def is_process_running(name: str) -> bool:
"""Check if a process matching name is running. Cross-platform."""
if _IS_WIN:
r = subprocess.run(["tasklist", "/FI", f"IMAGENAME eq {name}.exe"],
capture_output=True, text=True)
return name in r.stdout
else:
r = subprocess.run(["pgrep", "-f", name], capture_output=True)
return r.returncode == 0
# ---------------------------------------------------------------------------
# Serial port discovery
# ---------------------------------------------------------------------------
def list_serial_ports() -> list[str]:
"""List available serial ports.
POSIX hosts: glob the conventional /dev/tty* device files (no deps).
Windows: read HKLM\\HARDWARE\\DEVICEMAP\\SERIALCOMM via winreg (stdlib).
SERIALCOMM is the authoritative table the OS itself maintains for
present COM ports — what pyserial reads under the hood — so a registry
walk is both correct and dependency-free. The previous brute-force
COM0..COM255 open-and-close loop required pyserial, which MoonDeck did
not declare, so on Windows the list silently came back empty.
"""
ports: list[str] = []
import glob
ports.extend(glob.glob("/dev/tty.usb*"))
ports.extend(glob.glob("/dev/ttyUSB*"))
ports.extend(glob.glob("/dev/ttyACM*"))
if sys.platform == "win32":
import winreg
try:
with winreg.OpenKey(winreg.HKEY_LOCAL_MACHINE,
r"HARDWARE\DEVICEMAP\SERIALCOMM") as key:
i = 0
while True:
try:
_, port, _ = winreg.EnumValue(key, i)
ports.append(port)
i += 1
except OSError:
break
except FileNotFoundError:
pass # no SERIALCOMM key — no ports present
return sorted(ports)
# ---------------------------------------------------------------------------
# Perf-table HTML (shared shape with docs/tests/scenario-tests.md)
# ---------------------------------------------------------------------------
def _render_perf_table_html(step: dict) -> str:
"""Render a scenario step's contract+observed data as an HTML table that
matches the markdown table emitted by test_doc_gen._format_perf_table.
The doc generator owns the cell formatters; we just translate its pipe-
delimited markdown to <table><tr><td>. Returns "" when the step has no
contract/observed data."""
import html as html_mod
md_lines = test_doc_gen._format_perf_table(step)
if not md_lines:
return ""
# Lines come in groups:
# header text (`**Performance** ...`)
# blank
# table header row (`| Board | FPS | ... |`)
# separator row (`|---|---|...`)
# N body rows (`| `target` | ... |`)
# blank
# optional footer lines (`- \`target\`: contract set ... · observed ...`)
out: list[str] = []
in_table = False
rendered_header = False
for line in md_lines:
if not line.strip():
if in_table:
out.append("</tbody></table>")
in_table = False
continue
if line.startswith("**"):
# `**Performance** (contract / observed) — tick stored, FPS shown:`
# → strip both the leading `**...**` bold marker (just the markup,
# keep the bolded text inside) and the trailing `:` colon.
import re as _re
txt = _re.sub(r"\*\*(.+?)\*\*", r"\1", line.strip()).rstrip(":").strip()
out.append(f'<div class="perf-head"><strong>{html_mod.escape(txt)}</strong></div>')
continue
if line.startswith("|") and "---" in line:
continue # markdown separator row
if line.startswith("|"):
cells = [c.strip() for c in line.strip().strip("|").split("|")]
if not in_table:
out.append('<table class="perf-table"><tbody>')
in_table = True
tag = "th" if not rendered_header else "td"
rendered_header = True
row = "".join(
f"<{tag}>{_inline_code_html(html_mod.escape(c))}</{tag}>"
for c in cells
)
out.append(f"<tr>{row}</tr>")
continue
if line.lstrip().startswith("-"):
# Audit footer line.
txt = line.lstrip().lstrip("-").strip()
out.append(f'<div class="perf-audit">{_inline_code_html(html_mod.escape(txt))}</div>')
continue
if in_table:
out.append("</tbody></table>")
return '<div class="perf">' + "".join(out) + '</div>'
def _inline_code_html(s: str) -> str:
"""Tiny markdown-inline-code → <code> translator. The perf table cells
contain `target` and `field` names wrapped in backticks; render as <code>.
Doesn't try to be a full markdown parser — just the patterns the perf
formatter produces."""
import re as _re
return _re.sub(r"`([^`]+)`", r"<code>\1</code>", s)
# ---------------------------------------------------------------------------
# HTTP handler
# ---------------------------------------------------------------------------
class MoonDeckHandler(http.server.BaseHTTPRequestHandler):
def log_message(self, format, *args):
# Suppress default request logging
pass
def handle(self):
# Browser closing the connection is harmless; suppress the noise.
with suppress(ConnectionResetError, BrokenPipeError):
super().handle()
def _send_json(self, data, status=200):
body = json.dumps(data).encode()
self.send_response(status)
self.send_header("Content-Type", "application/json")
self.send_header("Content-Length", str(len(body)))
self.end_headers()
self.wfile.write(body)
def _read_body(self) -> bytes:
length = int(self.headers.get("Content-Length", 0))
return self.rfile.read(length) if length else b""
def do_GET(self):
if self.path == "/api/scripts":
self._send_json({"scripts": SCRIPTS, "firmwares": FIRMWARES})
elif self.path == "/api/ports":
self._send_json({"ports": list_serial_ports()})
elif self.path == "/api/scenarios":
self._send_json({"scenarios": self._list_scenarios()})
elif self.path.startswith("/api/scenarios/"):
self._serve_scenario_steps()
elif self.path == "/api/test-modules":
self._send_json({"modules": test_meta.list_test_modules()})
elif self.path == "/api/boards":
# Serves docs/install/deviceModels.json (loaded at startup). The web
# installer (Step 2) will fetch the same file directly from
# Pages; MoonDeck reads it locally and exposes it here so the
# JS UI shares one source of truth with the Python deduce path.
self._send_json({"boards": BOARDS})
elif self.path.startswith("/api/unit-tests/"):
self._serve_unit_tests_for_module()
elif self.path == "/api/state":
# Auto-select the network matching the host's current subnet
# (unless the user has pinned a different one — see
# _auto_select_network). Persist the selection back so the next
# load is stable when the host's subnet hasn't changed.
state = load_state()
before = state.get("active_network")
_auto_select_network(state, _get_local_subnet())
if state.get("active_network") != before:
save_state(state)
self._send_json(state)
elif self.path == "/api/running":
running = {}
for s in SCRIPTS:
pname = s.get("process_name")
if pname:
running[s["id"]] = is_process_running(pname)
self._send_json(running)
elif self.path.startswith("/api/stream/"):
script_id = self.path.split("/")[-1]
self._handle_stream(script_id)
elif self.path.startswith("/api/help"):
self._serve_help()
elif self.path.startswith("/api/docs/"):
self._serve_doc()
elif self.path == "/api/history-report":
self._serve_history_report()
elif self.path.startswith("/api/doc-asset/"):
self._serve_doc_asset()
else:
self._serve_static()
def do_POST(self):
if self.path.startswith("/api/run/"):
script_id = self.path.split("/")[-1]
body = self._read_body()
params = json.loads(body) if body else {}
self._handle_run(script_id, params)
elif self.path.startswith("/api/kill/"):
script_id = self.path.split("/")[-1]
kill_script(script_id)
self._send_json({"status": "killed"})
elif self.path == "/api/state":
body = self._read_body()
patch = json.loads(body) if body else {}
# mutate_state holds the lock across load + merge + save so two
# concurrent POSTs can't each load the same snapshot, apply
# different patches, and clobber each other on save.
def _merge(s):
s.update(patch)
result = mutate_state(_merge)
self._send_json(result)
elif self.path == "/api/push-board":
# Push a single (ip, board) to a device. Called by the JS when the
# user picks a board from the per-device dropdown — saveState
# alone persists the value in moondeck.json but the device also
# needs to hear about it (the device persists its `deviceModel` control,
# now on SystemModule, to /.config/SystemModule.json). The bulk push from discover /
# refresh covers the multi-device case; this covers the
# one-device-at-a-time UI mutation.
body = self._read_body()
params = json.loads(body) if body else {}
ip = params.get("ip", "")
board = params.get("board", "")
if not ip:
self._send_json({"error": "ip required"}, 400)
return
ok = _push_board_to_device(ip, board)
self._send_json({"ok": ok})