-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathcli.py
More file actions
187 lines (157 loc) · 6.02 KB
/
cli.py
File metadata and controls
187 lines (157 loc) · 6.02 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
#!/usr/bin/env python3
"""
CLI tool for monitoring and controlling the endless pool.
Usage:
python cli.py monitor # Listen for broadcasts and display status
python cli.py send start # Start the pool
python cli.py send stop # Stop the pool
python cli.py send speed <value> # Set speed (0-255, higher=slower)
python cli.py send timer <seconds> # Set timer in seconds
python cli.py send pace <M:SS> # Set speed by pace (e.g. 2:00 for 2:00/100m)
"""
import argparse
import socket
import sys
from protocol import constants as C
from protocol.decoder import (
build_command,
format_pace,
format_timer,
pace_to_speed_param,
parse_broadcast,
speed_param_to_pace,
)
def create_udp_listener() -> socket.socket:
"""Create a UDP socket that listens for broadcast packets on CLIENT_PORT."""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
sock.bind(("", C.CLIENT_PORT))
sock.settimeout(2.0)
return sock
def create_udp_sender() -> socket.socket:
"""Create a UDP socket for sending commands to the pool."""
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.setsockopt(socket.SOL_SOCKET, socket.SO_BROADCAST, 1)
return sock
def monitor(args):
"""Listen for pool broadcasts and display status in real-time."""
sock = create_udp_listener()
print(f"Listening for pool broadcasts on UDP port {C.CLIENT_PORT}...")
print(f"Pool expected at {C.POOL_IP}\n")
last_raw = None
count = 0
try:
while True:
try:
data, addr = sock.recvfrom(1024)
except socket.timeout:
continue
# Only process packets from pool IP (or any broadcast of right size)
if len(data) != C.BC_PACKET_SIZE:
continue
status = parse_broadcast(data)
if status is None:
continue
# Deduplicate (pool sends each packet twice)
if data == last_raw:
continue
last_raw = data
count += 1
# Clear screen and print status
print("\033[2J\033[H", end="") # ANSI clear screen
print(f"=== Endless Pool Monitor === (packet #{count} from {addr[0]})")
print(f" Device: {status.device_name}")
print(f" State: {'RUNNING' if status.is_running else 'STOPPED'}")
print(f" Speed: {status.current_speed} / {status.target_speed}"
f" (param: {status.speed_param})")
if status.current_speed > 1:
from protocol.decoder import speed_level_to_pace
pace = speed_level_to_pace(status.current_speed)
if pace:
print(f" Pace: ~{format_pace(pace)}/100m")
print(f" Timer: {format_timer(status.remaining_timer)}"
f" / {format_timer(status.set_timer)}")
print(f" Seg Distance: {status.segment_distance:.1f} m")
print(f" Tot Distance: {status.total_distance:.1f} m")
print(f" Timestamp: {status.timestamp}")
print("\n Press Ctrl+C to exit")
except KeyboardInterrupt:
print("\nStopped.")
finally:
sock.close()
def send_command(args):
"""Send a command to the pool."""
sock = create_udp_sender()
action = args.action
if action == "start":
cmd = build_command(C.CMD_START)
desc = "START"
elif action == "stop":
cmd = build_command(C.CMD_STOP)
desc = "STOP"
elif action == "speed":
value = int(args.value)
if not 0 <= value <= 255:
print("Error: Speed must be 0-255")
sys.exit(1)
cmd = build_command(C.CMD_SET_SPEED, value)
pace = speed_param_to_pace(value)
desc = f"SET SPEED {value} (~{format_pace(pace)}/100m)"
elif action == "timer":
value = int(args.value)
if value <= 0:
print("Error: Timer must be positive")
sys.exit(1)
cmd = build_command(C.CMD_SET_TIMER, value)
desc = f"SET TIMER {format_timer(value)} ({value}s)"
elif action == "pace":
pace_str = args.value
parts = pace_str.split(":")
if len(parts) != 2:
print("Error: Pace must be in M:SS format (e.g. 2:00)")
sys.exit(1)
pace_sec = int(parts[0]) * 60 + int(parts[1])
param = pace_to_speed_param(pace_sec)
cmd = build_command(C.CMD_SET_SPEED, param)
desc = f"SET PACE {pace_str}/100m (param={param})"
else:
print(f"Unknown action: {action}")
sys.exit(1)
target = (C.POOL_IP, C.POOL_PORT)
sock.sendto(cmd, target)
print(f"Sent {desc} to {target[0]}:{target[1]}")
print(f" Packet: {cmd.hex()}")
sock.close()
def main():
parser = argparse.ArgumentParser(
description="Endless Pool CLI - monitor and control",
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog=__doc__,
)
sub = parser.add_subparsers(dest="command")
sub.add_parser("monitor", help="Listen for broadcasts and display pool status")
send_parser = sub.add_parser("send", help="Send a command to the pool")
send_parser.add_argument(
"action",
choices=["start", "stop", "speed", "timer", "pace"],
help="Command to send",
)
send_parser.add_argument(
"value",
nargs="?",
default=None,
help="Value for speed (0-255), timer (seconds), or pace (M:SS)",
)
args = parser.parse_args()
if args.command == "monitor":
monitor(args)
elif args.command == "send":
if args.action in ("speed", "timer", "pace") and args.value is None:
print(f"Error: {args.action} requires a value")
sys.exit(1)
send_command(args)
else:
parser.print_help()
if __name__ == "__main__":
main()