Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions SolixBLE/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
C1000,
C1000G2,
F2000,
F2600,
F3800,
Generic,
PrimeCharger160w,
Expand Down Expand Up @@ -40,6 +41,7 @@
"C1000",
"C1000G2",
"F2000",
"F2600",
"F3800",
"Solarbank2",
"Solarbank3",
Expand Down
2 changes: 2 additions & 0 deletions SolixBLE/devices/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
from .c1000 import C1000
from .c1000g2 import C1000G2
from .f2000 import F2000
from .f2600 import F2600
from .f3800 import F3800
from .generic import Generic
from .prime_charger_160w import PrimeCharger160w
Expand All @@ -24,6 +25,7 @@
"C1000",
"C1000G2",
"F2000",
"F2600",
"F3800",
"Solarbank2",
"Solarbank3",
Expand Down
321 changes: 321 additions & 0 deletions SolixBLE/devices/f2600.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,321 @@
"""F2600 power station model.

.. moduleauthor:: Harvey Lelliott (flip-dots) <harveylelliott@duck.com>
.. moduleauthor:: github.com/unex

"""

import logging

from ..const import (
DEFAULT_METADATA_BOOL,
DEFAULT_METADATA_INT,
)
from ..states import DisplayTimeout, LightStatus, PortStatus

from . import F2000
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is a new device a page needs to be added to the docs for it as well as updating the index page with the supported capabilities, #14 should be a good example of how to do it.


CMD_AC_CHARGING_POWER = "4044"
CMD_DISPLAY_TIMEOUT = "4046"
CMD_DISPLAY_ON_OFF = "4052"
CMD_AC_OUTPUT = "404a"
CMD_DC_OUTPUT = "404b"
CMD_DISPLAY_MODE = "404c"
CMD_POWER_SAVING_MODE = "404e"
CMD_LIGHT_MODE = "404f"

PAYLOAD_ON = "a10121a2020101"
PAYLOAD_OFF = "a10121a2020100"
PAYLOAD_LIGHT_MODE = "a10121a20201"
PAYLOAD_TIMEOUT_TIME = "a10121a20302"
PAYLOAD_AC_CHARGING_POWER = "a10121a20302"

_LOGGER = logging.getLogger(__name__)


class F2600(F2000):
"""
F2600 Power Station.

Use this class to connect and monitor a F2600 power station.
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This could also use the model number like in the other device classes. This manual seems to suggest its A1781.

"""

_EXPECTED_TELEMETRY_LENGTH: int = 253

@property
def ac_output(self) -> PortStatus:
"""AC Port Status.

PortStatus.NOT_CONNECTED signifies off.
PortStatus.OUTPUT signifies on.

:returns: Status of the AC port.
"""
return PortStatus(self._parse_int("bb", begin=1))

@property
def dc_output(self) -> PortStatus:
"""DC Port Status.

PortStatus.NOT_CONNECTED signifies off.
PortStatus.OUTPUT signifies on.

Based on observed F2600 telemetry, key ``cb`` tracks DC output state.

:returns: Status of the DC port.
"""
return PortStatus(self._parse_int("cb", begin=1))

@property
def usb_c1(self) -> PortStatus:
"""USB C1 Port Status.

:returns: Status of the USB C1 port.
"""
return PortStatus(self._parse_int("c6", begin=1))

@property
def usb_c2(self) -> PortStatus:
"""USB C2 Port Status.

:returns: Status of the USB C2 port.
"""
return PortStatus(self._parse_int("c7", begin=1))
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs to use the same naming scheme as other devices (e.g c300)


@property
def usb_c3(self) -> PortStatus:
"""USB C3 Port Status.

:returns: Status of the USB C3 port.
"""
return PortStatus(self._parse_int("c8", begin=1))

@property
def usb_a1(self) -> PortStatus:
"""USB A1 Port Status.

:returns: Status of the USB A1 port.
"""
return PortStatus(self._parse_int("c9", begin=1))

@property
def usb_a2(self) -> PortStatus:
"""USB A2 Port Status.

:returns: Status of the USB A2 port.
"""
return PortStatus(self._parse_int("ca", begin=1))

@property
def ac_charging_power(self) -> int:
"""Configured AC charging power limit in watts.

:returns: AC charging power limit or default int value.
"""
if self._data is None or "d1" not in self._data:
return DEFAULT_METADATA_INT
return self._parse_int("d1", begin=1)

@property
def display_timeout_seconds(self) -> int:
"""Configured display timeout in seconds.

:returns: Display timeout in seconds or default int value.
"""
if self._data is None or "d3" not in self._data:
return DEFAULT_METADATA_INT
return self._parse_int("d3", begin=1)

@property
def power_saving_mode_enabled(self) -> bool | None:
"""Whether power saving mode is enabled.

:returns: True if enabled, False if disabled, or default bool value.
"""
return (
bool(self._parse_int("db", begin=1))
if self._data is not None and "db" in self._data
else DEFAULT_METADATA_BOOL
)


async def turn_ac_on(self) -> None:
"""Turn the AC output on.

:raises ConnectionError: If not connected to device.
:raises BleakError: If command transmission fails.
"""
await self._send_command(
cmd=bytes.fromhex(CMD_AC_OUTPUT), payload=bytes.fromhex(PAYLOAD_ON)
)

async def turn_ac_off(self) -> None:
"""Turn the AC output off.

:raises ConnectionError: If not connected to device.
:raises BleakError: If command transmission fails.
"""
await self._send_command(
cmd=bytes.fromhex(CMD_AC_OUTPUT), payload=bytes.fromhex(PAYLOAD_OFF)
)

async def turn_dc_on(self) -> None:
"""Turn the DC output on.

:raises ConnectionError: If not connected to device.
:raises BleakError: If command transmission fails.
"""
await self._send_command(
cmd=bytes.fromhex(CMD_DC_OUTPUT), payload=bytes.fromhex(PAYLOAD_ON)
)

async def turn_dc_off(self) -> None:
"""Turn the DC output off.

:raises ConnectionError: If not connected to device.
:raises BleakError: If command transmission fails.
"""
await self._send_command(
cmd=bytes.fromhex(CMD_DC_OUTPUT), payload=bytes.fromhex(PAYLOAD_OFF)
)

async def set_light_mode(self, mode: LightStatus) -> None:
"""Set the light mode of the LED bar.

:param mode: Mode to set light bar to.
:raises ValueError: If requested mode is invalid.
:raises ConnectionError: If not connected to device.
:raises BleakError: If command transmission fails.
"""
if mode is LightStatus.UNKNOWN:
raise ValueError("You cannot set the light status to unknown")
await self._send_command(
cmd=bytes.fromhex(CMD_LIGHT_MODE),
payload=bytes.fromhex(PAYLOAD_LIGHT_MODE) + mode.value.to_bytes(),
)

async def set_display_mode(self, mode: LightStatus) -> None:
"""Set the status/mode of the LCD display.

:param mode: Mode/status to set display to (off/low/med/high).
:raises ValueError: If requested mode is invalid.
:raises ConnectionError: If not connected to device.
:raises BleakError: If command transmission fails.
"""
if mode is LightStatus.UNKNOWN:
raise ValueError("You cannot set the display brightness status to unknown")
if mode is LightStatus.SOS:
raise ValueError("You cannot set the display brightness status to SOS")
await self._send_command(
cmd=bytes.fromhex(CMD_DISPLAY_MODE),
payload=bytes.fromhex(PAYLOAD_LIGHT_MODE) + mode.value.to_bytes(),
)

async def set_display_timeout(self, timeout: DisplayTimeout) -> None:
"""Set the status/mode of the LCD display.

:param mode: Mode/timeout to set display to (30s, 5m, 30m, etc).
:raises ValueError: If requested mode is invalid.
:raises ConnectionError: If not connected to device.
:raises BleakError: If command transmission fails.
"""

if timeout is DisplayTimeout.UNKNOWN:
raise ValueError("You cannot set the display timeout to unknown")
await self._send_command(
cmd=bytes.fromhex(CMD_DISPLAY_TIMEOUT),
payload=bytes.fromhex(PAYLOAD_TIMEOUT_TIME)
+ timeout.value.to_bytes(length=2, byteorder="little", signed=False),
)

async def turn_display_on(self) -> None:
"""Turn the display on.

:raises ConnectionError: If not connected to device.
:raises BleakError: If command transmission fails.
"""
await self._send_command(
cmd=bytes.fromhex(CMD_DISPLAY_ON_OFF), payload=bytes.fromhex(PAYLOAD_ON)
)

async def turn_display_off(self) -> None:
"""Turn the display off.

:raises ConnectionError: If not connected to device.
:raises BleakError: If command transmission fails.
"""
await self._send_command(
cmd=bytes.fromhex(CMD_DISPLAY_ON_OFF), payload=bytes.fromhex(PAYLOAD_OFF)
)

async def turn_power_saving_mode_on(self) -> None:
"""Turn the power saving mode on.

:raises ConnectionError: If not connected to device.
:raises BleakError: If command transmission fails.
"""
await self._send_command(
cmd=bytes.fromhex(CMD_POWER_SAVING_MODE),
payload=bytes.fromhex(PAYLOAD_ON),
)

async def turn_power_saving_mode_off(self) -> None:
"""Turn the power saving mode off.

:raises ConnectionError: If not connected to device.
:raises BleakError: If command transmission fails.
"""
await self._send_command(
cmd=bytes.fromhex(CMD_POWER_SAVING_MODE),
payload=bytes.fromhex(PAYLOAD_OFF),
)

async def set_ac_charging_power(self, watts: int) -> None:
"""Set the AC charging power limit in watts.

:param watts: AC charging power limit in watts.
:raises ValueError: If power value is out of valid range.
:raises ConnectionError: If not connected to device.
:raises BleakError: If command transmission fails.
"""
if watts < 100 or watts > 1440: # below 100 causes max charge, 1440 is max in app.
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does it let you set arbitrary values?, in my experience these things are limited to specific values like 100, 200, 300, etc.

raise ValueError("AC charging power must be between 100 and 1440 W")

await self._send_command(
cmd=bytes.fromhex(CMD_AC_CHARGING_POWER),
payload=bytes.fromhex(PAYLOAD_AC_CHARGING_POWER)
+ watts.to_bytes(length=2, byteorder="little", signed=False),
)

async def get_status_update(self) -> dict[str, bytes]:
"""Request and retrieve a status update from the device.

:raises ConnectionError: If not connected to device.
:raises TimeoutError: If no response from device.
:raises BleakError: If command transmission fails.
:returns: Dictionary containing telemetry parameters.
"""
await self._send_command(
cmd=bytes.fromhex("4040"),
payload=bytes.fromhex("a10121"),
)

packet_1 = await self._listen_for_packet(
bytes.fromhex("03010f"), bytes.fromhex("c840")
)
if not packet_1:
raise TimeoutError("Timed out waiting for packet 1!")

packet_2 = await self._listen_for_packet(
bytes.fromhex("03010f"), bytes.fromhex("c840")
)
if not packet_2:
raise TimeoutError("Timed out waiting for packet 2!")

# We need to ignore the first byte of each packet with these types
new_payload = packet_1[1:] + packet_2[1:]
decrypted_payload = self._decrypt_payload(new_payload)
parameters = self._parse_payload(decrypted_payload)
_LOGGER.debug(f"Parameters: {self._parameters_to_str(parameters, types=True)}")
await self._process_telemetry(parameters) # update the internal parameters as well
return parameters