-
Notifications
You must be signed in to change notification settings - Fork 74
feat: add some basic B01 support #429
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
Merged
Changes from 1 commit
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,29 @@ | ||
| """Thin wrapper around the MQTT channel for Roborock B01 devices.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import logging | ||
|
|
||
| from roborock.protocols.b01_protocol import ( | ||
| CommandType, | ||
| ParamsType, | ||
| decode_rpc_response, | ||
| encode_mqtt_payload, | ||
| ) | ||
|
|
||
| from .mqtt_channel import MqttChannel | ||
|
|
||
| _LOGGER = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| async def send_decoded_command( | ||
| mqtt_channel: MqttChannel, | ||
| dps: int, | ||
| command: CommandType, | ||
| params: ParamsType, | ||
| ) -> dict: | ||
| """Send a command on the MQTT channel and get a decoded response.""" | ||
| _LOGGER.debug("Sending MQTT command: %s", params) | ||
| roborock_message = encode_mqtt_payload(dps, command, params) | ||
| response = await mqtt_channel.send_message(roborock_message) | ||
| return decode_rpc_response(response) # type: ignore[return-value] | ||
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Empty file.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,31 @@ | ||
| from __future__ import annotations | ||
|
|
||
| import logging | ||
| from typing import Any | ||
|
|
||
| from roborock import RoborockB01Methods | ||
| from roborock.roborock_message import RoborockB01Props, RoborockDyadDataProtocol | ||
|
|
||
| from ...b01_channel import send_decoded_command | ||
| from ...mqtt_channel import MqttChannel | ||
| from ..trait import Trait | ||
|
|
||
| _LOGGER = logging.getLogger(__name__) | ||
|
|
||
| __all__ = [ | ||
| "B01PropsApi", | ||
| ] | ||
|
|
||
|
|
||
| class B01PropsApi(Trait): | ||
| """API for interacting with B01 devices.""" | ||
|
|
||
| name = "B01_props" | ||
|
|
||
| def __init__(self, channel: MqttChannel) -> None: | ||
| """Initialize the B01Props API.""" | ||
| self._channel = channel | ||
|
|
||
| async def query_values(self, props: list[RoborockB01Props]) -> dict[RoborockDyadDataProtocol, Any]: | ||
| """Query the device for the values of the given Dyad protocols.""" | ||
| return await send_decoded_command(self._channel, dps=10000, command=RoborockB01Methods.GET_PROP, params=props) |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,55 @@ | ||
| """Roborock B01 Protocol encoding and decoding.""" | ||
|
|
||
| import json | ||
| import logging | ||
| from typing import Any | ||
|
|
||
| from Crypto.Cipher import AES | ||
| from Crypto.Util.Padding import pad, unpad | ||
|
|
||
| from roborock import RoborockB01Methods | ||
| from roborock.exceptions import RoborockException | ||
| from roborock.roborock_message import ( | ||
| RoborockMessage, | ||
| RoborockMessageProtocol, | ||
| ) | ||
|
|
||
| _LOGGER = logging.getLogger(__name__) | ||
|
|
||
| B01_VERSION = b"B01" | ||
| CommandType = RoborockB01Methods | str | ||
| ParamsType = list | dict | int | None | ||
|
|
||
|
|
||
| def encode_mqtt_payload(dps: int, command: CommandType, params: ParamsType) -> RoborockMessage: | ||
| """Encode payload for B01 commands over MQTT.""" | ||
| dps_data = {"dps": {dps: {"method": command, "params": params or []}}} | ||
| payload = pad(json.dumps(dps_data).encode("utf-8"), AES.block_size) | ||
| return RoborockMessage( | ||
| protocol=RoborockMessageProtocol.RPC_REQUEST, | ||
| version=B01_VERSION, | ||
| payload=payload, | ||
| ) | ||
|
|
||
|
|
||
| def decode_rpc_response(message: RoborockMessage) -> dict[int, Any]: | ||
| """Decode a B01 RPC_RESPONSE message.""" | ||
| if not message.payload: | ||
| raise RoborockException("Invalid B01 message format: missing payload") | ||
| try: | ||
| unpadded = unpad(message.payload, AES.block_size) | ||
| except ValueError as err: | ||
| raise RoborockException(f"Unable to unpad B01 payload: {err}") | ||
|
|
||
| try: | ||
| payload = json.loads(unpadded.decode()) | ||
| except (json.JSONDecodeError, TypeError) as e: | ||
| raise RoborockException(f"Invalid B01 message payload: {e} for {message.payload!r}") from e | ||
|
|
||
| datapoints = payload.get("dps", {}) | ||
| if not isinstance(datapoints, dict): | ||
| raise RoborockException(f"Invalid B01 message format: 'dps' should be a dictionary for {message.payload!r}") | ||
| try: | ||
| return {int(key): value for key, value in datapoints.items()} | ||
| except ValueError: | ||
| raise RoborockException(f"Invalid B01 message format: 'dps' key should be an integer for {message.payload!r}") |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.