Skip to content

Commit f4476c5

Browse files
committed
🦎 docs: clarify B01 protobuf choice
1 parent c4f12d2 commit f4476c5

File tree

2 files changed

+148
-0
lines changed

2 files changed

+148
-0
lines changed

roborock/map/b01_map_parser.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,12 @@
66
- PKCS7 padded
77
- ASCII hex for a zlib-compressed SCMap payload
88
9+
The inner SCMap blob appears to use protobuf wire encoding, but we do not have
10+
an upstream `.proto` schema for it. We intentionally keep a tiny schema-free
11+
wire parser here instead of introducing generated protobuf classes from a
12+
reverse-engineered schema, because that would add maintenance/guesswork without
13+
meaningfully reducing complexity for the small set of fields we actually use.
14+
915
This module keeps the decode path narrow and explicit to match the observed
1016
payload shape as closely as possible.
1117
"""

tests/map/debug_b01_scmap.py

Lines changed: 142 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,142 @@
1+
"""Developer helper for inspecting B01/Q7 SCMap payloads.
2+
3+
This script is intentionally kept outside the runtime package so it stays
4+
non-obtrusive. It is useful when reverse-engineering new payload samples or
5+
validating assumptions about the current parser.
6+
7+
Why not generated protobuf classes here?
8+
- The inflated SCMap payload looks like protobuf wire format.
9+
- We do not have an upstream `.proto` schema.
10+
- For runtime code, reverse-engineering and committing guessed schema files
11+
would imply more certainty than we actually have.
12+
13+
So the library keeps a tiny schema-free parser for the fields it needs, while
14+
this script provides a convenient place to inspect unknown payloads during
15+
future debugging.
16+
"""
17+
18+
from __future__ import annotations
19+
20+
import argparse
21+
import gzip
22+
from pathlib import Path
23+
24+
from roborock.map.b01_map_parser import (
25+
_decode_b01_map_payload,
26+
_parse_scmap_payload,
27+
_read_len_delimited,
28+
_read_varint,
29+
)
30+
31+
32+
def _looks_like_message(blob: bytes) -> bool:
33+
"""Return True if the blob plausibly looks like a protobuf-style message."""
34+
if not blob or len(blob) > 4096:
35+
return False
36+
37+
idx = 0
38+
seen = 0
39+
try:
40+
while idx < len(blob):
41+
key, idx = _read_varint(blob, idx)
42+
wire = key & 0x07
43+
seen += 1
44+
if wire == 0:
45+
_, idx = _read_varint(blob, idx)
46+
elif wire == 1:
47+
idx += 8
48+
elif wire == 2:
49+
_, idx = _read_len_delimited(blob, idx)
50+
elif wire == 5:
51+
idx += 4
52+
else:
53+
return False
54+
return seen > 0 and idx == len(blob)
55+
except Exception:
56+
return False
57+
58+
59+
def _preview(blob: bytes, limit: int = 24) -> str:
60+
text = blob[:limit].hex()
61+
if len(blob) > limit:
62+
return f"{text}... ({len(blob)} bytes)"
63+
return f"{text} ({len(blob)} bytes)"
64+
65+
66+
def _dump_message(blob: bytes, *, indent: str = "", max_depth: int = 2, depth: int = 0) -> None:
67+
idx = 0
68+
while idx < len(blob):
69+
start = idx
70+
key, idx = _read_varint(blob, idx)
71+
field_no = key >> 3
72+
wire = key & 0x07
73+
74+
if wire == 0:
75+
value, idx = _read_varint(blob, idx)
76+
print(f"{indent}field {field_no} @ {start}: varint {value}")
77+
elif wire == 1:
78+
value = blob[idx : idx + 8]
79+
idx += 8
80+
print(f"{indent}field {field_no} @ {start}: fixed64 {_preview(value, 8)}")
81+
elif wire == 2:
82+
value, idx = _read_len_delimited(blob, idx)
83+
print(f"{indent}field {field_no} @ {start}: len-delimited {_preview(value)}")
84+
if depth < max_depth and _looks_like_message(value):
85+
_dump_message(value, indent=indent + " ", max_depth=max_depth, depth=depth + 1)
86+
elif wire == 5:
87+
value = blob[idx : idx + 4]
88+
idx += 4
89+
print(f"{indent}field {field_no} @ {start}: fixed32 {_preview(value, 4)}")
90+
else:
91+
print(f"{indent}field {field_no} @ {start}: unsupported wire type {wire}")
92+
return
93+
94+
95+
def _load_payload(args: argparse.Namespace) -> bytes:
96+
if args.inflated_gzip is not None:
97+
return gzip.decompress(args.inflated_gzip.read_bytes())
98+
if args.inflated_bin is not None:
99+
return args.inflated_bin.read_bytes()
100+
if args.raw_map_response is not None:
101+
if not args.serial or not args.model:
102+
raise SystemExit("--raw-map-response requires --serial and --model")
103+
return _decode_b01_map_payload(
104+
args.raw_map_response.read_bytes(),
105+
serial=args.serial,
106+
model=args.model,
107+
)
108+
raise SystemExit("one of --inflated-gzip, --inflated-bin, or --raw-map-response is required")
109+
110+
111+
def _build_parser() -> argparse.ArgumentParser:
112+
parser = argparse.ArgumentParser(description=__doc__)
113+
group = parser.add_mutually_exclusive_group(required=True)
114+
group.add_argument("--inflated-gzip", type=Path, help="Path to gzipped inflated SCMap payload")
115+
group.add_argument("--inflated-bin", type=Path, help="Path to raw inflated SCMap payload")
116+
group.add_argument("--raw-map-response", type=Path, help="Path to raw MAP_RESPONSE payload bytes")
117+
parser.add_argument("--serial", help="Device serial number (required for --raw-map-response)")
118+
parser.add_argument("--model", help="Device model, e.g. roborock.vacuum.sc05 (required for --raw-map-response)")
119+
parser.add_argument(
120+
"--max-depth",
121+
type=int,
122+
default=2,
123+
help="Maximum recursive dump depth for protobuf-like messages",
124+
)
125+
return parser
126+
127+
128+
def main() -> None:
129+
args = _build_parser().parse_args()
130+
payload = _load_payload(args)
131+
132+
size_x, size_y, grid, room_names = _parse_scmap_payload(payload)
133+
print(f"Inflated payload: {len(payload)} bytes")
134+
print(f"Map size: {size_x} x {size_y}")
135+
print(f"Grid bytes: {len(grid)}")
136+
print(f"Room names: {room_names}")
137+
print("\nTop-level field dump:")
138+
_dump_message(payload, max_depth=args.max_depth)
139+
140+
141+
if __name__ == "__main__":
142+
main()

0 commit comments

Comments
 (0)