|
17 | 17 | from roborock.devices.traits.b01.q7 import Q7PropertiesApi |
18 | 18 | from roborock.exceptions import RoborockException |
19 | 19 | from roborock.protocols.b01_q7_protocol import B01_VERSION, Q7RequestMessage |
20 | | -from roborock.roborock_message import RoborockB01Props, RoborockMessageProtocol |
| 20 | +from roborock.roborock_message import RoborockB01Props, RoborockMessage, RoborockMessageProtocol |
21 | 21 | from tests.fixtures.channel_fixtures import FakeChannel |
22 | 22 |
|
23 | 23 | from . import B01MessageBuilder |
@@ -257,3 +257,88 @@ async def test_q7_api_find_me(q7_api: Q7PropertiesApi, fake_channel: FakeChannel |
257 | 257 | payload_data = json.loads(unpad(message.payload, AES.block_size)) |
258 | 258 | assert payload_data["dps"]["10000"]["method"] == "service.find_device" |
259 | 259 | assert payload_data["dps"]["10000"]["params"] == {} |
| 260 | + |
| 261 | + |
| 262 | +async def test_q7_api_clean_segments( |
| 263 | + q7_api: Q7PropertiesApi, fake_channel: FakeChannel, message_builder: B01MessageBuilder |
| 264 | +): |
| 265 | + """Test room/segment cleaning helper for Q7.""" |
| 266 | + fake_channel.response_queue.append(message_builder.build({"result": "ok"})) |
| 267 | + await q7_api.clean_segments([10, 11]) |
| 268 | + |
| 269 | + assert len(fake_channel.published_messages) == 1 |
| 270 | + message = fake_channel.published_messages[0] |
| 271 | + payload_data = json.loads(unpad(message.payload, AES.block_size)) |
| 272 | + assert payload_data["dps"]["10000"]["method"] == "service.set_room_clean" |
| 273 | + assert payload_data["dps"]["10000"]["params"] == { |
| 274 | + "clean_type": CleanTaskTypeMapping.ROOM.code, |
| 275 | + "ctrl_value": SCDeviceCleanParam.START.code, |
| 276 | + "room_ids": [10, 11], |
| 277 | + } |
| 278 | + |
| 279 | + |
| 280 | +async def test_q7_api_get_current_map_payload( |
| 281 | + q7_api: Q7PropertiesApi, |
| 282 | + fake_channel: FakeChannel, |
| 283 | + message_builder: B01MessageBuilder, |
| 284 | +): |
| 285 | + """Fetch current map by map-list lookup, then upload_by_mapid.""" |
| 286 | + fake_channel.response_queue.append(message_builder.build({"map_list": [{"id": 1772093512, "cur": True}]})) |
| 287 | + fake_channel.response_queue.append( |
| 288 | + RoborockMessage( |
| 289 | + protocol=RoborockMessageProtocol.MAP_RESPONSE, |
| 290 | + payload=b"raw-map-payload", |
| 291 | + version=b"B01", |
| 292 | + seq=message_builder.seq + 1, |
| 293 | + ) |
| 294 | + ) |
| 295 | + |
| 296 | + raw_payload = await q7_api.get_current_map_payload() |
| 297 | + assert raw_payload == b"raw-map-payload" |
| 298 | + |
| 299 | + assert len(fake_channel.published_messages) == 2 |
| 300 | + |
| 301 | + first = fake_channel.published_messages[0] |
| 302 | + first_payload = json.loads(unpad(first.payload, AES.block_size)) |
| 303 | + assert first_payload["dps"]["10000"]["method"] == "service.get_map_list" |
| 304 | + assert first_payload["dps"]["10000"]["params"] == {} |
| 305 | + |
| 306 | + second = fake_channel.published_messages[1] |
| 307 | + second_payload = json.loads(unpad(second.payload, AES.block_size)) |
| 308 | + assert second_payload["dps"]["10000"]["method"] == "service.upload_by_mapid" |
| 309 | + assert second_payload["dps"]["10000"]["params"] == {"map_id": 1772093512} |
| 310 | + |
| 311 | + |
| 312 | +async def test_q7_api_get_current_map_payload_falls_back_to_first_map( |
| 313 | + q7_api: Q7PropertiesApi, |
| 314 | + fake_channel: FakeChannel, |
| 315 | + message_builder: B01MessageBuilder, |
| 316 | +): |
| 317 | + """If no current map marker exists, first map in list is used.""" |
| 318 | + fake_channel.response_queue.append(message_builder.build({"map_list": [{"id": 111}, {"id": 222, "cur": False}]})) |
| 319 | + fake_channel.response_queue.append( |
| 320 | + RoborockMessage( |
| 321 | + protocol=RoborockMessageProtocol.MAP_RESPONSE, |
| 322 | + payload=b"raw-map-payload", |
| 323 | + version=b"B01", |
| 324 | + seq=message_builder.seq + 1, |
| 325 | + ) |
| 326 | + ) |
| 327 | + |
| 328 | + await q7_api.get_current_map_payload() |
| 329 | + |
| 330 | + second = fake_channel.published_messages[1] |
| 331 | + second_payload = json.loads(unpad(second.payload, AES.block_size)) |
| 332 | + assert second_payload["dps"]["10000"]["params"] == {"map_id": 111} |
| 333 | + |
| 334 | + |
| 335 | +async def test_q7_api_get_current_map_payload_errors_without_map_list( |
| 336 | + q7_api: Q7PropertiesApi, |
| 337 | + fake_channel: FakeChannel, |
| 338 | + message_builder: B01MessageBuilder, |
| 339 | +): |
| 340 | + """Current-map payload fetch should fail clearly when map list is unusable.""" |
| 341 | + fake_channel.response_queue.append(message_builder.build({"map_list": []})) |
| 342 | + |
| 343 | + with pytest.raises(RoborockException, match="Unable to determine map_id"): |
| 344 | + await q7_api.get_current_map_payload() |
0 commit comments