-
-
Notifications
You must be signed in to change notification settings - Fork 13
Add F2600 support #35
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,321 @@ | ||
| """F2600 power station model. | ||
|
|
||
| .. moduleauthor:: Harvey Lelliott (flip-dots) <harveylelliott@duck.com> | ||
| .. moduleauthor:: github.com/unex | ||
|
|
||
| """ | ||
|
|
||
| import logging | ||
|
|
||
| from ..const import ( | ||
| DEFAULT_METADATA_BOOL, | ||
| DEFAULT_METADATA_INT, | ||
| ) | ||
| from ..states import DisplayTimeout, LightStatus, PortStatus | ||
|
|
||
| from . import F2000 | ||
|
|
||
| CMD_AC_CHARGING_POWER = "4044" | ||
| CMD_DISPLAY_TIMEOUT = "4046" | ||
| CMD_DISPLAY_ON_OFF = "4052" | ||
| CMD_AC_OUTPUT = "404a" | ||
| CMD_DC_OUTPUT = "404b" | ||
| CMD_DISPLAY_MODE = "404c" | ||
| CMD_POWER_SAVING_MODE = "404e" | ||
| CMD_LIGHT_MODE = "404f" | ||
|
|
||
| PAYLOAD_ON = "a10121a2020101" | ||
| PAYLOAD_OFF = "a10121a2020100" | ||
| PAYLOAD_LIGHT_MODE = "a10121a20201" | ||
| PAYLOAD_TIMEOUT_TIME = "a10121a20302" | ||
| PAYLOAD_AC_CHARGING_POWER = "a10121a20302" | ||
|
|
||
| _LOGGER = logging.getLogger(__name__) | ||
|
|
||
|
|
||
| class F2600(F2000): | ||
| """ | ||
| F2600 Power Station. | ||
|
|
||
| Use this class to connect and monitor a F2600 power station. | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. |
||
| """ | ||
|
|
||
| _EXPECTED_TELEMETRY_LENGTH: int = 253 | ||
|
|
||
| @property | ||
| def ac_output(self) -> PortStatus: | ||
| """AC Port Status. | ||
|
|
||
| PortStatus.NOT_CONNECTED signifies off. | ||
| PortStatus.OUTPUT signifies on. | ||
|
|
||
| :returns: Status of the AC port. | ||
| """ | ||
| return PortStatus(self._parse_int("bb", begin=1)) | ||
|
|
||
| @property | ||
| def dc_output(self) -> PortStatus: | ||
| """DC Port Status. | ||
|
|
||
| PortStatus.NOT_CONNECTED signifies off. | ||
| PortStatus.OUTPUT signifies on. | ||
|
|
||
| Based on observed F2600 telemetry, key ``cb`` tracks DC output state. | ||
|
|
||
| :returns: Status of the DC port. | ||
| """ | ||
| return PortStatus(self._parse_int("cb", begin=1)) | ||
|
|
||
| @property | ||
| def usb_c1(self) -> PortStatus: | ||
| """USB C1 Port Status. | ||
|
|
||
| :returns: Status of the USB C1 port. | ||
| """ | ||
| return PortStatus(self._parse_int("c6", begin=1)) | ||
|
|
||
| @property | ||
| def usb_c2(self) -> PortStatus: | ||
| """USB C2 Port Status. | ||
|
|
||
| :returns: Status of the USB C2 port. | ||
| """ | ||
| return PortStatus(self._parse_int("c7", begin=1)) | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Needs to use the same naming scheme as other devices (e.g c300) |
||
|
|
||
| @property | ||
| def usb_c3(self) -> PortStatus: | ||
| """USB C3 Port Status. | ||
|
|
||
| :returns: Status of the USB C3 port. | ||
| """ | ||
| return PortStatus(self._parse_int("c8", begin=1)) | ||
|
|
||
| @property | ||
| def usb_a1(self) -> PortStatus: | ||
| """USB A1 Port Status. | ||
|
|
||
| :returns: Status of the USB A1 port. | ||
| """ | ||
| return PortStatus(self._parse_int("c9", begin=1)) | ||
|
|
||
| @property | ||
| def usb_a2(self) -> PortStatus: | ||
| """USB A2 Port Status. | ||
|
|
||
| :returns: Status of the USB A2 port. | ||
| """ | ||
| return PortStatus(self._parse_int("ca", begin=1)) | ||
|
|
||
| @property | ||
| def ac_charging_power(self) -> int: | ||
| """Configured AC charging power limit in watts. | ||
|
|
||
| :returns: AC charging power limit or default int value. | ||
| """ | ||
| if self._data is None or "d1" not in self._data: | ||
| return DEFAULT_METADATA_INT | ||
| return self._parse_int("d1", begin=1) | ||
|
|
||
| @property | ||
| def display_timeout_seconds(self) -> int: | ||
| """Configured display timeout in seconds. | ||
|
|
||
| :returns: Display timeout in seconds or default int value. | ||
| """ | ||
| if self._data is None or "d3" not in self._data: | ||
| return DEFAULT_METADATA_INT | ||
| return self._parse_int("d3", begin=1) | ||
|
|
||
| @property | ||
| def power_saving_mode_enabled(self) -> bool | None: | ||
| """Whether power saving mode is enabled. | ||
|
|
||
| :returns: True if enabled, False if disabled, or default bool value. | ||
| """ | ||
| return ( | ||
| bool(self._parse_int("db", begin=1)) | ||
| if self._data is not None and "db" in self._data | ||
| else DEFAULT_METADATA_BOOL | ||
| ) | ||
|
|
||
|
|
||
| async def turn_ac_on(self) -> None: | ||
| """Turn the AC output on. | ||
|
|
||
| :raises ConnectionError: If not connected to device. | ||
| :raises BleakError: If command transmission fails. | ||
| """ | ||
| await self._send_command( | ||
| cmd=bytes.fromhex(CMD_AC_OUTPUT), payload=bytes.fromhex(PAYLOAD_ON) | ||
| ) | ||
|
|
||
| async def turn_ac_off(self) -> None: | ||
| """Turn the AC output off. | ||
|
|
||
| :raises ConnectionError: If not connected to device. | ||
| :raises BleakError: If command transmission fails. | ||
| """ | ||
| await self._send_command( | ||
| cmd=bytes.fromhex(CMD_AC_OUTPUT), payload=bytes.fromhex(PAYLOAD_OFF) | ||
| ) | ||
|
|
||
| async def turn_dc_on(self) -> None: | ||
| """Turn the DC output on. | ||
|
|
||
| :raises ConnectionError: If not connected to device. | ||
| :raises BleakError: If command transmission fails. | ||
| """ | ||
| await self._send_command( | ||
| cmd=bytes.fromhex(CMD_DC_OUTPUT), payload=bytes.fromhex(PAYLOAD_ON) | ||
| ) | ||
|
|
||
| async def turn_dc_off(self) -> None: | ||
| """Turn the DC output off. | ||
|
|
||
| :raises ConnectionError: If not connected to device. | ||
| :raises BleakError: If command transmission fails. | ||
| """ | ||
| await self._send_command( | ||
| cmd=bytes.fromhex(CMD_DC_OUTPUT), payload=bytes.fromhex(PAYLOAD_OFF) | ||
| ) | ||
|
|
||
| async def set_light_mode(self, mode: LightStatus) -> None: | ||
| """Set the light mode of the LED bar. | ||
|
|
||
| :param mode: Mode to set light bar to. | ||
| :raises ValueError: If requested mode is invalid. | ||
| :raises ConnectionError: If not connected to device. | ||
| :raises BleakError: If command transmission fails. | ||
| """ | ||
| if mode is LightStatus.UNKNOWN: | ||
| raise ValueError("You cannot set the light status to unknown") | ||
| await self._send_command( | ||
| cmd=bytes.fromhex(CMD_LIGHT_MODE), | ||
| payload=bytes.fromhex(PAYLOAD_LIGHT_MODE) + mode.value.to_bytes(), | ||
| ) | ||
|
|
||
| async def set_display_mode(self, mode: LightStatus) -> None: | ||
| """Set the status/mode of the LCD display. | ||
|
|
||
| :param mode: Mode/status to set display to (off/low/med/high). | ||
| :raises ValueError: If requested mode is invalid. | ||
| :raises ConnectionError: If not connected to device. | ||
| :raises BleakError: If command transmission fails. | ||
| """ | ||
| if mode is LightStatus.UNKNOWN: | ||
| raise ValueError("You cannot set the display brightness status to unknown") | ||
| if mode is LightStatus.SOS: | ||
| raise ValueError("You cannot set the display brightness status to SOS") | ||
| await self._send_command( | ||
| cmd=bytes.fromhex(CMD_DISPLAY_MODE), | ||
| payload=bytes.fromhex(PAYLOAD_LIGHT_MODE) + mode.value.to_bytes(), | ||
| ) | ||
|
|
||
| async def set_display_timeout(self, timeout: DisplayTimeout) -> None: | ||
| """Set the status/mode of the LCD display. | ||
|
|
||
| :param mode: Mode/timeout to set display to (30s, 5m, 30m, etc). | ||
| :raises ValueError: If requested mode is invalid. | ||
| :raises ConnectionError: If not connected to device. | ||
| :raises BleakError: If command transmission fails. | ||
| """ | ||
|
|
||
| if timeout is DisplayTimeout.UNKNOWN: | ||
| raise ValueError("You cannot set the display timeout to unknown") | ||
| await self._send_command( | ||
| cmd=bytes.fromhex(CMD_DISPLAY_TIMEOUT), | ||
| payload=bytes.fromhex(PAYLOAD_TIMEOUT_TIME) | ||
| + timeout.value.to_bytes(length=2, byteorder="little", signed=False), | ||
| ) | ||
|
|
||
| async def turn_display_on(self) -> None: | ||
| """Turn the display on. | ||
|
|
||
| :raises ConnectionError: If not connected to device. | ||
| :raises BleakError: If command transmission fails. | ||
| """ | ||
| await self._send_command( | ||
| cmd=bytes.fromhex(CMD_DISPLAY_ON_OFF), payload=bytes.fromhex(PAYLOAD_ON) | ||
| ) | ||
|
|
||
| async def turn_display_off(self) -> None: | ||
| """Turn the display off. | ||
|
|
||
| :raises ConnectionError: If not connected to device. | ||
| :raises BleakError: If command transmission fails. | ||
| """ | ||
| await self._send_command( | ||
| cmd=bytes.fromhex(CMD_DISPLAY_ON_OFF), payload=bytes.fromhex(PAYLOAD_OFF) | ||
| ) | ||
|
|
||
| async def turn_power_saving_mode_on(self) -> None: | ||
| """Turn the power saving mode on. | ||
|
|
||
| :raises ConnectionError: If not connected to device. | ||
| :raises BleakError: If command transmission fails. | ||
| """ | ||
| await self._send_command( | ||
| cmd=bytes.fromhex(CMD_POWER_SAVING_MODE), | ||
| payload=bytes.fromhex(PAYLOAD_ON), | ||
| ) | ||
|
|
||
| async def turn_power_saving_mode_off(self) -> None: | ||
| """Turn the power saving mode off. | ||
|
|
||
| :raises ConnectionError: If not connected to device. | ||
| :raises BleakError: If command transmission fails. | ||
| """ | ||
| await self._send_command( | ||
| cmd=bytes.fromhex(CMD_POWER_SAVING_MODE), | ||
| payload=bytes.fromhex(PAYLOAD_OFF), | ||
| ) | ||
|
|
||
| async def set_ac_charging_power(self, watts: int) -> None: | ||
| """Set the AC charging power limit in watts. | ||
|
|
||
| :param watts: AC charging power limit in watts. | ||
| :raises ValueError: If power value is out of valid range. | ||
| :raises ConnectionError: If not connected to device. | ||
| :raises BleakError: If command transmission fails. | ||
| """ | ||
| if watts < 100 or watts > 1440: # below 100 causes max charge, 1440 is max in app. | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Does it let you set arbitrary values?, in my experience these things are limited to specific values like 100, 200, 300, etc. |
||
| raise ValueError("AC charging power must be between 100 and 1440 W") | ||
|
|
||
| await self._send_command( | ||
| cmd=bytes.fromhex(CMD_AC_CHARGING_POWER), | ||
| payload=bytes.fromhex(PAYLOAD_AC_CHARGING_POWER) | ||
| + watts.to_bytes(length=2, byteorder="little", signed=False), | ||
| ) | ||
|
|
||
| async def get_status_update(self) -> dict[str, bytes]: | ||
| """Request and retrieve a status update from the device. | ||
|
|
||
| :raises ConnectionError: If not connected to device. | ||
| :raises TimeoutError: If no response from device. | ||
| :raises BleakError: If command transmission fails. | ||
| :returns: Dictionary containing telemetry parameters. | ||
| """ | ||
| await self._send_command( | ||
| cmd=bytes.fromhex("4040"), | ||
| payload=bytes.fromhex("a10121"), | ||
| ) | ||
|
|
||
| packet_1 = await self._listen_for_packet( | ||
| bytes.fromhex("03010f"), bytes.fromhex("c840") | ||
| ) | ||
| if not packet_1: | ||
| raise TimeoutError("Timed out waiting for packet 1!") | ||
|
|
||
| packet_2 = await self._listen_for_packet( | ||
| bytes.fromhex("03010f"), bytes.fromhex("c840") | ||
| ) | ||
| if not packet_2: | ||
| raise TimeoutError("Timed out waiting for packet 2!") | ||
|
|
||
| # We need to ignore the first byte of each packet with these types | ||
| new_payload = packet_1[1:] + packet_2[1:] | ||
| decrypted_payload = self._decrypt_payload(new_payload) | ||
| parameters = self._parse_payload(decrypted_payload) | ||
| _LOGGER.debug(f"Parameters: {self._parameters_to_str(parameters, types=True)}") | ||
| await self._process_telemetry(parameters) # update the internal parameters as well | ||
| return parameters | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since this is a new device a page needs to be added to the docs for it as well as updating the index page with the supported capabilities, #14 should be a good example of how to do it.