Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 29 additions & 9 deletions SolixBLE/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -493,7 +493,9 @@ def _encrypt_payload(self, payload: bytes) -> bytes:
)
return cipher.encrypt(padded_data)

async def _process_telemetry_packet(self, payload: bytes, cmd: bytes = None) -> None:
async def _process_telemetry_packet(
self, payload: bytes, cmd: bytes = None
) -> None:
"""Process a telemetry packet from the device.

This performs the default processing of telemetry packets in which
Expand Down Expand Up @@ -533,14 +535,12 @@ async def _process_telemetry_packet(self, payload: bytes, cmd: bytes = None) ->
)
del self._fragment_buffers[cmd_key]
del self._fragment_totals[cmd_key]
_LOGGER.debug(
f"Reassembled payload: {len(payload)} bytes"
)
_LOGGER.debug(f"Reassembled payload: {len(payload)} bytes")

else:
# Strip fragment info
payload = payload[1:]

decrypted_payload = self._decrypt_payload(payload)
_LOGGER.debug(f"Decrypted payload: {decrypted_payload.hex()}")
parameters = self._parse_payload(decrypted_payload)
Expand Down Expand Up @@ -615,26 +615,45 @@ async def _process_notification(
# Match against common message types
match pattern.hex():

# Encryption negotiation
# Negotiation messages
case "030001":
_LOGGER.debug("Received encryption negotiation message!")
return await self._process_negotiation(cmd, payload)

# Encrypted messages
# Session messages
case "03010f" | "030111":

match cmd.hex():

# Telemetry messages
# Non-encrypted telemetry messages
case "0300":
_LOGGER.debug("Received non-encrypted telemetry message!")
parameters = self._parse_payload(payload)
return await self._process_telemetry(parameters)

# Encrypted telemetry messages
case "c402" | "4300" | "c405":
_LOGGER.debug("Received telemetry message!")
_LOGGER.debug("Received encrypted telemetry message!")
return await self._process_telemetry_packet(payload, cmd)

# Unknown messages
case _:
_LOGGER.debug(f"Received unknown message of type: {cmd.hex()}")

# Try to parse the message as if it were not encrypted
try:
parameters = self._parse_payload(payload)
_LOGGER.debug(
f"Non-encrypted parameters: {self._parameters_to_str(parameters, types=True)}"
)
return
except Exception:
_LOGGER.exception(
"Failed to parse unknown payload using no encryption approach"
)

# Else lets try to decrypt it
try:
# If the payload is one byte too short and we are
# using the default AES (CBC) then try putting the
# last byte of the cmd in front of it
Expand All @@ -656,6 +675,7 @@ async def _process_notification(
_LOGGER.debug(
f"Parameters: {self._parameters_to_str(parameters, types=True)}"
)
return
except Exception:
_LOGGER.exception(
"Exception decrypting unknown message type"
Expand Down
4 changes: 3 additions & 1 deletion SolixBLE/prime_device.py
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,9 @@ async def _process_negotiation(self, cmd: bytes, payload: bytes) -> None:
# Packet processing #
#####################

async def _process_telemetry_packet(self, payload: bytes, cmd: bytes = None) -> None:
async def _process_telemetry_packet(
self, payload: bytes, cmd: bytes = None
) -> None:
"""
Process a telemetry packet from an Anker Prime device.

Expand Down
87 changes: 87 additions & 0 deletions tests/test_devices.py
Original file line number Diff line number Diff line change
Expand Up @@ -989,6 +989,18 @@ def test_payload_decryption(
"""{'a1': '31', 'a2': '02e805', 'a3': '020000', 'a4': '0100', 'a5': '0401a824fe0b3f0b', 'a6': '0400000000000000', 'a7': '0400000000000000', 'a8': '0103', 'a9': '0150', 'aa': '0100', 'ab': '0400000f0f0f000000', 'ac': '0401002c0100002c0100000203', 'ad': '0401002c0100002c0100000300', 'ae': '0401002c0100002c0100000300', 'af': '0100', 'b0': '0100', 'b1': '0101', 'b2': '0101', 'b3': '0101', 'b4': '04e8040000fafffbfffafffbff', 'b5': '04ffffffffffffffffffffffff', 'e0': '0408000000', 'e1': '0480034b53000000000000', 'fe': '0300000000'}""",
id="prime_telemetry_packet",
),
# Test an Anker Prime device (single payload device) with a single telemetry packet
# from the logs of someone elses unit which for some reason transmits telemetry
# unencrypted
pytest.param(
PrimeCharger160w,
[
"ff09ca000301110300a10131a203024606a303020000a4020100a5080401d8459906bb0ba6080401e81300000000a7080400000000000000a8020103a9020150aa020100ab090400000000000b0b0bac0d0401002c0100002c0100000200ad0d0401002c0100002c0100000201ae0d0401002c0100002c0100000300af020100b0020100b1020100b2020101b30201ffb40d0400000000ac051573fafffbffb50d04ffffffffffffffffffffffffe0050448000000e10b0400000000000000000000fe0503000000006b"
],
"5609bc39f79166da75139feb7c335fb7524b3bf0d730db96bf6ebf450d3e165b",
"""{'a1': '31', 'a2': '024606', 'a3': '020000', 'a4': '0100', 'a5': '0401d8459906bb0b', 'a6': '0401e81300000000', 'a7': '0400000000000000', 'a8': '0103', 'a9': '0150', 'aa': '0100', 'ab': '0400000000000b0b0b', 'ac': '0401002c0100002c0100000200', 'ad': '0401002c0100002c0100000201', 'ae': '0401002c0100002c0100000300', 'af': '0100', 'b0': '0100', 'b1': '0100', 'b2': '0101', 'b3': '01ff', 'b4': '0400000000ac051573fafffbff', 'b5': '04ffffffffffffffffffffffff', 'e0': '0448000000', 'e1': '0400000000000000000000', 'fe': '0300000000'}""",
id="prime_telemetry_packet_plain_text",
),
],
)
async def test_telemetry_packet_processing(
Expand Down Expand Up @@ -1045,6 +1057,81 @@ async def test_telemetry_packet_processing(
assert parameters == device_parameters, "Parameters do not match expected!"


@pytest.mark.asyncio
@pytest.mark.parametrize(
"device_class, packets, secret, expected_logs",
[
# Telemetry packet from logs of someone elses Prime 160w charger.
# Interestingly this packet is not encrypted at all
pytest.param(
PrimeCharger160w,
[
"ff09ca000301110300a10131a203024606a303020000a4020100a5080401e042b105b209a6080401e81300000000a7080400000000000000a8020103a9020150aa020100ab090400000000000b0b0bac0d0401002c0100002c0100000200ad0d0401002c0100002c0100000201ae0d0401002c0100002c0100000300af020100b0020100b1020100b2020101b30201ffb40d0400000000ac051573fafffbffb50d04ffffffffffffffffffffffffe0050448000000e10b0400000000000000000000fe05030000000074"
],
"5609bc39f79166da75139feb7c335fb7524b3bf0d730db96bf6ebf450d3e165b",
[
"Received non-encrypted telemetry message",
"Telemetry parameters: {'a1': '31', 'a2': '024606'",
],
id="prime_160w_other",
),
],
)
async def test_generic_packet_processing(
caplog,
fast_sleep,
fast_timeouts,
device_class: type[SolixBLEDevice],
packets: list[str],
secret: str,
expected_logs: list[str],
):
"""
Test the _process_notification function when processing arbitrary
packets and check for expected log entries.

:param device_class: Class of device under test.
:param packets: List of packets to send to device.
:param secret: Shared secret used as AES key and IV.
:param expected_logs: List of expected entries in the debug log.
"""

device = device_class(MOCK_BLE_DEVICE)

negotiation_responses = (
NEGOTIATION_RESPONSES_PRIME
if issubclass(device_class, PrimeDevice)
else NEGOTIATION_RESPONSES_SOLIX
)

async with MockDevice() as mock_bluetooth:
with caplog.at_level(logging.DEBUG):

# We first expect a negotiation
for expected, response in negotiation_responses.items():
mock_bluetooth.expect_ordered(
bytes.fromhex(expected),
[bytes.fromhex(x) for x in response],
)

# We expect the negotiations to succeed
assert await device.connect(), "Expected connect to return True"
await asyncio.sleep(0.5)
assert device.connected, "Expected connected to be True"
assert device.negotiated, "Expected connected to be True"
mock_bluetooth.check_assertions()

device._shared_secret = bytes.fromhex(secret)

for packet in packets:
await mock_bluetooth.send_data([bytes.fromhex(packet)])

for expected_log_entry in expected_logs:
assert (
expected_log_entry in caplog.text
), f"Expected to find '{expected_log_entry}' in logs but it was not found!"


@pytest.mark.asyncio
@pytest.mark.parametrize(
"device_class,payload,mapping,errors",
Expand Down
Loading