diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 755f4f90..57e8d890 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -1,6 +1,10 @@ # See https://pre-commit.com for more information # See https://pre-commit.com/hooks.html for more hooks -exclude: "CHANGELOG.md" +exclude: > + (?x)^( + CHANGELOG\.md| + roborock/map/proto/.*_pb2\.py + )$ default_stages: [ pre-commit ] repos: @@ -42,7 +46,7 @@ repos: hooks: - id: mypy exclude: cli.py - additional_dependencies: [ "types-paho-mqtt" ] + additional_dependencies: [ "types-paho-mqtt", "types-protobuf" ] - repo: https://github.com/alessandrojcm/commitlint-pre-commit-hook rev: v9.23.0 hooks: diff --git a/pyproject.toml b/pyproject.toml index 595d6ee6..863523e5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -25,6 +25,7 @@ dependencies = [ "pycryptodomex~=3.18 ; sys_platform == 'darwin'", "paho-mqtt>=1.6.1,<3.0.0", "construct>=2.10.57,<3", + "protobuf>=5,<7", "vacuum-map-parser-roborock", "pyrate-limiter>=4.0.0,<5", "aiomqtt>=2.5.0,<3", @@ -97,9 +98,15 @@ major_tags= ["refactor"] lint.ignore = ["F403", "E741"] lint.select=["E", "F", "UP", "I"] line-length = 120 +extend-exclude = ["roborock/map/proto/*_pb2.py"] [tool.ruff.lint.per-file-ignores] "*/__init__.py" = ["F401"] +"roborock/map/proto/*_pb2.py" = ["E501", "I001", "UP009"] + +[[tool.mypy.overrides]] +module = ["roborock.map.proto.*"] +ignore_errors = true [tool.pytest.ini_options] asyncio_mode = "auto" diff --git a/roborock/devices/device_manager.py b/roborock/devices/device_manager.py index b1ef6626..0be98ea1 100644 --- a/roborock/devices/device_manager.py +++ b/roborock/devices/device_manager.py @@ -251,7 +251,7 @@ def device_creator(home_data: HomeData, device: HomeDataDevice, product: HomeDat trait = b01.q10.create(channel) elif "sc" in model_part: # Q7 devices start with 'sc' in their model naming. - trait = b01.q7.create(channel) + trait = b01.q7.create(product, device, channel) else: raise UnsupportedDeviceError(f"Device {device.name} has unsupported B01 model: {product.model}") case _: diff --git a/roborock/devices/traits/b01/q7/__init__.py b/roborock/devices/traits/b01/q7/__init__.py index 9bbe6818..f29f287b 100644 --- a/roborock/devices/traits/b01/q7/__init__.py +++ b/roborock/devices/traits/b01/q7/__init__.py @@ -1,10 +1,14 @@ """Traits for Q7 B01 devices. -Potentially other devices may fall into this category in the future.""" + +Potentially other devices may fall into this category in the future. +""" + +from __future__ import annotations from typing import Any from roborock import B01Props -from roborock.data import Q7MapList, Q7MapListEntry +from roborock.data import HomeDataDevice, HomeDataProduct, Q7MapList, Q7MapListEntry from roborock.data.b01_q7.b01_q7_code_mappings import ( CleanPathPreferenceMapping, CleanRepeatMapping, @@ -23,18 +27,20 @@ from .clean_summary import CleanSummaryTrait from .map import MapTrait +from .map_content import MapContentTrait __all__ = [ "Q7PropertiesApi", "CleanSummaryTrait", "MapTrait", + "MapContentTrait", "Q7MapList", "Q7MapListEntry", ] class Q7PropertiesApi(Trait): - """API for interacting with B01 devices.""" + """API for interacting with B01 Q7 devices.""" clean_summary: CleanSummaryTrait """Trait for clean records / clean summary (Q7 `service.get_record_list`).""" @@ -42,11 +48,25 @@ class Q7PropertiesApi(Trait): map: MapTrait """Trait for map list metadata + raw map payload retrieval.""" - def __init__(self, channel: MqttChannel) -> None: - """Initialize the B01Props API.""" + map_content: MapContentTrait + """Trait for fetching parsed current map content.""" + + def __init__(self, channel: MqttChannel, *, device: HomeDataDevice, product: HomeDataProduct) -> None: + """Initialize the Q7 API.""" self._channel = channel + self._device = device + self._product = product + + if not device.sn or not product.model: + raise ValueError("B01 Q7 map content requires device serial number and product model metadata") + self.clean_summary = CleanSummaryTrait(channel) self.map = MapTrait(channel) + self.map_content = MapContentTrait( + self.map, + serial=device.sn, + model=product.model, + ) async def query_values(self, props: list[RoborockB01Props]) -> B01Props | None: """Query the device for the values of the given Q7 properties.""" @@ -151,6 +171,6 @@ async def send(self, command: CommandType, params: ParamsType) -> Any: ) -def create(channel: MqttChannel) -> Q7PropertiesApi: - """Create traits for B01 devices.""" - return Q7PropertiesApi(channel) +def create(product: HomeDataProduct, device: HomeDataDevice, channel: MqttChannel) -> Q7PropertiesApi: + """Create traits for B01 Q7 devices.""" + return Q7PropertiesApi(channel, device=device, product=product) diff --git a/roborock/devices/traits/b01/q7/map_content.py b/roborock/devices/traits/b01/q7/map_content.py new file mode 100644 index 00000000..db00119b --- /dev/null +++ b/roborock/devices/traits/b01/q7/map_content.py @@ -0,0 +1,98 @@ +"""Trait for fetching parsed map content from B01/Q7 devices. + +This intentionally mirrors the v1 `MapContentTrait` contract: +- `refresh()` performs I/O and populates cached fields +- `parse_map_content()` reparses cached raw bytes without I/O +- fields `image_content`, `map_data`, and `raw_api_response` are then readable + +For B01/Q7 devices, the underlying raw map payload is retrieved via `MapTrait`. +""" + +from dataclasses import dataclass + +from vacuum_map_parser_base.map_data import MapData + +from roborock.data import RoborockBase +from roborock.devices.traits import Trait +from roborock.exceptions import RoborockException +from roborock.map.b01_map_parser import B01MapParser, B01MapParserConfig + +from .map import MapTrait + +_TRUNCATE_LENGTH = 20 + + +@dataclass +class MapContent(RoborockBase): + """Dataclass representing map content.""" + + image_content: bytes | None = None + """The rendered image of the map in PNG format.""" + + map_data: MapData | None = None + """Parsed map data (metadata for points on the map).""" + + raw_api_response: bytes | None = None + """Raw bytes of the map payload from the device. + + This should be treated as an opaque blob used only internally by this + library to re-parse the map data when needed. + """ + + def __repr__(self) -> str: + img = self.image_content + if img and len(img) > _TRUNCATE_LENGTH: + img = img[: _TRUNCATE_LENGTH - 3] + b"..." + return f"MapContent(image_content={img!r}, map_data={self.map_data!r})" + + +class MapContentTrait(MapContent, Trait): + """Trait for fetching parsed map content for Q7 devices.""" + + def __init__( + self, + map_trait: MapTrait, + *, + serial: str, + model: str, + map_parser_config: B01MapParserConfig | None = None, + ) -> None: + super().__init__() + self._map_trait = map_trait + self._serial = serial + self._model = model + self._map_parser = B01MapParser(map_parser_config) + + async def refresh(self) -> None: + """Fetch, decode, and parse the current map payload.""" + raw_payload = await self._map_trait.get_current_map_payload() + parsed = self.parse_map_content(raw_payload) + self.image_content = parsed.image_content + self.map_data = parsed.map_data + self.raw_api_response = parsed.raw_api_response + + def parse_map_content(self, response: bytes) -> MapContent: + """Parse map content from raw bytes. + + This mirrors the v1 trait behavior so cached map payload bytes can be + reparsed without going back to the device. + """ + try: + parsed_data = self._map_parser.parse( + response, + serial=self._serial, + model=self._model, + ) + except RoborockException: + raise + except Exception as ex: + raise RoborockException("Failed to parse B01 map data") from ex + + if parsed_data.image_content is None: + raise RoborockException("Failed to render B01 map image") + + return MapContent( + image_content=parsed_data.image_content, + map_data=parsed_data.map_data, + raw_api_response=response, + ) diff --git a/roborock/map/b01_map_parser.py b/roborock/map/b01_map_parser.py new file mode 100644 index 00000000..57b5534a --- /dev/null +++ b/roborock/map/b01_map_parser.py @@ -0,0 +1,188 @@ +"""Module for parsing B01/Q7 map content. + +Observed Q7 `MAP_RESPONSE` payloads follow this decode pipeline: +- base64-encoded ASCII +- AES-ECB encrypted with the derived map key +- PKCS7 padded +- ASCII hex for a zlib-compressed SCMap payload + +The inner SCMap blob is parsed with protobuf messages generated from +`roborock/map/proto/b01_scmap.proto`. +""" + +import base64 +import binascii +import hashlib +import io +import zlib +from dataclasses import dataclass + +from Crypto.Cipher import AES +from Crypto.Util.Padding import pad, unpad +from google.protobuf.message import DecodeError, Message +from PIL import Image +from vacuum_map_parser_base.config.image_config import ImageConfig +from vacuum_map_parser_base.map_data import ImageData, MapData + +from roborock.exceptions import RoborockException +from roborock.map.proto.b01_scmap_pb2 import RobotMap # type: ignore[attr-defined] + +from .map_parser import ParsedMapData + +_B64_CHARS = set(b"ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklmnopqrstuvwxyz0123456789+/=") +_MAP_FILE_FORMAT = "PNG" + + +@dataclass +class B01MapParserConfig: + """Configuration for the B01/Q7 map parser.""" + + map_scale: int = 4 + """Scale factor for the rendered map image.""" + + +class B01MapParser: + """Decoder/parser for B01/Q7 SCMap payloads.""" + + def __init__(self, config: B01MapParserConfig | None = None) -> None: + self._config = config or B01MapParserConfig() + + def parse(self, raw_payload: bytes, *, serial: str, model: str) -> ParsedMapData: + """Parse a raw MAP_RESPONSE payload and return a PNG + MapData.""" + inflated = _decode_b01_map_payload(raw_payload, serial=serial, model=model) + parsed = _parse_scmap_payload(inflated) + size_x, size_y, grid = _extract_grid(parsed) + room_names = _extract_room_names(parsed) + + image = _render_occupancy_image(grid, size_x=size_x, size_y=size_y, scale=self._config.map_scale) + + map_data = MapData() + map_data.image = ImageData( + size=size_x * size_y, + top=0, + left=0, + height=size_y, + width=size_x, + image_config=ImageConfig(scale=self._config.map_scale), + data=image, + img_transformation=lambda p: p, + ) + if room_names: + map_data.additional_parameters["room_names"] = room_names + + image_bytes = io.BytesIO() + image.save(image_bytes, format=_MAP_FILE_FORMAT) + + return ParsedMapData( + image_content=image_bytes.getvalue(), + map_data=map_data, + ) + + +def _derive_map_key(serial: str, model: str) -> bytes: + """Derive the B01/Q7 map decrypt key from serial + model.""" + model_suffix = model.split(".")[-1] + model_key = (model_suffix + "0" * 16)[:16].encode() + material = f"{serial}+{model_suffix}+{serial}".encode() + encrypted = AES.new(model_key, AES.MODE_ECB).encrypt(pad(material, AES.block_size)) + md5 = hashlib.md5(base64.b64encode(encrypted), usedforsecurity=False).hexdigest() + return md5[8:24].encode() + + +def _decode_base64_payload(raw_payload: bytes) -> bytes: + blob = raw_payload.strip() + if len(blob) < 32 or any(b not in _B64_CHARS for b in blob): + raise RoborockException("Failed to decode B01 map payload") + + padded = blob + b"=" * (-len(blob) % 4) + try: + return base64.b64decode(padded, validate=True) + except binascii.Error as err: + raise RoborockException("Failed to decode B01 map payload") from err + + +def _decode_b01_map_payload(raw_payload: bytes, *, serial: str, model: str) -> bytes: + """Decode raw B01 `MAP_RESPONSE` payload into inflated SCMap bytes.""" + encrypted_payload = _decode_base64_payload(raw_payload) + if len(encrypted_payload) % AES.block_size != 0: + raise RoborockException("Unexpected encrypted B01 map payload length") + + map_key = _derive_map_key(serial, model) + decrypted_hex = AES.new(map_key, AES.MODE_ECB).decrypt(encrypted_payload) + + try: + compressed_hex = unpad(decrypted_hex, AES.block_size).decode("ascii") + compressed_payload = bytes.fromhex(compressed_hex) + return zlib.decompress(compressed_payload) + except (ValueError, UnicodeDecodeError, zlib.error) as err: + raise RoborockException("Failed to decode B01 map payload") from err + + +def _parse_proto(blob: bytes, message: Message, *, context: str) -> None: + try: + message.ParseFromString(blob) + except DecodeError as err: + raise RoborockException(f"Failed to parse {context}") from err + + +def _decode_map_data_bytes(value: bytes) -> bytes: + try: + return zlib.decompress(value) + except zlib.error: + return value + + +def _parse_scmap_payload(payload: bytes) -> RobotMap: + """Parse inflated SCMap bytes into a generated protobuf message.""" + parsed = RobotMap() + _parse_proto(payload, parsed, context="B01 SCMap") + return parsed + + +def _extract_grid(parsed: RobotMap) -> tuple[int, int, bytes]: + if not parsed.HasField("mapHead") or not parsed.HasField("mapData"): + raise RoborockException("Failed to parse B01 map header/grid") + + size_x = parsed.mapHead.sizeX if parsed.mapHead.HasField("sizeX") else 0 + size_y = parsed.mapHead.sizeY if parsed.mapHead.HasField("sizeY") else 0 + if not size_x or not size_y or not parsed.mapData.HasField("mapData"): + raise RoborockException("Failed to parse B01 map header/grid") + + map_data = _decode_map_data_bytes(parsed.mapData.mapData) + expected_len = size_x * size_y + if len(map_data) < expected_len: + raise RoborockException("B01 map data shorter than expected dimensions") + + return size_x, size_y, map_data[:expected_len] + + +def _extract_room_names(parsed: RobotMap) -> dict[int, str]: + # Expose room id/name mapping without inventing room geometry/polygons. + room_names: dict[int, str] = {} + for room in parsed.roomDataInfo: + if room.HasField("roomId"): + room_id = room.roomId + room_names[room_id] = room.roomName if room.HasField("roomName") else f"Room {room_id}" + return room_names + + +def _render_occupancy_image(grid: bytes, *, size_x: int, size_y: int, scale: int) -> Image.Image: + """Render the B01 occupancy grid into a simple image.""" + + # The observed occupancy grid contains only: + # - 0: outside/unknown + # - 127: wall/obstacle + # - 128: floor/free + table = bytearray(range(256)) + table[0] = 0 + table[127] = 180 + table[128] = 255 + + mapped = grid.translate(bytes(table)) + img = Image.frombytes("L", (size_x, size_y), mapped) + img = img.transpose(Image.Transpose.FLIP_TOP_BOTTOM).convert("RGB") + + if scale > 1: + img = img.resize((size_x * scale, size_y * scale), resample=Image.Resampling.NEAREST) + + return img diff --git a/roborock/map/proto/__init__.py b/roborock/map/proto/__init__.py new file mode 100644 index 00000000..81a4f8b1 --- /dev/null +++ b/roborock/map/proto/__init__.py @@ -0,0 +1 @@ +"""Generated protobuf modules for Roborock map payloads.""" diff --git a/roborock/map/proto/b01_scmap.proto b/roborock/map/proto/b01_scmap.proto new file mode 100644 index 00000000..b3659813 --- /dev/null +++ b/roborock/map/proto/b01_scmap.proto @@ -0,0 +1,70 @@ +// Checked-in B01/Q7 SCMap schema for the generated runtime protobuf module. +// Regenerate the checked-in Python module after edits with: +// python -m grpc_tools.protoc -I./roborock/map/proto --python_out=./roborock/map/proto roborock/map/proto/b01_scmap.proto +// The generated file `b01_scmap_pb2.py` is checked in for runtime use and should +// not be edited by hand. +syntax = "proto2"; + +package b01.scmap; + +message DevicePointInfo { + optional float x = 1; + optional float y = 2; +} + +message MapBoundaryInfo { + optional string mapMd5 = 1; + optional uint32 vMinX = 2; + optional uint32 vMaxX = 3; + optional uint32 vMinY = 4; + optional uint32 vMaxY = 5; +} + +message MapExtInfo { + optional uint32 taskBeginDate = 1; + optional uint32 mapUploadDate = 2; + optional uint32 mapValid = 3; + optional uint32 radian = 4; + optional uint32 force = 5; + optional uint32 cleanPath = 6; + optional MapBoundaryInfo boudaryInfo = 7; + optional uint32 mapVersion = 8; + optional uint32 mapValueType = 9; +} + +message MapHeadInfo { + optional uint32 mapHeadId = 1; + optional uint32 sizeX = 2; + optional uint32 sizeY = 3; + optional float minX = 4; + optional float minY = 5; + optional float maxX = 6; + optional float maxY = 7; + optional float resolution = 8; +} + +message MapDataInfo { + optional bytes mapData = 1; +} + +message RoomDataInfo { + optional uint32 roomId = 1; + optional string roomName = 2; + optional uint32 roomTypeId = 3; + optional uint32 meterialId = 4; + optional uint32 cleanState = 5; + optional uint32 roomClean = 6; + optional uint32 roomCleanIndex = 7; + optional DevicePointInfo roomNamePost = 8; + optional uint32 colorId = 10; + optional uint32 floor_direction = 11; + optional uint32 global_seq = 12; +} + +message RobotMap { + optional uint32 mapType = 1; + optional MapExtInfo mapExtInfo = 2; + optional MapHeadInfo mapHead = 3; + optional MapDataInfo mapData = 4; + repeated RoomDataInfo roomDataInfo = 12; +} diff --git a/roborock/map/proto/b01_scmap_pb2.py b/roborock/map/proto/b01_scmap_pb2.py new file mode 100644 index 00000000..66cc0843 --- /dev/null +++ b/roborock/map/proto/b01_scmap_pb2.py @@ -0,0 +1,48 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE +# source: roborock/map/proto/b01_scmap.proto +# Protobuf Python Version: 6.31.1 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder +_runtime_version.ValidateProtobufRuntimeVersion( + _runtime_version.Domain.PUBLIC, + 6, + 31, + 1, + '', + 'roborock/map/proto/b01_scmap.proto' +) +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile(b'\n\"roborock/map/proto/b01_scmap.proto\x12\tb01.scmap\"\'\n\x0f\x44\x65vicePointInfo\x12\t\n\x01x\x18\x01 \x01(\x02\x12\t\n\x01y\x18\x02 \x01(\x02\"]\n\x0fMapBoundaryInfo\x12\x0e\n\x06mapMd5\x18\x01 \x01(\t\x12\r\n\x05vMinX\x18\x02 \x01(\r\x12\r\n\x05vMaxX\x18\x03 \x01(\r\x12\r\n\x05vMinY\x18\x04 \x01(\r\x12\r\n\x05vMaxY\x18\x05 \x01(\r\"\xd9\x01\n\nMapExtInfo\x12\x15\n\rtaskBeginDate\x18\x01 \x01(\r\x12\x15\n\rmapUploadDate\x18\x02 \x01(\r\x12\x10\n\x08mapValid\x18\x03 \x01(\r\x12\x0e\n\x06radian\x18\x04 \x01(\r\x12\r\n\x05\x66orce\x18\x05 \x01(\r\x12\x11\n\tcleanPath\x18\x06 \x01(\r\x12/\n\x0b\x62oudaryInfo\x18\x07 \x01(\x0b\x32\x1a.b01.scmap.MapBoundaryInfo\x12\x12\n\nmapVersion\x18\x08 \x01(\r\x12\x14\n\x0cmapValueType\x18\t \x01(\r\"\x8a\x01\n\x0bMapHeadInfo\x12\x11\n\tmapHeadId\x18\x01 \x01(\r\x12\r\n\x05sizeX\x18\x02 \x01(\r\x12\r\n\x05sizeY\x18\x03 \x01(\r\x12\x0c\n\x04minX\x18\x04 \x01(\x02\x12\x0c\n\x04minY\x18\x05 \x01(\x02\x12\x0c\n\x04maxX\x18\x06 \x01(\x02\x12\x0c\n\x04maxY\x18\x07 \x01(\x02\x12\x12\n\nresolution\x18\x08 \x01(\x02\"\x1e\n\x0bMapDataInfo\x12\x0f\n\x07mapData\x18\x01 \x01(\x0c\"\x87\x02\n\x0cRoomDataInfo\x12\x0e\n\x06roomId\x18\x01 \x01(\r\x12\x10\n\x08roomName\x18\x02 \x01(\t\x12\x12\n\nroomTypeId\x18\x03 \x01(\r\x12\x12\n\nmeterialId\x18\x04 \x01(\r\x12\x12\n\ncleanState\x18\x05 \x01(\r\x12\x11\n\troomClean\x18\x06 \x01(\r\x12\x16\n\x0eroomCleanIndex\x18\x07 \x01(\r\x12\x30\n\x0croomNamePost\x18\x08 \x01(\x0b\x32\x1a.b01.scmap.DevicePointInfo\x12\x0f\n\x07\x63olorId\x18\n \x01(\r\x12\x17\n\x0f\x66loor_direction\x18\x0b \x01(\r\x12\x12\n\nglobal_seq\x18\x0c \x01(\r\"\xc7\x01\n\x08RobotMap\x12\x0f\n\x07mapType\x18\x01 \x01(\r\x12)\n\nmapExtInfo\x18\x02 \x01(\x0b\x32\x15.b01.scmap.MapExtInfo\x12\'\n\x07mapHead\x18\x03 \x01(\x0b\x32\x16.b01.scmap.MapHeadInfo\x12\'\n\x07mapData\x18\x04 \x01(\x0b\x32\x16.b01.scmap.MapDataInfo\x12-\n\x0croomDataInfo\x18\x0c \x03(\x0b\x32\x17.b01.scmap.RoomDataInfo') + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, 'roborock.map.proto.b01_scmap_pb2', _globals) +if not _descriptor._USE_C_DESCRIPTORS: + DESCRIPTOR._loaded_options = None + _globals['_DEVICEPOINTINFO']._serialized_start=49 + _globals['_DEVICEPOINTINFO']._serialized_end=88 + _globals['_MAPBOUNDARYINFO']._serialized_start=90 + _globals['_MAPBOUNDARYINFO']._serialized_end=183 + _globals['_MAPEXTINFO']._serialized_start=186 + _globals['_MAPEXTINFO']._serialized_end=403 + _globals['_MAPHEADINFO']._serialized_start=406 + _globals['_MAPHEADINFO']._serialized_end=544 + _globals['_MAPDATAINFO']._serialized_start=546 + _globals['_MAPDATAINFO']._serialized_end=576 + _globals['_ROOMDATAINFO']._serialized_start=579 + _globals['_ROOMDATAINFO']._serialized_end=842 + _globals['_ROBOTMAP']._serialized_start=845 + _globals['_ROBOTMAP']._serialized_end=1044 +# @@protoc_insertion_point(module_scope) diff --git a/tests/devices/traits/b01/q7/conftest.py b/tests/devices/traits/b01/q7/conftest.py index 5dc476f6..4f55ba0f 100644 --- a/tests/devices/traits/b01/q7/conftest.py +++ b/tests/devices/traits/b01/q7/conftest.py @@ -5,6 +5,7 @@ import pytest +from roborock.data import HomeDataDevice, HomeDataProduct, RoborockCategory from roborock.devices.traits.b01.q7 import Q7PropertiesApi from tests.fixtures.channel_fixtures import FakeChannel @@ -16,9 +17,30 @@ def fake_channel_fixture() -> FakeChannel: return FakeChannel() +@pytest.fixture(name="product") +def product_fixture() -> HomeDataProduct: + return HomeDataProduct( + id="product-id-q7", + name="Roborock Q7", + model="roborock.vacuum.sc05", + category=RoborockCategory.VACUUM, + ) + + +@pytest.fixture(name="device") +def device_fixture() -> HomeDataDevice: + return HomeDataDevice( + duid="abc123", + name="Q7", + local_key="key123key123key1", + product_id="product-id-q7", + sn="testsn012345", + ) + + @pytest.fixture(name="q7_api") -def q7_api_fixture(fake_channel: FakeChannel) -> Q7PropertiesApi: - return Q7PropertiesApi(fake_channel) # type: ignore[arg-type] +def q7_api_fixture(fake_channel: FakeChannel, device: HomeDataDevice, product: HomeDataProduct) -> Q7PropertiesApi: + return Q7PropertiesApi(fake_channel, device=device, product=product) # type: ignore[arg-type] @pytest.fixture(name="expected_msg_id", autouse=True) @@ -28,6 +50,7 @@ def next_message_id_fixture() -> Generator[int, None, None]: We pick an arbitrary number, but just need it to ensure we can craft a fake response with the message id matched to the outgoing RPC. """ + expected_msg_id = math.floor(time.time()) # Patch get_next_int to return our expected msg_id so the channel waits for it diff --git a/tests/devices/traits/b01/q7/test_map_content.py b/tests/devices/traits/b01/q7/test_map_content.py new file mode 100644 index 00000000..c0b62a4f --- /dev/null +++ b/tests/devices/traits/b01/q7/test_map_content.py @@ -0,0 +1,79 @@ +from typing import cast +from unittest.mock import patch + +import pytest +from vacuum_map_parser_base.map_data import MapData + +from roborock.devices.traits.b01.q7 import Q7PropertiesApi +from roborock.devices.transport.mqtt_channel import MqttChannel +from roborock.exceptions import RoborockException +from roborock.roborock_message import RoborockMessage, RoborockMessageProtocol +from tests.fixtures.channel_fixtures import FakeChannel + +from . import B01MessageBuilder + + +async def test_q7_map_content_refresh_populates_cached_values( + q7_api: Q7PropertiesApi, + fake_channel: FakeChannel, + message_builder: B01MessageBuilder, +): + fake_channel.response_queue.append(message_builder.build({"map_list": [{"id": 1772093512, "cur": True}]})) + fake_channel.response_queue.append( + RoborockMessage( + protocol=RoborockMessageProtocol.MAP_RESPONSE, + payload=b"raw-map-payload", + version=b"B01", + seq=message_builder.seq + 1, + ) + ) + + dummy_map_data = MapData() + with patch( + "roborock.devices.traits.b01.q7.map_content.B01MapParser.parse", + return_value=type("X", (), {"image_content": b"pngbytes", "map_data": dummy_map_data})(), + ) as parse: + await q7_api.map_content.refresh() + + assert q7_api.map_content.image_content == b"pngbytes" + assert q7_api.map_content.map_data is dummy_map_data + assert q7_api.map_content.raw_api_response == b"raw-map-payload" + + parse.assert_called_once() + + +def test_q7_map_content_parse_errors_cleanly(q7_api: Q7PropertiesApi): + with patch("roborock.devices.traits.b01.q7.map_content.B01MapParser.parse", side_effect=ValueError("boom")): + with pytest.raises(RoborockException, match="Failed to parse B01 map data"): + q7_api.map_content.parse_map_content(b"raw") + + +def test_q7_map_content_preserves_specific_roborock_errors(q7_api: Q7PropertiesApi): + with patch( + "roborock.devices.traits.b01.q7.map_content.B01MapParser.parse", + side_effect=RoborockException("Specific decoder failure"), + ): + with pytest.raises(RoborockException, match="Specific decoder failure"): + q7_api.map_content.parse_map_content(b"raw") + + +def test_q7_map_content_requires_metadata_at_init(fake_channel: FakeChannel): + from roborock.data import HomeDataDevice, HomeDataProduct, RoborockCategory + + with pytest.raises(ValueError, match="requires device serial number and product model metadata"): + Q7PropertiesApi( + cast(MqttChannel, fake_channel), + device=HomeDataDevice( + duid="abc123", + name="Q7", + local_key="key123key123key1", + product_id="product-id-q7", + sn=None, + ), + product=HomeDataProduct( + id="product-id-q7", + name="Roborock Q7", + model="roborock.vacuum.sc05", + category=RoborockCategory.VACUUM, + ), + ) diff --git a/tests/map/test_b01_map_parser.py b/tests/map/test_b01_map_parser.py new file mode 100644 index 00000000..6c91dba0 --- /dev/null +++ b/tests/map/test_b01_map_parser.py @@ -0,0 +1,175 @@ +import base64 +import gzip +import hashlib +import io +import struct +import zlib +from pathlib import Path + +import pytest +from Crypto.Cipher import AES +from Crypto.Util.Padding import pad +from PIL import Image + +from roborock.exceptions import RoborockException +from roborock.map.b01_map_parser import B01MapParser, _parse_scmap_payload + +FIXTURE = Path(__file__).resolve().parent / "testdata" / "raw-mqtt-map301.bin.inflated.bin.gz" + + +def _derive_map_key(serial: str, model: str) -> bytes: + model_suffix = model.split(".")[-1] + model_key = (model_suffix + "0" * 16)[:16].encode() + material = f"{serial}+{model_suffix}+{serial}".encode() + encrypted = AES.new(model_key, AES.MODE_ECB).encrypt(pad(material, AES.block_size)) + md5 = hashlib.md5(base64.b64encode(encrypted), usedforsecurity=False).hexdigest() + return md5[8:24].encode() + + +def _encode_varint(value: int) -> bytes: + encoded = bytearray() + while True: + to_write = value & 0x7F + value >>= 7 + if value: + encoded.append(to_write | 0x80) + else: + encoded.append(to_write) + return bytes(encoded) + + +def _field_varint(field_no: int, value: int) -> bytes: + return _encode_varint((field_no << 3) | 0) + _encode_varint(value) + + +def _field_len(field_no: int, value: bytes) -> bytes: + return _encode_varint((field_no << 3) | 2) + _encode_varint(len(value)) + value + + +def _field_float32(field_no: int, value: float) -> bytes: + return _encode_varint((field_no << 3) | 5) + struct.pack(" None: + serial = "testsn012345" + model = "roborock.vacuum.sc05" + inflated = gzip.decompress(FIXTURE.read_bytes()) + + compressed = zlib.compress(inflated) + map_key = _derive_map_key(serial, model) + encrypted = AES.new(map_key, AES.MODE_ECB).encrypt(pad(compressed.hex().encode(), AES.block_size)) + payload = base64.b64encode(encrypted) + + parser = B01MapParser() + parsed = parser.parse(payload, serial=serial, model=model) + + assert parsed.image_content is not None + assert parsed.image_content.startswith(b"\x89PNG\r\n\x1a\n") + assert parsed.map_data is not None + + # The fixture includes 10 rooms with names room1..room10. + assert parsed.map_data.additional_parameters["room_names"] == { + 10: "room1", + 11: "room2", + 12: "room3", + 13: "room4", + 14: "room5", + 15: "room6", + 16: "room7", + 17: "room8", + 18: "room9", + 19: "room10", + } + + # Image should be scaled by default. + img = Image.open(io.BytesIO(parsed.image_content)) + assert img.size == (340 * 4, 300 * 4) + + +def test_b01_scmap_parser_maps_observed_schema_fields() -> None: + room_name_post = _field_float32(1, 11.25) + _field_float32(2, 22.5) + room_one = b"".join( + [ + _field_varint(1, 42), + _field_len(2, b"Kitchen"), + _field_varint(5, 1), + _field_len(8, room_name_post), + _field_varint(10, 7), + _field_varint(12, 9), + ] + ) + room_two = b"".join([_field_varint(1, 99), _field_varint(5, 0)]) + + boundary_info = b"".join( + [ + _field_len(1, b"md5"), + _field_varint(2, 10), + _field_varint(3, 20), + _field_varint(4, 30), + _field_varint(5, 40), + ] + ) + map_ext_info = b"".join( + [ + _field_varint(1, 100), + _field_varint(2, 200), + _field_varint(3, 1), + _field_varint(8, 3), + _field_len(7, boundary_info), + ] + ) + map_head = b"".join( + [ + _field_varint(1, 7), + _field_varint(2, 2), + _field_varint(3, 2), + _field_float32(4, 1.5), + _field_float32(5, 2.5), + _field_float32(6, 3.5), + _field_float32(7, 4.5), + _field_float32(8, 0.05), + ] + ) + map_data = _field_len(1, zlib.compress(bytes([0, 127, 128, 128]))) + payload = b"".join( + [ + _field_varint(1, 1), + _field_len(2, map_ext_info), + _field_len(3, map_head), + _field_len(4, map_data), + _field_len(12, room_one), + _field_len(12, room_two), + ] + ) + + parsed = _parse_scmap_payload(payload) + + assert parsed.mapType == 1 + assert parsed.HasField("mapExtInfo") + assert parsed.mapExtInfo.taskBeginDate == 100 + assert parsed.mapExtInfo.mapUploadDate == 200 + assert parsed.mapExtInfo.HasField("boudaryInfo") + assert parsed.mapExtInfo.boudaryInfo.vMaxY == 40 + assert parsed.HasField("mapHead") + assert parsed.mapHead.mapHeadId == 7 + assert parsed.mapHead.sizeX == 2 + assert parsed.mapHead.sizeY == 2 + assert parsed.mapHead.resolution == pytest.approx(0.05) + assert parsed.HasField("mapData") + assert parsed.mapData.HasField("mapData") + assert zlib.decompress(parsed.mapData.mapData) == bytes([0, 127, 128, 128]) + assert parsed.roomDataInfo[0].roomId == 42 + assert parsed.roomDataInfo[0].roomName == "Kitchen" + assert parsed.roomDataInfo[0].HasField("roomNamePost") + assert parsed.roomDataInfo[0].roomNamePost.x == pytest.approx(11.25) + assert parsed.roomDataInfo[0].roomNamePost.y == pytest.approx(22.5) + assert parsed.roomDataInfo[0].colorId == 7 + assert parsed.roomDataInfo[0].global_seq == 9 + assert parsed.roomDataInfo[1].roomId == 99 + assert not parsed.roomDataInfo[1].HasField("roomName") + + +def test_b01_map_parser_rejects_invalid_payload() -> None: + parser = B01MapParser() + with pytest.raises(RoborockException, match="Failed to decode B01 map payload"): + parser.parse(b"not a map", serial="testsn012345", model="roborock.vacuum.sc05") diff --git a/tests/map/testdata/raw-mqtt-map301.bin.inflated.bin.gz b/tests/map/testdata/raw-mqtt-map301.bin.inflated.bin.gz new file mode 100644 index 00000000..63b67ddb Binary files /dev/null and b/tests/map/testdata/raw-mqtt-map301.bin.inflated.bin.gz differ