Skip to content

Commit 8d5985e

Browse files
committed
feat(cli): add CLI-GUI coexistence with server proxy and port lock
When WebServer is running, CLI now auto-detects it and forwards device operations (info/inject/unpatch/mem-read/mem-write) via HTTP API proxy instead of opening a competing serial connection. When WebServer is offline, CLI falls back to direct serial mode with file-based port locking (fcntl/msvcrt) to prevent multi-instance conflicts. New modules: - cli/server_proxy.py: HTTP proxy forwarding to WebServer API - utils/port_lock.py: cross-platform file-based serial port lock New CLI flags: - --direct: skip proxy detection, force direct serial access - --server-url: specify non-default WebServer address Tests: 57 new tests (port_lock, server_proxy, cli_coexistence) Docs: docs/cli-gui-coexistence-plan.md with full analysis and roadmap
1 parent 819d3db commit 8d5985e

7 files changed

Lines changed: 1478 additions & 0 deletions

File tree

Tools/WebServer/cli/fpb_cli.py

Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,8 @@
2828
# Import from existing WebServer modules
2929
sys.path.insert(0, str(Path(__file__).parent))
3030
from fpb_inject import FPBInject # noqa: E402
31+
from utils.port_lock import PortLock # noqa: E402
32+
from cli.server_proxy import ServerProxy, DEFAULT_SERVER_URL # noqa: E402
3133

3234
try:
3335
import serial
@@ -101,6 +103,8 @@ def __init__(
101103
tx_chunk_size: int = 0,
102104
tx_chunk_delay: float = 0.002,
103105
max_retries: int = 10,
106+
direct: bool = False,
107+
server_url: str = DEFAULT_SERVER_URL,
104108
):
105109
self.verbose = verbose
106110
self.setup_logging()
@@ -113,8 +117,33 @@ def __init__(
113117
self._device_state.transfer_max_retries = max_retries
114118
self._fpb = FPBInject(self._device_state)
115119

120+
# Proxy and lock state
121+
self._proxy = None
122+
self._port_lock = None
123+
116124
# Connect to serial if port specified
117125
if port:
126+
if not direct:
127+
# Try proxy mode first
128+
proxy = ServerProxy(base_url=server_url)
129+
if proxy.is_server_running() and proxy.is_device_connected():
130+
self._proxy = proxy
131+
self._device_state.connected = True
132+
if self.verbose:
133+
logging.info(f"Using WebServer proxy mode ({server_url})")
134+
return
135+
136+
# Direct mode: acquire port lock then connect
137+
lock = PortLock(port)
138+
if not lock.acquire():
139+
owner = lock.get_owner_pid()
140+
raise FPBCLIError(
141+
f"Serial port {port} is locked by another process "
142+
f"(PID: {owner}). "
143+
f"Stop the other process or use a different port."
144+
)
145+
self._port_lock = lock
146+
118147
self._device_state.connect(port, baudrate)
119148
if self.verbose:
120149
logging.info(f"Connected to {port}")
@@ -355,6 +384,19 @@ def inject(
355384
if not source_path.exists():
356385
raise FPBCLIError(f"Source file not found: {source_file}")
357386

387+
# Proxy mode: forward to WebServer
388+
if self._proxy:
389+
result = self._proxy.inject(
390+
target_func=target_func,
391+
source_file=source_file,
392+
elf_path=elf_path,
393+
compile_commands=compile_commands,
394+
patch_mode=patch_mode,
395+
comp=comp,
396+
)
397+
self.output_json(result)
398+
return
399+
358400
# Check if device is connected
359401
if not self._device_state.connected:
360402
# Still provide useful info even without connection
@@ -433,6 +475,11 @@ def inject(
433475
def unpatch(self, comp: int = 0, all_patches: bool = False) -> None:
434476
"""Remove patch from device"""
435477
try:
478+
if self._proxy:
479+
result = self._proxy.unpatch(comp=comp, all_patches=all_patches)
480+
self.output_json(result)
481+
return
482+
436483
if not self._device_state.connected:
437484
raise FPBCLIError(
438485
"No device connected. Use --port to specify serial port."
@@ -454,6 +501,11 @@ def unpatch(self, comp: int = 0, all_patches: bool = False) -> None:
454501
def info(self) -> None:
455502
"""Get device FPB info"""
456503
try:
504+
if self._proxy:
505+
result = self._proxy.info()
506+
self.output_json(result)
507+
return
508+
457509
if not self._device_state.connected:
458510
raise FPBCLIError(
459511
"No device connected. Use --port to specify serial port."
@@ -599,6 +651,11 @@ def file_download(self, remote_path: str, local_path: str) -> None:
599651
def mem_read(self, addr: int, length: int, fmt: str = "hex") -> None:
600652
"""Read memory from device"""
601653
try:
654+
if self._proxy:
655+
result = self._proxy.mem_read(addr, length, fmt)
656+
self.output_json(result)
657+
return
658+
602659
if not self._device_state.connected:
603660
raise FPBCLIError(
604661
"No device connected. Use --port to specify serial port."
@@ -648,6 +705,11 @@ def mem_read(self, addr: int, length: int, fmt: str = "hex") -> None:
648705
def mem_write(self, addr: int, data_hex: str) -> None:
649706
"""Write memory to device"""
650707
try:
708+
if self._proxy:
709+
result = self._proxy.mem_write(addr, data_hex)
710+
self.output_json(result)
711+
return
712+
651713
if not self._device_state.connected:
652714
raise FPBCLIError(
653715
"No device connected. Use --port to specify serial port."
@@ -721,6 +783,9 @@ def mem_dump(self, addr: int, length: int, output_file: str) -> None:
721783
def cleanup(self):
722784
"""Cleanup resources"""
723785
self._device_state.disconnect()
786+
if self._port_lock:
787+
self._port_lock.release()
788+
self._port_lock = None
724789

725790

726791
def main():
@@ -792,6 +857,17 @@ def main():
792857
default=10,
793858
help="Maximum retry attempts for file transfer operations (default: 10).",
794859
)
860+
parser.add_argument(
861+
"--direct",
862+
action="store_true",
863+
help="Force direct serial connection (skip WebServer proxy detection).",
864+
)
865+
parser.add_argument(
866+
"--server-url",
867+
type=str,
868+
default=DEFAULT_SERVER_URL,
869+
help=f"WebServer URL for proxy mode (default: {DEFAULT_SERVER_URL}).",
870+
)
795871

796872
subparsers = parser.add_subparsers(dest="command", help="Command to execute")
797873

@@ -957,6 +1033,8 @@ def main():
9571033
tx_chunk_size=args.tx_chunk_size,
9581034
tx_chunk_delay=args.tx_chunk_delay,
9591035
max_retries=args.max_retries,
1036+
direct=args.direct,
1037+
server_url=args.server_url,
9601038
)
9611039

9621040
try:
Lines changed: 150 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,150 @@
1+
#!/usr/bin/env python3
2+
3+
# MIT License
4+
# Copyright (c) 2025 - 2026 _VIFEXTech
5+
6+
"""
7+
WebServer proxy for CLI coexistence.
8+
9+
When the WebServer is running, CLI device operations are forwarded
10+
via HTTP API instead of opening a second serial connection.
11+
"""
12+
13+
import json
14+
import logging
15+
import os
16+
from typing import Any, Dict, Optional
17+
from urllib.request import Request, urlopen
18+
19+
logger = logging.getLogger(__name__)
20+
21+
# Default WebServer URL
22+
DEFAULT_SERVER_URL = "http://127.0.0.1:5500"
23+
24+
# Timeout for probe / API calls (seconds)
25+
_PROBE_TIMEOUT = 2
26+
_API_TIMEOUT = 30
27+
28+
29+
class ServerProxy:
30+
"""Proxy device operations through the running WebServer HTTP API."""
31+
32+
def __init__(
33+
self,
34+
base_url: str = DEFAULT_SERVER_URL,
35+
token: Optional[str] = None,
36+
):
37+
self.base_url = base_url.rstrip("/")
38+
self.token = token
39+
40+
# ------------------------------------------------------------------
41+
# Low-level HTTP helpers (stdlib only, no requests dependency)
42+
# ------------------------------------------------------------------
43+
44+
def _build_url(self, path: str) -> str:
45+
url = f"{self.base_url}{path}"
46+
if self.token:
47+
sep = "&" if "?" in url else "?"
48+
url += f"{sep}token={self.token}"
49+
return url
50+
51+
def _get(self, path: str, timeout: float = _API_TIMEOUT) -> dict:
52+
url = self._build_url(path)
53+
req = Request(url, method="GET")
54+
req.add_header("Accept", "application/json")
55+
with urlopen(req, timeout=timeout) as resp:
56+
return json.loads(resp.read().decode())
57+
58+
def _post(
59+
self, path: str, data: Optional[dict] = None, timeout: float = _API_TIMEOUT
60+
) -> dict:
61+
url = self._build_url(path)
62+
body = json.dumps(data or {}).encode()
63+
req = Request(url, data=body, method="POST")
64+
req.add_header("Content-Type", "application/json")
65+
req.add_header("Accept", "application/json")
66+
with urlopen(req, timeout=timeout) as resp:
67+
return json.loads(resp.read().decode())
68+
69+
# ------------------------------------------------------------------
70+
# Server detection
71+
# ------------------------------------------------------------------
72+
73+
def is_server_running(self) -> bool:
74+
"""Check if the WebServer is reachable."""
75+
try:
76+
resp = self._get("/api/status", timeout=_PROBE_TIMEOUT)
77+
return resp.get("success", False)
78+
except Exception:
79+
return False
80+
81+
def is_device_connected(self) -> bool:
82+
"""Check if the WebServer has an active device connection."""
83+
try:
84+
resp = self._get("/api/status", timeout=_PROBE_TIMEOUT)
85+
return resp.get("connected", False)
86+
except Exception:
87+
return False
88+
89+
def get_status(self) -> dict:
90+
"""Get full WebServer status."""
91+
return self._get("/api/status")
92+
93+
# ------------------------------------------------------------------
94+
# Device operations (proxied to WebServer)
95+
# ------------------------------------------------------------------
96+
97+
def info(self) -> dict:
98+
"""Get device FPB info via WebServer."""
99+
return self._get("/api/fpb/info")
100+
101+
def inject(
102+
self,
103+
target_func: str,
104+
source_file: str,
105+
elf_path: Optional[str] = None,
106+
compile_commands: Optional[str] = None,
107+
patch_mode: str = "trampoline",
108+
comp: int = -1,
109+
) -> dict:
110+
"""Inject a patch via WebServer."""
111+
# Read source content to send
112+
source_content = ""
113+
source_ext = ".c"
114+
if os.path.exists(source_file):
115+
with open(source_file, "r", encoding="utf-8") as f:
116+
source_content = f.read()
117+
source_ext = os.path.splitext(source_file)[1]
118+
119+
payload: Dict[str, Any] = {
120+
"target_func": target_func,
121+
"source_content": source_content,
122+
"patch_mode": patch_mode,
123+
"comp": comp,
124+
"source_ext": source_ext,
125+
"original_source_file": os.path.abspath(source_file),
126+
}
127+
if elf_path:
128+
payload["elf_path"] = elf_path
129+
if compile_commands:
130+
payload["compile_commands_path"] = compile_commands
131+
132+
return self._post("/api/fpb/inject", payload)
133+
134+
def unpatch(self, comp: int = 0, all_patches: bool = False) -> dict:
135+
"""Remove a patch via WebServer."""
136+
return self._post("/api/fpb/unpatch", {"comp": comp, "all": all_patches})
137+
138+
def mem_read(self, addr: int, length: int, fmt: str = "hex") -> dict:
139+
"""Read device memory via WebServer."""
140+
return self._post(
141+
"/api/fpb/mem-read",
142+
{"addr": addr, "length": length, "fmt": fmt},
143+
)
144+
145+
def mem_write(self, addr: int, data_hex: str) -> dict:
146+
"""Write device memory via WebServer."""
147+
return self._post(
148+
"/api/fpb/mem-write",
149+
{"addr": addr, "data": data_hex},
150+
)

0 commit comments

Comments
 (0)