|
2 | 2 |
|
3 | 3 | from __future__ import annotations |
4 | 4 |
|
5 | | -import asyncio |
6 | 5 | import logging |
7 | | -from collections.abc import Iterable |
| 6 | +from collections.abc import AsyncGenerator |
8 | 7 | from typing import Any |
9 | 8 |
|
10 | 9 | from roborock.data.b01_q10.b01_q10_code_mappings import B01_Q10_DP |
11 | 10 | from roborock.devices.transport.mqtt_channel import MqttChannel |
12 | 11 | from roborock.exceptions import RoborockException |
13 | | -from roborock.protocols.b01_q10_protocol import ( |
14 | | - ParamsType, |
15 | | - decode_rpc_response, |
16 | | - encode_mqtt_payload, |
17 | | -) |
18 | | -from roborock.roborock_message import RoborockMessage |
| 12 | +from roborock.protocols.b01_q10_protocol import ParamsType, encode_mqtt_payload |
| 13 | +from roborock.protocols.b01_q10_protocol import decode_rpc_response |
19 | 14 |
|
20 | 15 | _LOGGER = logging.getLogger(__name__) |
21 | | -_TIMEOUT = 10.0 |
22 | 16 |
|
23 | 17 |
|
24 | | -async def send_command( |
| 18 | +async def stream_decoded_responses( |
25 | 19 | mqtt_channel: MqttChannel, |
26 | | - command: B01_Q10_DP, |
27 | | - params: ParamsType, |
28 | | -) -> None: |
29 | | - """Send a command on the MQTT channel, without waiting for a response""" |
30 | | - _LOGGER.debug("Sending B01 MQTT command: cmd=%s params=%s", command, params) |
31 | | - roborock_message = encode_mqtt_payload(command, params) |
32 | | - _LOGGER.debug("Sending MQTT message: %s", roborock_message) |
33 | | - try: |
34 | | - await mqtt_channel.publish(roborock_message) |
35 | | - except RoborockException as ex: |
36 | | - _LOGGER.debug( |
37 | | - "Error sending B01 decoded command (method=%s params=%s): %s", |
38 | | - command, |
39 | | - params, |
40 | | - ex, |
41 | | - ) |
42 | | - raise |
43 | | - |
| 20 | +) -> AsyncGenerator[dict[B01_Q10_DP, Any], None]: |
| 21 | + """Stream decoded DPS messages received via MQTT.""" |
44 | 22 |
|
45 | | -async def send_decoded_command( |
46 | | - mqtt_channel: MqttChannel, |
47 | | - command: B01_Q10_DP, |
48 | | - params: ParamsType, |
49 | | - expected_dps: Iterable[B01_Q10_DP] | None = None, |
50 | | -) -> dict[B01_Q10_DP, Any]: |
51 | | - """Send a command and await the first decoded response. |
52 | | -
|
53 | | - Q10 responses are not correlated with a message id, so we filter on |
54 | | - expected datapoints when provided. |
55 | | - """ |
56 | | - roborock_message = encode_mqtt_payload(command, params) |
57 | | - future: asyncio.Future[dict[B01_Q10_DP, Any]] = asyncio.get_running_loop().create_future() |
58 | | - |
59 | | - expected_set = set(expected_dps) if expected_dps is not None else None |
60 | | - |
61 | | - def find_response(response_message: RoborockMessage) -> None: |
| 23 | + async for response_message in mqtt_channel.subscribe_stream(): |
62 | 24 | try: |
63 | 25 | decoded_dps = decode_rpc_response(response_message) |
64 | 26 | except RoborockException as ex: |
65 | 27 | _LOGGER.debug( |
66 | | - "Failed to decode B01 Q10 RPC response (expecting %s): %s: %s", |
67 | | - command, |
| 28 | + "Failed to decode B01 RPC response: %s: %s", |
68 | 29 | response_message, |
69 | 30 | ex, |
70 | 31 | ) |
71 | | - return |
72 | | - if expected_set and not any(dps in decoded_dps for dps in expected_set): |
73 | | - return |
74 | | - if not future.done(): |
75 | | - future.set_result(decoded_dps) |
| 32 | + continue |
| 33 | + yield decoded_dps |
76 | 34 |
|
77 | | - unsub = await mqtt_channel.subscribe(find_response) |
78 | 35 |
|
| 36 | +async def send_command( |
| 37 | + mqtt_channel: MqttChannel, |
| 38 | + command: B01_Q10_DP, |
| 39 | + params: ParamsType, |
| 40 | +) -> None: |
| 41 | + """Send a command on the MQTT channel, without waiting for a response.""" |
| 42 | + _LOGGER.debug("Sending B01 MQTT command: cmd=%s params=%s", command, params) |
| 43 | + roborock_message = encode_mqtt_payload(command, params) |
79 | 44 | _LOGGER.debug("Sending MQTT message: %s", roborock_message) |
80 | 45 | try: |
81 | 46 | await mqtt_channel.publish(roborock_message) |
82 | | - return await asyncio.wait_for(future, timeout=_TIMEOUT) |
83 | | - except TimeoutError as ex: |
84 | | - raise RoborockException(f"B01 Q10 command timed out after {_TIMEOUT}s ({command})") from ex |
85 | 47 | except RoborockException as ex: |
86 | | - _LOGGER.warning( |
87 | | - "Error sending B01 Q10 decoded command (%s): %s", |
88 | | - command, |
89 | | - ex, |
90 | | - ) |
91 | | - raise |
92 | | - except Exception as ex: |
93 | | - _LOGGER.exception( |
94 | | - "Error sending B01 Q10 decoded command (%s): %s", |
| 48 | + _LOGGER.debug( |
| 49 | + "Error sending B01 decoded command (method=%s params=%s): %s", |
95 | 50 | command, |
| 51 | + params, |
96 | 52 | ex, |
97 | 53 | ) |
98 | 54 | raise |
99 | | - finally: |
100 | | - unsub() |
0 commit comments