@@ -80,58 +80,6 @@ class RpcStrategy:
8080 """Optional health manager for monitoring the channel."""
8181
8282
83- async def _send_rpc (strategy : RpcStrategy , request : RequestMessage ) -> ResponseData | bytes :
84- """Send a command and return a parsed response RoborockBase type.
85-
86- This provides an RPC interface over a given channel strategy. The device
87- channel only supports publish and subscribe, so this function handles
88- associating requests with their corresponding responses.
89-
90- The provided RpcStrategy defines how to encode/decode messages and which
91- channel to use for communication.
92- """
93- future : asyncio .Future [ResponseData | bytes ] = asyncio .Future ()
94- _LOGGER .debug (
95- "Sending command (%s, request_id=%s): %s, params=%s" ,
96- strategy .name ,
97- request .request_id ,
98- request .method ,
99- request .params ,
100- )
101-
102- message = strategy .encoder (request )
103-
104- def find_response (response_message : RoborockMessage ) -> None :
105- try :
106- decoded = strategy .decoder (response_message )
107- except RoborockException as ex :
108- _LOGGER .debug ("Exception while decoding message (%s): %s" , response_message , ex )
109- return
110- if decoded is None :
111- return
112- _LOGGER .debug ("Received response (%s, request_id=%s)" , strategy .name , decoded .request_id )
113- if decoded .request_id == request .request_id :
114- if isinstance (decoded , ResponseMessage ) and decoded .api_error :
115- future .set_exception (decoded .api_error )
116- else :
117- future .set_result (decoded .data )
118-
119- unsub = await strategy .channel .subscribe (find_response )
120- try :
121- await strategy .channel .publish (message )
122- result = await asyncio .wait_for (future , timeout = _TIMEOUT )
123- except TimeoutError as ex :
124- if strategy .health_manager :
125- await strategy .health_manager .on_timeout ()
126- future .cancel ()
127- raise RoborockException (f"Command timed out after { _TIMEOUT } s" ) from ex
128- finally :
129- unsub ()
130- if strategy .health_manager :
131- await strategy .health_manager .on_success ()
132- return result
133-
134-
13583class RpcChannel (V1RpcChannel ):
13684 """Wrapper to expose V1RpcChannel interface with a specific set of RpcStrategies.
13785
@@ -157,7 +105,7 @@ async def send_command(
157105 last_exception = None
158106 for strategy in self ._rpc_strategies :
159107 try :
160- decoded_response = await _send_rpc (strategy , request )
108+ decoded_response = await self . _send_rpc (strategy , request )
161109 except RoborockException as e :
162110 _LOGGER .warning ("Command %s failed on %s channel: %s" , method , strategy .name , e )
163111 last_exception = e
@@ -175,6 +123,58 @@ async def send_command(
175123
176124 raise last_exception or RoborockException ("No available connection to send command" )
177125
126+ @staticmethod
127+ async def _send_rpc (strategy : RpcStrategy , request : RequestMessage ) -> ResponseData | bytes :
128+ """Send a command and return a parsed response RoborockBase type.
129+
130+ This provides an RPC interface over a given channel strategy. The device
131+ channel only supports publish and subscribe, so this function handles
132+ associating requests with their corresponding responses.
133+
134+ The provided RpcStrategy defines how to encode/decode messages and which
135+ channel to use for communication.
136+ """
137+ future : asyncio .Future [ResponseData | bytes ] = asyncio .Future ()
138+ _LOGGER .debug (
139+ "Sending command (%s, request_id=%s): %s, params=%s" ,
140+ strategy .name ,
141+ request .request_id ,
142+ request .method ,
143+ request .params ,
144+ )
145+
146+ message = strategy .encoder (request )
147+
148+ def find_response (response_message : RoborockMessage ) -> None :
149+ try :
150+ decoded = strategy .decoder (response_message )
151+ except RoborockException as ex :
152+ _LOGGER .debug ("Exception while decoding message (%s): %s" , response_message , ex )
153+ return
154+ if decoded is None :
155+ return
156+ _LOGGER .debug ("Received response (%s, request_id=%s)" , strategy .name , decoded .request_id )
157+ if decoded .request_id == request .request_id :
158+ if isinstance (decoded , ResponseMessage ) and decoded .api_error :
159+ future .set_exception (decoded .api_error )
160+ else :
161+ future .set_result (decoded .data )
162+
163+ unsub = await strategy .channel .subscribe (find_response )
164+ try :
165+ await strategy .channel .publish (message )
166+ result = await asyncio .wait_for (future , timeout = _TIMEOUT )
167+ except TimeoutError as ex :
168+ if strategy .health_manager :
169+ await strategy .health_manager .on_timeout ()
170+ future .cancel ()
171+ raise RoborockException (f"Command timed out after { _TIMEOUT } s" ) from ex
172+ finally :
173+ unsub ()
174+ if strategy .health_manager :
175+ await strategy .health_manager .on_success ()
176+ return result
177+
178178
179179class V1Channel (Channel ):
180180 """Unified V1 protocol channel with automatic MQTT/local connection handling.
@@ -248,7 +248,7 @@ def map_rpc_channel(self) -> V1RpcChannel:
248248 decoder = create_map_response_decoder (security_data = self ._security_data )
249249 return RpcChannel ([self ._create_mqtt_rpc_strategy (decoder )])
250250
251- def _create_local_rpc_strategy (self ) -> RpcStrategy :
251+ def _create_local_rpc_strategy (self ) -> RpcStrategy | None :
252252 """Create the RPC strategy for local transport."""
253253 if self ._local_channel is None or not self .is_local_connected :
254254 return None
@@ -261,7 +261,7 @@ def _create_local_rpc_strategy(self) -> RpcStrategy:
261261
262262 def _local_encoder (self , x : RequestMessage ) -> RoborockMessage :
263263 """Encode a request message for local transport.
264-
264+
265265 This is passed to the RpcStrategy as a function so that it will
266266 read the current local channel's protocol version which changes as
267267 the protocol version is discovered.
0 commit comments