11"""Module for parsing B01/Q7 map content.
22
3- B01/Q7 map uploads arrive as encrypted/compressed bytes in MQTT MAP_RESPONSE payloads.
4- This module provides a small parser that:
5-
6- 1) Decodes the raw MAP_RESPONSE payload (base64/AES/zlib layers)
7- 2) Parses the inflated SCMap protobuf-like payload
8- 3) Renders a basic map image
9-
10- This is intentionally minimal: it aims to provide a stable `map_content` surface
11- similar to the existing v1 map content trait, without inventing additional public
12- APIs.
3+ Observed Q7 MAP_RESPONSE payloads are:
4+ - base64-encoded ASCII
5+ - AES-ECB encrypted with the derived map key
6+ - PKCS7 padded
7+ - ASCII hex for a zlib-compressed SCMap payload
8+
9+ This module keeps the decode path narrow and explicit to match the observed
10+ payload shape as closely as possible.
1311"""
1412
1513from __future__ import annotations
2220from dataclasses import dataclass
2321
2422from Crypto .Cipher import AES
25- from Crypto .Util .Padding import unpad
23+ from Crypto .Util .Padding import pad , unpad
2624from PIL import Image
2725from vacuum_map_parser_base .config .image_config import ImageConfig
28- from vacuum_map_parser_base .map_data import ImageData , MapData , Room
26+ from vacuum_map_parser_base .map_data import ImageData , MapData
2927
3028from roborock .exceptions import RoborockException
3129
3230from .map_parser import ParsedMapData
3331
34- _B01_HASH = "5wwh9ikChRjASpMU8cxg7o1d2E"
3532_B64_CHARS = set (b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=" )
3633_MAP_FILE_FORMAT = "PNG"
3734
@@ -52,8 +49,9 @@ def __init__(self, config: B01MapParserConfig | None = None) -> None:
5249
5350 def parse (self , raw_payload : bytes , * , local_key : str , serial : str , model : str ) -> ParsedMapData :
5451 """Parse a raw MAP_RESPONSE payload and return a PNG + MapData."""
55- inflated = _decode_b01_map_payload (raw_payload , local_key = local_key , serial = serial , model = model )
56- size_x , size_y , grid , rooms = _parse_scmap_payload (inflated )
52+ _ = local_key # Current observed Q7 MAP_RESPONSE payloads do not use the local key.
53+ inflated = _decode_b01_map_payload (raw_payload , serial = serial , model = model )
54+ size_x , size_y , grid , room_names = _parse_scmap_payload (inflated )
5755
5856 image = _render_occupancy_image (grid , size_x = size_x , size_y = size_y , scale = self ._config .map_scale )
5957
@@ -68,7 +66,8 @@ def parse(self, raw_payload: bytes, *, local_key: str, serial: str, model: str)
6866 data = image ,
6967 img_transformation = lambda p : p ,
7068 )
71- map_data .rooms = {rid : Room (0 , 0 , 0 , 0 , number = rid , name = name ) for rid , name in rooms .items ()} or None
69+ if room_names :
70+ map_data .additional_parameters ["room_names" ] = room_names
7271
7372 image_bytes = io .BytesIO ()
7473 image .save (image_bytes , format = _MAP_FILE_FORMAT )
@@ -79,112 +78,43 @@ def parse(self, raw_payload: bytes, *, local_key: str, serial: str, model: str)
7978 )
8079
8180
82- def _derive_b01_iv (iv_seed : int ) -> bytes :
83- random_hex = iv_seed .to_bytes (4 , "big" ).hex ().lower ()
84- md5 = hashlib .md5 ((random_hex + _B01_HASH ).encode (), usedforsecurity = False ).hexdigest ()
85- return md5 [9 :25 ].encode ()
86-
87-
8881def _derive_map_key (serial : str , model : str ) -> bytes :
89- """Derive map decrypt key for B01/Q7 map payloads ."""
82+ """Derive the B01/Q7 map decrypt key from serial + model ."""
9083 model_suffix = model .split ("." )[- 1 ]
9184 model_key = (model_suffix + "0" * 16 )[:16 ].encode ()
9285 material = f"{ serial } +{ model_suffix } +{ serial } " .encode ()
93- encrypted = AES .new (model_key , AES .MODE_ECB ).encrypt (_pad_pkcs7 (material , AES .block_size ))
86+ encrypted = AES .new (model_key , AES .MODE_ECB ).encrypt (pad (material , AES .block_size ))
9487 md5 = hashlib .md5 (base64 .b64encode (encrypted ), usedforsecurity = False ).hexdigest ()
9588 return md5 [8 :24 ].encode ()
9689
9790
98- def _pad_pkcs7 (data : bytes , block_size : int ) -> bytes :
99- pad_len = block_size - (len (data ) % block_size )
100- return data + bytes ([pad_len ]) * pad_len
101-
102-
103- def _maybe_b64 (data : bytes ) -> bytes | None :
104- """Decode base64 only when the input clearly looks like base64 ASCII."""
105- blob = data .strip ()
106- if len (blob ) < 32 :
107- return None
108- if any (b not in _B64_CHARS for b in blob ):
109- return None
91+ def _decode_base64_payload (raw_payload : bytes ) -> bytes :
92+ blob = raw_payload .strip ()
93+ if len (blob ) < 32 or any (b not in _B64_CHARS for b in blob ):
94+ raise RoborockException ("Failed to decode B01 map payload" )
11095
11196 padded = blob + b"=" * (- len (blob ) % 4 )
11297 try :
11398 return base64 .b64decode (padded , validate = True )
114- except binascii .Error :
115- return None
99+ except binascii .Error as err :
100+ raise RoborockException ( "Failed to decode B01 map payload" ) from err
116101
117102
118- def _decode_b01_map_payload (raw_payload : bytes , * , local_key : str , serial : str , model : str ) -> bytes :
103+ def _decode_b01_map_payload (raw_payload : bytes , * , serial : str , model : str ) -> bytes :
119104 """Decode raw B01 MAP_RESPONSE payload into inflated SCMap bytes."""
120-
121- # Try progressively un-base64 layers (some payloads are double-base64).
122- layers : list [bytes ] = [raw_payload ]
123- l0 = _maybe_b64 (raw_payload )
124- if l0 is not None :
125- layers .append (l0 )
126- l1 = _maybe_b64 (l0 )
127- if l1 is not None :
128- layers .append (l1 )
105+ encrypted_payload = _decode_base64_payload (raw_payload )
106+ if len (encrypted_payload ) % AES .block_size != 0 :
107+ raise RoborockException ("Unexpected encrypted B01 map payload length" )
129108
130109 map_key = _derive_map_key (serial , model )
110+ decrypted_hex = AES .new (map_key , AES .MODE_ECB ).decrypt (encrypted_payload )
131111
132- for layer in layers :
133- candidates : list [bytes ] = [layer ]
134-
135- # Optional B01 envelope for local-key CBC decryption.
136- if len (layer ) > 19 and layer [:3 ] == b"B01" :
137- iv_seed = int .from_bytes (layer [7 :11 ], "big" )
138- payload_len = int .from_bytes (layer [17 :19 ], "big" )
139- if payload_len and (19 + payload_len ) <= len (layer ):
140- encrypted = layer [19 : 19 + payload_len ]
141- try :
142- decrypted = AES .new (local_key .encode (), AES .MODE_CBC , _derive_b01_iv (iv_seed )).decrypt (encrypted )
143- candidates .append (unpad (decrypted , AES .block_size ))
144- except Exception :
145- pass
146-
147- # Optional map-key ECB layer.
148- for candidate in list (candidates ):
149- if len (candidate ) % AES .block_size != 0 :
150- continue
151- try :
152- decrypted = AES .new (map_key , AES .MODE_ECB ).decrypt (candidate )
153- except Exception :
154- continue
155-
156- candidates .append (decrypted )
157- try :
158- candidates .append (unpad (decrypted , AES .block_size ))
159- except Exception :
160- pass
161-
162- for candidate in candidates :
163- for variant in _candidate_variants (candidate ):
164- try :
165- return zlib .decompress (variant )
166- except zlib .error :
167- continue
168-
169- raise RoborockException ("Failed to decode B01 map payload" )
170-
171-
172- def _candidate_variants (candidate : bytes ) -> list [bytes ]:
173- variants = [candidate ]
174-
175- # Some payloads are ASCII hex for a compressed buffer.
176112 try :
177- text = candidate .strip ().decode ("ascii" )
178- except UnicodeDecodeError :
179- text = ""
180-
181- if text and len (text ) >= 32 and len (text ) % 2 == 0 and all (c in "0123456789abcdefABCDEF" for c in text ):
182- try :
183- variants .append (bytes .fromhex (text ))
184- except ValueError :
185- pass
186-
187- return variants
113+ compressed_hex = unpad (decrypted_hex , AES .block_size ).decode ("ascii" )
114+ compressed_payload = bytes .fromhex (compressed_hex )
115+ return zlib .decompress (compressed_payload )
116+ except (ValueError , UnicodeDecodeError , zlib .error ) as err :
117+ raise RoborockException ("Failed to decode B01 map payload" ) from err
188118
189119
190120def _read_varint (buf : bytes , idx : int ) -> tuple [int , int ]:
@@ -265,7 +195,7 @@ def _parse_scmap_payload(payload: bytes) -> tuple[int, int, bytes, dict[int, str
265195 size_x = 0
266196 size_y = 0
267197 grid = b""
268- rooms : dict [int , str ] = {}
198+ room_names : dict [int , str ] = {}
269199
270200 idx = 0
271201 while idx < len (payload ):
@@ -308,7 +238,7 @@ def _parse_scmap_payload(payload: bytes) -> tuple[int, int, bytes, dict[int, str
308238 elif field_no == 12 : # roomDataInfo (repeated)
309239 room_id , room_name = _parse_room_data_info (value )
310240 if room_id is not None :
311- rooms [room_id ] = room_name or f"Room { room_id } "
241+ room_names [room_id ] = room_name or f"Room { room_id } "
312242
313243 if not size_x or not size_y or not grid :
314244 raise RoborockException ("Failed to parse B01 map header/grid" )
@@ -317,21 +247,20 @@ def _parse_scmap_payload(payload: bytes) -> tuple[int, int, bytes, dict[int, str
317247 if len (grid ) < expected_len :
318248 raise RoborockException ("B01 map data shorter than expected dimensions" )
319249
320- return size_x , size_y , grid [:expected_len ], rooms
250+ return size_x , size_y , grid [:expected_len ], room_names
321251
322252
323253def _render_occupancy_image (grid : bytes , * , size_x : int , size_y : int , scale : int ) -> Image .Image :
324254 """Render the B01 occupancy grid into a simple image."""
325255
326- # The raw grid contains occupancy-like bytes. For now we render :
327- # - 0: outside/unknown (black)
328- # - 127: wall/obstacle (grey)
329- # - 128+ : floor/free (white)
256+ # The observed occupancy grid contains only :
257+ # - 0: outside/unknown
258+ # - 127: wall/obstacle
259+ # - 128: floor/free
330260 table = bytearray (range (256 ))
331261 table [0 ] = 0
332262 table [127 ] = 180
333- for i in range (128 , 256 ):
334- table [i ] = 255
263+ table [128 ] = 255
335264
336265 mapped = bytes (table [b ] for b in grid )
337266 img = Image .frombytes ("L" , (size_x , size_y ), mapped )
0 commit comments