Skip to content

Commit a8811fd

Browse files
committed
🦎 q7: trim final map content nits
1 parent f4476c5 commit a8811fd

File tree

6 files changed

+172
-27
lines changed

6 files changed

+172
-27
lines changed

roborock/devices/traits/b01/q7/__init__.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,6 @@ def __init__(self, channel: MqttChannel, *, device: HomeDataDevice, product: Hom
6161
self.map = MapTrait(channel)
6262
self.map_content = MapContentTrait(
6363
self.map,
64-
local_key=device.local_key,
6564
serial=device.sn,
6665
model=product.model,
6766
)

roborock/devices/traits/b01/q7/map_content.py

Lines changed: 0 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,6 @@ class MapContent(RoborockBase):
3333
map_data: MapData | None = None
3434
"""Parsed map data (metadata for points on the map)."""
3535

36-
room_names: dict[int, str] | None = None
37-
"""Observed room-id to room-name mapping for the current map, if available."""
38-
3936
raw_api_response: bytes | None = None
4037
"""Raw bytes of the map payload from the device.
4138
@@ -57,14 +54,12 @@ def __init__(
5754
self,
5855
map_trait: MapTrait,
5956
*,
60-
local_key: str,
6157
serial: str | None,
6258
model: str | None,
6359
map_parser_config: B01MapParserConfig | None = None,
6460
) -> None:
6561
super().__init__()
6662
self._map_trait = map_trait
67-
self._local_key = local_key
6863
self._serial = serial
6964
self._model = model
7065
self._map_parser = B01MapParser(map_parser_config)
@@ -75,7 +70,6 @@ async def refresh(self) -> None:
7570
parsed = self.parse_map_content(raw_payload)
7671
self.image_content = parsed.image_content
7772
self.map_data = parsed.map_data
78-
self.room_names = parsed.room_names
7973
self.raw_api_response = parsed.raw_api_response
8074

8175
def parse_map_content(self, response: bytes) -> MapContent:
@@ -92,7 +86,6 @@ def parse_map_content(self, response: bytes) -> MapContent:
9286
try:
9387
parsed_data = self._map_parser.parse(
9488
response,
95-
local_key=self._local_key,
9689
serial=self._serial,
9790
model=self._model,
9891
)
@@ -102,13 +95,8 @@ def parse_map_content(self, response: bytes) -> MapContent:
10295
if parsed_data.image_content is None:
10396
raise RoborockException("Failed to render B01 map image")
10497

105-
room_names = None
106-
if parsed_data.map_data is not None:
107-
room_names = parsed_data.map_data.additional_parameters.get("room_names")
108-
10998
return MapContent(
11099
image_content=parsed_data.image_content,
111100
map_data=parsed_data.map_data,
112-
room_names=room_names,
113101
raw_api_response=response,
114102
)

roborock/map/b01_map_parser.py

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,8 @@ class B01MapParser:
5353
def __init__(self, config: B01MapParserConfig | None = None) -> None:
5454
self._config = config or B01MapParserConfig()
5555

56-
def parse(self, raw_payload: bytes, *, local_key: str, serial: str, model: str) -> ParsedMapData:
56+
def parse(self, raw_payload: bytes, *, serial: str, model: str) -> ParsedMapData:
5757
"""Parse a raw MAP_RESPONSE payload and return a PNG + MapData."""
58-
_ = local_key # Current observed Q7 MAP_RESPONSE payloads do not use the local key.
5958
inflated = _decode_b01_map_payload(raw_payload, serial=serial, model=model)
6059
size_x, size_y, grid, room_names = _parse_scmap_payload(inflated)
6160

tests/devices/traits/b01/q7/test_map_content.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,6 @@ async def test_q7_map_content_refresh_populates_cached_values(
2727
)
2828

2929
dummy_map_data = MapData()
30-
dummy_map_data.additional_parameters["room_names"] = {17: "room8"}
3130
with patch(
3231
"roborock.devices.traits.b01.q7.map_content.B01MapParser.parse",
3332
return_value=type("X", (), {"image_content": b"pngbytes", "map_data": dummy_map_data})(),
@@ -36,7 +35,6 @@ async def test_q7_map_content_refresh_populates_cached_values(
3635

3736
assert q7_api.map_content.image_content == b"pngbytes"
3837
assert q7_api.map_content.map_data is dummy_map_data
39-
assert q7_api.map_content.room_names == {17: "room8"}
4038
assert q7_api.map_content.raw_api_response == b"raw-map-payload"
4139

4240
parse.assert_called_once()

tests/map/debug_b01_scmap.py

Lines changed: 169 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -13,24 +13,186 @@
1313
So the library keeps a tiny schema-free parser for the fields it needs, while
1414
this script provides a convenient place to inspect unknown payloads during
1515
future debugging.
16+
17+
This helper is intentionally standalone and does not import private runtime
18+
helpers. That keeps it useful for debugging without coupling test/dev tooling to
19+
internal implementation details.
1620
"""
1721

1822
from __future__ import annotations
1923

2024
import argparse
25+
import base64
26+
import binascii
2127
import gzip
28+
import hashlib
29+
import zlib
2230
from pathlib import Path
2331

24-
from roborock.map.b01_map_parser import (
25-
_decode_b01_map_payload,
26-
_parse_scmap_payload,
27-
_read_len_delimited,
28-
_read_varint,
29-
)
32+
from Crypto.Cipher import AES
33+
from Crypto.Util.Padding import pad, unpad
34+
35+
_B64_CHARS = set(b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=")
36+
37+
38+
def _derive_map_key(serial: str, model: str) -> bytes:
39+
model_suffix = model.split(".")[-1]
40+
model_key = (model_suffix + "0" * 16)[:16].encode()
41+
material = f"{serial}+{model_suffix}+{serial}".encode()
42+
encrypted = AES.new(model_key, AES.MODE_ECB).encrypt(pad(material, AES.block_size))
43+
md5 = hashlib.md5(base64.b64encode(encrypted), usedforsecurity=False).hexdigest()
44+
return md5[8:24].encode()
45+
46+
47+
def _decode_base64_payload(raw_payload: bytes) -> bytes:
48+
blob = raw_payload.strip()
49+
if len(blob) < 32 or any(b not in _B64_CHARS for b in blob):
50+
raise ValueError("Unexpected B01 map payload format")
51+
52+
padded = blob + b"=" * (-len(blob) % 4)
53+
try:
54+
return base64.b64decode(padded, validate=True)
55+
except binascii.Error as err:
56+
raise ValueError("Failed to decode B01 map payload") from err
57+
58+
59+
def _decode_b01_map_payload(raw_payload: bytes, *, serial: str, model: str) -> bytes:
60+
encrypted_payload = _decode_base64_payload(raw_payload)
61+
if len(encrypted_payload) % AES.block_size != 0:
62+
raise ValueError("Unexpected encrypted B01 map payload length")
63+
64+
map_key = _derive_map_key(serial, model)
65+
decrypted_hex = AES.new(map_key, AES.MODE_ECB).decrypt(encrypted_payload)
66+
67+
try:
68+
compressed_hex = unpad(decrypted_hex, AES.block_size).decode("ascii")
69+
compressed_payload = bytes.fromhex(compressed_hex)
70+
return zlib.decompress(compressed_payload)
71+
except (ValueError, UnicodeDecodeError, zlib.error) as err:
72+
raise ValueError("Failed to decode B01 map payload") from err
73+
74+
75+
def _read_varint(buf: bytes, idx: int) -> tuple[int, int]:
76+
value = 0
77+
shift = 0
78+
while True:
79+
if idx >= len(buf):
80+
raise ValueError("Truncated varint")
81+
byte = buf[idx]
82+
idx += 1
83+
value |= (byte & 0x7F) << shift
84+
if not byte & 0x80:
85+
return value, idx
86+
shift += 7
87+
if shift > 63:
88+
raise ValueError("Invalid varint")
89+
90+
91+
def _read_len_delimited(buf: bytes, idx: int) -> tuple[bytes, int]:
92+
length, idx = _read_varint(buf, idx)
93+
end = idx + length
94+
if end > len(buf):
95+
raise ValueError("Invalid length-delimited field")
96+
return buf[idx:end], end
97+
98+
99+
def _parse_map_data_info(blob: bytes) -> bytes:
100+
idx = 0
101+
while idx < len(blob):
102+
key, idx = _read_varint(blob, idx)
103+
field_no = key >> 3
104+
wire = key & 0x07
105+
if wire == 0:
106+
_, idx = _read_varint(blob, idx)
107+
elif wire == 2:
108+
value, idx = _read_len_delimited(blob, idx)
109+
if field_no == 1:
110+
try:
111+
return zlib.decompress(value)
112+
except zlib.error:
113+
return value
114+
elif wire == 5:
115+
idx += 4
116+
else:
117+
raise ValueError(f"Unsupported wire type {wire} in mapDataInfo")
118+
raise ValueError("SCMap missing mapData")
119+
120+
121+
def _parse_room_data_info(blob: bytes) -> tuple[int | None, str | None]:
122+
room_id: int | None = None
123+
room_name: str | None = None
124+
idx = 0
125+
while idx < len(blob):
126+
key, idx = _read_varint(blob, idx)
127+
field_no = key >> 3
128+
wire = key & 0x07
129+
if wire == 0:
130+
value, idx = _read_varint(blob, idx)
131+
if field_no == 1:
132+
room_id = int(value)
133+
elif wire == 2:
134+
value, idx = _read_len_delimited(blob, idx)
135+
if field_no == 2:
136+
room_name = value.decode("utf-8", errors="replace")
137+
elif wire == 5:
138+
idx += 4
139+
else:
140+
raise ValueError(f"Unsupported wire type {wire} in roomDataInfo")
141+
return room_id, room_name
142+
143+
144+
def _parse_scmap_payload(payload: bytes) -> tuple[int, int, bytes, dict[int, str]]:
145+
size_x = 0
146+
size_y = 0
147+
grid = b""
148+
room_names: dict[int, str] = {}
149+
150+
idx = 0
151+
while idx < len(payload):
152+
key, idx = _read_varint(payload, idx)
153+
field_no = key >> 3
154+
wire = key & 0x07
155+
156+
if wire == 0:
157+
_, idx = _read_varint(payload, idx)
158+
continue
159+
160+
if wire != 2:
161+
if wire == 5:
162+
idx += 4
163+
continue
164+
raise ValueError(f"Unsupported wire type {wire} in SCMap payload")
165+
166+
value, idx = _read_len_delimited(payload, idx)
167+
if field_no == 3:
168+
hidx = 0
169+
while hidx < len(value):
170+
hkey, hidx = _read_varint(value, hidx)
171+
hfield = hkey >> 3
172+
hwire = hkey & 0x07
173+
if hwire == 0:
174+
hvalue, hidx = _read_varint(value, hidx)
175+
if hfield == 2:
176+
size_x = int(hvalue)
177+
elif hfield == 3:
178+
size_y = int(hvalue)
179+
elif hwire == 5:
180+
hidx += 4
181+
elif hwire == 2:
182+
_, hidx = _read_len_delimited(value, hidx)
183+
else:
184+
raise ValueError(f"Unsupported wire type {hwire} in map header")
185+
elif field_no == 4:
186+
grid = _parse_map_data_info(value)
187+
elif field_no == 12:
188+
room_id, room_name = _parse_room_data_info(value)
189+
if room_id is not None:
190+
room_names[room_id] = room_name or f"Room {room_id}"
191+
192+
return size_x, size_y, grid, room_names
30193

31194

32195
def _looks_like_message(blob: bytes) -> bool:
33-
"""Return True if the blob plausibly looks like a protobuf-style message."""
34196
if not blob or len(blob) > 4096:
35197
return False
36198

tests/map/test_b01_map_parser.py

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,6 @@ def _derive_map_key(serial: str, model: str) -> bytes:
2626

2727

2828
def test_b01_map_parser_decodes_and_renders_fixture() -> None:
29-
local_key = "abcdefghijklmnop"
3029
serial = "testsn012345"
3130
model = "roborock.vacuum.sc05"
3231
inflated = gzip.decompress(FIXTURE.read_bytes())
@@ -37,7 +36,7 @@ def test_b01_map_parser_decodes_and_renders_fixture() -> None:
3736
payload = base64.b64encode(encrypted)
3837

3938
parser = B01MapParser()
40-
parsed = parser.parse(payload, local_key=local_key, serial=serial, model=model)
39+
parsed = parser.parse(payload, serial=serial, model=model)
4140

4241
assert parsed.image_content is not None
4342
assert parsed.image_content.startswith(b"\x89PNG\r\n\x1a\n")
@@ -65,4 +64,4 @@ def test_b01_map_parser_decodes_and_renders_fixture() -> None:
6564
def test_b01_map_parser_rejects_invalid_payload() -> None:
6665
parser = B01MapParser()
6766
with pytest.raises(RoborockException, match="Failed to decode B01 map payload"):
68-
parser.parse(b"not a map", local_key="abcdefghijklmnop", serial="testsn012345", model="roborock.vacuum.sc05")
67+
parser.parse(b"not a map", serial="testsn012345", model="roborock.vacuum.sc05")

0 commit comments

Comments
 (0)