-
Notifications
You must be signed in to change notification settings - Fork 78
Expand file tree
/
Copy pathb01_q10_protocol.py
More file actions
87 lines (71 loc) · 3.33 KB
/
b01_q10_protocol.py
File metadata and controls
87 lines (71 loc) · 3.33 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
"""Roborock B01 Protocol encoding and decoding."""
import json
import logging
from typing import Any
from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP
from roborock.exceptions import RoborockException
from roborock.roborock_message import (
RoborockMessage,
RoborockMessageProtocol,
)
_LOGGER = logging.getLogger(__name__)
B01_VERSION = b"B01"
ParamsType = list | dict | int | None
def encode_mqtt_payload(command: B01_Q10_DP, params: ParamsType) -> RoborockMessage:
"""Encode payload for B01 Q10 commands over MQTT.
This does not perform any special encoding for the command parameters and expects
them to already be in a request specific format.
"""
dps_data = {
"dps": {
# Important: some commands use falsy values so only default to `{}` when params is actually None.
command.code: params if params is not None else {},
}
}
return RoborockMessage(
protocol=RoborockMessageProtocol.RPC_REQUEST,
version=B01_VERSION,
payload=json.dumps(dps_data).encode("utf-8"),
)
def _convert_datapoints(datapoints: dict[str, Any], message: RoborockMessage) -> dict[B01_Q10_DP, Any]:
"""Convert the 'dps' dictionary keys from strings to B01_Q10_DP enums."""
result: dict[B01_Q10_DP, Any] = {}
for key, value in datapoints.items():
try:
code = int(key)
except ValueError as e:
raise ValueError(f"dps key is not a valid integer: {e} for {message.payload!r}") from e
if (dps := B01_Q10_DP.from_code_optional(code)) is not None:
result[dps] = value
return result
def decode_rpc_response(message: RoborockMessage) -> dict[B01_Q10_DP, Any]:
"""Decode a B01 Q10 RPC_RESPONSE message.
This does not perform any special decoding for the response body, but does
convert the 'dps' keys from strings to B01_Q10_DP enums.
"""
if not message.payload:
raise RoborockException("Invalid B01 message format: missing payload")
try:
payload = json.loads(message.payload.decode())
except (json.JSONDecodeError, UnicodeDecodeError) as e:
raise RoborockException(f"Invalid B01 json payload: {e} for {message.payload!r}") from e
if (datapoints := payload.get("dps")) is None:
raise RoborockException(f"Invalid B01 json payload: missing 'dps' for {message.payload!r}")
if not isinstance(datapoints, dict):
raise RoborockException(f"Invalid B01 message format: 'dps' should be a dictionary for {message.payload!r}")
try:
result = _convert_datapoints(datapoints, message)
except ValueError as e:
raise RoborockException(f"Invalid B01 message format: {e}") from e
# The COMMON response contains nested datapoints need conversion. To simplify
# response handling at higher levels we flatten these into the main result.
if B01_Q10_DP.COMMON in result:
common_result = result.pop(B01_Q10_DP.COMMON)
if not isinstance(common_result, dict):
raise RoborockException(f"Invalid dpCommon format: expected dict, got {type(common_result).__name__}")
try:
common_dps_result = _convert_datapoints(common_result, message)
except ValueError as e:
raise RoborockException(f"Invalid dpCommon format: {e}") from e
result.update(common_dps_result)
return result