From b4dabcffa0556c0326bf9ae6dcbce89040df2960 Mon Sep 17 00:00:00 2001 From: unex <7575866+unex@users.noreply.github.com> Date: Tue, 28 Apr 2026 20:20:57 -0600 Subject: [PATCH] Add F2600 support --- SolixBLE/__init__.py | 2 + SolixBLE/devices/__init__.py | 2 + SolixBLE/devices/f2600.py | 321 +++++++++++++++++++++++++++++++++++ 3 files changed, 325 insertions(+) create mode 100644 SolixBLE/devices/f2600.py diff --git a/SolixBLE/__init__.py b/SolixBLE/__init__.py index fd7e1b2..38fb6e2 100644 --- a/SolixBLE/__init__.py +++ b/SolixBLE/__init__.py @@ -12,6 +12,7 @@ C1000, C1000G2, F2000, + F2600, F3800, Generic, PrimeCharger160w, @@ -40,6 +41,7 @@ "C1000", "C1000G2", "F2000", + "F2600", "F3800", "Solarbank2", "Solarbank3", diff --git a/SolixBLE/devices/__init__.py b/SolixBLE/devices/__init__.py index 030b7ea..30aade1 100644 --- a/SolixBLE/devices/__init__.py +++ b/SolixBLE/devices/__init__.py @@ -10,6 +10,7 @@ from .c1000 import C1000 from .c1000g2 import C1000G2 from .f2000 import F2000 +from .f2600 import F2600 from .f3800 import F3800 from .generic import Generic from .prime_charger_160w import PrimeCharger160w @@ -24,6 +25,7 @@ "C1000", "C1000G2", "F2000", + "F2600", "F3800", "Solarbank2", "Solarbank3", diff --git a/SolixBLE/devices/f2600.py b/SolixBLE/devices/f2600.py new file mode 100644 index 0000000..8f06fad --- /dev/null +++ b/SolixBLE/devices/f2600.py @@ -0,0 +1,321 @@ +"""F2600 power station model. + +.. moduleauthor:: Harvey Lelliott (flip-dots) +.. moduleauthor:: github.com/unex + +""" + +import logging + +from ..const import ( + DEFAULT_METADATA_BOOL, + DEFAULT_METADATA_INT, +) +from ..states import DisplayTimeout, LightStatus, PortStatus + +from . import F2000 + +CMD_AC_CHARGING_POWER = "4044" +CMD_DISPLAY_TIMEOUT = "4046" +CMD_DISPLAY_ON_OFF = "4052" +CMD_AC_OUTPUT = "404a" +CMD_DC_OUTPUT = "404b" +CMD_DISPLAY_MODE = "404c" +CMD_POWER_SAVING_MODE = "404e" +CMD_LIGHT_MODE = "404f" + +PAYLOAD_ON = "a10121a2020101" +PAYLOAD_OFF = "a10121a2020100" +PAYLOAD_LIGHT_MODE = "a10121a20201" +PAYLOAD_TIMEOUT_TIME = "a10121a20302" +PAYLOAD_AC_CHARGING_POWER = "a10121a20302" + +_LOGGER = logging.getLogger(__name__) + + +class F2600(F2000): + """ + F2600 Power Station. + + Use this class to connect and monitor a F2600 power station. + """ + + _EXPECTED_TELEMETRY_LENGTH: int = 253 + + @property + def ac_output(self) -> PortStatus: + """AC Port Status. + + PortStatus.NOT_CONNECTED signifies off. + PortStatus.OUTPUT signifies on. + + :returns: Status of the AC port. + """ + return PortStatus(self._parse_int("bb", begin=1)) + + @property + def dc_output(self) -> PortStatus: + """DC Port Status. + + PortStatus.NOT_CONNECTED signifies off. + PortStatus.OUTPUT signifies on. + + Based on observed F2600 telemetry, key ``cb`` tracks DC output state. + + :returns: Status of the DC port. + """ + return PortStatus(self._parse_int("cb", begin=1)) + + @property + def usb_c1(self) -> PortStatus: + """USB C1 Port Status. + + :returns: Status of the USB C1 port. + """ + return PortStatus(self._parse_int("c6", begin=1)) + + @property + def usb_c2(self) -> PortStatus: + """USB C2 Port Status. + + :returns: Status of the USB C2 port. + """ + return PortStatus(self._parse_int("c7", begin=1)) + + @property + def usb_c3(self) -> PortStatus: + """USB C3 Port Status. + + :returns: Status of the USB C3 port. + """ + return PortStatus(self._parse_int("c8", begin=1)) + + @property + def usb_a1(self) -> PortStatus: + """USB A1 Port Status. + + :returns: Status of the USB A1 port. + """ + return PortStatus(self._parse_int("c9", begin=1)) + + @property + def usb_a2(self) -> PortStatus: + """USB A2 Port Status. + + :returns: Status of the USB A2 port. + """ + return PortStatus(self._parse_int("ca", begin=1)) + + @property + def ac_charging_power(self) -> int: + """Configured AC charging power limit in watts. + + :returns: AC charging power limit or default int value. + """ + if self._data is None or "d1" not in self._data: + return DEFAULT_METADATA_INT + return self._parse_int("d1", begin=1) + + @property + def display_timeout_seconds(self) -> int: + """Configured display timeout in seconds. + + :returns: Display timeout in seconds or default int value. + """ + if self._data is None or "d3" not in self._data: + return DEFAULT_METADATA_INT + return self._parse_int("d3", begin=1) + + @property + def power_saving_mode_enabled(self) -> bool | None: + """Whether power saving mode is enabled. + + :returns: True if enabled, False if disabled, or default bool value. + """ + return ( + bool(self._parse_int("db", begin=1)) + if self._data is not None and "db" in self._data + else DEFAULT_METADATA_BOOL + ) + + + async def turn_ac_on(self) -> None: + """Turn the AC output on. + + :raises ConnectionError: If not connected to device. + :raises BleakError: If command transmission fails. + """ + await self._send_command( + cmd=bytes.fromhex(CMD_AC_OUTPUT), payload=bytes.fromhex(PAYLOAD_ON) + ) + + async def turn_ac_off(self) -> None: + """Turn the AC output off. + + :raises ConnectionError: If not connected to device. + :raises BleakError: If command transmission fails. + """ + await self._send_command( + cmd=bytes.fromhex(CMD_AC_OUTPUT), payload=bytes.fromhex(PAYLOAD_OFF) + ) + + async def turn_dc_on(self) -> None: + """Turn the DC output on. + + :raises ConnectionError: If not connected to device. + :raises BleakError: If command transmission fails. + """ + await self._send_command( + cmd=bytes.fromhex(CMD_DC_OUTPUT), payload=bytes.fromhex(PAYLOAD_ON) + ) + + async def turn_dc_off(self) -> None: + """Turn the DC output off. + + :raises ConnectionError: If not connected to device. + :raises BleakError: If command transmission fails. + """ + await self._send_command( + cmd=bytes.fromhex(CMD_DC_OUTPUT), payload=bytes.fromhex(PAYLOAD_OFF) + ) + + async def set_light_mode(self, mode: LightStatus) -> None: + """Set the light mode of the LED bar. + + :param mode: Mode to set light bar to. + :raises ValueError: If requested mode is invalid. + :raises ConnectionError: If not connected to device. + :raises BleakError: If command transmission fails. + """ + if mode is LightStatus.UNKNOWN: + raise ValueError("You cannot set the light status to unknown") + await self._send_command( + cmd=bytes.fromhex(CMD_LIGHT_MODE), + payload=bytes.fromhex(PAYLOAD_LIGHT_MODE) + mode.value.to_bytes(), + ) + + async def set_display_mode(self, mode: LightStatus) -> None: + """Set the status/mode of the LCD display. + + :param mode: Mode/status to set display to (off/low/med/high). + :raises ValueError: If requested mode is invalid. + :raises ConnectionError: If not connected to device. + :raises BleakError: If command transmission fails. + """ + if mode is LightStatus.UNKNOWN: + raise ValueError("You cannot set the display brightness status to unknown") + if mode is LightStatus.SOS: + raise ValueError("You cannot set the display brightness status to SOS") + await self._send_command( + cmd=bytes.fromhex(CMD_DISPLAY_MODE), + payload=bytes.fromhex(PAYLOAD_LIGHT_MODE) + mode.value.to_bytes(), + ) + + async def set_display_timeout(self, timeout: DisplayTimeout) -> None: + """Set the status/mode of the LCD display. + + :param mode: Mode/timeout to set display to (30s, 5m, 30m, etc). + :raises ValueError: If requested mode is invalid. + :raises ConnectionError: If not connected to device. + :raises BleakError: If command transmission fails. + """ + + if timeout is DisplayTimeout.UNKNOWN: + raise ValueError("You cannot set the display timeout to unknown") + await self._send_command( + cmd=bytes.fromhex(CMD_DISPLAY_TIMEOUT), + payload=bytes.fromhex(PAYLOAD_TIMEOUT_TIME) + + timeout.value.to_bytes(length=2, byteorder="little", signed=False), + ) + + async def turn_display_on(self) -> None: + """Turn the display on. + + :raises ConnectionError: If not connected to device. + :raises BleakError: If command transmission fails. + """ + await self._send_command( + cmd=bytes.fromhex(CMD_DISPLAY_ON_OFF), payload=bytes.fromhex(PAYLOAD_ON) + ) + + async def turn_display_off(self) -> None: + """Turn the display off. + + :raises ConnectionError: If not connected to device. + :raises BleakError: If command transmission fails. + """ + await self._send_command( + cmd=bytes.fromhex(CMD_DISPLAY_ON_OFF), payload=bytes.fromhex(PAYLOAD_OFF) + ) + + async def turn_power_saving_mode_on(self) -> None: + """Turn the power saving mode on. + + :raises ConnectionError: If not connected to device. + :raises BleakError: If command transmission fails. + """ + await self._send_command( + cmd=bytes.fromhex(CMD_POWER_SAVING_MODE), + payload=bytes.fromhex(PAYLOAD_ON), + ) + + async def turn_power_saving_mode_off(self) -> None: + """Turn the power saving mode off. + + :raises ConnectionError: If not connected to device. + :raises BleakError: If command transmission fails. + """ + await self._send_command( + cmd=bytes.fromhex(CMD_POWER_SAVING_MODE), + payload=bytes.fromhex(PAYLOAD_OFF), + ) + + async def set_ac_charging_power(self, watts: int) -> None: + """Set the AC charging power limit in watts. + + :param watts: AC charging power limit in watts. + :raises ValueError: If power value is out of valid range. + :raises ConnectionError: If not connected to device. + :raises BleakError: If command transmission fails. + """ + if watts < 100 or watts > 1440: # below 100 causes max charge, 1440 is max in app. + raise ValueError("AC charging power must be between 100 and 1440 W") + + await self._send_command( + cmd=bytes.fromhex(CMD_AC_CHARGING_POWER), + payload=bytes.fromhex(PAYLOAD_AC_CHARGING_POWER) + + watts.to_bytes(length=2, byteorder="little", signed=False), + ) + + async def get_status_update(self) -> dict[str, bytes]: + """Request and retrieve a status update from the device. + + :raises ConnectionError: If not connected to device. + :raises TimeoutError: If no response from device. + :raises BleakError: If command transmission fails. + :returns: Dictionary containing telemetry parameters. + """ + await self._send_command( + cmd=bytes.fromhex("4040"), + payload=bytes.fromhex("a10121"), + ) + + packet_1 = await self._listen_for_packet( + bytes.fromhex("03010f"), bytes.fromhex("c840") + ) + if not packet_1: + raise TimeoutError("Timed out waiting for packet 1!") + + packet_2 = await self._listen_for_packet( + bytes.fromhex("03010f"), bytes.fromhex("c840") + ) + if not packet_2: + raise TimeoutError("Timed out waiting for packet 2!") + + # We need to ignore the first byte of each packet with these types + new_payload = packet_1[1:] + packet_2[1:] + decrypted_payload = self._decrypt_payload(new_payload) + parameters = self._parse_payload(decrypted_payload) + _LOGGER.debug(f"Parameters: {self._parameters_to_str(parameters, types=True)}") + await self._process_telemetry(parameters) # update the internal parameters as well + return parameters