|
16 | 16 | import json |
17 | 17 | import mimetypes |
18 | 18 | from pathlib import Path |
| 19 | +from types import SimpleNamespace |
19 | 20 | from typing import TYPE_CHECKING, Any, Dict, List, Optional, Union, cast |
20 | 21 | from urllib import parse |
21 | 22 |
|
@@ -257,6 +258,49 @@ def frame(self) -> "Frame": |
257 | 258 | return self._request.frame |
258 | 259 |
|
259 | 260 |
|
| 261 | +class WebSocket(ChannelOwner): |
| 262 | + |
| 263 | + Events = SimpleNamespace( |
| 264 | + Close="close", |
| 265 | + FrameReceived="framereceived", |
| 266 | + FrameSent="framesent", |
| 267 | + Error="socketerror", |
| 268 | + ) |
| 269 | + |
| 270 | + def __init__( |
| 271 | + self, parent: ChannelOwner, type: str, guid: str, initializer: Dict |
| 272 | + ) -> None: |
| 273 | + super().__init__(parent, type, guid, initializer) |
| 274 | + self._channel.on( |
| 275 | + "frameSent", |
| 276 | + lambda params: self._on_frame_sent(params["opcode"], params["data"]), |
| 277 | + ) |
| 278 | + self._channel.on( |
| 279 | + "frameReceived", |
| 280 | + lambda params: self._on_frame_received(params["opcode"], params["data"]), |
| 281 | + ) |
| 282 | + self._channel.on( |
| 283 | + "error", lambda params: self.emit(WebSocket.Events.Error, params["error"]) |
| 284 | + ) |
| 285 | + self._channel.on("close", lambda params: self.emit(WebSocket.Events.Close)) |
| 286 | + |
| 287 | + @property |
| 288 | + def url(self) -> str: |
| 289 | + return self._initializer["url"] |
| 290 | + |
| 291 | + def _on_frame_sent(self, opcode: int, data: str) -> None: |
| 292 | + if opcode == 2: |
| 293 | + self.emit(WebSocket.Events.FrameSent, base64.b64decode(data)) |
| 294 | + else: |
| 295 | + self.emit(WebSocket.Events.FrameSent, data) |
| 296 | + |
| 297 | + def _on_frame_received(self, opcode: int, data: str) -> None: |
| 298 | + if opcode == 2: |
| 299 | + self.emit(WebSocket.Events.FrameReceived, base64.b64decode(data)) |
| 300 | + else: |
| 301 | + self.emit(WebSocket.Events.FrameReceived, data) |
| 302 | + |
| 303 | + |
260 | 304 | def serialize_headers(headers: Dict[str, str]) -> List[Header]: |
261 | 305 | return [{"name": name, "value": value} for name, value in headers.items()] |
262 | 306 |
|
|
0 commit comments