33# SPDX-License-Identifier: MPL-2.0
44
55import json
6- import base64
76import os
87import threading
98import queue
1413import websockets
1514import asyncio
1615from collections .abc import Callable
17- from concurrent .futures import CancelledError
1816
1917from arduino .app_utils import Logger
2018
2119from .camera import BaseCamera
2220from .errors import CameraConfigError , CameraOpenError
21+ from .websocket_codec import BinaryCodec , JsonCodec
2322
2423logger = Logger ("WebSocketCamera" )
2524
@@ -38,43 +37,64 @@ class WebSocketCamera(BaseCamera):
3837 - BMP
3938 - TIFF
4039
41- The frames can be serialized in one of the following formats:
42- - Binary image data
43- - Base64 encoded images
44- - JSON messages with image data
40+ Secure communication with the WebSocket server is supported in three security modes:
41+ - Security disabled (empty secret)
42+ - Authenticated (secret + enable_encryption=False) - HMAC-SHA256
43+ - Authenticated + Encrypted (secret + enable_encryption=True) - ChaCha20-Poly1305
44+
45+ The frames can be serialized in one of the following formats, whose structure depends
46+ on the selected security mode:
47+ - Binary format:
48+ - Security disabled: [data]
49+ - Authenticated: [sig_len:1][signature][timestamp:8][data]
50+ - Authenticated + Encrypted: [encrypted_len:4][timestamp:8][encrypted_data]
51+ - JSON format:
52+ - Security disabled: {"data": "base64_data"}
53+ - Authenticated: {"data": "base64_data", "timestamp": ..., "signature": "..."}
54+ - Authenticated + Encrypted: {"data": "encrypted_base64_data", "timestamp": ...}
4555 """
4656
4757 def __init__ (
4858 self ,
4959 port : int = 8080 ,
5060 timeout : int = 3 ,
51- frame_format : Literal ["binary" , "base64" , "json" ] = "binary" ,
61+ frame_format : Literal ["binary" , "json" ] = "binary" ,
62+ secret : str = "" ,
63+ enable_encryption : bool = False ,
5264 resolution : tuple [int , int ] = (640 , 480 ),
5365 fps : int = 10 ,
5466 adjustments : Callable [[np .ndarray ], np .ndarray ] | None = None ,
5567 auto_reconnect : bool = True ,
5668 ):
5769 """
58- Initialize WebSocket camera server.
70+ Initialize WebSocket camera server with security options .
5971
6072 Args:
61- port (int): Port to bind the server to (default: 8080)
62- timeout (int): Connection timeout in seconds (default: 10)
63- frame_format (str): Expected frame format from clients ("binary", "base64", "json") (default: "binary")
64- resolution (tuple, optional): Resolution as (width, height). None uses default resolution.
65- fps (int): Frames per second to capture from the camera.
66- adjustments (callable, optional): Function or function pipeline to adjust frames that takes
67- a numpy array and returns a numpy array. Default: None
68- auto_reconnect (bool, optional): Enable automatic reconnection on failure. Default: True.
73+ port: Port to bind the server to
74+ timeout: Connection timeout in seconds
75+ frame_format: Expected frame format ("binary", "json")
76+ secret: Secret key for authentication/encryption (empty = security disabled)
77+ enable_encryption: Enable encryption (only effective if secret is provided)
78+ resolution: Resolution as (width, height)
79+ fps: Frames per second to capture
80+ adjustments: Function to adjust frames
81+ auto_reconnect: Enable automatic reconnection on failure
6982 """
7083 super ().__init__ (resolution , fps , adjustments , auto_reconnect )
7184
7285 self .protocol = "ws"
7386 self .port = port
7487 self .timeout = timeout
75- if frame_format not in ["binary" , "base64" , "json" ]:
88+ # Select appropriate codec
89+ if frame_format == "binary" :
90+ self .codec = BinaryCodec (secret , enable_encryption )
91+ elif frame_format == "json" :
92+ self .codec = JsonCodec (secret , enable_encryption )
93+ else :
7694 raise CameraConfigError (f"Invalid frame format: { frame_format } " )
7795 self .frame_format = frame_format
96+ self .secret = secret
97+ self .enable_encryption = enable_encryption
7898 self .logger = logger
7999
80100 host_ip = os .getenv ("HOST_IP" )
@@ -94,17 +114,26 @@ def url(self) -> str:
94114 """Return the WebSocket server address."""
95115 return f"{ self .protocol } ://{ self ._external_ip } :{ self .port } "
96116
117+ @property
118+ def security_mode (self ) -> str :
119+ """Return current security mode for logging/debugging."""
120+ if not self .secret :
121+ return "none"
122+ elif self .enable_encryption :
123+ return "encrypted (ChaCha20-Poly1305)"
124+ else :
125+ return "authenticated (HMAC-SHA256)"
126+
97127 def _open_camera (self ) -> None :
98128 """Start the WebSocket server."""
99129 self ._server_thread = threading .Thread (target = self ._start_server_thread , daemon = True )
100130 self ._server_thread .start ()
101131
102132 # Wait for server to start
103133 start_time = time .time ()
104- start_timeout = self .timeout
105- while time .time () - start_time < start_timeout :
134+ while time .time () - start_time < self .timeout :
106135 if self ._server is not None :
107- logger .info (f"WebSocket camera server started on { self .url } " )
136+ self . logger .info (f"WebSocket camera server started on { self .url } , security: { self . security_mode } " )
108137 return
109138 time .sleep (0.1 )
110139
@@ -140,6 +169,7 @@ async def _start_server(self) -> None:
140169 ping_timeout = self .timeout ,
141170 close_timeout = self .timeout ,
142171 ping_interval = 20 ,
172+ max_size = 5 * 1024 * 1024 , # Limit max message size for security
143173 ),
144174 timeout = self .timeout ,
145175 )
@@ -152,7 +182,7 @@ async def _start_server(self) -> None:
152182 await self ._stop_event .wait ()
153183
154184 except TimeoutError as e :
155- self .logger .error (f"Failed to start WebSocket server in a time ( { self .timeout } s) : { e } " )
185+ self .logger .error (f"Failed to start WebSocket server within { self .timeout } s: { e } " )
156186 raise
157187 except Exception as e :
158188 self .logger .error (f"Failed to start WebSocket server: { e } " )
@@ -172,46 +202,51 @@ async def _ws_handler(self, conn: websockets.ServerConnection) -> None:
172202 # Reject the new client
173203 self .logger .warning (f"Rejecting client { client_addr } : only one client allowed at a time" )
174204 try :
175- await conn .send (json .dumps ({"error" : "Server busy" , "message" : "Only one client connection allowed at a time" , "code" : 1000 }))
176- await conn .close (code = 1000 , reason = "Server busy - only one client allowed" )
205+ rejection = json .dumps ({"error" : "Server busy" , "message" : "Only one client connection allowed at a time" , "code" : 1000 })
206+ await self ._send_to_client (rejection , client = conn )
207+ await conn .close (code = 1000 , reason = "Server busy" )
177208 except Exception as e :
178- self .logger .warning (f"Error sending rejection message to { client_addr } : { e } " )
209+ self .logger .warning (f"Failed to send rejection message to { client_addr } : { e } " )
179210 return
180211
181212 # Accept the client
182213 self ._client = conn
183214
184215 self ._set_status ("connected" , {"client_address" : client_addr })
185-
186216 self .logger .debug (f"Client connected: { client_addr } " )
187217
188218 try :
189- # Send welcome message
190219 try :
191- await self ._send_to_client ({
220+ # Send welcome message
221+ welcome = json .dumps ({
192222 "status" : "connected" ,
193- "message" : "You are now connected to the camera server" ,
223+ "message" : "Connected to camera server" ,
194224 "frame_format" : self .frame_format ,
195225 "resolution" : self .resolution ,
196226 "fps" : self .fps ,
227+ "security_mode" : self .security_mode ,
197228 })
229+ await self ._send_to_client (welcome )
198230 except Exception as e :
199- self .logger .warning (f"Could not send welcome message to { client_addr } : { e } " )
231+ self .logger .warning (f"Failed to send welcome message: { e } " )
200232
233+ # Handle incoming messages
201234 async for message in conn :
202235 frame = self ._parse_message (message )
203- if frame is not None :
204- # Drop old frames until there's room for the new one
205- while True :
236+ if frame is None :
237+ continue
238+
239+ # Drop old frames until there's room for the new one
240+ while True :
241+ try :
242+ self ._frame_queue .put_nowait (frame )
243+ break
244+ except queue .Full :
206245 try :
207- self ._frame_queue .put_nowait (frame )
208- break
209- except queue .Full :
210- try :
211- # Drop oldest frame and try again
212- self ._frame_queue .get_nowait ()
213- except queue .Empty :
214- continue
246+ # Drop oldest frame and try again
247+ self ._frame_queue .get_nowait ()
248+ except queue .Empty :
249+ continue
215250
216251 except websockets .exceptions .ConnectionClosed :
217252 self .logger .debug (f"Client disconnected: { client_addr } " )
@@ -222,78 +257,29 @@ async def _ws_handler(self, conn: websockets.ServerConnection) -> None:
222257 if self ._client == conn :
223258 self ._client = None
224259 self ._set_status ("disconnected" , {"client_address" : client_addr })
225- self .logger .debug (f"Client disconnected: { client_addr } " )
226260
227- def _parse_message (self , message : str | bytes ) -> np .ndarray | None :
228- """Parse WebSocket message to extract frame."""
229- try :
230- if self .frame_format == "binary" :
231- # Expect raw binary image data
232- if isinstance (message , str ):
233- # Use latin-1 encoding to preserve binary data
234- image_data = message .encode ("latin-1" )
235- else :
236- image_data = message
237-
238- nparr = np .frombuffer (image_data , np .uint8 )
239- frame = cv2 .imdecode (nparr , cv2 .IMREAD_UNCHANGED )
240- return frame
241-
242- elif self .frame_format == "base64" :
243- # Expect base64 encoded image
244- if isinstance (message , str ):
245- image_data = base64 .b64decode (message )
246- else :
247- image_data = base64 .b64decode (message .decode ())
248-
249- # Decode image
250- nparr = np .frombuffer (image_data , np .uint8 )
251- frame = cv2 .imdecode (nparr , cv2 .IMREAD_UNCHANGED )
252- return frame
253-
254- elif self .frame_format == "json" :
255- # Expect JSON with image data
256- if isinstance (message , bytes ):
257- message = message .decode ()
258-
259- data = json .loads (message )
260-
261- if "image" in data :
262- image_data = base64 .b64decode (data ["image" ])
263- nparr = np .frombuffer (image_data , np .uint8 )
264- frame = cv2 .imdecode (nparr , cv2 .IMREAD_UNCHANGED )
265- return frame
266-
267- elif "frame" in data :
268- # Handle different frame data formats
269- frame_data = data ["frame" ]
270- if isinstance (frame_data , str ):
271- image_data = base64 .b64decode (frame_data )
272- nparr = np .frombuffer (image_data , np .uint8 )
273- frame = cv2 .imdecode (nparr , cv2 .IMREAD_UNCHANGED )
274- return frame
275-
276- else :
277- logger .error (f"Unknown video format: { self .frame_format } " )
278- return None
261+ def _parse_message (self , message : websockets .Data ) -> np .ndarray | None :
262+ if isinstance (message , str ):
263+ message = message .encode ()
279264
280- except Exception as e :
281- logger .warning (f"Error parsing message: { e } " )
265+ decoded = self .codec .decode (message )
266+ if decoded is None :
267+ self .logger .warning ("Failed to decode/authenticate message" )
282268 return None
283269
270+ nparr = np .frombuffer (decoded , np .uint8 )
271+ frame = cv2 .imdecode (nparr , cv2 .IMREAD_UNCHANGED )
272+ return frame
273+
284274 def _close_camera (self ):
285275 """Stop the WebSocket server."""
286276 # Only attempt cleanup if the event loop is running
287277 if self ._loop and not self ._loop .is_closed () and self ._loop .is_running ():
288278 try :
289279 future = asyncio .run_coroutine_threadsafe (self ._stop_and_disconnect_client (), self ._loop )
290280 future .result (1.0 )
291- except CancelledError :
292- self .logger .debug (f"Error stopping WebSocket server: CancelledError" )
293- except TimeoutError :
294- self .logger .debug (f"Error stopping WebSocket server: TimeoutError" )
295281 except Exception as e :
296- self .logger .warning (f"Error stopping WebSocket server: { e } " )
282+ self .logger .warning (f"Failed to stop WebSocket server cleanly : { e } " )
297283
298284 # Wait for server thread to finish
299285 if self ._server_thread and self ._server_thread .is_alive ():
@@ -312,18 +298,14 @@ def _close_camera(self):
312298 self ._client = None
313299
314300 async def _stop_and_disconnect_client (self ):
315- """Set the async stop event and close the client connection."""
316- # Send goodbye message and close the client connection
301+ """Cleanly disconnect client with goodbye message."""
317302 if self ._client :
318303 try :
319304 self .logger .debug ("Disconnecting client..." )
320- # Send goodbye message before closing
321- await self ._send_to_client ({
322- "status" : "disconnecting" ,
323- "message" : "Server is shutting down. Connection will be closed." ,
324- })
305+ goodbye = json .dumps ({"status" : "disconnecting" , "message" : "Server is shutting down" })
306+ await self ._send_to_client (goodbye )
325307 except Exception as e :
326- self .logger .warning (f"Failed to send 'disconnecting' event to closing client : { e } " )
308+ self .logger .warning (f"Failed to send goodbye message : { e } " )
327309 finally :
328310 if self ._client :
329311 await self ._client .close ()
@@ -338,18 +320,23 @@ def _read_frame(self) -> np.ndarray | None:
338320 except queue .Empty :
339321 return None
340322
341- async def _send_to_client (self , message : str | bytes | dict ) -> None :
342- """Send a message to the connected client."""
343- if isinstance (message , dict ):
344- message = json . dumps ( message )
323+ async def _send_to_client (self , data : bytes | str , client : websockets . ServerConnection | None = None ) :
324+ """Send secure message to connected client."""
325+ if isinstance (data , str ):
326+ data = data . encode ( )
345327
346- async with self ._client_lock :
347- if self ._client is None :
348- raise ConnectionError ("No client connected to send message to" )
328+ encoded = self .codec .encode (data )
349329
350- try :
351- await self ._client .send (message )
352- except websockets .ConnectionClosedOK :
353- self .logger .warning ("Client has already closed the connection" )
354- except Exception :
355- raise
330+ # Keep a ref to current client to avoid locking
331+ client = client or self ._client
332+ if client is None :
333+ raise ConnectionError ("No client connected" )
334+
335+ try :
336+ await client .send (encoded )
337+ except websockets .ConnectionClosedOK :
338+ self .logger .warning ("Client has already closed the connection" )
339+ except websockets .ConnectionClosedError as e :
340+ self .logger .warning (f"Client has already closed the connection with error: { e } " )
341+ except Exception :
342+ raise
0 commit comments