-
Notifications
You must be signed in to change notification settings - Fork 74
Expand file tree
/
Copy pathbroadcast_protocol.py
More file actions
71 lines (56 loc) · 1.96 KB
/
broadcast_protocol.py
File metadata and controls
71 lines (56 loc) · 1.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
from __future__ import annotations
import asyncio
import json
import logging
from asyncio import BaseTransport, Lock
from construct import ( # type: ignore
Bytes,
Checksum,
Int16ub,
Int32ub,
RawCopy,
Struct,
)
from roborock.containers import BroadcastMessage
from roborock.protocol import EncryptionAdapter, Utils, _Parser
_LOGGER = logging.getLogger(__name__)
BROADCAST_TOKEN = b"qWKYcdQWrbm9hPqe"
class RoborockProtocol(asyncio.DatagramProtocol):
def __init__(self, timeout: int = 5):
self.timeout = timeout
self.transport: BaseTransport | None = None
self.devices_found: list[BroadcastMessage] = []
self._mutex = Lock()
def __del__(self):
self.close()
def datagram_received(self, data, _):
[broadcast_message], _ = BroadcastParser.parse(data)
if broadcast_message.payload:
parsed_message = BroadcastMessage.from_dict(json.loads(broadcast_message.payload))
_LOGGER.debug(f"Received broadcast: {parsed_message}")
self.devices_found.append(parsed_message)
async def discover(self):
async with self._mutex:
try:
loop = asyncio.get_event_loop()
self.transport, _ = await loop.create_datagram_endpoint(lambda: self, local_addr=("0.0.0.0", 58866))
await asyncio.sleep(self.timeout)
return self.devices_found
finally:
self.close()
self.devices_found = []
def close(self):
self.transport.close() if self.transport else None
_BroadcastMessage = Struct(
"message"
/ RawCopy(
Struct(
"version" / Bytes(3),
"seq" / Int32ub,
"protocol" / Int16ub,
"payload" / EncryptionAdapter(lambda ctx: BROADCAST_TOKEN),
)
),
"checksum" / Checksum(Int32ub, Utils.crc, lambda ctx: ctx.message.data),
)
BroadcastParser: _Parser = _Parser(_BroadcastMessage, False)