diff --git a/.vscode/settings.json b/.vscode/settings.json index 4b06c16..3b9671c 100644 --- a/.vscode/settings.json +++ b/.vscode/settings.json @@ -8,6 +8,7 @@ }, "cSpell.enabled": true, "cSpell.words": [ + "Anker", "Solix", "stringifying" ], diff --git a/README.md b/README.md index 4933b61..9867dd3 100644 --- a/README.md +++ b/README.md @@ -51,6 +51,7 @@ See the [support table](https://solixble.readthedocs.io/en/latest) in the docume - F3800 - Solarbank 2 - Solarbank 3 +- Prime Charger 160w - Prime Charger 250w - Potentially more! diff --git a/SolixBLE/__init__.py b/SolixBLE/__init__.py index 230e37e..fd7e1b2 100644 --- a/SolixBLE/__init__.py +++ b/SolixBLE/__init__.py @@ -14,10 +14,12 @@ F2000, F3800, Generic, + PrimeCharger160w, PrimeCharger250w, Solarbank2, Solarbank3, ) +from .prime_device import PrimeDevice from .states import ( ChargingStatus, ChargingStatusF3800, @@ -31,6 +33,7 @@ __all__ = [ "SolixBLEDevice", + "PrimeDevice", "C300", "C300DC", "C800", @@ -40,6 +43,7 @@ "F3800", "Solarbank2", "Solarbank3", + "PrimeCharger160w", "PrimeCharger250w", "Generic", "ChargingStatus", diff --git a/SolixBLE/const.py b/SolixBLE/const.py index 6bfd647..f85cffd 100644 --- a/SolixBLE/const.py +++ b/SolixBLE/const.py @@ -10,7 +10,7 @@ #: GATT Service UUID for sending commands / negotiating. UUID_COMMAND = "8c850002-0302-41c5-b46e-cf057c562025" -#: GATT Service UUID for identifying Solix devices (Tested on C300X and C1000). +#: GATT Service UUID for identifying Solix/Prime devices (Tested on C300X, C1000, and Prime 160w Charger). UUID_IDENTIFIER = "0000ff09-0000-1000-8000-00805f9b34fb" #: Time to wait before re-connecting on an unexpected disconnect. diff --git a/SolixBLE/device.py b/SolixBLE/device.py index a563ad9..c5cbb93 100644 --- a/SolixBLE/device.py +++ b/SolixBLE/device.py @@ -73,8 +73,7 @@ def __init__(self, ble_device: BLEDevice) -> None: self._auto_reconnect_task: asyncio.Task | None = None self._disconnect_event: asyncio.Event = asyncio.Event() self._connection_attempts: int = 0 - self._shared_key: bytes | None = None - self._iv: bytes | None = None + self._shared_secret: bytes | None = None def add_callback(self, function: Callable[[], None]) -> None: """Register a callback to be run on state updates. @@ -94,6 +93,14 @@ def remove_callback(self, function: Callable[[], None]) -> None: """ self._state_changed_callbacks.remove(function) + async def _initiate_negotiations(self) -> None: + """Send the negotiation initiation command.""" + await self._client.write_gatt_char( + UUID_COMMAND, + bytes.fromhex(NEGOTIATION_COMMAND_0), + response=True, + ) + async def connect(self, max_attempts: int = 3, run_callbacks: bool = True) -> bool: """Connect to device. @@ -108,12 +115,8 @@ async def connect(self, max_attempts: int = 3, run_callbacks: bool = True) -> bo try: # If we have an old client get rid of it - if self._client is not None and self._client.is_connected: - _LOGGER.debug( - f"Disposing of old client '{self._client}' in order to connect to '{self.name}'!" - ) - await self._client.disconnect() - self._client = None + if self._client is not None: + await self._dispose_of_client() # Reset negotiated details but keep any data self._reset_session(reset_data=False) @@ -170,11 +173,7 @@ async def connect(self, max_attempts: int = 3, run_callbacks: bool = True) -> bo _LOGGER.debug( f"Sending negotiation initiation request to '{self.name}'..." ) - await self._client.write_gatt_char( - UUID_COMMAND, - bytes.fromhex(NEGOTIATION_COMMAND_0), - response=True, - ) + await self._initiate_negotiations() # Wait at this long to see if we get any response to # our initial request in stage 0. This weird layout @@ -218,14 +217,14 @@ async def disconnect(self) -> None: if self._auto_reconnect_task is not None: self._auto_reconnect_task.cancel() + # If there is a client disconnect and throw it away + if self._client is not None: + await self._dispose_of_client() + + # Reset session self._connection_attempts = 0 self._reset_session() - # If there is a client disconnect and throw it away - if self._client: - await self._client.disconnect() - self._client = None - @property def connected(self) -> bool: """Connected to device. @@ -247,12 +246,7 @@ def negotiated(self) -> bool: :returns: True/False if session has been negotiated and connected. """ - return ( - self.connected - and self._shared_key is not None - and self._iv is not None - and self._negotiation_timestamp is not None - ) + return self.connected and self._shared_secret is not None @property def available(self) -> bool: @@ -363,48 +357,88 @@ def _split_packet(self, packet: bytes) -> tuple[bytes, bytes, bytes]: return packet_pattern, packet_cmd, packet_payload - def _parse_payload(self, payload: bytearray) -> dict[str, bytes]: - """Parse payload bytes into parameters.""" + def _parse_payload(self, payload: bytearray | bytes) -> dict[str, bytes]: + """ + Parse payload bytes into parameters. + + Payloads contain a list of parameters and these parameters + have a format of: . + + If an error occurs when decoding a parameter it prevents all + further parameters from being parsed and logs an exception, + but the successfully parsed parameters (if any) will be returned. + + :param payload: Payload to parse into parameters. + :returns: Dictionary mapping parameter ids (a1, a2, ...) to data. + """ + + def _verbose_pop(data: bytearray, length: int, name: str) -> bytes: + """ + Pop specified number of bytes from bytearray and log if error. + + :param data: Data to be popped. + :param length: Number of bytes to pop and return. + :param name: Name of value being popped to put in logs if error. + :raises IndexError: If popping fails. + """ + + # Copy of bytes to use in error message if needed + data_copy = bytes(data) + + # Bytes extracted so far + new_bytes = bytes([]) + + try: + # Pop length bytes from data and return + for _ in range(length): + new_bytes = new_bytes + bytes([data.pop(0)]) + return new_bytes + + # Build error message + except IndexError as e: + message = ( + f"Error extracting {name} (len={length}) from '{data_copy.hex()}'" + f" (len={len(data_copy)}) at index {len(new_bytes)}. We extracted:" + f" '{new_bytes.hex()}' but expected {length - len(data_copy)}" + f" more bytes!" + ) + _LOGGER.exception(message) + raise IndexError(message) from e parsed_data: dict[str, bytes] = {} remaining_data = bytearray(payload) - # Packets sometimes start with 00 and we must strip that + # Payloads sometimes start with 00 and we must strip that if remaining_data.startswith(bytes.fromhex("00")): - remaining_data.pop(0) + _LOGGER.debug("Stripped 00 from start of payload") + _verbose_pop(remaining_data, 1, "special 00 header") while len(remaining_data) != 0: try: # Extract param id (e.g a1, a2, ...) - param_id = bytes([remaining_data.pop(0)]).hex() + param_id = _verbose_pop(remaining_data, 1, "param_id").hex() # Sometimes there is just a param_id with no length or values - # and then padding after that. This has been observed during - # the optional stage 6 negotiation stage that only sometimes - # seems to happen with the C300X (~ 1/20 chance). - # - # If we have reached PKCS7 padding then we have - # reached the end of the payload - if len(remaining_data) < 16 and remaining_data == bytearray( - len(remaining_data) * len(remaining_data).to_bytes(1) - ): + if len(remaining_data) == 0: parsed_data[param_id] = bytes() break - param_len = remaining_data.pop(0) - param_data = bytes([remaining_data.pop(0) for _ in range(0, param_len)]) - parsed_data[param_id] = param_data + # Extract encoded length of parameter + param_len = int.from_bytes( + _verbose_pop(remaining_data, 1, f"param_len (id={param_id})") + ) - # If we have reached PKCS7 padding then we have - # reached the end of the payload - if len(remaining_data) < 16 and remaining_data == bytearray( - len(remaining_data) * len(remaining_data).to_bytes(1) - ): - break + # Extract data/body from parameter + param_data = _verbose_pop( + remaining_data, param_len, f"param_data (id={param_id})" + ) + parsed_data[param_id] = param_data except IndexError: _LOGGER.exception( - f"Unexpected end of packet! Data may be missing or invalid! Payload: '{payload.hex()}'" + f"Unexpected end of packet! Data may be missing or invalid!" + f" Extracted so far: '{self._parameters_to_str(parsed_data)}'." + f" Payload: '{payload.hex()}'" ) return parsed_data @@ -444,12 +478,74 @@ def _log_diff(self, old: dict[str, bytes], new: dict[str, bytes]) -> None: def _decrypt_payload(self, payload: bytes) -> bytes: """Decrypt telemetry packet using negotiated shared secret and IV.""" - cipher = AES.new(self._shared_key, AES.MODE_CBC, iv=self._iv) - return cipher.decrypt(payload) + cipher = AES.new( + self._shared_secret[:16], AES.MODE_CBC, iv=self._shared_secret[16:] + ) + decrypted = cipher.decrypt(payload) + unpadder = PKCS7(128).unpadder() + unpadded_data = unpadder.update(decrypted) + return unpadded_data + unpadder.finalize() - async def _process_telemetry( - self, cmd: bytes, parameters: dict[str, bytes] - ) -> None: + def _encrypt_payload(self, payload: bytes) -> bytes: + """Encrypt telemetry packet using negotiated shared secret and IV.""" + + # Pad and encrypt payload + padder = PKCS7(128).padder() + padded_data = padder.update(payload) + padded_data += padder.finalize() + cipher = AES.new( + self._shared_secret[:16], AES.MODE_CBC, iv=self._shared_secret[16:] + ) + return cipher.encrypt(padded_data) + + async def _process_telemetry_packet(self, payload: bytes) -> None: + """Process a telemetry packet from the device. + + This performs the default processing of telemetry packets in which + telemetry payloads are spread across multiple packets. This is + overridden for devices which do not use multi-packet payloads for + telemetry. + """ + + # Anker devices seem to split data across multiple + # packets so we need to wait until we have both + # packets before we can decrypt all of the data + if len(payload) < 50: + self._telemetry_payload_small = payload + + # If we receive a big packet it invalidates the + # last small one since the big one comes before + # the small one + elif len(payload) > 230: + self._telemetry_payload_large = payload + self._telemetry_payload_small = None + + else: + _LOGGER.warning( + f"Telemetry payload has an unexpected length of {len(payload)}!" + ) + + if ( + self._telemetry_payload_small is None + or self._telemetry_payload_large is None + ): + _LOGGER.debug("Missing other payload!") + return + + new_payload = self._telemetry_payload_large + self._telemetry_payload_small + + # If we are accepting the new payload we invalidate + # the partial payloads + self._telemetry_payload_large = None + self._telemetry_payload_small = None + + _LOGGER.debug(f"Merged payload: {new_payload.hex()}") + decrypted_payload = self._decrypt_payload(new_payload) + _LOGGER.debug(f"Decrypted payload: {decrypted_payload.hex()}") + parameters = self._parse_payload(decrypted_payload) + return await self._process_telemetry(parameters) + + async def _process_telemetry(self, parameters: dict[str, bytes]) -> None: """Process telemetry data from the device.""" state_changed = self._data is None or parameters != self._data @@ -524,63 +620,31 @@ async def _process_notification( return await self._process_negotiation(cmd, payload) # Encrypted messages - case "03010f": + case "03010f" | "030111": match cmd.hex(): # Telemetry messages - case "c402": + case "c402" | "4300": _LOGGER.debug("Received telemetry message!") - - # Anker devices seem to split data across multiple - # packets so we need to wait until we have both - # packets before we can decrypt all of the data - if len(payload) < 50: - self._telemetry_payload_small = payload - - # If we receive a big packet it invalidates the - # last small one since the big one comes before - # the small one - elif len(payload) > 230: - self._telemetry_payload_large = payload - self._telemetry_payload_small = None - - else: - _LOGGER.warning( - f"Telemetry payload has an unexpected length of {len(payload)}!" - ) - - if ( - self._telemetry_payload_small is None - or self._telemetry_payload_large is None - ): - _LOGGER.debug("Missing other payload!") - return - - new_payload = ( - self._telemetry_payload_large - + self._telemetry_payload_small - ) - - # If we are accepting the new payload we invalidate - # the partial payloads - self._telemetry_payload_large = None - self._telemetry_payload_small = None - - _LOGGER.debug(f"Merged payload: {new_payload.hex()}") - decrypted_payload = self._decrypt_payload(new_payload) - _LOGGER.debug(f"Decrypted payload: {decrypted_payload.hex()}") - parameters = self._parse_payload(decrypted_payload) - return await self._process_telemetry(cmd, parameters) + return await self._process_telemetry_packet(payload) # Unknown messages case _: _LOGGER.debug(f"Received unknown message of type: {cmd.hex()}") try: - # If the payload is one byte too short try putting the + # If the payload is one byte too short and we are + # using the default AES (CBC) then try putting the # last byte of the cmd in front of it - if len(payload) % 16 == 15: + if ( + len(payload) % 16 == 15 + and self._decrypt_payload + is SolixBLEDevice._decrypt_payload + ): + _LOGGER.debug( + "Using special trick of embedded part of CMD in payload..." + ) payload = cmd[1].to_bytes() + payload decrypted_payload = self._decrypt_payload(payload) @@ -682,12 +746,8 @@ async def _process_negotiation(self, cmd: bytes, payload: bytes) -> None: bytes.fromhex(PRIVATE_KEY), byteorder="big" ) private_key = derive_private_key(private_value, SECP256R1()) - shared_secret = private_key.exchange(ECDH(), device_public_key) - self._shared_key = shared_secret[:16] - self._iv = shared_secret[16:] - _LOGGER.debug(f"Shared secret: {shared_secret.hex()}") - _LOGGER.debug(f"AES key: {self._shared_key.hex()}") - _LOGGER.debug(f"AES IV: {self._iv.hex()}") + self._shared_secret = private_key.exchange(ECDH(), device_public_key) + _LOGGER.debug(f"Shared secret: {self._shared_secret.hex()}") _LOGGER.debug("Sending stage 5 response message...") return await self._client.write_gatt_char( @@ -740,34 +800,34 @@ async def _send_command(self, cmd: bytes, payload: bytes) -> None: new_payload = payload + bytes.fromhex("fe0503") + new_timestamp await self._send_encrypted_packet(cmd, new_payload) - async def _send_encrypted_packet(self, cmd: bytes, payload: bytes) -> None: - """Send an encrypted packet using negotiated shared secret and IV.""" - _LOGGER.debug( - f"Building packet with cmd: {cmd.hex()} and payload: {payload.hex()}" - ) + def _build_packet(self, pattern: bytes, cmd: bytes, payload: bytes) -> bytes: + """ + Build a packet to be send to a device. - # Pad payload - padder = PKCS7(128).padder() - padded_data = padder.update(payload) - padded_data += padder.finalize() + Packet format:
. - # Encrypt payload - cipher = AES.new(self._shared_key, AES.MODE_CBC, iv=self._iv) - encrypted_payload = cipher.encrypt(padded_data) + :param pattern: Pattern of packet (e.g encrypted, negotiation, etc). + :param cmd: Command in packet (e.g telemetry, power on, etc). + :param payload: Payload of command (e.g a1...). + :returns: Packet bytes ready to be sent. + """ # Calculate length of message - length = 2 + 2 + 3 + 2 + len(encrypted_payload) + 1 + length = 2 + 2 + 3 + 2 + len(payload) + 1 length_bytes = length.to_bytes(length=2, byteorder="little") # Build packet - packet = ( - bytes.fromhex("ff09") - + length_bytes - + bytes.fromhex("03000f") - + cmd - + encrypted_payload + packet = bytes.fromhex("ff09") + length_bytes + pattern + cmd + payload + return packet + self._checksum(packet) + + async def _send_encrypted_packet(self, cmd: bytes, payload: bytes) -> None: + """Send an encrypted packet using negotiated shared secret and IV.""" + _LOGGER.debug( + f"Building packet with cmd: {cmd.hex()} and payload: {payload.hex()}" ) - packet = packet + self._checksum(packet) + encrypted_payload = self._encrypt_payload(payload) + + packet = self._build_packet(bytes.fromhex("03000f"), cmd, encrypted_payload) _LOGGER.debug(f"Sending encrypted packet: {packet.hex()}") # Send packet @@ -963,7 +1023,18 @@ def _disconnect_callback(self, client: BaseBleakClient) -> None: # Trigger disconnection event self._disconnect_event.set() - def _reset_session(self, reset_data: bool = True): + async def _dispose_of_client(self) -> None: + """Dispose of current bleak client.""" + client = self._client + self._client = None + try: + await client.disconnect() + except Exception: + _LOGGER.exception( + f"Exception raised when disposing of bleak client '{client}'!" + ) + + def _reset_session(self, reset_data: bool = True) -> None: """Reset negotiated variables and data and futures.""" if reset_data: @@ -972,8 +1043,7 @@ def _reset_session(self, reset_data: bool = True): self._telemetry_payload_small = None self._telemetry_payload_large = None - self._shared_key = None - self._iv = None + self._shared_secret = None self._last_packet_timestamp = None self._negotiation_timestamp = None self._packet_futures: dict[bytes, list[asyncio.Future]] = {} diff --git a/SolixBLE/devices/__init__.py b/SolixBLE/devices/__init__.py index b68dd3a..030b7ea 100644 --- a/SolixBLE/devices/__init__.py +++ b/SolixBLE/devices/__init__.py @@ -12,6 +12,7 @@ from .f2000 import F2000 from .f3800 import F3800 from .generic import Generic +from .prime_charger_160w import PrimeCharger160w from .prime_charger_250w import PrimeCharger250w from .solarbank2 import Solarbank2 from .solarbank3 import Solarbank3 @@ -26,6 +27,7 @@ "F3800", "Solarbank2", "Solarbank3", + "PrimeCharger160w", "PrimeCharger250w", "Generic", ] diff --git a/SolixBLE/devices/prime_charger_160w.py b/SolixBLE/devices/prime_charger_160w.py new file mode 100644 index 0000000..6e8bd85 --- /dev/null +++ b/SolixBLE/devices/prime_charger_160w.py @@ -0,0 +1,261 @@ +"""Anker Prime Charger (160w) model. + +.. moduleauthor:: Harvey Lelliott (flip-dots) + +""" + +from ..const import DEFAULT_METADATA_FLOAT +from ..prime_device import PrimeDevice +from ..states import PortStatus + +CMD_USB_OUTPUT = "4207" +CMD_USB_TIMER = "4209" + +PAYLOAD_USB_C1_ON = "a10121a2020100a3020101" +PAYLOAD_USB_C1_OFF = "a10121a2020100a3020100" +PAYLOAD_USB_C1_TIMER = "a10121a2020100a30504" + +PAYLOAD_USB_C2_ON = "a10121a2020101a3020101" +PAYLOAD_USB_C2_OFF = "a10121a2020101a3020100" +PAYLOAD_USB_C2_TIMER = "a10121a2020101a30504" + +PAYLOAD_USB_C3_ON = "a10121a2020102a3020101" +PAYLOAD_USB_C3_OFF = "a10121a2020102a3020100" +PAYLOAD_USB_C3_TIMER = "a10121a2020102a30504" + + +class PrimeCharger160w(PrimeDevice): + """ + Anker Prime Charger (160w) model. + + Use this class to connect and monitor the 160w charger. + This model is also known as the A2687. + """ + + @property + def usb_port_c1(self) -> PortStatus: + """USB C1 Port Status. + + :returns: Status of the USB C1 port. + """ + return PortStatus(self._parse_int("a5", begin=1, end=2)) + + @property + def usb_c1_voltage(self) -> float: + """USB C1 Port voltage (V). + + :returns: Voltage of the USB C1 port or default float value. + """ + if self._data is None: + return DEFAULT_METADATA_FLOAT + + return self._parse_int("a5", begin=2, end=4) / 1000.0 + + @property + def usb_c1_current(self) -> float: + """USB C1 Port current (A). + + :returns: Current of the USB C1 port or default float value. + """ + if self._data is None: + return DEFAULT_METADATA_FLOAT + + return self._parse_int("a5", begin=4, end=6) / 1000.0 + + @property + def usb_c1_power(self) -> float: + """USB C1 Port power (W). + + :returns: Power of the USB C1 port or default float value. + """ + if self._data is None: + return DEFAULT_METADATA_FLOAT + + return self._parse_int("a5", begin=6, end=8) / 100.0 + + @property + def usb_port_c2(self) -> PortStatus: + """USB C2 Port Status. + + :returns: Status of the USB C2 port. + """ + return PortStatus(self._parse_int("a6", begin=1, end=2)) + + @property + def usb_c2_voltage(self) -> float: + """USB C2 Port voltage (V). + + :returns: Voltage of the USB C2 port or default float value. + """ + if self._data is None: + return DEFAULT_METADATA_FLOAT + + return self._parse_int("a6", begin=2, end=4) / 1000.0 + + @property + def usb_c2_current(self) -> float: + """USB C2 Port current (A). + + :returns: Current of the USB C2 port or default float value. + """ + if self._data is None: + return DEFAULT_METADATA_FLOAT + + return self._parse_int("a6", begin=4, end=6) / 1000.0 + + @property + def usb_c2_power(self) -> float: + """USB C2 Port power (W). + + :returns: Power of the USB C2 port or default float value. + """ + if self._data is None: + return DEFAULT_METADATA_FLOAT + + return self._parse_int("a6", begin=6, end=8) / 100.0 + + @property + def usb_port_c3(self) -> PortStatus: + """USB C3 Port Status. + + :returns: Status of the USB C3 port. + """ + return PortStatus(self._parse_int("a7", begin=1, end=2)) + + @property + def usb_c3_voltage(self) -> float: + """USB C3 Port voltage (V). + + :returns: Voltage of the USB C3 port or default float value. + """ + if self._data is None: + return DEFAULT_METADATA_FLOAT + + return self._parse_int("a7", begin=2, end=4) / 1000.0 + + @property + def usb_c3_current(self) -> float: + """USB C3 Port current (A). + + :returns: Current of the USB C3 port or default float value. + """ + if self._data is None: + return DEFAULT_METADATA_FLOAT + + return self._parse_int("a7", begin=4, end=6) / 1000.0 + + @property + def usb_c3_power(self) -> float: + """USB C3 Port power (W). + + :returns: Power of the USB C3 port or default float value. + """ + if self._data is None: + return DEFAULT_METADATA_FLOAT + + return self._parse_int("a7", begin=6, end=8) / 100.0 + + async def turn_usb_c1_on(self) -> None: + """Turn USB port C1 on. + + :raises ConnectionError: If not connected to device. + :raises BleakError: If command transmission fails. + """ + await self._send_command( + cmd=bytes.fromhex(CMD_USB_OUTPUT), + payload=bytes.fromhex(PAYLOAD_USB_C1_ON), + ) + + async def turn_usb_c1_off(self) -> None: + """Turn USB port C1 off. + + :raises ConnectionError: If not connected to device. + :raises BleakError: If command transmission fails. + """ + await self._send_command( + cmd=bytes.fromhex(CMD_USB_OUTPUT), + payload=bytes.fromhex(PAYLOAD_USB_C1_OFF), + ) + + async def set_timer_usb_c1(self, time: int) -> None: + """Set auto off timer for USB C1. + + :param time: Seconds until shutdown. + :raises ConnectionError: If not connected to device. + :raises BleakError: If command transmission fails. + """ + await self._send_command( + cmd=bytes.fromhex(CMD_USB_TIMER), + payload=bytes.fromhex(PAYLOAD_USB_C1_TIMER) + + time.to_bytes(4, byteorder="little"), + ) + + async def turn_usb_c2_on(self) -> None: + """Turn USB port C2 on. + + :raises ConnectionError: If not connected to device. + :raises BleakError: If command transmission fails. + """ + await self._send_command( + cmd=bytes.fromhex(CMD_USB_OUTPUT), + payload=bytes.fromhex(PAYLOAD_USB_C2_ON), + ) + + async def turn_usb_c2_off(self) -> None: + """Turn USB port C2 off. + + :raises ConnectionError: If not connected to device. + :raises BleakError: If command transmission fails. + """ + await self._send_command( + cmd=bytes.fromhex(CMD_USB_OUTPUT), + payload=bytes.fromhex(PAYLOAD_USB_C2_OFF), + ) + + async def set_timer_usb_c2(self, time: int) -> None: + """Set auto off timer for USB C2. + + :param time: Seconds until shutdown. + :raises ConnectionError: If not connected to device. + :raises BleakError: If command transmission fails. + """ + await self._send_command( + cmd=bytes.fromhex(CMD_USB_TIMER), + payload=bytes.fromhex(PAYLOAD_USB_C2_TIMER) + + time.to_bytes(4, byteorder="little"), + ) + + async def turn_usb_c3_on(self) -> None: + """Turn USB port C3 on. + + :raises ConnectionError: If not connected to device. + :raises BleakError: If command transmission fails. + """ + await self._send_command( + cmd=bytes.fromhex(CMD_USB_OUTPUT), + payload=bytes.fromhex(PAYLOAD_USB_C3_ON), + ) + + async def turn_usb_c3_off(self) -> None: + """Turn USB port C3 off. + + :raises ConnectionError: If not connected to device. + :raises BleakError: If command transmission fails. + """ + await self._send_command( + cmd=bytes.fromhex(CMD_USB_OUTPUT), + payload=bytes.fromhex(PAYLOAD_USB_C3_OFF), + ) + + async def set_timer_usb_c3(self, time: int) -> None: + """Set auto off timer for USB C3. + + :param time: Seconds until shutdown. + :raises ConnectionError: If not connected to device. + :raises BleakError: If command transmission fails. + """ + await self._send_command( + cmd=bytes.fromhex(CMD_USB_TIMER), + payload=bytes.fromhex(PAYLOAD_USB_C3_TIMER) + + time.to_bytes(4, byteorder="little"), + ) diff --git a/SolixBLE/devices/prime_charger_250w.py b/SolixBLE/devices/prime_charger_250w.py index 32754ce..45b7468 100644 --- a/SolixBLE/devices/prime_charger_250w.py +++ b/SolixBLE/devices/prime_charger_250w.py @@ -5,11 +5,11 @@ """ from ..const import DEFAULT_METADATA_FLOAT -from ..device import SolixBLEDevice +from ..prime_device import PrimeDevice from ..states import PortStatus -class PrimeCharger250w(SolixBLEDevice): +class PrimeCharger250w(PrimeDevice): """ Anker Prime Charger (250W) model. @@ -40,7 +40,7 @@ def usb_port_c1(self) -> PortStatus: return PortStatus(self._parse_int("a2", begin=1, end=2)) @property - def usb_port_c1_voltage(self) -> float: + def usb_c1_voltage(self) -> float: """USB C1 Port voltage (V). :returns: Voltage of the USB C1 port or default float value. @@ -51,7 +51,7 @@ def usb_port_c1_voltage(self) -> float: return self._parse_int("a2", begin=2, end=4) / 1000.0 @property - def usb_port_c1_current(self) -> float: + def usb_c1_current(self) -> float: """USB C1 Port current (A). :returns: Current of the USB C1 port or default float value. @@ -62,7 +62,7 @@ def usb_port_c1_current(self) -> float: return self._parse_int("a2", begin=4, end=6) / 1000.0 @property - def usb_port_c1_power(self) -> float: + def usb_c1_power(self) -> float: """USB C1 Port power (W). :returns: Power of the USB C1 port or default float value. @@ -81,7 +81,7 @@ def usb_port_c2(self) -> PortStatus: return PortStatus(self._parse_int("a3", begin=1, end=2)) @property - def usb_port_c2_voltage(self) -> float: + def usb_c2_voltage(self) -> float: """USB C2 Port voltage (V). :returns: Voltage of the USB C2 port or default float value. @@ -92,7 +92,7 @@ def usb_port_c2_voltage(self) -> float: return self._parse_int("a3", begin=2, end=4) / 1000.0 @property - def usb_port_c2_current(self) -> float: + def usb_c2_current(self) -> float: """USB C2 Port current (A). :returns: Current of the USB C2 port or default float value. @@ -103,7 +103,7 @@ def usb_port_c2_current(self) -> float: return self._parse_int("a3", begin=4, end=6) / 1000.0 @property - def usb_port_c2_power(self) -> float: + def usb_c2_power(self) -> float: """USB C2 Port power (W). :returns: Power of the USB C2 port or default float value. @@ -122,7 +122,7 @@ def usb_port_c3(self) -> PortStatus: return PortStatus(self._parse_int("a4", begin=1, end=2)) @property - def usb_port_c3_voltage(self) -> float: + def usb_c3_voltage(self) -> float: """USB C3 Port voltage (V). :returns: Voltage of the USB C3 port or default float value. @@ -133,7 +133,7 @@ def usb_port_c3_voltage(self) -> float: return self._parse_int("a4", begin=2, end=4) / 1000.0 @property - def usb_port_c3_current(self) -> float: + def usb_c3_current(self) -> float: """USB C3 Port current (A). :returns: Current of the USB C3 port or default float value. @@ -144,7 +144,7 @@ def usb_port_c3_current(self) -> float: return self._parse_int("a4", begin=4, end=6) / 1000.0 @property - def usb_port_c3_power(self) -> float: + def usb_c3_power(self) -> float: """USB C3 Port power (W). :returns: Power of the USB C3 port or default float value. @@ -163,7 +163,7 @@ def usb_port_c4(self) -> PortStatus: return PortStatus(self._parse_int("a5", begin=1, end=2)) @property - def usb_port_c4_voltage(self) -> float: + def usb_c4_voltage(self) -> float: """USB C4 Port voltage (V). :returns: Voltage of the USB C4 port or default float value. @@ -174,7 +174,7 @@ def usb_port_c4_voltage(self) -> float: return self._parse_int("a5", begin=2, end=4) / 1000.0 @property - def usb_port_c4_current(self) -> float: + def usb_c4_current(self) -> float: """USB C3 Port current (A). :returns: Current of the USB C4 port or default float value. @@ -185,7 +185,7 @@ def usb_port_c4_current(self) -> float: return self._parse_int("a5", begin=4, end=6) / 1000.0 @property - def usb_port_c4_power(self) -> float: + def usb_c4_power(self) -> float: """USB C4 Port power (W). :returns: Power of the USB C4 port or default float value. @@ -204,7 +204,7 @@ def usb_port_a1(self) -> PortStatus: return PortStatus(self._parse_int("a6", begin=1, end=2)) @property - def usb_port_a1_voltage(self) -> float: + def usb_a1_voltage(self) -> float: """USB A1 Port voltage (V). :returns: Voltage of the USB A1 port or default float value. @@ -215,7 +215,7 @@ def usb_port_a1_voltage(self) -> float: return self._parse_int("a6", begin=2, end=4) / 1000.0 @property - def usb_port_a1_current(self) -> float: + def usb_a1_current(self) -> float: """USB A1 Port current (A). :returns: Current of the USB A1 port or default float value. @@ -226,7 +226,7 @@ def usb_port_a1_current(self) -> float: return self._parse_int("a6", begin=4, end=6) / 1000.0 @property - def usb_port_a1_power(self) -> float: + def usb_a1_power(self) -> float: """USB A1 Port power (W). :returns: Power of the USB A1 port or default float value. @@ -245,7 +245,7 @@ def usb_port_a2(self) -> PortStatus: return PortStatus(self._parse_int("a7", begin=1, end=2)) @property - def usb_port_a2_voltage(self) -> float: + def usb_a2_voltage(self) -> float: """USB A2 Port voltage (V). :returns: Voltage of the USB A2 port or default float value. @@ -256,7 +256,7 @@ def usb_port_a2_voltage(self) -> float: return self._parse_int("a7", begin=2, end=4) / 1000.0 @property - def usb_port_a2_current(self) -> float: + def usb_a2_current(self) -> float: """USB A2 Port current (A). :returns: Current of the USB A2 port or default float value. @@ -267,7 +267,7 @@ def usb_port_a2_current(self) -> float: return self._parse_int("a7", begin=4, end=6) / 1000.0 @property - def usb_port_a2_power(self) -> float: + def usb_a2_power(self) -> float: """USB A2 Port power (W). :returns: Power of the USB A2 port or default float value. diff --git a/SolixBLE/prime_device.py b/SolixBLE/prime_device.py new file mode 100644 index 0000000..2869d05 --- /dev/null +++ b/SolixBLE/prime_device.py @@ -0,0 +1,522 @@ +"""Base Anker Prime device implementation of SolixBLE module. + +.. moduleauthor:: Harvey Lelliott (flip-dots) + +""" + +import logging +import time + +from Crypto.Cipher import AES +from cryptography.hazmat.primitives.asymmetric.ec import ( + ECDH, + SECP256R1, + EllipticCurvePublicKey, + derive_private_key, +) + +from SolixBLE.const import UUID_COMMAND +from SolixBLE.device import SolixBLEDevice + +_LOGGER = logging.getLogger(__name__) + +#: Command used to initiate negotiations +NEGOTIATION_COMMAND_0 = ( + "ff09200003000140010a82d0ab535303e3aa9f0c2f9c868465bc8476f556fb7d" +) + +#: Response to receiving 1st negotiation message +NEGOTIATION_COMMAND_1 = ( + "ff09270003000140030a82d0ab53538ab3de100ac9bb87a0b8e36c1dd8167a9c25a9839d9a14d5" +) + +#: Response to receiving 2nd negotiation message +NEGOTIATION_COMMAND_2 = ( + "ff09200003000140290a82d0ab535303e3aa9f0c2f9c868465bc8476f556fb55" +) + +#: Response to receiving 3rd negotiation message +NEGOTIATION_COMMAND_3 = "ff092d0003000140050a82d0ab53538ab3de100ae04aca6791257881a90164eac7460450e0c82f2c03de4f9604" + +#: Response to receiving 4th negotiation message +NEGOTIATION_COMMAND_4 = "ff095c0003000140210ac6ea31e4300bb2877d6ddeb628b0d7be8d768333f00ceab5454d20fbd97e091457b1f3b6efb6511eb9e98ac2b2c46eee211ae359ad246e1ae9886b4a29e41eddd5a5064d8b9ffdbfb43eb6b8e307fcde9de7" + +#: The cmd to put in the response to receiving 5th negotiation message +NEGOTIATION_COMMAND_5_CMD = "4022" + +#: The payload to put in the response to receiving 5th negotiation message +NEGOTIATION_COMMAND_5_PAYLOAD = ( + "a104f079b569a30400000000a518474d54304253542c4d332e352e302f312c4d31302e352e30" +) + +#: The cmd to put in the response to receiving 6th negotiation message +NEGOTIATION_COMMAND_6_CMD = "4027" + +#: The payload to put in the response to receiving 6th negotiation message +NEGOTIATION_COMMAND_6_PAYLOAD = "a104f079b569a22437396562656433352d646339632d343930342d623430632d373263346538363361613130" + +#: The cmd to put in the first response to receiving 7th negotiation message +NEGOTIATION_COMMAND_7_CMD = "4200" + +#: The payload to put in the first response to receiving 7th negotiation message +NEGOTIATION_COMMAND_7_PAYLOAD = "a10121fe04f079b569" + +#: The cmd to put in the second response to receiving 7th negotiation message +NEGOTIATION_COMMAND_8_CMD = "420a" + +#: The payload to put in the second response to receiving 7th negotiation message +NEGOTIATION_COMMAND_8_PAYLOAD = "a10121a203044742a3250437396562656433352d646339632d343930342d623430632d373263346538363361613130a5020101fe04f079b569" + +#: Anker Prime devices encrypt the negotiation using a static key +NEGOTIATION_KEY = "b8ff7422955d4eb6d554a2c470280559" + +#: Anker Prime devices encrypt the negotiation using a static nonce +NEGOTIATION_NONCE = "6ba3e3f2f3a60f2971ce5d1f" + +#: The pattern used in negotiation packets from Anker Prime devices +NEGOTIATION_PATTERN = "030001" + +#: The pattern used in telemetry packets from Anker Prime and Solix devices +TELEMETRY_PATTERN = "03000f" + +#: Additional Authenticated Data bytes used by protocol +AAD = "3322110077665544bbaa9988ffeeddcc" + +#: The private key this program uses to perform the ECDH negotiation to +#: get a shared secret which is then used as an AES key for encrypting +#: communications between the program and the power station. Yes I know it +#: is bad security practice to hardcode keys but its a freaking power station +#: talking over Bluetooth with a range of like 10m... I don't care. +PRIVATE_KEY = "754744d72984c378bc4fa77d7fcdf6bbb6d9df119fa9be4948eb8a3b4cd6071f" + +#: The unix timestamp that is agreed upon in the negotiations. This is used +#: by Anker to protect against replay attacks as commands must contain the +#: current encrypted time. +BASE_TIMESTAMP = "ef79b569" + + +class PrimeDevice(SolixBLEDevice): + """ + This is a base class based upon SolixBLEDevice which contains logic + unique to Anker Prime devices that is designed to be overridden for + specific implementations, e.g 160w, 250w, etc. + """ + + ########################### + # Encryption / Decryption # + ########################### + + def _encrypt_payload(self, payload: bytes) -> bytes: + """ + Encrypt the payload of a session message (e.g telemetry, commands, etc). + + Anker Prime devices use AES GCM with the first 16 bytes of the shared + secret as the AES key and next 12 bytes as the nonce. The MAC tag is + 16 bytes and appended to the end of the payload. + """ + cipher = AES.new( + self._shared_secret[:16], AES.MODE_GCM, nonce=self._shared_secret[16:28] + ) + cipher.update(bytes.fromhex(AAD)) + encrypted_payload, mac_bytes = cipher.encrypt_and_digest(payload) + return encrypted_payload + mac_bytes + + def _decrypt_payload(self, payload: bytes) -> bytes: + """ + Decrypt the payload of a message (e.g telemetry, commands, etc). + + If the shared secret has not been established then the static + negotiation key and nonce will be used. + + Anker Prime devices use AES GCM with the first 16 bytes of the shared + secret as the AES key and next 12 bytes as the nonce. The last 16 bytes + of the payload are a MAC used to ensure the message has not been tampered + with. + """ + mac = payload[-16:] + encrypted_payload = payload[:-16] + key = ( + self._shared_secret[:16] + if self._shared_secret is not None + else bytes.fromhex(NEGOTIATION_KEY) + ) + nonce = ( + self._shared_secret[16:28] + if self._shared_secret is not None + else bytes.fromhex(NEGOTIATION_NONCE) + ) + + # Try to decrypt and verify data + try: + cipher = AES.new(key, AES.MODE_GCM, nonce) + cipher.update(bytes.fromhex(AAD)) + return cipher.decrypt_and_verify(encrypted_payload, mac) + + # If validation fails decrypt anyway + except ValueError: + _LOGGER.exception( + "Failed to validate authenticity of payload, decoding anyway..." + ) + cipher = AES.new(key, AES.MODE_GCM, nonce) + return cipher.decrypt(encrypted_payload) + + ############### + # Negotiation # + ############### + + async def _initiate_negotiations(self) -> None: + """ + Send the negotiation initiation command. + """ + + # Log parameters we will send if debugging (makes handshake easier to see in logs) + if _LOGGER.isEnabledFor(logging.DEBUG): + new_parameters = self._parse_payload( + self._decrypt_payload( + self._split_packet(bytes.fromhex(NEGOTIATION_COMMAND_0))[2] + ) + ) + _LOGGER.debug( + f"Stage 0 message parameters: {self._parameters_to_str(new_parameters, types=True)}" + ) + + await self._client.write_gatt_char( + UUID_COMMAND, bytes.fromhex(NEGOTIATION_COMMAND_0) + ) + + async def _process_negotiation(self, cmd: bytes, payload: bytes) -> None: + """ + Negotiate encryption with the device. + """ + + match cmd.hex(): + + # There is a "stage 0" in which we automatically send a negotiation + # request as soon as we establish the initial connection. That + # should lead to the power station sending a response landing us + # in stage 1. + + # Negotiations at this point are encrypted using the static key and nonce + + # Negotiation stage 1 + case "4801": + _LOGGER.debug( + "Entered negotiation stage 1 due to response from device!" + ) + decrypted_payload = self._decrypt_payload(payload) + _LOGGER.debug(f"Decrypted payload: {decrypted_payload.hex()}") + parameters = self._parse_payload(decrypted_payload) + _LOGGER.debug( + f"Parameters: {self._parameters_to_str(parameters, types=True)}" + ) + + # Log parameters we will send if debugging (makes handshake easier to see in logs) + if _LOGGER.isEnabledFor(logging.DEBUG): + new_parameters = self._parse_payload( + self._decrypt_payload( + self._split_packet(bytes.fromhex(NEGOTIATION_COMMAND_1))[2] + ) + ) + _LOGGER.debug( + f"Stage 1 response message parameters: {self._parameters_to_str(new_parameters, types=True)}" + ) + + _LOGGER.debug("Sending stage 1 response message...") + return await self._client.write_gatt_char( + UUID_COMMAND, + bytes.fromhex(NEGOTIATION_COMMAND_1), + ) + + # Negotiation stage 2 + case "4803": + _LOGGER.debug( + "Entered negotiation stage 2 due to response from device!" + ) + decrypted_payload = self._decrypt_payload(payload) + _LOGGER.debug(f"Decrypted payload: {decrypted_payload.hex()}") + parameters = self._parse_payload(decrypted_payload) + _LOGGER.debug( + f"Parameters: {self._parameters_to_str(parameters, types=True)}" + ) + + # Log parameters we will send if debugging (makes handshake easier to see in logs) + if _LOGGER.isEnabledFor(logging.DEBUG): + new_parameters = self._parse_payload( + self._decrypt_payload( + self._split_packet(bytes.fromhex(NEGOTIATION_COMMAND_2))[2] + ) + ) + _LOGGER.debug( + f"Stage 2 response message parameters: {self._parameters_to_str(new_parameters, types=True)}" + ) + + _LOGGER.debug("Sending stage 2 response message...") + return await self._client.write_gatt_char( + UUID_COMMAND, + bytes.fromhex(NEGOTIATION_COMMAND_2), + ) + + # Negotiation stage 3 + case "4829": + _LOGGER.debug( + "Entered negotiation stage 3 due to response from device!" + ) + decrypted_payload = self._decrypt_payload(payload) + _LOGGER.debug(f"Decrypted payload: {decrypted_payload.hex()}") + parameters = self._parse_payload(decrypted_payload) + _LOGGER.debug( + f"Parameters: {self._parameters_to_str(parameters, types=True)}" + ) + + # Log parameters we will send if debugging (makes handshake easier to see in logs) + if _LOGGER.isEnabledFor(logging.DEBUG): + new_parameters = self._parse_payload( + self._decrypt_payload( + self._split_packet(bytes.fromhex(NEGOTIATION_COMMAND_3))[2] + ) + ) + _LOGGER.debug( + f"Stage 3 response message parameters: {self._parameters_to_str(new_parameters, types=True)}" + ) + + _LOGGER.debug("Sending stage 3 response message...") + return await self._client.write_gatt_char( + UUID_COMMAND, + bytes.fromhex(NEGOTIATION_COMMAND_3), + ) + + # Negotiation stage 4 + case "4805": + _LOGGER.debug( + "Entered negotiation stage 4 due to response from device!" + ) + decrypted_payload = self._decrypt_payload(payload) + _LOGGER.debug(f"Decrypted payload: {decrypted_payload.hex()}") + parameters = self._parse_payload(decrypted_payload) + _LOGGER.debug( + f"Parameters: {self._parameters_to_str(parameters, types=True)}" + ) + + # Log parameters we will send if debugging (makes handshake easier to see in logs) + if _LOGGER.isEnabledFor(logging.DEBUG): + new_parameters = self._parse_payload( + self._decrypt_payload( + self._split_packet(bytes.fromhex(NEGOTIATION_COMMAND_4))[2] + ) + ) + _LOGGER.debug( + f"Stage 4 response message parameters: {self._parameters_to_str(new_parameters, types=True)}" + ) + + _LOGGER.debug("Sending stage 4 response message...") + return await self._client.write_gatt_char( + UUID_COMMAND, + bytes.fromhex(NEGOTIATION_COMMAND_4), + ) + + # Negotiation stage 5 + case "4821": + _LOGGER.debug( + "Entered negotiation stage 5 due to response from device!" + ) + decrypted_payload = self._decrypt_payload(payload) + _LOGGER.debug(f"Decrypted payload: {decrypted_payload.hex()}") + parameters = self._parse_payload(decrypted_payload) + _LOGGER.debug( + f"Parameters: {self._parameters_to_str(parameters, types=True)}" + ) + + self._negotiation_timestamp = time.time() + + # Extract public key of device from payload + device_public_key_bytes = bytes.fromhex("04") + parameters["a1"] + _LOGGER.debug(f"Public key of device: {device_public_key_bytes.hex()}") + device_public_key = EllipticCurvePublicKey.from_encoded_point( + SECP256R1(), device_public_key_bytes + ) + + # Calculate the shared secret + # The first half of the shared secret is the encryption key + # and the 12 bytes after that is the nonce + private_value = int.from_bytes( + bytes.fromhex(PRIVATE_KEY), + byteorder="big", + ) + private_key = derive_private_key(private_value, SECP256R1()) + self._shared_secret = private_key.exchange(ECDH(), device_public_key) + _LOGGER.debug(f"Shared secret: {self._shared_secret.hex()}") + + # All negotiation packets past this point use the + # shared secret for encryption rather than the static key. + # This means we need to build these messages instead of using + # pre-defined ones. + _LOGGER.debug("Sending stage 5 response message...") + + # Log parameters we will send if debugging (makes handshake easier to see in logs) + if _LOGGER.isEnabledFor(logging.DEBUG): + new_parameters = self._parse_payload( + bytes.fromhex(NEGOTIATION_COMMAND_5_PAYLOAD) + ) + _LOGGER.debug( + f"Stage 5 response message parameters: {self._parameters_to_str(new_parameters, types=True)}" + ) + + new_payload = self._encrypt_payload( + bytes.fromhex(NEGOTIATION_COMMAND_5_PAYLOAD) + ) + new_packet = self._build_packet( + pattern=bytes.fromhex(NEGOTIATION_PATTERN), + cmd=bytes.fromhex(NEGOTIATION_COMMAND_5_CMD), + payload=new_payload, + ) + _LOGGER.debug(f"Built stage 5 response packet: {new_packet.hex()}") + return await self._client.write_gatt_char( + UUID_COMMAND, + new_packet, + ) + + # Negotiations past this point are encrypted using the shared secret + + # Negotiation stage 6 + case "4822": + _LOGGER.debug( + "Entered negotiation stage 6 due to response from device!" + ) + decrypted_payload = self._decrypt_payload(payload) + _LOGGER.debug(f"Decrypted payload: {decrypted_payload.hex()}") + parameters = self._parse_payload(decrypted_payload) + _LOGGER.debug( + f"Parameters: {self._parameters_to_str(parameters, types=True)}" + ) + + _LOGGER.debug("Sending stage 6 response message...") + + # Log parameters we will send if debugging (makes handshake easier to see in logs) + if _LOGGER.isEnabledFor(logging.DEBUG): + new_parameters = self._parse_payload( + bytes.fromhex(NEGOTIATION_COMMAND_6_PAYLOAD) + ) + _LOGGER.debug( + f"Stage 6 response message parameters: {self._parameters_to_str(new_parameters, types=True)}" + ) + + new_payload = self._encrypt_payload( + bytes.fromhex(NEGOTIATION_COMMAND_6_PAYLOAD) + ) + new_packet = self._build_packet( + pattern=bytes.fromhex(NEGOTIATION_PATTERN), + cmd=bytes.fromhex(NEGOTIATION_COMMAND_6_CMD), + payload=new_payload, + ) + _LOGGER.debug(f"Built stage 6 response packet: {new_packet.hex()}") + return await self._client.write_gatt_char( + UUID_COMMAND, + new_packet, + ) + + # Negotiation stage 7 + case "4827": + _LOGGER.debug( + "Entered negotiation stage 7 due to response from device!" + ) + decrypted_payload = self._decrypt_payload(payload) + _LOGGER.debug(f"Decrypted payload: {decrypted_payload.hex()}") + parameters = self._parse_payload(decrypted_payload) + _LOGGER.debug( + f"Parameters: {self._parameters_to_str(parameters, types=True)}" + ) + + _LOGGER.debug("Sending stage 7 response messages...") + + # Packet A + new_payload_a = self._encrypt_payload( + bytes.fromhex(NEGOTIATION_COMMAND_7_PAYLOAD) + ) + new_packet_a = self._build_packet( + pattern=bytes.fromhex(TELEMETRY_PATTERN), + cmd=bytes.fromhex(NEGOTIATION_COMMAND_7_CMD), + payload=new_payload_a, + ) + _LOGGER.debug(f"Built stage 7a response packet: {new_packet_a.hex()}") + await self._client.write_gatt_char( + UUID_COMMAND, + new_packet_a, + ) + + # Log parameters we will send if debugging (makes handshake easier to see in logs) + if _LOGGER.isEnabledFor(logging.DEBUG): + new_parameters = self._parse_payload( + bytes.fromhex(NEGOTIATION_COMMAND_7_PAYLOAD) + ) + _LOGGER.debug( + f"Stage 7a response message parameters: {self._parameters_to_str(new_parameters, types=True)}" + ) + + # Packet B + new_payload_b = self._encrypt_payload( + bytes.fromhex(NEGOTIATION_COMMAND_8_PAYLOAD) + ) + new_packet_b = self._build_packet( + pattern=bytes.fromhex(TELEMETRY_PATTERN), + cmd=bytes.fromhex(NEGOTIATION_COMMAND_8_CMD), + payload=new_payload_b, + ) + _LOGGER.debug(f"Built stage 7b response packet: {new_packet_b.hex()}") + await self._client.write_gatt_char( + UUID_COMMAND, + new_packet_b, + ) + + # Log parameters we will send if debugging (makes handshake easier to see in logs) + if _LOGGER.isEnabledFor(logging.DEBUG): + new_parameters = self._parse_payload( + bytes.fromhex(NEGOTIATION_COMMAND_8_PAYLOAD) + ) + _LOGGER.debug( + f"Stage 7b response message parameters: {self._parameters_to_str(new_parameters, types=True)}" + ) + + return + + case _: + _LOGGER.warning( + f"Received unexpected negotiation request response from device! cmd: '{cmd}', parameters: '{self._parameters_to_str(parameters, types=True)}'" + ) + + ##################### + # Packet processing # + ##################### + + async def _process_telemetry_packet(self, payload: bytes) -> None: + """ + Process a telemetry packet from an Anker Prime device. + + Anker Prime devices pack all telemetry data into a single packet + requiring no special logic to handle. + """ + decrypted_payload = self._decrypt_payload(payload) + _LOGGER.debug(f"Decrypted payload: {decrypted_payload.hex()}") + parameters = self._parse_payload(decrypted_payload) + return await self._process_telemetry(parameters) + + async def _send_command(self, cmd: bytes, payload: bytes) -> None: + """Send a command to the device. + + :param cmd: 2 bytes containing command type. + :param payload: Variable number of bytes containing arguments. + :raises ConnectionError: If not connected/negotiated to device. + """ + if not self.negotiated: + raise ConnectionError("Not connected to device") + + # Commands include a timestamp in the payload to prevent replay attacks + # and that timestamp is set during negotiations + time_passed = int(time.time() - self._negotiation_timestamp) + base_timestamp = int.from_bytes( + bytes.fromhex(BASE_TIMESTAMP), byteorder="little" + ) + new_timestamp = (base_timestamp + time_passed).to_bytes( + length=4, byteorder="little" + ) + new_payload = payload + bytes.fromhex("fe04") + new_timestamp + await self._send_encrypted_packet(cmd, new_payload) diff --git a/SolixBLE/utilities.py b/SolixBLE/utilities.py index 10de9f5..7c11909 100644 --- a/SolixBLE/utilities.py +++ b/SolixBLE/utilities.py @@ -5,11 +5,14 @@ """ import asyncio +import logging from bleak import BleakScanner, BLEDevice from .const import UUID_IDENTIFIER +_LOGGER = logging.getLogger(__name__) + async def discover_devices( scanner: BleakScanner | None = None, timeout: int = 5 @@ -29,7 +32,13 @@ async def discover_devices( devices = [] def callback(device, advertising_data): + _LOGGER.debug( + f"Found generic BT device '{device}' with advertising data: '{advertising_data}'" + ) if UUID_IDENTIFIER in advertising_data.service_uuids and device not in devices: + _LOGGER.debug( + f"Found Anker device '{device}' with advertising data: '{advertising_data}'" + ) devices.append(device) async with BleakScanner(callback) as scanner: diff --git a/docs/source/api.rst b/docs/source/api.rst index 7af4a49..8bcc67c 100644 --- a/docs/source/api.rst +++ b/docs/source/api.rst @@ -21,6 +21,7 @@ the list of properties for that class. f3800 solarbank2 solarbank3 + prime_charger_160w prime_charger_250w generic enums diff --git a/docs/source/index.rst b/docs/source/index.rst index c8b3ab2..8eba0d9 100644 --- a/docs/source/index.rst +++ b/docs/source/index.rst @@ -130,19 +130,21 @@ Expansion battery serial number ❌ ❌ Prime charger support --------------------- -======================= ============= -Parameter 250w (A2345) -======================= ============= -Display status ❌ -Total power out ❌ -Individual port status ✅ -Individual port voltage ✅ -Individual port current ✅ -Individual port power ✅ -Temperature ❌ -Firmware version ❌ -Serial number ❌ -======================= ============= +======================= ============= ============= +Parameter 250w (A2345) 160w (A2687) +======================= ============= ============= +Display status ❌ ❌ +Total power out ❌ ❌ +Port on/off control ❌ ✅ +Timer control ❌ ✅ +Individual port status ✅ ✅ +Individual port voltage ✅ ✅ +Individual port current ✅ ✅ +Individual port power ✅ ✅ +Temperature ❌ ❌ +Firmware version ❌ ❌ +Serial number ❌ ❌ +======================= ============= ============= diff --git a/docs/source/prime_charger_160w.rst b/docs/source/prime_charger_160w.rst new file mode 100644 index 0000000..8423eef --- /dev/null +++ b/docs/source/prime_charger_160w.rst @@ -0,0 +1,9 @@ +Prime Charger 160w +================== + +.. autoclass:: SolixBLE.PrimeCharger160w + :members: + :inherited-members: connect, disconnect, add_callback, remove_callback, connected, available, address, name, supports_telemetry, last_update + :special-members: __init__ + :member-order: groupwise + :no-index: diff --git a/examples/demo.py b/examples/demo.py index 7df6ee4..7972f0b 100644 --- a/examples/demo.py +++ b/examples/demo.py @@ -22,6 +22,7 @@ F2000, F3800, Generic, + PrimeCharger160w, PrimeCharger250w, Solarbank2, Solarbank3, @@ -39,6 +40,7 @@ "F3800": F3800, "Solarbank 2": Solarbank2, "Solarbank 3": Solarbank3, + "PrimeCharger160w": PrimeCharger160w, "PrimeCharger250w": PrimeCharger250w, "Unknown": Generic, } diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..f9dd3ce --- /dev/null +++ b/tests/__init__.py @@ -0,0 +1,5 @@ +"""Tests for SolixBLE module. + +.. moduleauthor:: Harvey Lelliott (flip-dots) + +""" diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..d5f3746 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,34 @@ +"""Fixtures for tests for SolixBLE. + +.. moduleauthor:: Harvey Lelliott (flip-dots) + +""" + +import asyncio +from unittest import mock + +import pytest + + +@pytest.fixture +def fast_timeouts(): + """Use to make asyncio.Timeout finish 100x faster.""" + original_timeout = asyncio.timeout + + def scaled_timeout(delay): + return original_timeout(delay / 100 if delay else None) + + with mock.patch("asyncio.timeout", side_effect=scaled_timeout): + yield + + +@pytest.fixture +def fast_sleep(): + """Use to make asyncio.sleep finish 100x faster.""" + original_sleep = asyncio.sleep + + async def scaled_sleep(delay): + return await original_sleep(delay / 100) + + with mock.patch("asyncio.sleep", side_effect=scaled_sleep): + yield diff --git a/tests/const.py b/tests/const.py new file mode 100644 index 0000000..42ca316 --- /dev/null +++ b/tests/const.py @@ -0,0 +1,82 @@ +""" +Constants used inside tests. + +.. moduleauthor:: Harvey Lelliott (flip-dots) +""" + +from bleak import BLEDevice + +from SolixBLE import const, prime_device + +MOCK_DEVICE_NAME = "Mock Device" +""" +Mock device name used to emulate Anker devices for tests. +""" + +MOCK_DEVICE_ADDRESS = "AA:BB:CC:DD:EE:FF" +""" +Mock bluetooth address used to emulate Anker devices for tests. +""" + +MOCK_BLE_DEVICE = BLEDevice( + address=MOCK_DEVICE_ADDRESS, name=MOCK_DEVICE_NAME, details={}, rssi=-1 +) +""" +Mock BLEDevice used to emulate Anker devices for tests. +""" + + +NEGOTIATION_RESPONSES_PRIME: dict[str, list[str]] = { + prime_device.NEGOTIATION_COMMAND_0: [ + "ff091e000300014801ab273ed3e27270c3f4d676ac7d69a00572793732a6" + ], + prime_device.NEGOTIATION_COMMAND_1: [ + "ff092b000300014803ab273ed0443800b35db54c6d4a6ec3d48171a04ea7ebce8bf749e5e48c5d991a5e67" + ], + prime_device.NEGOTIATION_COMMAND_2: [ + "ff0958000300014829ab273ed144326ada9fc66fa02508c5ddf549ade014d1eeb252fea1057c15b00985ab8a724fa3830e8e5b27acbaa1224fd2172c0439d27aaf9e62a66bda5c41c424f23c5c8d7df8d3b89422ddff2266" + ], + prime_device.NEGOTIATION_COMMAND_3: [ + "ff091b000300014805abab709a595a803dd04246b78a927453cf65" + ], + prime_device.NEGOTIATION_COMMAND_4: [ + "ff095d000300014821ab277fc01de436d341de628c79c1384d0aea25ce030622fa3ca0808ce5d1b7365ec1b1753a11ab78fba3ca07dda95cd57c93d1267b1222bef9908f7633a758ab924eba63ee01e715be5b9c3b082e6d81c2204241" + ], + # These packets below are dynamically generated by the library unlike previous + # negotiation packets as past stage 5 negotiations the packets use the shared + # secret for encryption which is different from device to device due to different + # public keys. This is why constants are not used. But we can use constants for + # the earlier packets which use static keys. + "ff094000030001402257ec69586f3500c8f858e0ba047f237f4e2ed8c50d2f39ba3587e4010275bea22242936f08788849272fb3f4cf7493be4a60bb9c9f0693": [ + "ff091b000300014822f60b45600839b2c171b33dc5790ed64ae32d" + ], + "ff094600030001402757ec69586f3501e8cf6185d8c4035707377af9af3a2e40b02b86e7531974f1c22440de6e43705566b77cf940e235b65abf4d413ece5f2c3781712f3742": [ + "ff091b000300014827f60b45600839b2c171b33dc5790ed64ae328" + ], + "ff09230003000f420057e9b8dfdeacda7991d3eb7f12093e55ff002aa9799bcc9216e3": [], + "ff09530003000f420a57e9b883d958e48e5b7de48d980206577e2dafbb3d604dea3686f3011969f0db2311906d142b5730ee2bfb11e3fbbe7485aac8877995310669156ec74645c962b419e579b385fd079967": [], +} +""" +This maps the expected commands sent by the library to what my Anker Prime 160w +charger sends in response. Its used to emulate it for testing negotiations. +""" + + +NEGOTIATION_RESPONSES_SOLIX: dict[str, list[str]] = { + const.NEGOTIATION_COMMAND_0: ["ff090e00030001080100a1010152"], + const.NEGOTIATION_COMMAND_1: [ + "ff091b00030001080300a10102a202fd00a30144a40101a50102ff" + ], + const.NEGOTIATION_COMMAND_2: [ + "ff093800030001082900a10103a2054553503332a307302e302e302e33a41041504339464530453237333030323735a506f49d8a104e0c9a" + ], + const.NEGOTIATION_COMMAND_3: ["ff090b00030001080500f2"], + const.NEGOTIATION_COMMAND_4: [ + "ff094d00030001082100a140b2ade5cac4f4a0c1307e44a0e9c5363cb21e4c8485ee324c23be949fa5d5929a75e57da3207c948a0c366ca9ea1ab2cb8e57d2d046a6ebefe5d96adb5d4cb35039" + ], + const.NEGOTIATION_COMMAND_5: [], +} +""" +This maps the expected commands sent by the library to what my Anker Solix C300 +sends in response. Its used to emulate it for testing negotiations. +""" diff --git a/tests/helpers.py b/tests/helpers.py index c5e78c8..0bd60b3 100644 --- a/tests/helpers.py +++ b/tests/helpers.py @@ -7,27 +7,15 @@ import asyncio import logging from collections.abc import Callable -from dataclasses import dataclass +from dataclasses import dataclass, field from typing import Any, Union from unittest import mock from bleak import BleakClient -from SolixBLE import const - _LOGGER = logging.getLogger(__name__) -NEGOTIATION_RESPONSES: dict[str, Union[str, None]] = { - const.NEGOTIATION_COMMAND_0: "ff090e00030001080100a1010152", - const.NEGOTIATION_COMMAND_1: "ff091b00030001080300a10102a202fd00a30144a40101a50102ff", - const.NEGOTIATION_COMMAND_2: "ff093800030001082900a10103a2054553503332a307302e302e302e33a41041504339464530453237333030323735a506f49d8a104e0c9a", - const.NEGOTIATION_COMMAND_3: "ff090b00030001080500f2", - const.NEGOTIATION_COMMAND_4: "ff094d00030001082100a140b2ade5cac4f4a0c1307e44a0e9c5363cb21e4c8485ee324c23be949fa5d5929a75e57da3207c948a0c366ca9ea1ab2cb8e57d2d046a6ebefe5d96adb5d4cb35039", - const.NEGOTIATION_COMMAND_5: None, -} - - @dataclass class RequestResponse: """ @@ -36,17 +24,22 @@ class RequestResponse: the request is. """ - expected: bytes + name: str """ - The bytes expected by this request. + Name of request to produce more useful error messages. """ - response: Union[bytes, None] + expected: Union[bytes, None] + """ + The bytes expected by this request. Use none to accept any bytes. + """ + + response: list[bytes] """ The bytes (if any) that should be sent in response to a matching request. """ - called: bool + called: bool = field(default=False) """ Has this request been fulfilled. """ @@ -147,7 +140,8 @@ def new_connection_error(self, side_effect: Any): :param side_effect: Side effect to trigger (e.g exception). """ - + if self._establish is None: + raise ValueError("Context manager not active!") self._establish.side_effect = side_effect def allow_connect(self): @@ -166,7 +160,9 @@ def disconnect(self): for callback in dc_callbacks: callback(bleak_client) - def expect_ordered(self, value: bytes, response: Union[bytes, None] = None): + def expect_ordered( + self, value: Union[bytes, None] = None, response: list[bytes] = [] + ): """ Expect an ordered request to be made to the mock device with the specified value and optionally respond with bytes. @@ -174,10 +170,25 @@ def expect_ordered(self, value: bytes, response: Union[bytes, None] = None): If an unexpected or out of order request is made an error will be raised. - :param value: Expected bytes value. - :param response: Optional bytes value to respond with. + :param value: Expected bytes value or None to accept any. + :param response: List of bytes to respond with. """ - self._assertions.append(RequestResponse(value, response, False)) + self._assertions.append( + RequestResponse( + name=f"num {len(self._assertions)}", expected=value, response=response + ) + ) + + def expect_ordered_all(self, requests: list[RequestResponse]): + """ + Expect an list of requests. + + If an unexpected or out of order request is made an error will be + raised. + + :param request_response: Expected request and/or response. + """ + self._assertions.extend(requests) async def start_notify(self, uuid: bytes, callback: Callable): """ @@ -192,7 +203,7 @@ async def start_notify(self, uuid: bytes, callback: Callable): if client is self._current_mock_bleak_client: n_callbacks.append(callback) - async def send_data(self, data: bytes) -> None: + async def send_data(self, data: list[bytes]) -> None: """ Write the specified data as a notification to all clients registered callbacks. @@ -202,11 +213,12 @@ async def send_data(self, data: bytes) -> None: for client, _, n_callbacks in self._mock_bleak_clients: for callback in n_callbacks: - _LOGGER.debug( - f"Mock device sending '{data.hex()}' to client '{client}' for callback '{callback}'..." - ) - # Handle is not used - await callback(None, data) + for packet in data: + _LOGGER.debug( + f"Mock device sending '{packet.hex()}' to client '{client}' for callback '{callback}'..." + ) + # Handle is not used + await callback(None, packet) # Wait between sending await asyncio.sleep(0.1) @@ -236,10 +248,11 @@ async def write_gatt_char( False ), f"Received an unexpected request '{data.hex()}'. Number: {self._position+1}, Num expected: {len(self._assertions)}" - # Assert it matches - assert ( - request_response.expected == data - ), f"Expected bytes {request_response.expected.hex()}' but got '{data.hex()}'!" + if request_response.expected is not None: + # Assert it matches + assert ( + request_response.expected == data + ), f"Expected bytes {request_response.expected.hex()}' for request '{request_response.name}' ({self._position+1}) but got '{data.hex()}'!" # Increment position self._position = self._position + 1 @@ -262,7 +275,7 @@ def check_assertions(self): for i, item in enumerate(self._assertions): assert ( item.called - ), f"Request {i} with expected bytes '{item.expected.hex()}' was not called!" + ), f"Request '{item.name}' ({i}) with expected bytes '{item.expected.hex()}' was not called!" async def __aexit__(self, *exc): """ diff --git a/tests/test_connection.py b/tests/test_connection.py new file mode 100644 index 0000000..9659679 --- /dev/null +++ b/tests/test_connection.py @@ -0,0 +1,255 @@ +"""Tests for the automatic reconnection to devices. + +.. moduleauthor:: Harvey Lelliott (flip-dots) + +""" + +import asyncio + +import pytest + +from SolixBLE import C300, PrimeCharger160w, SolixBLEDevice +from tests.const import ( + MOCK_BLE_DEVICE, + NEGOTIATION_RESPONSES_PRIME, + NEGOTIATION_RESPONSES_SOLIX, +) +from tests.helpers import MockDevice + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "device_class,negotiation", + [ + pytest.param(C300, NEGOTIATION_RESPONSES_SOLIX, id="solix"), + pytest.param(PrimeCharger160w, NEGOTIATION_RESPONSES_PRIME, id="prime"), + ], +) +async def test_automatic_retry( + fast_sleep, fast_timeouts, device_class: type[SolixBLEDevice], negotiation: dict +): + """ + Test the automatic retrying of a lost connection when the + reconnection happens within the timeout. + + This test expects the module to connect the the mock device + and then the mock device drops the connection and we expect + the module to automatically reconnect and not run any callbacks. + + :param device_class: Device class under test (e.g C300). + :param negotiation: Expected negotiation for the device to mock it. + """ + + async with MockDevice() as mock_bluetooth: + + device = device_class(MOCK_BLE_DEVICE) + + def my_callback(*args, **kwargs): + """We do not expect this callback to be triggered.""" + assert False + + # We first expect a negotiation + for expected, responses in negotiation.items(): + mock_bluetooth.expect_ordered( + bytes.fromhex(expected) if expected is not None else None, + [bytes.fromhex(response) for response in responses], + ) + + # We expect the negotiations to succeed + assert await device.connect(), "Expected connect to return True" + await asyncio.sleep(0.5) + assert device.connected, "Expected connected to be True" + assert device.negotiated, "Expected connected to be True" + mock_bluetooth.check_assertions() + + # We then add our callback that should not be run as we should + # silently reconnect + device.add_callback(my_callback) + + # We will soon expect a renegotiation + for expected, responses in negotiation.items(): + mock_bluetooth.expect_ordered( + bytes.fromhex(expected) if expected is not None else None, + [bytes.fromhex(response) for response in responses], + ) + + # We then trigger a disconnect from the device + mock_bluetooth.disconnect() + await asyncio.sleep(0.5) + assert not device.connected, "Expected connected to be False" + assert not device.negotiated, "Expected connected to be False" + + # Set .is_connected to True + mock_bluetooth.allow_connect() + + # We expect to have been automatically reconnected + await asyncio.sleep(30) + assert device.connected, "Expected connected to be True" + assert device.negotiated, "Expected connected to be True" + mock_bluetooth.check_assertions() + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "device_class,negotiation", + [ + pytest.param(C300, NEGOTIATION_RESPONSES_SOLIX, id="solix"), + pytest.param(PrimeCharger160w, NEGOTIATION_RESPONSES_PRIME, id="prime"), + ], +) +async def test_automatic_retry_timeout( + fast_sleep, + fast_timeouts, + device_class: type[SolixBLEDevice], + negotiation: dict, +): + """ + Test the automatic retrying of a lost connection when + the reconnection takes longer than the timeout. + + This test expects the module to connect the the mock device + and then the mock device drops the connection and we expect + callbacks to be run as the module will not be able to establish + the connection within the silent reconnect timeout and then + we allow a reconnect and expect the module to automatically + reconnect and run callbacks again on successful connection. + + :param device_class: Device class under test (e.g C300). + :param negotiation: Expected negotiation for the device to mock it. + """ + + async with MockDevice() as mock_bluetooth: + + device = device_class(MOCK_BLE_DEVICE) + + num_calls = 0 + + def my_callback(*args, **kwargs): + """We expect this to be triggered on timeout limit and on reconnect.""" + nonlocal num_calls + num_calls = num_calls + 1 + + # We first expect a negotiation + for expected, responses in negotiation.items(): + mock_bluetooth.expect_ordered( + bytes.fromhex(expected) if expected is not None else None, + [bytes.fromhex(response) for response in responses], + ) + + # We expect the negotiations to succeed + assert await device.connect(), "Expected connect to return True" + await asyncio.sleep(1) + assert device.connected, "Expected connected to be True" + assert device.negotiated, "Expected connected to be True" + mock_bluetooth.check_assertions() + + # We then add our callback that should be run both when the timeout + # is exceeded and again when we successfully reconnect + device.add_callback(my_callback) + + # We then trigger a disconnect from the device + mock_bluetooth.disconnect() + await asyncio.sleep(160) + assert not device.connected, "Expected connected to be False" + assert not device.negotiated, "Expected connected to be False" + + # Expect callback to be triggered due to timeout limit being + # exceeded + assert num_calls == 1 + + # Set .is_connected to True + mock_bluetooth.allow_connect() + + # We then expect a renegotiation + for expected, responses in negotiation.items(): + mock_bluetooth.expect_ordered( + bytes.fromhex(expected) if expected is not None else None, + [bytes.fromhex(response) for response in responses], + ) + + # We expect to have been automatically reconnected + await asyncio.sleep(30) + assert device.connected, "Expected connected to be True" + assert device.negotiated, "Expected connected to be True" + mock_bluetooth.check_assertions() + + # Expect callback to have been triggered again due to + # successful reconnection after running callbacks due to + # disconnection + assert num_calls == 2 + + +@pytest.mark.asyncio +@pytest.mark.parametrize( + "device_class,negotiation", + [ + pytest.param(C300, NEGOTIATION_RESPONSES_SOLIX, id="solix"), + pytest.param(PrimeCharger160w, NEGOTIATION_RESPONSES_PRIME, id="prime"), + ], +) +async def test_disconnect( + fast_timeouts, + fast_sleep, + device_class: type[SolixBLEDevice], + negotiation: dict, +): + """ + Test the mock device is disconnected and no automatic + reconnection attempts are executed when disconnect is called. + + We also expect no callbacks to be run and multiple calls + to disconnect to do nothing. + + :param device_class: Device class under test (e.g C300). + :param negotiation: Expected negotiation for the device to mock it. + """ + + async with MockDevice() as mock_bluetooth: + + device = device_class(MOCK_BLE_DEVICE) + + async def assert_still_disconnected(): + """Assert that device is still disconnected.""" + for i in range(0, 100): + await asyncio.sleep(1) + assert ( + not device.connected + ), f"Expected connected to be False after {i} seconds" + assert ( + not device.negotiated + ), f"Expected negotiated to be False after {i} seconds" + assert ( + device._client is None + ), f"Expected client to be None after {i} seconds" + + def my_callback(*args, **kwargs): + """We expect this to not be called.""" + assert False + + # We first expect a negotiation + for expected, responses in negotiation.items(): + mock_bluetooth.expect_ordered( + bytes.fromhex(expected) if expected is not None else None, + [bytes.fromhex(response) for response in responses], + ) + + # We expect the negotiations to succeed + assert await device.connect(), "Expected connect to return True" + await asyncio.sleep(5) + assert device.connected, "Expected connected to be True" + assert device.negotiated, "Expected negotiated to be True" + mock_bluetooth.check_assertions() + + # We then add our callback that should not be run when we call + # disconnect + device.add_callback(my_callback) + + # We then call disconnect and expect to remain disconnected and that + # disconnect on the client was called + await device.disconnect() + await assert_still_disconnected() + mock_bluetooth._current_mock_bleak_client.disconnect.assert_called_once() + + # We call disconnect again and expect no changes (still disconnected) + await device.disconnect() + await assert_still_disconnected() diff --git a/tests/test_devices.py b/tests/test_devices.py index ba5179a..a730112 100644 --- a/tests/test_devices.py +++ b/tests/test_devices.py @@ -7,11 +7,8 @@ import asyncio import logging from typing import Any -from unittest import mock import pytest -from bleak import BLEDevice -from helpers import NEGOTIATION_RESPONSES, MockDevice from SolixBLE import ( C300, @@ -21,16 +18,19 @@ C1000G2, ChargingStatus, LightStatus, - PortStatus, - TemperatureUnit, PortOverload, + PortStatus, + PrimeCharger160w, + PrimeDevice, SolixBLEDevice, - const, + TemperatureUnit, ) - -MOCK_DEVICE_NAME = "Mock Device" -MOCK_DEVICE_ADDRESS = "AA:BB:CC:DD:EE:FF" -MOCK_BLE_DEVICE = BLEDevice(MOCK_DEVICE_ADDRESS, MOCK_DEVICE_NAME, {}) +from tests.const import ( + MOCK_BLE_DEVICE, + NEGOTIATION_RESPONSES_PRIME, + NEGOTIATION_RESPONSES_SOLIX, +) +from tests.helpers import MockDevice @pytest.mark.asyncio @@ -441,6 +441,44 @@ }, id="c300_charging_ac_and_light", ), + pytest.param( + PrimeCharger160w, + "a10131a20302e805a303020000a4020100a5080400000000000000a6080400000000000000a7080400000000000000a8020103a9020150aa020100ab090400000f0f0f000000ac0d0401002c0100002c0100000300ad0d0401002c0100002c0100000300ae0d0401002c0100002c0100000300af020100b0020100b1020101b2020101b3020101b40d04fafffbfffafffbfffafffbffb50d04ffffffffffffffffffffffffe0050408000000e10b0480034b53000000000000fe050300000000", + { + "usb_port_c1": PortStatus.NOT_CONNECTED, + "usb_c1_current": 0.0, + "usb_c1_power": 0.0, + "usb_c1_voltage": 0.0, + "usb_port_c2": PortStatus.NOT_CONNECTED, + "usb_c2_current": 0.0, + "usb_c2_power": 0.0, + "usb_c2_voltage": 0.0, + "usb_port_c3": PortStatus.NOT_CONNECTED, + "usb_c3_current": 0.0, + "usb_c3_power": 0.0, + "usb_c3_voltage": 0.0, + }, + id="prime_160w_idle", + ), + pytest.param( + PrimeCharger160w, + "a10131a20302e805a303020000a4020100a5080401e01374003700a608040108236c030b03a7080401d81364003200a8020103a9020150aa020100ab090400000f0f0f000000ac0d0401002c0100002c0100000000ad0d0401002c0100002c0100000203ae0d0401002c0100002c0100000000af020100b0020100b1020101b2020101b3020101b40d0400000000e804000000000000b50d04ffffffffffffffffffffffffe0050408000000e10b0480034b53000000000000fe050300000000", + { + "usb_port_c1": PortStatus.OUTPUT, + "usb_c1_current": 0.116, + "usb_c1_power": 0.55, + "usb_c1_voltage": 5.088, + "usb_port_c2": PortStatus.OUTPUT, + "usb_c2_current": 0.876, + "usb_c2_power": 7.79, + "usb_c2_voltage": 8.968, + "usb_port_c3": PortStatus.OUTPUT, + "usb_c3_current": 0.1, + "usb_c3_power": 0.5, + "usb_c3_voltage": 5.08, + }, + id="prime_160w_all_three_charging", + ), pytest.param( C300DC, "a10131a2050300000000a303020000a403020000a503020000a603020000a703020000a803020000a903020000aa03020000ab03020000ac03020000ad03020000ae03020000af03020000b003020000b103020000b203020000b303020000b403020000b5020180b6020100b7020100b8020100b9020100ba020100bb020100bc020100bd020100be020100bf020100c0020100c1020100c2020100c3110020202020202020202020202020202020c403020000c503020000c603020000c7020100c8020100c9020100ca020100cb03020000cc020100cd020100f7050300000000f815040000000000000000000000000000000000000000", @@ -579,7 +617,7 @@ ], ) async def test_values( - device_class: SolixBLEDevice, payload: str, mapping: dict[str, Any] + device_class: type[SolixBLEDevice], payload: str, mapping: dict[str, Any] ) -> None: """ Test that a payload is parsed into the correct values. @@ -590,7 +628,7 @@ async def test_values( """ device = device_class(MOCK_BLE_DEVICE) parameters = device._parse_payload(bytes.fromhex(payload)) - await device._process_telemetry(None, parameters) + await device._process_telemetry(parameters) for class_property, expected_value in mapping.items(): assert ( @@ -600,157 +638,160 @@ async def test_values( @pytest.mark.asyncio @pytest.mark.parametrize( - "device_class, packet_1, packet_2, packet_3, packet_4, packet_5, secret", + "device_class,packets,secret", [ pytest.param( C300, - "ff090e00030001080100a1010152", - "ff091b00030001080300a10102a202fd00a30144a40101a50102ff", - "ff093800030001082900a10103a2054553503332a307302e302e302e33a410415a5653424a30453339323030303438a506f49d8a53a95a14", - "ff090b00030001080500f2", - "ff094d00030001082100a140c2a5a88fab34c1ac0f96a52e1b93354a47fb6c674b5afebacf5a2ed755435f41f0d26e97782e54e268b46d9f8a58a267cd7f7a239771e6289e55d94f7669ed448a", + [ + "ff090e00030001080100a1010152", + "ff091b00030001080300a10102a202fd00a30144a40101a50102ff", + "ff093800030001082900a10103a2054553503332a307302e302e302e33a410415a5653424a30453339323030303438a506f49d8a53a95a14", + "ff090b00030001080500f2", + "ff094d00030001082100a140c2a5a88fab34c1ac0f96a52e1b93354a47fb6c674b5afebacf5a2ed755435f41f0d26e97782e54e268b46d9f8a58a267cd7f7a239771e6289e55d94f7669ed448a", + None, + ], "2e9edc471d11bd214d45c0a651ab42e3cd370e04f1b860fc85adfaf612aba33f", id="c300_1", ), pytest.param( C300, - "ff090e00030001080100a1010152", - "ff091b00030001080300a10102a202fd00a30144a40101a50102ff", - "ff093800030001082900a10103a2054553503332a307302e302e302e33a410415a5653424a30453339323030303438a506f49d8a53a95a14", - "ff090b00030001080500f2", - "ff094d00030001082100a140a7b5d3824a36cae20bab9fc4d9358191e5351905a782eda157f376cc43f1f761ab772d437f33787188716d1bebd81719d1eb76b94f08499ee93895d5b43e75ef5f", + [ + "ff090e00030001080100a1010152", + "ff091b00030001080300a10102a202fd00a30144a40101a50102ff", + "ff093800030001082900a10103a2054553503332a307302e302e302e33a410415a5653424a30453339323030303438a506f49d8a53a95a14", + "ff090b00030001080500f2", + "ff094d00030001082100a140a7b5d3824a36cae20bab9fc4d9358191e5351905a782eda157f376cc43f1f761ab772d437f33787188716d1bebd81719d1eb76b94f08499ee93895d5b43e75ef5f", + None, + ], "f97b0112a955846530c60e4cf95f941df76d86ab9ca106aa4bd00fe1c4fcb14f", id="c300_2", ), pytest.param( C1000, - "ff090e00030001080100a1010152", - "ff091b00030001080300a10102a202fd00a30144a40101a50102ff", - "ff093800030001082900a10103a2054553503332a307302e302e302e33a41041504339464530453237333030323735a506f49d8a104e0c9a", - "ff090b00030001080500f2", - "ff094d00030001082100a140d3ef70a8faeb9ae7d9be034390108c2c7b177f3d549eb87318bd7a31703fc604664efb0e4600298ca9a905fb5af170955fb76229791dd583478b84d9950bd65420", + [ + "ff090e00030001080100a1010152", + "ff091b00030001080300a10102a202fd00a30144a40101a50102ff", + "ff093800030001082900a10103a2054553503332a307302e302e302e33a41041504339464530453237333030323735a506f49d8a104e0c9a", + "ff090b00030001080500f2", + "ff094d00030001082100a140d3ef70a8faeb9ae7d9be034390108c2c7b177f3d549eb87318bd7a31703fc604664efb0e4600298ca9a905fb5af170955fb76229791dd583478b84d9950bd65420", + None, + ], "2bdc8c8bfecf40814f602e6547cf29bf125abcc1a93be0751d8f1065a2bb5570", id="c1000_1", ), pytest.param( C1000, - "ff090e00030001080100a1010152", - "ff091b00030001080300a10102a202fd00a30144a40101a50102ff", - "ff093800030001082900a10103a2054553503332a307302e302e302e33a41041504339464530453237333030323735a506f49d8a104e0c9a", - "ff090b00030001080500f2", - "ff094d00030001082100a140b2ade5cac4f4a0c1307e44a0e9c5363cb21e4c8485ee324c23be949fa5d5929a75e57da3207c948a0c366ca9ea1ab2cb8e57d2d046a6ebefe5d96adb5d4cb35039", + [ + "ff090e00030001080100a1010152", + "ff091b00030001080300a10102a202fd00a30144a40101a50102ff", + "ff093800030001082900a10103a2054553503332a307302e302e302e33a41041504339464530453237333030323735a506f49d8a104e0c9a", + "ff090b00030001080500f2", + "ff094d00030001082100a140b2ade5cac4f4a0c1307e44a0e9c5363cb21e4c8485ee324c23be949fa5d5929a75e57da3207c948a0c366ca9ea1ab2cb8e57d2d046a6ebefe5d96adb5d4cb35039", + None, + ], "0c4d9db9ef376fcfe627b9b73089eda514315d4bf67fb7eb299f2894ef7a059c", id="c1000_2", ), ], ) async def test_negotiation( - device_class: SolixBLEDevice, - packet_1: str, - packet_2: str, - packet_3: str, - packet_4: str, - packet_5: str, + fast_sleep, + fast_timeouts, + device_class: type[SolixBLEDevice], + packets: list[str], secret: str, ): """ Test negotiation of the shared secret by mocking a device. :param device_class: The class of the device being tested. - :param packet_1: Packet sent by device in response to negotiation command 0. - :param packet_2: Packet sent by device in response to negotiation command 1. - :param packet_3: Packet sent by device in response to negotiation command 2. - :param packet_4: Packet sent by device in response to negotiation command 3. - :param packet_5: Packet sent by device in response to negotiation command 4. + :param packets: Packets sent by the mock device in response to our packets. :param secret: The expected shared secret. """ async with MockDevice() as mock_bluetooth: device = device_class(MOCK_BLE_DEVICE) - mock_bluetooth.expect_ordered( - bytes.fromhex(const.NEGOTIATION_COMMAND_0), - bytes.fromhex(packet_1), - ) - mock_bluetooth.expect_ordered( - bytes.fromhex(const.NEGOTIATION_COMMAND_1), - bytes.fromhex(packet_2), - ) - mock_bluetooth.expect_ordered( - bytes.fromhex(const.NEGOTIATION_COMMAND_2), - bytes.fromhex(packet_3), - ) - mock_bluetooth.expect_ordered( - bytes.fromhex(const.NEGOTIATION_COMMAND_3), - bytes.fromhex(packet_4), - ) - mock_bluetooth.expect_ordered( - bytes.fromhex(const.NEGOTIATION_COMMAND_4), - bytes.fromhex(packet_5), - ) - mock_bluetooth.expect_ordered(bytes.fromhex(const.NEGOTIATION_COMMAND_5), None) + for packet in packets: + mock_bluetooth.expect_ordered( + None, + [bytes.fromhex(packet)] if packet else [], + ) # Assert that the connection succeeds assert await device.connect(), "Expected connect to return True" # Assert that the correct shared secret is calculated assert ( - bytes.fromhex(secret)[:16] == device._shared_key - ), "Negotiated key does not match expected" - assert ( - bytes.fromhex(secret)[16:] == device._iv - ), "Negotiated IV does not match expected" + bytes.fromhex(secret) == device._shared_secret + ), "Shared secret does not match expected" mock_bluetooth.check_assertions() @pytest.mark.parametrize( - "payload,secret,iv,decrypted", + "device_class,payload,secret,decrypted", [ pytest.param( + C300, "5bc7c7b05cf74c1ba441a17a5568f4b25bc061d354f498e39ba509e2c7664ce36d6a9ee8280a40736b9b681f10ab6eb7c86bca4b88fe6fc39ca3391d7ede4e1c47b6b5f0e5ccc67c841a0eb0912039323c27f9e819244424914c9fb538e93a23bc9bfd0f4e9df1b59fec44b5236c75c6f45e42a1110152e56491f8381ae07e50113e3746ca9a16182bc8c9102bbb463eb42d27b1e6330feb3f76d21bf751fe4a1d469c64cd8c9bda426943d48fc7c583c665ea21c7ee23fdde9262d47727c9454d88dd30d291f9bc9b0936a66761846c729f898895d97c158c36e703626ea8499fbf2dc8962159f1b7380f5f84038240d5df00ce1a7eecb4f3ea0b7de9aac5b8637d78f0f3fcf6d600227148d5011bd765a99be6d6ab0e83b9ebe8dcb9ce5ba6", - "23a6446c34efb9f9ab1dbc43ffc8e289", - "fffdfed557f849c4e91bd7baec0c4814", - "a10131a2050300000000a3050300000000a40302ffffa503020000a603025b00a703020000a803020000a903020000aa03020000ab03020000ac03020000ad03020000ae03025b00af03020000b003020100b103021b04b20302fc01b30302fc01b403021c00b503027b00b603021b04b7020101b8020100b9020125ba020100bb020164bc020164bd020100be020100bf020100c0020101c1020100c2020100c3020100c4020100c51100415a5653424a30453339323030303438c603024a01c70302a005c803022c01c903023c00ca03020000cb020101cc020100cd020102ce020132cf020100d0020100d1020100d2020100f7050301000000f815040101010100010000000000000000000000000000f90201020a0a0a0a0a0a0a0a0a0a", + "23a6446c34efb9f9ab1dbc43ffc8e289fffdfed557f849c4e91bd7baec0c4814", + "a10131a2050300000000a3050300000000a40302ffffa503020000a603025b00a703020000a803020000a903020000aa03020000ab03020000ac03020000ad03020000ae03025b00af03020000b003020100b103021b04b20302fc01b30302fc01b403021c00b503027b00b603021b04b7020101b8020100b9020125ba020100bb020164bc020164bd020100be020100bf020100c0020101c1020100c2020100c3020100c4020100c51100415a5653424a30453339323030303438c603024a01c70302a005c803022c01c903023c00ca03020000cb020101cc020100cd020102ce020132cf020100d0020100d1020100d2020100f7050301000000f815040101010100010000000000000000000000000000f9020102", id="c300_telemetry", ), pytest.param( + C1000, "403d9e7311afd074672804704798c421db698f11a5a0fc4bd793c127871c6eea7a970666c9b614c494e62b15770b1dba3dc98019e34cf0eb0ebecb5a2c5bc9ae39441d5e5acad73a645112b779312966513b53ba6f78c0f82cda624cce3b08a1a83416bd52fa4caf37e05cfaa9b37ddea75447be949ba10b892c320398fae0191c1290af0e79791c56c0d2217aafb9259b13cd2ccb9e4d520548eb416f4f96b9d852231578d4d516495564215c297fce97549986ef47058168d77afddc8ac5c0b59c9bfaf681a4cd60eca4bfad743731ca81849b83689e452e68f82fcab9fa2404f05f22b557b73705d16bab42b8045ffcc8083f9cb4fa4acda9997de1a40a2eac55b5dfbc70d882874c1db1990b76ae009bb1997ab507d347c84f3fd39d6f6c", - "0c4d9db9ef376fcfe627b9b73089eda5", - "14315d4bf67fb7eb299f2894ef7a059c", - "a10131a2050300000000a3050300000000a40302d104a503020000a603020000a703020000a803020000a903020000aa03020000ab03020000ac03020000ad03020000ae03020000af03020000b003020300b103020000b203020100b30302a600b403020000b50302ff01b60302ff01b703020000b803029a00b903020000ba0302a600bb03020000bc020100bd020123be020100bf020101c0020100c1020164c2020100c3020164c4020100c5020100c6020100c7020100c8020100c9020100ca020100cb020100cc020100cd020100ce020100cf020100d0110041504339464530453237333030323735e5020100f7050301000000f815040202010100010000000000000000000000000000f9020102fd0b0041313736315f3330416801", + "0c4d9db9ef376fcfe627b9b73089eda514315d4bf67fb7eb299f2894ef7a059c", + "a10131a2050300000000a3050300000000a40302d104a503020000a603020000a703020000a803020000a903020000aa03020000ab03020000ac03020000ad03020000ae03020000af03020000b003020300b103020000b203020100b30302a600b403020000b50302ff01b60302ff01b703020000b803029a00b903020000ba0302a600bb03020000bc020100bd020123be020100bf020101c0020100c1020164c2020100c3020164c4020100c5020100c6020100c7020100c8020100c9020100ca020100cb020100cc020100cd020100ce020100cf020100d0110041504339464530453237333030323735e5020100f7050301000000f815040202010100010000000000000000000000000000f9020102fd0b0041313736315f33304168", id="c1000_telemetry", ), pytest.param( + C1000, "a9fdb7f5f88e0d7ec2c3a36f9cb4f226", - "cf9b34f93bc679b84c9754a9484a5699", - "1cef242c586b23dbef195ba0f2ee02cb", - "00a101310c0c0c0c0c0c0c0c0c0c0c0c", + "cf9b34f93bc679b84c9754a9484a56991cef242c586b23dbef195ba0f2ee02cb", + "00a10131", id="c1000_cmd_ack_ac_on", ), pytest.param( + C1000, "2eb0fc833d00ca9e33491eab73ccfda202cfdedb86599ba5d0e3c2c059652818", - "cf9b34f93bc679b84c9754a9484a5699", - "1cef242c586b23dbef195ba0f2ee02cb", - "a10131a2020101a3020100a4020100a5020103a6020101e50201000505050505", + "cf9b34f93bc679b84c9754a9484a56991cef242c586b23dbef195ba0f2ee02cb", + "a10131a2020101a3020100a4020100a5020103a6020101e5020100", id="c1000_unknown", ), + pytest.param( + PrimeCharger160w, + "57e9a883d95e4bc95b5be2baa1c366331abb9292585357de1f59c997254092ef1372bd5a26ef6b51d61dc87082ca8e7985aacad07f64181902c70c0502de2418e366f5f700b13049d9b857e95c85c66a32d64fcf31c8eead9e025ed69c1440170cca149e038501a9544b1baa044a6a65392e154357e137d917fc834e019012a01b9bd18d5ca7dc22bdb0204b0629b3f738f34bafdc26f6bb0781cec80fe547674a6a7a341a018ce3ac81e6eb6b5110d3311db692d174fe363acec5ba606a24b975c2bb2a43ddfe5351f54d9fcd295709", + "09486817d949a232b58b47a43cc72d045a617a26f3999d30e1d27e38eae52265", + "a10131a20302e805a303020000a4020100a508040150235704eb03a6080400000000000000a7080400000000000000a8020103a9020150aa020100ab090400000f0f0f000000ac0d0401002c0100002c0100000203ad0d0401002c0100002c0100000300ae0d0401002c0100002c0100000300af020100b0020100b1020101b2020101b3020101b40d04e8040000fafffbfffafffbffb50d04ffffffffffffffffffffffffe0050408000000e10b0480034b53000000000000fe050300000000", + id="prime_160w_telemetry", + ), + # Different anker prime charger from other tests + pytest.param( + PrimeCharger160w, + "14676a53fc1315457c58163660d5b7bb4a6c83be2f8511d2bc79e2428827907a591b28a709df413e4fa633dc943dd7d2902c46bdcd69ea2bfe4c529f577dfe492d3192aa04f2b2a66fa745b4ed64d34a0a8100d4dd165514edd14499cf1243fbc9d1c216239bc53b756256f4dc04723c470a10434d49e3e38c6d6e1c2054a4890ea244a14964ef6b69eecc3ce8debc0f50537a6be461f3a1b9eb6cc1f1303d8dcf9488a8d4c8bc60729fa669974a4b84a50a0d5f75833c157e5e5c54cf19f944e731932e076b25892c13e0b3979ccd11", + "c0779a39bfa7b290ba9cd3d96b6fdc22a1f6a9746d4fc81e942c3d95", + "a10131a20302e805a303020000a4020100a5080400000000000000a6080401d84e00000000a7080400000000000000a8020100a9020150aa020100ab090400001c50343b3b3bac0d0401002c0100002c0100000300ad0d0401002c0100002c0100000100ae0d0401002c0100002c0100000300af020101b0020101b1020100b2020101b30201ffb40d04fafffbff00000000fafffbffb50d04ffffffffffffffffffffffffe0050408000000e10b0400000000000000000000fe050300000000", + id="prime_160w_telemetry_alt", + ), ], ) -def test_payload_decryption(payload: str, secret: str, iv: str, decrypted: str): +def test_payload_decryption( + device_class: type[SolixBLEDevice], payload: str, secret: str, decrypted: str +): """ Test the decryption of a payload only. This does not test the splitting of a packet. + :param device_class: Class of device under test. :param payload: Payload to be decrypted. - :param secret: AES secret payload is encrypted with. - :param iv: IV used to encrypt payload. + :param secret: Shared secret used for AES key and IV. :param decrypted: Expected content of decrypted payload. """ - device = SolixBLEDevice(BLEDevice) - device._shared_key = bytes.fromhex(secret) - device._iv = bytes.fromhex(iv) + device = device_class(MOCK_BLE_DEVICE) + device._shared_secret = bytes.fromhex(secret) decrypted_bytes = device._decrypt_payload(bytes.fromhex(payload)) assert decrypted_bytes.hex() == decrypted, "Payloads do not match!" @@ -758,149 +799,170 @@ def test_payload_decryption(payload: str, secret: str, iv: str, decrypted: str): @pytest.mark.asyncio @pytest.mark.parametrize( - "packets, secret, iv, parameters", + "device_class, packets, secret, parameters", [ # Test that when there are no packets device._ data is None pytest.param( + SolixBLEDevice, [], "", - "", None, id="no_packets", ), # Test that when there there are 0/2 required packets device._data is None pytest.param( + C1000, [ "ff092a0003010f440156ecb95eb746de03d40ee711ce99f42837a9554c6382d3f5298a3b0648d8536936" ], - "645ca871528991eb38ebb327a781e932", - "b1d9d7a613b04c966b317db056c83428", + "645ca871528991eb38ebb327a781e932b1d9d7a613b04c966b317db056c83428", None, id="irrelevant_packet_only", ), # Test that when there there is only 1/2 required packets device._data is None pytest.param( + C1000, [ "ff09390003010fc40222788d127d8418b41a81719975719a26b32734ea4e44ce244683e31928bb9a2736f9ede939567cddce6b3fb0de68116c" ], - "645ca871528991eb38ebb327a781e932", - "b1d9d7a613b04c966b317db056c83428", + "645ca871528991eb38ebb327a781e932b1d9d7a613b04c966b317db056c83428", None, - id="packet_1_missing", + id="solix_packet_1_missing", ), # Test that when there there is only 1/2 required packets device._data is None pytest.param( + C1000, [ "ff09fd0003010fc402121e0e23790307a57d4adabcd8d5ad56c3a9ea3cb5b222b0152438ccd3b980eda40fbde184fa66c80c3372dad179f11cad8799858ab95696e52c7e729af87c1106343ed5be9c042c8912b14f3a0d94b32afbed432e66616e1895ba0ff5e74a6da9401117070c926631e5d7886a07bec0de35aeb689e8bb289f1d7854143dc413f25d4b57d290ca4378cfb8efc275aa779145f98956e934eaced2d1f51cef7dd21a340318bfc14fb5f90ffd33e0e484175512af33593b1f91eb9801d7c2e1ac6d56e8fe7e8883d62226484ed6f1af711d042c5e3d0c186b3f2222293bc71ccf4a156a544d5171e90ee9b6b9b8f36ae058b96e3b88" ], - "645ca871528991eb38ebb327a781e932", - "b1d9d7a613b04c966b317db056c83428", + "645ca871528991eb38ebb327a781e932b1d9d7a613b04c966b317db056c83428", None, - id="packet_2_missing", + id="solix_packet_2_missing", ), # Test that when the 1st packet arrives after the 2nd packet is it ignored pytest.param( + C1000, [ "ff09390003010fc40222788d127d8418b41a81719975719a26b32734ea4e44ce244683e31928bb9a2736f9ede939567cddce6b3fb0de68116c", "ff09fd0003010fc402121e0e23790307a57d4adabcd8d5ad56c3a9ea3cb5b222b0152438ccd3b980eda40fbde184fa66c80c3372dad179f11cad8799858ab95696e52c7e729af87c1106343ed5be9c042c8912b14f3a0d94b32afbed432e66616e1895ba0ff5e74a6da9401117070c926631e5d7886a07bec0de35aeb689e8bb289f1d7854143dc413f25d4b57d290ca4378cfb8efc275aa779145f98956e934eaced2d1f51cef7dd21a340318bfc14fb5f90ffd33e0e484175512af33593b1f91eb9801d7c2e1ac6d56e8fe7e8883d62226484ed6f1af711d042c5e3d0c186b3f2222293bc71ccf4a156a544d5171e90ee9b6b9b8f36ae058b96e3b88", ], - "645ca871528991eb38ebb327a781e932", - "b1d9d7a613b04c966b317db056c83428", + "645ca871528991eb38ebb327a781e932b1d9d7a613b04c966b317db056c83428", None, - id="both_packets_reversed", + id="solix_both_packets_reversed", ), # Test that when the packets arrive in order they are parsed and device._data is populated pytest.param( + C1000, [ "ff09fd0003010fc402121e0e23790307a57d4adabcd8d5ad56c3a9ea3cb5b222b0152438ccd3b980eda40fbde184fa66c80c3372dad179f11cad8799858ab95696e52c7e729af87c1106343ed5be9c042c8912b14f3a0d94b32afbed432e66616e1895ba0ff5e74a6da9401117070c926631e5d7886a07bec0de35aeb689e8bb289f1d7854143dc413f25d4b57d290ca4378cfb8efc275aa779145f98956e934eaced2d1f51cef7dd21a340318bfc14fb5f90ffd33e0e484175512af33593b1f91eb9801d7c2e1ac6d56e8fe7e8883d62226484ed6f1af711d042c5e3d0c186b3f2222293bc71ccf4a156a544d5171e90ee9b6b9b8f36ae058b96e3b88", "ff09390003010fc40222788d127d8418b41a81719975719a26b32734ea4e44ce244683e31928bb9a2736f9ede939567cddce6b3fb0de68116c", ], - "645ca871528991eb38ebb327a781e932", - "b1d9d7a613b04c966b317db056c83428", + "645ca871528991eb38ebb327a781e932b1d9d7a613b04c966b317db056c83428", """{'a1': '31', 'a2': '0300000000', 'a3': '0300000000', 'a4': '02720f', 'a5': '020000', 'a6': '020000', 'a7': '020000', 'a8': '020000', 'a9': '020000', 'aa': '020000', 'ab': '020000', 'ac': '020000', 'ad': '020000', 'ae': '020000', 'af': '020000', 'b0': '020100', 'b1': '020000', 'b2': '020100', 'b3': '02a600', 'b4': '020000', 'b5': '02ff01', 'b6': '02ff01', 'b7': '020000', 'b8': '029a00', 'b9': '020000', 'ba': '02a600', 'bb': '020000', 'bc': '0100', 'bd': '0122', 'be': '0100', 'bf': '0101', 'c0': '0100', 'c1': '0164', 'c2': '0100', 'c3': '0164', 'c4': '0100', 'c5': '0100', 'c6': '0100', 'c7': '0100', 'c8': '0100', 'c9': '0100', 'ca': '0100', 'cb': '0100', 'cc': '0100', 'cd': '0100', 'ce': '0100', 'cf': '0100', 'd0': '0041504339464530453237333030323735', 'e5': '0100', 'f7': '0301000000', 'f8': '040202010100010000000000000000000000000000', 'f9': '0102', 'fd': '0041313736315f33304168'}""", - id="both_packets", + id="solix_both_packets", ), # Test that when the packets arrive in order they are parsed and device._data is populated # but that the later packet does not result in any changes to the data because it is not # valid until the next telemetry packet arrives pytest.param( + C1000, [ "ff09fd0003010fc402121e0e23790307a57d4adabcd8d5ad56c3a9ea3cb5b222b0152438ccd3b980eda40fbde184fa66c80c3372dad179f11cad8799858ab95696e52c7e729af87c1106343ed5be9c042c8912b14f3a0d94b32afbed432e66616e1895ba0ff5e74a6da9401117070c926631e5d7886a07bec0de35aeb689e8bb289f1d7854143dc413f25d4b57d290ca4378cfb8efc275aa779145f98956e934eaced2d1f51cef7dd21a340318bfc14fb5f90ffd33e0e484175512af33593b1f91eb9801d7c2e1ac6d56e8fe7e8883d62226484ed6f1af711d042c5e3d0c186b3f2222293bc71ccf4a156a544d5171e90ee9b6b9b8f36ae058b96e3b88", "ff09390003010fc40222788d127d8418b41a81719975719a26b32734ea4e44ce244683e31928bb9a2736f9ede939567cddce6b3fb0de68116c", "ff09fd0003010fc402121e0e23790307a57d4adabcd8d5ad56c3218e598b95b4b8aa7ff3483fd3cfc72612b49fad1e5e27b50be913da3b73328c0db3e5f58c5a86dce0f36a9c080db786c1b917a8541d43aec30c6cbd2b229876255894ac5269fb9f3d4258450905bbe28781c5544d7eb57553bc5c39418d02fba353983a9b0f318e951d57ccc019cea984f9a64b0cb793bec8c696936b16fac2d72c59c4b95561f5f534c448f911d5e1c9ac30601e04fb2338313498d083cc6f676b0797b587ebc5e2fc32e60562f5e41e44682b5f8f094bcbea33e0926f304366d5df28c4868d00ba37eb754c9921e9b63ebb0bb1fb76f644c0760636df1303362106", ], - "645ca871528991eb38ebb327a781e932", - "b1d9d7a613b04c966b317db056c83428", + "645ca871528991eb38ebb327a781e932b1d9d7a613b04c966b317db056c83428", """{'a1': '31', 'a2': '0300000000', 'a3': '0300000000', 'a4': '02720f', 'a5': '020000', 'a6': '020000', 'a7': '020000', 'a8': '020000', 'a9': '020000', 'aa': '020000', 'ab': '020000', 'ac': '020000', 'ad': '020000', 'ae': '020000', 'af': '020000', 'b0': '020100', 'b1': '020000', 'b2': '020100', 'b3': '02a600', 'b4': '020000', 'b5': '02ff01', 'b6': '02ff01', 'b7': '020000', 'b8': '029a00', 'b9': '020000', 'ba': '02a600', 'bb': '020000', 'bc': '0100', 'bd': '0122', 'be': '0100', 'bf': '0101', 'c0': '0100', 'c1': '0164', 'c2': '0100', 'c3': '0164', 'c4': '0100', 'c5': '0100', 'c6': '0100', 'c7': '0100', 'c8': '0100', 'c9': '0100', 'ca': '0100', 'cb': '0100', 'cc': '0100', 'cd': '0100', 'ce': '0100', 'cf': '0100', 'd0': '0041504339464530453237333030323735', 'e5': '0100', 'f7': '0301000000', 'f8': '040202010100010000000000000000000000000000', 'f9': '0102', 'fd': '0041313736315f33304168'}""", - id="both_packets_later_invalidates", + id="solix_both_packets_later_invalidates", ), # Test that when the packets arrive in order they are parsed and device._data is populated # but that the later packet does not result in any changes to the data because it is out # of order pytest.param( + C1000, [ "ff09fd0003010fc402121e0e23790307a57d4adabcd8d5ad56c3a9ea3cb5b222b0152438ccd3b980eda40fbde184fa66c80c3372dad179f11cad8799858ab95696e52c7e729af87c1106343ed5be9c042c8912b14f3a0d94b32afbed432e66616e1895ba0ff5e74a6da9401117070c926631e5d7886a07bec0de35aeb689e8bb289f1d7854143dc413f25d4b57d290ca4378cfb8efc275aa779145f98956e934eaced2d1f51cef7dd21a340318bfc14fb5f90ffd33e0e484175512af33593b1f91eb9801d7c2e1ac6d56e8fe7e8883d62226484ed6f1af711d042c5e3d0c186b3f2222293bc71ccf4a156a544d5171e90ee9b6b9b8f36ae058b96e3b88", "ff09390003010fc40222788d127d8418b41a81719975719a26b32734ea4e44ce244683e31928bb9a2736f9ede939567cddce6b3fb0de68116c", "ff09390003010fc40222922d054e0b6cd682ba63ba7cc0e158113a569150aa95c5a21bc3142c1ba2e95c06a7ce78547448520ae8cc1a2844fa", "ff09fd0003010fc402121e0e23790307a57d4adabcd8d5ad56c3218e598b95b4b8aa7ff3483fd3cfc72612b49fad1e5e27b50be913da3b73328c0db3e5f58c5a86dce0f36a9c080db786c1b917a8541d43aec30c6cbd2b229876255894ac5269fb9f3d4258450905bbe28781c5544d7eb57553bc5c39418d02fba353983a9b0f318e951d57ccc019cea984f9a64b0cb793bec8c696936b16fac2d72c59c4b95561f5f534c448f911d5e1c9ac30601e04fb2338313498d083cc6f676b0797b587ebc5e2fc32e60562f5e41e44682b5f8f094bcbea33e0926f304366d5df28c4868d00ba37eb754c9921e9b63ebb0bb1fb76f644c0760636df1303362106", ], - "645ca871528991eb38ebb327a781e932", - "b1d9d7a613b04c966b317db056c83428", + "645ca871528991eb38ebb327a781e932b1d9d7a613b04c966b317db056c83428", """{'a1': '31', 'a2': '0300000000', 'a3': '0300000000', 'a4': '02720f', 'a5': '020000', 'a6': '020000', 'a7': '020000', 'a8': '020000', 'a9': '020000', 'aa': '020000', 'ab': '020000', 'ac': '020000', 'ad': '020000', 'ae': '020000', 'af': '020000', 'b0': '020100', 'b1': '020000', 'b2': '020100', 'b3': '02a600', 'b4': '020000', 'b5': '02ff01', 'b6': '02ff01', 'b7': '020000', 'b8': '029a00', 'b9': '020000', 'ba': '02a600', 'bb': '020000', 'bc': '0100', 'bd': '0122', 'be': '0100', 'bf': '0101', 'c0': '0100', 'c1': '0164', 'c2': '0100', 'c3': '0164', 'c4': '0100', 'c5': '0100', 'c6': '0100', 'c7': '0100', 'c8': '0100', 'c9': '0100', 'ca': '0100', 'cb': '0100', 'cc': '0100', 'cd': '0100', 'ce': '0100', 'cf': '0100', 'd0': '0041504339464530453237333030323735', 'e5': '0100', 'f7': '0301000000', 'f8': '040202010100010000000000000000000000000000', 'f9': '0102', 'fd': '0041313736315f33304168'}""", - id="both_packets_later_out_of_order", + id="solix_both_packets_later_out_of_order", ), # Test that when the packets arrive in order they are parsed and device._data is populated # but that the later non-telemetry packet does not result in any changes because it is # not a telemetry packet pytest.param( + C1000, [ "ff09fd0003010fc402121e0e23790307a57d4adabcd8d5ad56c3a9ea3cb5b222b0152438ccd3b980eda40fbde184fa66c80c3372dad179f11cad8799858ab95696e52c7e729af87c1106343ed5be9c042c8912b14f3a0d94b32afbed432e66616e1895ba0ff5e74a6da9401117070c926631e5d7886a07bec0de35aeb689e8bb289f1d7854143dc413f25d4b57d290ca4378cfb8efc275aa779145f98956e934eaced2d1f51cef7dd21a340318bfc14fb5f90ffd33e0e484175512af33593b1f91eb9801d7c2e1ac6d56e8fe7e8883d62226484ed6f1af711d042c5e3d0c186b3f2222293bc71ccf4a156a544d5171e90ee9b6b9b8f36ae058b96e3b88", "ff09390003010fc40222788d127d8418b41a81719975719a26b32734ea4e44ce244683e31928bb9a2736f9ede939567cddce6b3fb0de68116c", "ff091a0003010f484a6e744378c57c16ca8ab3a40bebb6f39807", ], - "645ca871528991eb38ebb327a781e932", - "b1d9d7a613b04c966b317db056c83428", + "645ca871528991eb38ebb327a781e932b1d9d7a613b04c966b317db056c83428", """{'a1': '31', 'a2': '0300000000', 'a3': '0300000000', 'a4': '02720f', 'a5': '020000', 'a6': '020000', 'a7': '020000', 'a8': '020000', 'a9': '020000', 'aa': '020000', 'ab': '020000', 'ac': '020000', 'ad': '020000', 'ae': '020000', 'af': '020000', 'b0': '020100', 'b1': '020000', 'b2': '020100', 'b3': '02a600', 'b4': '020000', 'b5': '02ff01', 'b6': '02ff01', 'b7': '020000', 'b8': '029a00', 'b9': '020000', 'ba': '02a600', 'bb': '020000', 'bc': '0100', 'bd': '0122', 'be': '0100', 'bf': '0101', 'c0': '0100', 'c1': '0164', 'c2': '0100', 'c3': '0164', 'c4': '0100', 'c5': '0100', 'c6': '0100', 'c7': '0100', 'c8': '0100', 'c9': '0100', 'ca': '0100', 'cb': '0100', 'cc': '0100', 'cd': '0100', 'ce': '0100', 'cf': '0100', 'd0': '0041504339464530453237333030323735', 'e5': '0100', 'f7': '0301000000', 'f8': '040202010100010000000000000000000000000000', 'f9': '0102', 'fd': '0041313736315f33304168'}""", - id="both_packets_irrelevant_ignored", + id="solix_both_packets_irrelevant_ignored", ), # Test that when the packets arrive in order they are parsed and device._data is populated # and that once both of the next packets are received the device._data changes. pytest.param( + C1000, [ "ff09fd0003010fc402121e0e23790307a57d4adabcd8d5ad56c3a9ea3cb5b222b0152438ccd3b980eda40fbde184fa66c80c3372dad179f11cad8799858ab95696e52c7e729af87c1106343ed5be9c042c8912b14f3a0d94b32afbed432e66616e1895ba0ff5e74a6da9401117070c926631e5d7886a07bec0de35aeb689e8bb289f1d7854143dc413f25d4b57d290ca4378cfb8efc275aa779145f98956e934eaced2d1f51cef7dd21a340318bfc14fb5f90ffd33e0e484175512af33593b1f91eb9801d7c2e1ac6d56e8fe7e8883d62226484ed6f1af711d042c5e3d0c186b3f2222293bc71ccf4a156a544d5171e90ee9b6b9b8f36ae058b96e3b88", "ff09390003010fc40222788d127d8418b41a81719975719a26b32734ea4e44ce244683e31928bb9a2736f9ede939567cddce6b3fb0de68116c", "ff09fd0003010fc402121e0e23790307a57d4adabcd8d5ad56c3218e598b95b4b8aa7ff3483fd3cfc72612b49fad1e5e27b50be913da3b73328c0db3e5f58c5a86dce0f36a9c080db786c1b917a8541d43aec30c6cbd2b229876255894ac5269fb9f3d4258450905bbe28781c5544d7eb57553bc5c39418d02fba353983a9b0f318e951d57ccc019cea984f9a64b0cb793bec8c696936b16fac2d72c59c4b95561f5f534c448f911d5e1c9ac30601e04fb2338313498d083cc6f676b0797b587ebc5e2fc32e60562f5e41e44682b5f8f094bcbea33e0926f304366d5df28c4868d00ba37eb754c9921e9b63ebb0bb1fb76f644c0760636df1303362106", "ff09390003010fc40222922d054e0b6cd682ba63ba7cc0e158113a569150aa95c5a21bc3142c1ba2e95c06a7ce78547448520ae8cc1a2844fa", ], - "645ca871528991eb38ebb327a781e932", - "b1d9d7a613b04c966b317db056c83428", + "645ca871528991eb38ebb327a781e932b1d9d7a613b04c966b317db056c83428", """{'a1': '31', 'a2': '0300000000', 'a3': '0300000000', 'a4': '02d80e', 'a5': '020000', 'a6': '020000', 'a7': '020000', 'a8': '020000', 'a9': '020000', 'aa': '020000', 'ab': '020000', 'ac': '020000', 'ad': '020000', 'ae': '020000', 'af': '020000', 'b0': '020100', 'b1': '020000', 'b2': '020100', 'b3': '02a600', 'b4': '020000', 'b5': '02ff01', 'b6': '02ff01', 'b7': '020000', 'b8': '029a00', 'b9': '020000', 'ba': '02a600', 'bb': '020100', 'bc': '0100', 'bd': '0122', 'be': '0100', 'bf': '0101', 'c0': '0100', 'c1': '0164', 'c2': '0100', 'c3': '0164', 'c4': '0100', 'c5': '0100', 'c6': '0100', 'c7': '0100', 'c8': '0100', 'c9': '0100', 'ca': '0100', 'cb': '0100', 'cc': '0100', 'cd': '0100', 'ce': '0100', 'cf': '0100', 'd0': '0041504339464530453237333030323735', 'e5': '0100', 'f7': '0301000000', 'f8': '040202010100010000000000000000000000000000', 'f9': '0102', 'fd': '0041313736315f33304168'}""", - id="both_packets_with_update", + id="solix_both_packets_with_update", + ), + # Test an Anker Prime device (single payload device) with a single telemetry packet. + pytest.param( + PrimeCharger160w, + [ + "ff09da00030111430057e9a883d95e4bc95b5be2baa1c366331abb929258ab5077108dc197254092ef1372bd5a26ef6b51d61dc87082ca8e7985aacad07f64181902c70c0502de2418e366f5f700b13049d9b857e95c85c66a32d64fcf31c8eead9e025ed69c1440170cca149e038501a9544b1baa044a6a65392e154357e137d917fc834e019012a01b9bd18d5ca7dc22bdb0204b0629b3f738f34bafdc26f6bb0781cec80fe547674a6a7a341a018ce3ac81e6eb6b5110d3311db692d174fe363acec5ba606a24b92dcc95a6cdd8fee1843a26694ddd23ac74" + ], + "09486817d949a232b58b47a43cc72d045a617a26f3999d30e1d27e38eae52265", + """{'a1': '31', 'a2': '02e805', 'a3': '020000', 'a4': '0100', 'a5': '0401a824fe0b3f0b', 'a6': '0400000000000000', 'a7': '0400000000000000', 'a8': '0103', 'a9': '0150', 'aa': '0100', 'ab': '0400000f0f0f000000', 'ac': '0401002c0100002c0100000203', 'ad': '0401002c0100002c0100000300', 'ae': '0401002c0100002c0100000300', 'af': '0100', 'b0': '0100', 'b1': '0101', 'b2': '0101', 'b3': '0101', 'b4': '04e8040000fafffbfffafffbff', 'b5': '04ffffffffffffffffffffffff', 'e0': '0408000000', 'e1': '0480034b53000000000000', 'fe': '0300000000'}""", + id="prime_telemetry_packet", ), ], ) async def test_telemetry_packet_processing( - packets: list[str], secret: str, iv: str, parameters: str | None + fast_sleep, + fast_timeouts, + device_class: type[SolixBLEDevice], + packets: list[str], + secret: str, + parameters: str | None, ): """ Test the _process_notification function when processing telemetry packets end to end. + :param device_class: Class of device under test. :param packets: List of packets to send to device. - :param secret: AES secret payloads are encrypted with. - :param iv: IV used to encrypt payloads. + :param secret: Shared secret used as AES key and IV. :param parameters: Expected parameters in string form. """ - device = SolixBLEDevice(BLEDevice) + device = device_class(MOCK_BLE_DEVICE) + + negotiation_responses = ( + NEGOTIATION_RESPONSES_PRIME + if issubclass(device_class, PrimeDevice) + else NEGOTIATION_RESPONSES_SOLIX + ) async with MockDevice() as mock_bluetooth: # We first expect a negotiation - for expected, response in NEGOTIATION_RESPONSES.items(): + for expected, response in negotiation_responses.items(): mock_bluetooth.expect_ordered( bytes.fromhex(expected), - bytes.fromhex(response) if response is not None else None, + [bytes.fromhex(x) for x in response], ) # We expect the negotiations to succeed @@ -910,11 +972,10 @@ async def test_telemetry_packet_processing( assert device.negotiated, "Expected connected to be True" mock_bluetooth.check_assertions() - device._shared_key = bytes.fromhex(secret) - device._iv = bytes.fromhex(iv) + device._shared_secret = bytes.fromhex(secret) for packet in packets: - await mock_bluetooth.send_data(bytes.fromhex(packet)) + await mock_bluetooth.send_data([bytes.fromhex(packet)]) device_parameters = ( device._parameters_to_str(device._data) if device._data else None @@ -984,7 +1045,7 @@ async def test_telemetry_packet_processing( ) async def test_bad_values( caplog, - device_class: SolixBLEDevice, + device_class: type[SolixBLEDevice], payload: str, mapping: dict[str, Any], errors: list[str], @@ -1008,12 +1069,9 @@ async def test_bad_values( device = device_class(MOCK_BLE_DEVICE) parameters = device._parse_payload(bytes.fromhex(payload)) - await device._process_telemetry(None, parameters) + await device._process_telemetry(parameters) for class_property, expected_value in mapping.items(): assert ( getattr(device, class_property) == expected_value ), f"Mismatch for property '{class_property}'!" - - for error_message in errors: - assert error_message in caplog.text diff --git a/tests/test_prime.py b/tests/test_prime.py new file mode 100644 index 0000000..3d85c0c --- /dev/null +++ b/tests/test_prime.py @@ -0,0 +1,74 @@ +""" +Tests for the Anker Prime specific functionality. + +.. moduleauthor:: Harvey Lelliott (flip-dots) +""" + +import pytest + +from SolixBLE import prime_device +from SolixBLE.prime_device import PrimeDevice +from tests.const import MOCK_BLE_DEVICE + + +@pytest.mark.parametrize( + "packet,decrypted_payload,shared_secret", + [ + pytest.param( + "ff094000030001402257ec69586f3500c8f858e0ba047f237f4e2ed8c50d2f39ba3587e4010275bea22242936f08788849272fb3f4cf7493be4a60bb9c9f0693", + prime_device.NEGOTIATION_COMMAND_5_PAYLOAD, + "09486817d949a232b58b47a43cc72d045a617a26f3999d30e1d27e38eae52265", + id="stage_5_response", + ), + pytest.param( + "ff094600030001402757ec69586f3501e8cf6185d8c4035707377af9af3a2e40b02b86e7531974f1c22440de6e43705566b77cf940e235b65abf4d413ece5f2c3781712f3742", + prime_device.NEGOTIATION_COMMAND_6_PAYLOAD, + "09486817d949a232b58b47a43cc72d045a617a26f3999d30e1d27e38eae52265", + id="stage_6_response", + ), + pytest.param( + "ff09230003000f420057e9b8dfdeacda7991d3eb7f12093e55ff002aa9799bcc9216e3", + prime_device.NEGOTIATION_COMMAND_7_PAYLOAD, + "09486817d949a232b58b47a43cc72d045a617a26f3999d30e1d27e38eae52265", + id="stage_7a_response", + ), + pytest.param( + "ff09530003000f420a57e9b883d958e48e5b7de48d980206577e2dafbb3d604dea3686f3011969f0db2311906d142b5730ee2bfb11e3fbbe7485aac8877995310669156ec74645c962b419e579b385fd079967", + prime_device.NEGOTIATION_COMMAND_8_PAYLOAD, + "09486817d949a232b58b47a43cc72d045a617a26f3999d30e1d27e38eae52265", + id="stage_7b_response", + ), + pytest.param( + "ff092d0003000140221462ecff54785e445fd4ebc9c574f6e91ee4b316f4458b9bd1af3515b6b0820cdb4f1c4f", + "a104b70eab69a304808fffffa5054353542d38", + "c0779a39bfa7b290ba9cd3d96b6fdc22a1f6a9746d4fc81e942c3d95a3892d2f", + id="from_external_logs", + ), + ], +) +def test_negotiation_encryption_session( + packet: str, decrypted_payload: str, shared_secret: str +): + """ + Test that the encrypted packets produced by the library + for negotiation are correct. + + This test takes a packet, extracts its payload, decrypts the + payload, and then re-encrypts the payload and asserts that + the encrypted and decrypted-then-re-encrypted payload are + identical. + + This test also asserts that the decrypted payload matches + the expected one. + """ + + prime = PrimeDevice(MOCK_BLE_DEVICE) + + _, _, payload = prime._split_packet(bytes.fromhex(packet)) + prime._shared_secret = bytes.fromhex(shared_secret) + + decrypted = prime._decrypt_payload(payload) + assert decrypted.hex() == decrypted_payload + + re_encrypted = prime._encrypt_payload(decrypted) + assert payload.hex() == re_encrypted.hex() diff --git a/tests/test_reconnect.py b/tests/test_reconnect.py deleted file mode 100644 index 2695773..0000000 --- a/tests/test_reconnect.py +++ /dev/null @@ -1,155 +0,0 @@ -"""Tests for the automatic reconnection to devices. - -.. moduleauthor:: Harvey Lelliott (flip-dots) - -""" - -import asyncio -from typing import Union -from unittest.mock import patch - -import pytest -from bleak import BLEDevice -from helpers import NEGOTIATION_RESPONSES, MockDevice - -from SolixBLE import SolixBLEDevice, const - -MOCK_DEVICE_NAME = "Mock Device" -MOCK_DEVICE_ADDRESS = "AA:BB:CC:DD:EE:FF" -MOCK_BLE_DEVICE = BLEDevice(MOCK_DEVICE_ADDRESS, MOCK_DEVICE_NAME, {}) - - -@pytest.mark.asyncio -async def test_automatic_retry(): - """ - Test the automatic retrying of a lost connection when the - reconnection happens within the timeout. - - This test expects the module to connect the the mock device - and then the mock device drops the connection and we expect - the module to automatically reconnect and not run any callbacks. - """ - - async with MockDevice() as mock_bluetooth: - - device = SolixBLEDevice(MOCK_BLE_DEVICE) - - def my_callback(*args, **kwargs): - """We do not expect this callback to be triggered.""" - assert False - - # We first expect a negotiation - for expected, response in NEGOTIATION_RESPONSES.items(): - mock_bluetooth.expect_ordered( - bytes.fromhex(expected), - bytes.fromhex(response) if response is not None else None, - ) - - # We expect the negotiations to succeed - assert await device.connect(), "Expected connect to return True" - await asyncio.sleep(0.5) - assert device.connected, "Expected connected to be True" - assert device.negotiated, "Expected connected to be True" - mock_bluetooth.check_assertions() - - # We then add our callback that should not be run as we should - # silently reconnect - device.add_callback(my_callback) - - for expected, response in NEGOTIATION_RESPONSES.items(): - mock_bluetooth.expect_ordered( - bytes.fromhex(expected), - bytes.fromhex(response) if response is not None else None, - ) - - # We then trigger a disconnect from the device - mock_bluetooth.disconnect() - await asyncio.sleep(0.5) - assert not device.connected, "Expected connected to be False" - assert not device.negotiated, "Expected connected to be False" - - # Set .is_connected to True - mock_bluetooth.allow_connect() - - # We expect to have been automatically reconnected - await asyncio.sleep(5) - assert device.connected, "Expected connected to be True" - assert device.negotiated, "Expected connected to be True" - mock_bluetooth.check_assertions() - - -@pytest.mark.asyncio -@patch("SolixBLE.device.DISCONNECT_TIMEOUT", 5) -@patch("SolixBLE.device.RECONNECT_DELAY", 1) -async def test_automatic_retry_timeout(): - """ - Test the automatic retrying of a lost connection when - the reconnection takes longer than the timeout. - - This test expects the module to connect the the mock device - and then the mock device drops the connection and we expect - callbacks to be run as the module will not be able to establish - the connection within the silent reconnect timeout and then - we allow a reconnect and expect the module to automatically - reconnect and run callbacks again on successful connection. - """ - - async with MockDevice() as mock_bluetooth: - - device = SolixBLEDevice(MOCK_BLE_DEVICE) - - num_calls = 0 - - def my_callback(*args, **kwargs): - """We expect this to be triggered on timeout limit and on reconnect.""" - nonlocal num_calls - num_calls = num_calls + 1 - - # We first expect a negotiation - for expected, response in NEGOTIATION_RESPONSES.items(): - mock_bluetooth.expect_ordered( - bytes.fromhex(expected), - bytes.fromhex(response) if response is not None else None, - ) - - # We expect the negotiations to succeed - assert await device.connect(), "Expected connect to return True" - await asyncio.sleep(0.5) - assert device.connected, "Expected connected to be True" - assert device.negotiated, "Expected connected to be True" - mock_bluetooth.check_assertions() - - # We then add our callback that should be run both when the timeout - # is exceeded and again when we successfully reconnect - device.add_callback(my_callback) - - # We then trigger a disconnect from the device - mock_bluetooth.disconnect() - await asyncio.sleep(7) - assert not device.connected, "Expected connected to be False" - assert not device.negotiated, "Expected connected to be False" - - # Expect callback to be triggered due to timeout limit being - # exceeded - assert num_calls == 1 - - # Set .is_connected to True - mock_bluetooth.allow_connect() - - # We then expect a renegotiation - for expected, response in NEGOTIATION_RESPONSES.items(): - mock_bluetooth.expect_ordered( - bytes.fromhex(expected), - bytes.fromhex(response) if response is not None else None, - ) - - # We expect to have been automatically reconnected - await asyncio.sleep(7) - assert device.connected, "Expected connected to be True" - assert device.negotiated, "Expected connected to be True" - mock_bluetooth.check_assertions() - - # Expect callback to have been triggered again due to - # successful reconnection after running callbacks due to - # disconnection - assert num_calls == 2