From c1627096f400ee88d7952c5ee69f8ec6185d8296 Mon Sep 17 00:00:00 2001 From: Harvey Lelliott <42912136+flip-dots@users.noreply.github.com> Date: Wed, 22 Apr 2026 16:43:51 +0100 Subject: [PATCH] Add support for reading unencrypted telemetry on Anker Prime devices --- SolixBLE/device.py | 38 +++++++++++++----- SolixBLE/prime_device.py | 4 +- tests/test_devices.py | 87 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 119 insertions(+), 10 deletions(-) diff --git a/SolixBLE/device.py b/SolixBLE/device.py index a1b7da1..b3873a1 100644 --- a/SolixBLE/device.py +++ b/SolixBLE/device.py @@ -493,7 +493,9 @@ def _encrypt_payload(self, payload: bytes) -> bytes: ) return cipher.encrypt(padded_data) - async def _process_telemetry_packet(self, payload: bytes, cmd: bytes = None) -> None: + async def _process_telemetry_packet( + self, payload: bytes, cmd: bytes = None + ) -> None: """Process a telemetry packet from the device. This performs the default processing of telemetry packets in which @@ -533,14 +535,12 @@ async def _process_telemetry_packet(self, payload: bytes, cmd: bytes = None) -> ) del self._fragment_buffers[cmd_key] del self._fragment_totals[cmd_key] - _LOGGER.debug( - f"Reassembled payload: {len(payload)} bytes" - ) + _LOGGER.debug(f"Reassembled payload: {len(payload)} bytes") else: # Strip fragment info payload = payload[1:] - + decrypted_payload = self._decrypt_payload(payload) _LOGGER.debug(f"Decrypted payload: {decrypted_payload.hex()}") parameters = self._parse_payload(decrypted_payload) @@ -615,26 +615,45 @@ async def _process_notification( # Match against common message types match pattern.hex(): - # Encryption negotiation + # Negotiation messages case "030001": _LOGGER.debug("Received encryption negotiation message!") return await self._process_negotiation(cmd, payload) - # Encrypted messages + # Session messages case "03010f" | "030111": match cmd.hex(): - # Telemetry messages + # Non-encrypted telemetry messages + case "0300": + _LOGGER.debug("Received non-encrypted telemetry message!") + parameters = self._parse_payload(payload) + return await self._process_telemetry(parameters) + + # Encrypted telemetry messages case "c402" | "4300" | "c405": - _LOGGER.debug("Received telemetry message!") + _LOGGER.debug("Received encrypted telemetry message!") return await self._process_telemetry_packet(payload, cmd) # Unknown messages case _: _LOGGER.debug(f"Received unknown message of type: {cmd.hex()}") + + # Try to parse the message as if it were not encrypted try: + parameters = self._parse_payload(payload) + _LOGGER.debug( + f"Non-encrypted parameters: {self._parameters_to_str(parameters, types=True)}" + ) + return + except Exception: + _LOGGER.exception( + "Failed to parse unknown payload using no encryption approach" + ) + # Else lets try to decrypt it + try: # If the payload is one byte too short and we are # using the default AES (CBC) then try putting the # last byte of the cmd in front of it @@ -656,6 +675,7 @@ async def _process_notification( _LOGGER.debug( f"Parameters: {self._parameters_to_str(parameters, types=True)}" ) + return except Exception: _LOGGER.exception( "Exception decrypting unknown message type" diff --git a/SolixBLE/prime_device.py b/SolixBLE/prime_device.py index 64a6959..7064ebe 100644 --- a/SolixBLE/prime_device.py +++ b/SolixBLE/prime_device.py @@ -487,7 +487,9 @@ async def _process_negotiation(self, cmd: bytes, payload: bytes) -> None: # Packet processing # ##################### - async def _process_telemetry_packet(self, payload: bytes, cmd: bytes = None) -> None: + async def _process_telemetry_packet( + self, payload: bytes, cmd: bytes = None + ) -> None: """ Process a telemetry packet from an Anker Prime device. diff --git a/tests/test_devices.py b/tests/test_devices.py index 084a35f..0aaf855 100644 --- a/tests/test_devices.py +++ b/tests/test_devices.py @@ -989,6 +989,18 @@ def test_payload_decryption( """{'a1': '31', 'a2': '02e805', 'a3': '020000', 'a4': '0100', 'a5': '0401a824fe0b3f0b', 'a6': '0400000000000000', 'a7': '0400000000000000', 'a8': '0103', 'a9': '0150', 'aa': '0100', 'ab': '0400000f0f0f000000', 'ac': '0401002c0100002c0100000203', 'ad': '0401002c0100002c0100000300', 'ae': '0401002c0100002c0100000300', 'af': '0100', 'b0': '0100', 'b1': '0101', 'b2': '0101', 'b3': '0101', 'b4': '04e8040000fafffbfffafffbff', 'b5': '04ffffffffffffffffffffffff', 'e0': '0408000000', 'e1': '0480034b53000000000000', 'fe': '0300000000'}""", id="prime_telemetry_packet", ), + # Test an Anker Prime device (single payload device) with a single telemetry packet + # from the logs of someone elses unit which for some reason transmits telemetry + # unencrypted + pytest.param( + PrimeCharger160w, + [ + "ff09ca000301110300a10131a203024606a303020000a4020100a5080401d8459906bb0ba6080401e81300000000a7080400000000000000a8020103a9020150aa020100ab090400000000000b0b0bac0d0401002c0100002c0100000200ad0d0401002c0100002c0100000201ae0d0401002c0100002c0100000300af020100b0020100b1020100b2020101b30201ffb40d0400000000ac051573fafffbffb50d04ffffffffffffffffffffffffe0050448000000e10b0400000000000000000000fe0503000000006b" + ], + "5609bc39f79166da75139feb7c335fb7524b3bf0d730db96bf6ebf450d3e165b", + """{'a1': '31', 'a2': '024606', 'a3': '020000', 'a4': '0100', 'a5': '0401d8459906bb0b', 'a6': '0401e81300000000', 'a7': '0400000000000000', 'a8': '0103', 'a9': '0150', 'aa': '0100', 'ab': '0400000000000b0b0b', 'ac': '0401002c0100002c0100000200', 'ad': '0401002c0100002c0100000201', 'ae': '0401002c0100002c0100000300', 'af': '0100', 'b0': '0100', 'b1': '0100', 'b2': '0101', 'b3': '01ff', 'b4': '0400000000ac051573fafffbff', 'b5': '04ffffffffffffffffffffffff', 'e0': '0448000000', 'e1': '0400000000000000000000', 'fe': '0300000000'}""", + id="prime_telemetry_packet_plain_text", + ), ], ) async def test_telemetry_packet_processing( @@ -1045,6 +1057,81 @@ async def test_telemetry_packet_processing( assert parameters == device_parameters, "Parameters do not match expected!" +@pytest.mark.asyncio +@pytest.mark.parametrize( + "device_class, packets, secret, expected_logs", + [ + # Telemetry packet from logs of someone elses Prime 160w charger. + # Interestingly this packet is not encrypted at all + pytest.param( + PrimeCharger160w, + [ + "ff09ca000301110300a10131a203024606a303020000a4020100a5080401e042b105b209a6080401e81300000000a7080400000000000000a8020103a9020150aa020100ab090400000000000b0b0bac0d0401002c0100002c0100000200ad0d0401002c0100002c0100000201ae0d0401002c0100002c0100000300af020100b0020100b1020100b2020101b30201ffb40d0400000000ac051573fafffbffb50d04ffffffffffffffffffffffffe0050448000000e10b0400000000000000000000fe05030000000074" + ], + "5609bc39f79166da75139feb7c335fb7524b3bf0d730db96bf6ebf450d3e165b", + [ + "Received non-encrypted telemetry message", + "Telemetry parameters: {'a1': '31', 'a2': '024606'", + ], + id="prime_160w_other", + ), + ], +) +async def test_generic_packet_processing( + caplog, + fast_sleep, + fast_timeouts, + device_class: type[SolixBLEDevice], + packets: list[str], + secret: str, + expected_logs: list[str], +): + """ + Test the _process_notification function when processing arbitrary + packets and check for expected log entries. + + :param device_class: Class of device under test. + :param packets: List of packets to send to device. + :param secret: Shared secret used as AES key and IV. + :param expected_logs: List of expected entries in the debug log. + """ + + device = device_class(MOCK_BLE_DEVICE) + + negotiation_responses = ( + NEGOTIATION_RESPONSES_PRIME + if issubclass(device_class, PrimeDevice) + else NEGOTIATION_RESPONSES_SOLIX + ) + + async with MockDevice() as mock_bluetooth: + with caplog.at_level(logging.DEBUG): + + # We first expect a negotiation + for expected, response in negotiation_responses.items(): + mock_bluetooth.expect_ordered( + bytes.fromhex(expected), + [bytes.fromhex(x) for x in response], + ) + + # We expect the negotiations to succeed + assert await device.connect(), "Expected connect to return True" + await asyncio.sleep(0.5) + assert device.connected, "Expected connected to be True" + assert device.negotiated, "Expected connected to be True" + mock_bluetooth.check_assertions() + + device._shared_secret = bytes.fromhex(secret) + + for packet in packets: + await mock_bluetooth.send_data([bytes.fromhex(packet)]) + + for expected_log_entry in expected_logs: + assert ( + expected_log_entry in caplog.text + ), f"Expected to find '{expected_log_entry}' in logs but it was not found!" + + @pytest.mark.asyncio @pytest.mark.parametrize( "device_class,payload,mapping,errors",