-
Notifications
You must be signed in to change notification settings - Fork 75
feat: fix a01 and b01 response handling in new api #453
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 3 commits
5d78df1
7a2b8e2
3387858
c31bf19
65bb013
9577c8b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,19 +1,24 @@ | ||
| """Thin wrapper around the MQTT channel for Roborock A01 devices.""" | ||
|
|
||
| from __future__ import annotations | ||
|
|
||
| import asyncio | ||
| import logging | ||
| from typing import Any, overload | ||
|
|
||
| from roborock.exceptions import RoborockException | ||
| from roborock.protocols.a01_protocol import ( | ||
| decode_rpc_response, | ||
| encode_mqtt_payload, | ||
| ) | ||
| from roborock.roborock_message import RoborockDyadDataProtocol, RoborockZeoProtocol | ||
| from roborock.roborock_message import ( | ||
| RoborockDyadDataProtocol, | ||
| RoborockMessage, | ||
| RoborockZeoProtocol, | ||
| ) | ||
|
|
||
| from .mqtt_channel import MqttChannel | ||
|
|
||
| _LOGGER = logging.getLogger(__name__) | ||
| _TIMEOUT = 10.0 | ||
|
|
||
|
|
||
| @overload | ||
|
|
@@ -39,5 +44,46 @@ async def send_decoded_command( | |
| """Send a command on the MQTT channel and get a decoded response.""" | ||
| _LOGGER.debug("Sending MQTT command: %s", params) | ||
| roborock_message = encode_mqtt_payload(params) | ||
| response = await mqtt_channel.send_message(roborock_message) | ||
| return decode_rpc_response(response) # type: ignore[return-value] | ||
|
|
||
| # We only block on a response for queries | ||
| param_values = {int(k): v for k, v in params.items()} | ||
| if not ( | ||
| query_values := param_values.get(int(RoborockDyadDataProtocol.ID_QUERY)) | ||
| or param_values.get(int(RoborockZeoProtocol.ID_QUERY)) | ||
| ): | ||
| await mqtt_channel.publish(roborock_message) | ||
| return {} | ||
|
allenporter marked this conversation as resolved.
Outdated
|
||
|
|
||
| # Merge any results together than contain the requested data. This | ||
| # does not use a future since it needs to merge results across responses. | ||
| # This could be simplified if we can assume there is a single response. | ||
| finished = asyncio.Event() | ||
| result: dict[int, Any] = {} | ||
|
|
||
| def find_response(response_message: RoborockMessage) -> None: | ||
| """Handle incoming messages and resolve the future.""" | ||
| try: | ||
| decoded = decode_rpc_response(response_message) | ||
| except RoborockException: | ||
| return | ||
|
allenporter marked this conversation as resolved.
Outdated
|
||
| for key, value in decoded.items(): | ||
| if key in query_values: | ||
| result[key] = value | ||
| if len(result) != len(query_values): | ||
| return | ||
|
allenporter marked this conversation as resolved.
|
||
| _LOGGER.debug("Received query response: %s", result) | ||
| if not finished.is_set(): | ||
| finished.set() | ||
|
|
||
| unsub = await mqtt_channel.subscribe(find_response) | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Okay maybe i'm just getting confused by the asyncness and maybe that is leading to my confusion with the returns above as well. My first thought when I was reading this was that we are adding a subscription everytime that we send a message. Then when a message goes through, we are going through all of the subscriptions and if they aren't a match, we end up returning None on them. However, looking at it, it seems that subscribe holds exactly one callback, so if I had three different send_commands I wanted to send at once - wouldn't that override the subscription callback for each one? Or does the lock within roborock_session prevent that - if that's the case, doesn't that dramatically increase the amount of time that I will be waiting to get a response? Maybe it's okay, as if you are using the api correctly, all IDs that you are querying should be done with one function call - but there could be commands that we don't utilize in A01 that should have responses as well
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. You're right, the code I was using for mqtt session changed a bunch of times, but i reverted it for this PR. The intent was to use a version that allowed any number of callbacks. I think a version fo this was reverted so this no longer makes sense.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Updated this to remove the limitation of a single subscribe call for the mqtt channel. This matches the behavior of local channel. |
||
|
|
||
| try: | ||
| await mqtt_channel.publish(roborock_message) | ||
| try: | ||
| await asyncio.wait_for(finished.wait(), timeout=_TIMEOUT) | ||
| except TimeoutError as ex: | ||
| raise RoborockException(f"Command timed out after {_TIMEOUT}s") from ex | ||
| finally: | ||
| unsub() | ||
|
|
||
| return result # type: ignore[return-value] | ||
This file was deleted.
Uh oh!
There was an error while loading. Please reload this page.