forked from Zive-IT-2025/Solution
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathopenlab_light_controller.py
More file actions
263 lines (210 loc) · 8.62 KB
/
openlab_light_controller.py
File metadata and controls
263 lines (210 loc) · 8.62 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
"""
OpenLab Light Controller - Real MQTT-based light control
Connects to OpenLab Bridge and controls real lights via MQTT
"""
import json
import logging
import time
import paho.mqtt.client as mqtt
from typing import Optional, Dict, Any
logger = logging.getLogger(__name__)
class OpenLabLightController:
"""Controller for OpenLab lights via MQTT"""
def __init__(self, config: dict):
"""
Initialize OpenLab light controller
Args:
config: Configuration dictionary with MQTT settings
"""
self.config = config
self.mqtt_config = config.get('mqtt', {})
# MQTT settings
self.broker = self.mqtt_config.get('broker', 'localhost')
self.port = self.mqtt_config.get('port', 1883)
self.topic = self.mqtt_config.get('topic', 'openlab/lights')
self.username = self.mqtt_config.get('username')
self.password = self.mqtt_config.get('password')
# Light settings
self.min_brightness = config.get('min_brightness', 0)
self.max_brightness = config.get('max_brightness', 100)
self.fade_duration = config.get('fade_duration', 1000) # milliseconds
self.epilepsy_safe_duration = 250 # minimum duration for safety
# State
self.current_brightness = 0
self.is_on = False
self.last_command_time = 0
self.command_cooldown = 0.1 # seconds between commands (reduced for faster response)
# Initialize MQTT client
self.client = mqtt.Client()
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
self.client.on_publish = self._on_publish
# Set credentials if provided
if self.username and self.password:
self.client.username_pw_set(self.username, self.password)
# Connect to broker
self._connect()
logger.info(f"OpenLab Light Controller initialized (broker: {self.broker}:{self.port})")
def _connect(self):
"""Connect to MQTT broker"""
try:
self.client.connect(self.broker, self.port, keepalive=60)
self.client.loop_start()
logger.info(f"Connecting to MQTT broker at {self.broker}:{self.port}")
except Exception as e:
logger.error(f"Failed to connect to MQTT broker: {e}")
raise
def _on_connect(self, client, userdata, flags, rc):
"""Callback when connected to MQTT broker"""
if rc == 0:
logger.info("✅ Connected to MQTT broker successfully")
else:
logger.error(f"❌ Failed to connect to MQTT broker (code: {rc})")
def _on_disconnect(self, client, userdata, rc):
"""Callback when disconnected from MQTT broker"""
if rc != 0:
logger.warning(f"Unexpected disconnection from MQTT broker (code: {rc})")
def _on_publish(self, client, userdata, mid):
"""Callback when message is published"""
logger.debug(f"Message published (mid: {mid})")
def _brightness_to_rgbw(self, brightness: int) -> str:
"""
Convert brightness percentage to RGBW hex value
Args:
brightness: Brightness level (0-100)
Returns:
RGBW hex string (e.g., "000000ff" for full white)
"""
# Clamp brightness
brightness = max(0, min(100, brightness))
# Convert to 0-255 range for white channel
white_value = int((brightness / 100) * 255)
# Format as RGBW (RGB off, only white channel)
rgbw = f"000000{white_value:02x}"
return rgbw
def _send_mqtt_command(self, rgbw_value: str, duration: Optional[int] = None):
"""
Send MQTT command to control lights
Args:
rgbw_value: RGBW hex value (e.g., "000000ff")
duration: Fade duration in milliseconds
"""
# Respect command cooldown
current_time = time.time()
time_since_last = current_time - self.last_command_time
if time_since_last < self.command_cooldown:
time.sleep(self.command_cooldown - time_since_last)
# Use default duration if not specified
if duration is None:
duration = self.fade_duration
# Ensure epilepsy-safe duration
duration = max(duration, self.epilepsy_safe_duration)
# Create message
message = {
"all": rgbw_value,
"duration": duration
}
# Publish to MQTT
try:
result = self.client.publish(
self.topic,
json.dumps(message),
qos=1
)
if result.rc == mqtt.MQTT_ERR_SUCCESS:
logger.info(f"💡 Sent light command: {rgbw_value} (duration: {duration}ms)")
self.last_command_time = time.time()
else:
logger.error(f"Failed to publish MQTT message (code: {result.rc})")
except Exception as e:
logger.error(f"Error sending MQTT command: {e}")
def turn_on(self, brightness: Optional[int] = None):
"""
Turn lights on
Args:
brightness: Optional brightness level (0-100)
"""
if brightness is None:
brightness = self.max_brightness
# Clamp brightness
brightness = max(self.min_brightness, min(self.max_brightness, brightness))
# Convert to RGBW
rgbw = self._brightness_to_rgbw(brightness)
# Send command
self._send_mqtt_command(rgbw)
# Update state
self.is_on = True
self.current_brightness = brightness
logger.info(f"🔆 Lights turned ON (brightness: {brightness}%)")
def turn_off(self):
"""Turn lights off"""
# Send command to turn off (all zeros)
self._send_mqtt_command("00000000")
# Update state
self.is_on = False
self.current_brightness = 0
logger.info("🔅 Lights turned OFF")
def set_brightness(self, brightness: int):
"""
Set light brightness
Args:
brightness: Brightness level (0-100)
"""
if brightness == 0:
self.turn_off()
else:
self.turn_on(brightness)
def adjust_brightness(self, person_count: int, max_persons: int = 10):
"""
Automatically adjust brightness based on person count
Args:
person_count: Number of detected persons
max_persons: Maximum expected persons (for scaling)
"""
if person_count == 0:
self.turn_off()
else:
# Calculate brightness (linear scaling)
brightness_range = self.max_brightness - self.min_brightness
brightness = self.min_brightness + (
(person_count / max_persons) * brightness_range
)
# Clamp to max
brightness = min(brightness, self.max_brightness)
self.turn_on(int(brightness))
def update_from_detections(self, detections: list, frame_size: tuple):
"""
Update light brightness based on current detections
Args:
detections: List of Detection objects
frame_size: Tuple (width, height) of frame
"""
if detections:
# Count persons
person_count = len([d for d in detections if d.class_name == 'person'])
if person_count > 0:
# Adjust brightness based on person count
self.adjust_brightness(person_count)
else:
# No detections - turn off
self.turn_off()
def get_status(self) -> Dict[str, Any]:
"""Get current light status"""
return {
"state": "on" if self.is_on else "off",
"current_brightness": self.current_brightness,
"mqtt_connected": self.client.is_connected(),
"broker": f"{self.broker}:{self.port}",
"topic": self.topic
}
def get_current_brightness(self) -> int:
"""Get current brightness level"""
return self.current_brightness
def disconnect(self):
"""Disconnect from MQTT broker"""
try:
self.client.loop_stop()
self.client.disconnect()
logger.info("Disconnected from MQTT broker")
except Exception as e:
logger.error(f"Error disconnecting: {e}")