Skip to content

Commit 825591a

Browse files
committed
Add scripts and license updates
Add new utility scripts under common/scripts: udp_send.py (UDP send/recv module and CLI), do-tftp.py (Python port to drive TFTP transfers using udp_send and system tftp), and gd32/flash.py (GD32 ROM bootloader flasher and UART monitor). Update .gitignore to ignore the GD32 script virtualenv directory. Also apply multiple source maintenance changes: bump copyright years to include 2026, add/restore license headers in a couple of JSON param files, tidy includes/whitespace and small formatting/refactor changes (notably in dmxsendparams.cpp and several display/udf/global/pixeldmx files). These are cosmetic and licensing/formatting updates only.
1 parent a08c574 commit 825591a

12 files changed

Lines changed: 686 additions & 36 deletions

File tree

.gitignore

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -80,3 +80,4 @@ udp_send
8080
/lib-pixel/jbc
8181
/.cache
8282
compile_commands.json
83+
/common/scripts/gd32/venv

common/scripts/do-tftp.py

Lines changed: 194 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,194 @@
1+
#!/usr/bin/env python3
2+
"""
3+
do-tftp.py
4+
5+
Python port of the provided bash script, using udp_send.py for the UDP control
6+
messages and invoking the system 'tftp' client (no external dependencies).
7+
8+
Usage:
9+
python3 do-tftp.py <ip_address> <file_to_put>
10+
11+
Behavior matches the bash script:
12+
- toggles tftp on, waits until device responds and reports not Off
13+
- checks '?list#' for 'TFTP Server' and reboots if missing, then repeats enable
14+
- runs: tftp <ip> (binary, put <file>, quit)
15+
- toggles tftp off, waits until device responds and reports not On
16+
- reboots and waits until '?list#' responds, then prints list and version
17+
"""
18+
19+
from __future__ import annotations
20+
21+
import os
22+
import subprocess
23+
import sys
24+
sys.dont_write_bytecode = True
25+
import time
26+
from typing import Optional
27+
28+
import udp_send # expects udp_send.py to be importable (same dir or PYTHONPATH)
29+
30+
PORT = 10501
31+
BUFLEN = 512
32+
TIMEOUT_SEC = 1.0
33+
34+
35+
def _udp_cmd(ip: str, cmd: str) -> str:
36+
"""
37+
Send cmd via udp_send and return reply as text if any, else "".
38+
Mirrors: echo '...cmd...' | udp_send <ip>
39+
"""
40+
data = cmd.encode("utf-8", errors="strict")
41+
_sent, reply = udp_send.send_and_maybe_recv(
42+
ip, data, port=PORT, local_port=PORT, timeout_sec=TIMEOUT_SEC, buf_len=BUFLEN
43+
)
44+
if not reply:
45+
return ""
46+
return reply.decode("utf-8", errors="replace").rstrip("\r\n")
47+
48+
49+
def _sleep1() -> None:
50+
time.sleep(1.0)
51+
52+
53+
def _wait_nonempty_reply(ip: str, query_cmd: str, tick_cmd: Optional[str] = None) -> str:
54+
"""
55+
Repeatedly:
56+
optional send tick_cmd (e.g. '!tftp#1' or '!tftp#0')
57+
query query_cmd (e.g. '?tftp#' or '?list#')
58+
until reply != "".
59+
"""
60+
reply = _udp_cmd(ip, query_cmd)
61+
while reply == "":
62+
_sleep1()
63+
if tick_cmd is not None:
64+
_udp_cmd(ip, tick_cmd)
65+
reply = _udp_cmd(ip, query_cmd)
66+
return reply
67+
68+
69+
def _wait_until_not_equal(ip: str, query_cmd: str, bad_value: str, tick_cmd: Optional[str] = None) -> str:
70+
"""
71+
Wait until query reply is non-empty and != bad_value.
72+
"""
73+
reply = _wait_nonempty_reply(ip, query_cmd, tick_cmd=tick_cmd)
74+
while reply == bad_value:
75+
_sleep1()
76+
if tick_cmd is not None:
77+
_udp_cmd(ip, tick_cmd)
78+
reply = _udp_cmd(ip, query_cmd)
79+
if reply == "":
80+
# keep consistent with bash (it loops separately on empty)
81+
reply = _wait_nonempty_reply(ip, query_cmd, tick_cmd=tick_cmd)
82+
return reply
83+
84+
85+
def _reboot_and_wait_list(ip: str) -> str:
86+
print("Rebooting...")
87+
_udp_cmd(ip, "?reboot##")
88+
89+
reply = _udp_cmd(ip, "?list#")
90+
while reply == "":
91+
time.sleep(0.2)
92+
reply = _udp_cmd(ip, "?list#")
93+
return reply
94+
95+
96+
def _ensure_tftp_on(ip: str) -> str:
97+
# initial kick
98+
_udp_cmd(ip, "!tftp#1")
99+
on_line = _udp_cmd(ip, "?tftp#")
100+
print(f"[{on_line}]")
101+
102+
# while empty: sleep 1, kick + query
103+
on_line = _wait_nonempty_reply(ip, "?tftp#", tick_cmd="!tftp#1")
104+
print(f"[{on_line}]")
105+
106+
# while "tftp:Off": sleep 1, kick + query
107+
on_line = _wait_until_not_equal(ip, "?tftp#", "tftp:Off", tick_cmd="!tftp#1")
108+
print(f"[{on_line}]")
109+
return on_line
110+
111+
112+
def _ensure_tftp_off(ip: str) -> str:
113+
_udp_cmd(ip, "!tftp#0")
114+
on_line = _udp_cmd(ip, "?tftp#")
115+
print(f"[{on_line}]")
116+
117+
on_line = _wait_nonempty_reply(ip, "?tftp#", tick_cmd="!tftp#0")
118+
print(f"[{on_line}]")
119+
120+
on_line = _wait_until_not_equal(ip, "?tftp#", "tftp:On", tick_cmd="!tftp#0")
121+
print(f"[{on_line}]")
122+
return on_line
123+
124+
125+
def _run_tftp_put(ip: str, filename: str) -> None:
126+
print("tftp...")
127+
# Equivalent of:
128+
# tftp $1 << -EOF
129+
# binary
130+
# put $2
131+
# quit
132+
# -EOF
133+
script = f"binary\nput {filename}\nquit\n"
134+
try:
135+
subprocess.run(
136+
["tftp", ip],
137+
input=script.encode("utf-8"),
138+
check=True,
139+
stdout=sys.stdout,
140+
stderr=sys.stderr,
141+
)
142+
except FileNotFoundError:
143+
raise RuntimeError("System 'tftp' client not found in PATH.")
144+
except subprocess.CalledProcessError as e:
145+
raise RuntimeError(f"tftp failed with exit code {e.returncode}.")
146+
147+
148+
def main(argv: list[str]) -> int:
149+
if len(argv) < 3:
150+
print(f"Usage: {argv[0]} ip_address file", file=sys.stderr)
151+
return 2
152+
153+
ip = argv[1]
154+
filepath = argv[2]
155+
156+
if not os.path.isfile(filepath):
157+
# matches bash: echo $2 else exit
158+
return 1
159+
160+
print(filepath)
161+
162+
# Enable TFTP, wait until device reports it isn't Off
163+
_ensure_tftp_on(ip)
164+
165+
on_line = _udp_cmd(ip, "?list#")
166+
print(f"[{on_line}]")
167+
168+
sub = "Bootloader TFTP"
169+
if sub not in on_line:
170+
time.sleep(1.0)
171+
on_line = _reboot_and_wait_list(ip)
172+
173+
# After reboot: enable again (with the same wait loops)
174+
_ensure_tftp_on(ip)
175+
176+
# Do the transfer
177+
_run_tftp_put(ip, filepath)
178+
179+
# Disable TFTP, wait until device reports it isn't On
180+
_ensure_tftp_off(ip)
181+
182+
# Reboot again and wait for list to respond
183+
on_line = _reboot_and_wait_list(ip)
184+
print(f"[{on_line}]")
185+
186+
version = _udp_cmd(ip, "?version#")
187+
print(version, end="" if version.endswith("\n") else "\n")
188+
189+
return 0
190+
191+
192+
if __name__ == "__main__":
193+
raise SystemExit(main(sys.argv))
194+

0 commit comments

Comments
 (0)