Skip to content

Commit 9169500

Browse files
committed
🦎 q7: tighten B01 map payload decode + use gz fixture
1 parent 50aeb88 commit 9169500

2 files changed

Lines changed: 42 additions & 17 deletions

File tree

roborock/protocols/b01_map_protocol.py

Lines changed: 39 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88
from __future__ import annotations
99

1010
import base64
11+
import binascii
1112
import hashlib
1213
import zlib
1314

@@ -36,25 +37,42 @@ def derive_map_key(serial: str, model: str) -> bytes:
3637
return md5[8:24].encode()
3738

3839

40+
_B64_CHARS = set(b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=")
41+
42+
3943
def _maybe_b64(data: bytes) -> bytes | None:
44+
"""Decode base64 *only* when the input clearly looks like base64 ASCII.
45+
46+
MAP_RESPONSE payloads sometimes arrive as base64-encoded bytes (and in some
47+
cases double-base64). Avoid blindly attempting b64decode on arbitrary binary
48+
data; that can yield garbage-but-decodable output and hide the real payload.
49+
"""
50+
51+
blob = data.strip()
52+
if len(blob) < 32:
53+
return None
54+
if any(b not in _B64_CHARS for b in blob):
55+
return None
56+
57+
# Base64 strings may omit padding.
58+
padded = blob + b"=" * (-len(blob) % 4)
4059
try:
41-
return base64.b64decode(data, validate=False)
42-
except Exception:
60+
return base64.b64decode(padded, validate=True)
61+
except binascii.Error:
4362
return None
4463

4564

4665
def decode_b01_map_payload(raw_payload: bytes, *, local_key: str, serial: str, model: str) -> bytes:
4766
"""Decode raw B01 MAP_RESPONSE payload into inflated SCMap protobuf bytes."""
4867

49-
layers: list[bytes] = []
68+
# Try the raw payload first, then progressively un-base64 layers if present.
69+
layers: list[bytes] = [raw_payload]
5070
l0 = _maybe_b64(raw_payload)
5171
if l0 is not None:
5272
layers.append(l0)
5373
l1 = _maybe_b64(l0)
5474
if l1 is not None:
5575
layers.append(l1)
56-
else:
57-
layers.append(raw_payload)
5876

5977
map_key = derive_map_key(serial, model)
6078
for layer in layers:
@@ -64,12 +82,13 @@ def decode_b01_map_payload(raw_payload: bytes, *, local_key: str, serial: str, m
6482
if len(layer) > 19 and layer[:3] == b"B01":
6583
iv_seed = int.from_bytes(layer[7:11], "big")
6684
payload_len = int.from_bytes(layer[17:19], "big")
67-
encrypted = layer[19 : 19 + payload_len]
68-
try:
69-
decrypted = AES.new(local_key.encode(), AES.MODE_CBC, _derive_b01_iv(iv_seed)).decrypt(encrypted)
70-
candidates.append(unpad(decrypted, 16))
71-
except Exception:
72-
pass
85+
if payload_len and (19 + payload_len) <= len(layer):
86+
encrypted = layer[19 : 19 + payload_len]
87+
try:
88+
decrypted = AES.new(local_key.encode(), AES.MODE_CBC, _derive_b01_iv(iv_seed)).decrypt(encrypted)
89+
candidates.append(unpad(decrypted, 16))
90+
except Exception:
91+
pass
7392

7493
# Optional map-key ECB layer seen in Q7 payloads.
7594
for candidate in list(candidates):
@@ -84,11 +103,16 @@ def decode_b01_map_payload(raw_payload: bytes, *, local_key: str, serial: str, m
84103
for candidate in candidates:
85104
variants = [candidate]
86105
try:
87-
text = candidate.decode("ascii").strip()
88-
if len(text) > 16 and all(char in "0123456789abcdefABCDEF" for char in text[:32]):
106+
text = candidate.strip().decode("ascii")
107+
except UnicodeDecodeError:
108+
text = ""
109+
if text and len(text) >= 32 and len(text) % 2 == 0 and all(
110+
char in "0123456789abcdefABCDEF" for char in text
111+
):
112+
try:
89113
variants.append(bytes.fromhex(text))
90-
except Exception:
91-
pass
114+
except ValueError:
115+
pass
92116
for variant in variants:
93117
try:
94118
return zlib.decompress(variant)

tests/protocols/test_b01_map_protocol.py

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
from __future__ import annotations
44

55
import base64
6+
import gzip
67
import zlib
78
from pathlib import Path
89

@@ -11,14 +12,14 @@
1112

1213
from roborock.protocols.b01_map_protocol import decode_b01_map_payload, derive_map_key
1314

14-
FIXTURE = Path(__file__).resolve().parents[1] / "map" / "testdata" / "raw-mqtt-map301.bin.inflated.bin"
15+
FIXTURE = Path(__file__).resolve().parents[1] / "map" / "testdata" / "raw-mqtt-map301.bin.inflated.bin.gz"
1516

1617

1718
def test_decode_b01_map_payload_round_trip() -> None:
1819
local_key = "abcdefghijklmnop"
1920
serial = "testsn012345"
2021
model = "roborock.vacuum.sc05"
21-
inflated = FIXTURE.read_bytes()
22+
inflated = gzip.decompress(FIXTURE.read_bytes())
2223

2324
compressed = zlib.compress(inflated)
2425
map_key = derive_map_key(serial, model)

0 commit comments

Comments
 (0)