From dcf3454fe548087e39d1b87debf67e8955063dac Mon Sep 17 00:00:00 2001 From: Guzz-T Date: Tue, 6 Jan 2026 21:56:06 +0100 Subject: [PATCH] Restructure source code and imports. No effective code change. --- luxtronik/__init__.py | 266 +-------- luxtronik/cfi/__init__.py | 12 + luxtronik/{ => cfi}/calculations.py | 4 +- luxtronik/cfi/constants.py | 23 + luxtronik/cfi/interface.py | 262 +++++++++ luxtronik/{ => cfi}/parameters.py | 4 +- luxtronik/{ => cfi}/visibilities.py | 4 +- luxtronik/common.py | 69 +++ luxtronik/constants.py | 22 - luxtronik/definitions/__init__.py | 540 ++++++++++++++++++ luxtronik/shi/__init__.py | 12 +- luxtronik/shi/common.py | 61 -- luxtronik/shi/definitions.py | 529 ----------------- luxtronik/shi/holdings.py | 2 +- luxtronik/shi/inputs.py | 2 +- luxtronik/shi/interface.py | 13 +- luxtronik/shi/vector.py | 10 +- .../test_cfi_calculations.py} | 2 +- .../test_cfi_parameters.py} | 2 +- .../test_cfi_visibilities.py} | 2 +- .../{test_common.py => test_shi_common.py} | 53 -- ...t_contiguous.py => test_shi_contiguous.py} | 2 +- tests/shi/test_shi_definitions.py | 122 ++++ ...est_interface.py => test_shi_interface.py} | 11 +- .../{test_modbus.py => test_shi_modbus.py} | 0 .../{test_vector.py => test_shi_vector.py} | 5 +- tests/test_common.py | 58 ++ tests/{shi => }/test_definition_list.py | 2 +- tests/{shi => }/test_definitions.py | 120 +--- 29 files changed, 1139 insertions(+), 1075 deletions(-) create mode 100644 luxtronik/cfi/__init__.py rename luxtronik/{ => cfi}/calculations.py (93%) mode change 100755 => 100644 create mode 100644 luxtronik/cfi/constants.py create mode 100644 luxtronik/cfi/interface.py rename luxtronik/{ => cfi}/parameters.py (92%) mode change 100755 => 100644 rename luxtronik/{ => cfi}/visibilities.py (86%) mode change 100755 => 100644 create mode 100644 luxtronik/definitions/__init__.py rename tests/{test_calculations.py => cfi/test_cfi_calculations.py} (88%) rename tests/{test_parameters.py => cfi/test_cfi_parameters.py} (98%) rename tests/{test_visibilities.py => cfi/test_cfi_visibilities.py} (88%) rename tests/shi/{test_common.py => test_shi_common.py} (54%) rename tests/shi/{test_contiguous.py => test_shi_contiguous.py} (99%) create mode 100644 tests/shi/test_shi_definitions.py rename tests/shi/{test_interface.py => test_shi_interface.py} (99%) rename tests/shi/{test_modbus.py => test_shi_modbus.py} (100%) rename tests/shi/{test_vector.py => test_shi_vector.py} (99%) create mode 100644 tests/test_common.py rename tests/{shi => }/test_definition_list.py (99%) rename tests/{shi => }/test_definitions.py (78%) diff --git a/luxtronik/__init__.py b/luxtronik/__init__.py index fb874ac8..caa77545 100755 --- a/luxtronik/__init__.py +++ b/luxtronik/__init__.py @@ -5,269 +5,31 @@ from __future__ import annotations import logging -import socket -import struct -import time from luxtronik.common import get_host_lock -from luxtronik.calculations import Calculations -from luxtronik.parameters import Parameters -from luxtronik.visibilities import Visibilities from luxtronik.discover import discover # noqa: F401 -from luxtronik.constants import ( + +from luxtronik.cfi import ( LUXTRONIK_DEFAULT_PORT, - LUXTRONIK_PARAMETERS_WRITE, - LUXTRONIK_PARAMETERS_READ, - LUXTRONIK_CALCULATIONS_READ, - LUXTRONIK_VISIBILITIES_READ, - LUXTRONIK_SOCKET_READ_SIZE_INTEGER, - LUXTRONIK_SOCKET_READ_SIZE_CHAR, + Calculations, # noqa: F401 + Parameters, # noqa: F401 + Visibilities, # noqa: F401 + LuxtronikData, + LuxtronikSocketInterface, ) -from luxtronik.shi import resolve_version -from luxtronik.shi.modbus import LuxtronikModbusTcpInterface -from luxtronik.shi.holdings import Holdings -from luxtronik.shi.interface import LuxtronikSmartHomeData, LuxtronikSmartHomeInterface -from luxtronik.shi.constants import ( +from luxtronik.shi import ( LUXTRONIK_DEFAULT_MODBUS_PORT, + LuxtronikModbusTcpInterface, + Holdings, # noqa: F401 + Inputs, # noqa: F401 + LuxtronikSmartHomeData, + LuxtronikSmartHomeInterface, + resolve_version, ) - # endregion Imports LOGGER = logging.getLogger("Luxtronik") -# Wait time (in seconds) after writing parameters to give controller -# some time to re-calculate values, etc. -WAIT_TIME_AFTER_PARAMETER_WRITE = 1 - - -class LuxtronikData: - """ - Collection of parameters, calculations and visiblities. - Also provide some high level access functions to their data values. - """ - - def __init__(self, parameters=None, calculations=None, visibilities=None, safe=True): - self.parameters = Parameters(safe) if parameters is None else parameters - self.calculations = Calculations() if calculations is None else calculations - self.visibilities = Visibilities() if visibilities is None else visibilities - - def get_firmware_version(self): - return self.calculations.get_firmware_version() - - -class LuxtronikSocketInterface: - """Luxtronik read/write interface via socket.""" - - def __init__(self, host, port=LUXTRONIK_DEFAULT_PORT): - # Acquire a lock object for this host to ensure thread safety - self._lock = get_host_lock(host) - - self._host = host - self._port = port - self._socket = None - - @property - def lock(self): - return self._lock - - def _with_lock_and_connect(self, func, *args, **kwargs): - """ - Decorator around various read/write functions to connect first. - - This method is essentially a wrapper for the _read() and _write() methods. - Locking is being used to ensure that only a single socket operation is - performed at any point in time. This helps to avoid issues with the - Luxtronik controller, which seems unstable otherwise. - """ - with self.lock: - try: - ret_val = None - with socket.create_connection((self._host, self._port)) as sock: - self._socket = sock - LOGGER.info("Connected to Luxtronik heat pump %s:%s", self._host, self._port) - ret_val = func(*args, **kwargs) - except socket.gaierror as e: - LOGGER.error("Failed to connect to Luxtronik heat pump %s:%s. %s.", - self._host, self._port, f"Address-related error: {e}") - except socket.timeout as e: - LOGGER.error("Failed to connect to Luxtronik heat pump %s:%s. %s.", - self._host, self._port, f"Connection timed out: {e}") - except ConnectionRefusedError as e: - LOGGER.error("Failed to connect to Luxtronik heat pump %s:%s. %s.", - self._host, self._port, f"Connection refused: {e}") - except OSError as e: - LOGGER.error("Failed to connect to Luxtronik heat pump %s:%s. %s.", - self._host, self._port, f"OS error during connect: {e}") - except Exception as e: - LOGGER.error("Failed to connect to Luxtronik heat pump %s:%s. %s.", - self._host, self._port, f"Unknown exception: {e}") - self._socket = None - return ret_val - - def read(self, data=None): - """ - All available data will be read from the heat pump - and integrated to the passed data object. - This data object is returned afterwards, mainly for access to a newly created. - """ - if data is None: - data = LuxtronikData() - return self._with_lock_and_connect(self._read, data) - - def read_parameters(self, parameters=None): - """ - Read parameters from heat pump and integrate them to the passed dictionary. - This dictionary is returned afterwards, mainly for access to a newly created. - """ - if parameters is None: - parameters = Parameters() - return self._with_lock_and_connect(self._read_parameters, parameters) - - def read_calculations(self, calculations=None): - """ - Read calculations from heat pump and integrate them to the passed dictionary. - This dictionary is returned afterwards, mainly for access to a newly created. - """ - if calculations is None: - calculations = Calculations() - return self._with_lock_and_connect(self._read_calculations, calculations) - - def read_visibilities(self, visibilities=None): - """ - Read visibilities from heat pump and integrate them to the passed dictionary. - This dictionary is returned afterwards, mainly for access to a newly created. - """ - if visibilities is None: - visibilities = Visibilities() - return self._with_lock_and_connect(self._read_visibilities, visibilities) - - def write(self, parameters): - """ - Write all set parameters to the heat pump. - :param Parameters() parameters Parameter dictionary to be written - to the heatpump before reading all available data - from the heat pump. - """ - self._with_lock_and_connect(self._write, parameters) - - def write_and_read(self, parameters, data=None): - """ - Write all set parameter to the heat pump (see write()) - prior to reading back in all data from the heat pump (see read()) - after a short wait time - """ - if data is None: - data = LuxtronikData() - return self._with_lock_and_connect(self._write_and_read, parameters, data) - - def _read(self, data): - self._read_parameters(data.parameters) - self._read_calculations(data.calculations) - self._read_visibilities(data.visibilities) - return data - - def _write_and_read(self, parameters, data): - self._write(parameters) - return self._read(data) - - def _write(self, parameters): - for index, value in parameters.queue.items(): - if not isinstance(index, int) or not isinstance(value, int): - LOGGER.warning( - "%s: Parameter id '%s' or value '%s' invalid!", - self._host, - index, - value, - ) - continue - LOGGER.info("%s: Parameter '%d' set to '%s'", self._host, index, value) - self._send_ints(LUXTRONIK_PARAMETERS_WRITE, index, value) - cmd = self._read_int() - LOGGER.debug("%s: Command %s", self._host, cmd) - val = self._read_int() - LOGGER.debug("%s: Value %s", self._host, val) - # Flush queue after writing all values - parameters.queue = {} - # Give the heatpump a short time to handle the value changes/calculations: - time.sleep(WAIT_TIME_AFTER_PARAMETER_WRITE) - - def _read_parameters(self, parameters): - data = [] - self._send_ints(LUXTRONIK_PARAMETERS_READ, 0) - cmd = self._read_int() - LOGGER.debug("%s: Command %s", self._host, cmd) - length = self._read_int() - LOGGER.debug("%s: Length %s", self._host, length) - for _ in range(0, length): - data.append(self._read_int()) - LOGGER.info("%s: Read %d parameters", self._host, length) - parameters.parse(data) - return parameters - - def _read_calculations(self, calculations): - data = [] - self._send_ints(LUXTRONIK_CALCULATIONS_READ, 0) - cmd = self._read_int() - LOGGER.debug("%s: Command %s", self._host, cmd) - stat = self._read_int() - LOGGER.debug("%s: Stat %s", self._host, stat) - length = self._read_int() - LOGGER.debug("%s: Length %s", self._host, length) - for _ in range(0, length): - data.append(self._read_int()) - LOGGER.info("%s: Read %d calculations", self._host, length) - calculations.parse(data) - return calculations - - def _read_visibilities(self, visibilities): - data = [] - self._send_ints(LUXTRONIK_VISIBILITIES_READ, 0) - cmd = self._read_int() - LOGGER.debug("%s: Command %s", self._host, cmd) - length = self._read_int() - LOGGER.debug("%s: Length %s", self._host, length) - for _ in range(0, length): - data.append(self._read_char()) - LOGGER.info("%s: Read %d visibilities", self._host, length) - visibilities.parse(data) - return visibilities - - def _send_ints(self, *ints): - "Low-level helper to send a tuple of ints" - data = struct.pack(">" + "i" * len(ints), *ints) - LOGGER.debug("%s: sending %s", self._host, data) - self._socket.sendall(data) - - def _read_bytes(self, count): - "Low-level helper to receive a precise number of bytes" - total_reading = b"" - - while len(total_reading) is not count: - missing = count - len(total_reading) - - reading = self._socket.recv( missing ) - - if len(reading) == 0: - LOGGER.error("%s: Connection died.", self._host) - raise ConnectionError("Connection to %s died." % self._host) - - total_reading += reading - - if len(reading) is not missing: - LOGGER.debug("%s: received %s bytes out of %s bytes. Will read again.", self._host, len(reading), missing) - - return total_reading - - def _read_int(self): - "Low-level helper to receive an int" - reading = self._read_bytes(LUXTRONIK_SOCKET_READ_SIZE_INTEGER) - return struct.unpack(">i", reading)[0] - - def _read_char(self): - "Low-level helper to receive a signed int" - reading = self._read_bytes(LUXTRONIK_SOCKET_READ_SIZE_CHAR) - return struct.unpack(">b", reading)[0] - class LuxtronikAllData(LuxtronikData, LuxtronikSmartHomeData): """ diff --git a/luxtronik/cfi/__init__.py b/luxtronik/cfi/__init__.py new file mode 100644 index 00000000..23b5baf3 --- /dev/null +++ b/luxtronik/cfi/__init__.py @@ -0,0 +1,12 @@ +""" +Python module for controlling a Luxtronik heat pump controller +via the config interface. +""" + +from luxtronik.cfi.constants import ( + LUXTRONIK_DEFAULT_PORT, # noqa: F401 +) +from luxtronik.cfi.calculations import CALCULATIONS_DEFINITIONS, Calculations # noqa: F401 +from luxtronik.cfi.parameters import PARAMETERS_DEFINITIONS, Parameters # noqa: F401 +from luxtronik.cfi.visibilities import VISIBILITIES_DEFINITIONS, Visibilities # noqa: F401 +from luxtronik.cfi.interface import LuxtronikData, LuxtronikSocketInterface # noqa: F401 \ No newline at end of file diff --git a/luxtronik/calculations.py b/luxtronik/cfi/calculations.py old mode 100755 new mode 100644 similarity index 93% rename from luxtronik/calculations.py rename to luxtronik/cfi/calculations.py index 2eed64e8..0140ee3d --- a/luxtronik/calculations.py +++ b/luxtronik/cfi/calculations.py @@ -3,10 +3,10 @@ import logging from typing import Final +from luxtronik.definitions import LuxtronikDefinitionsList from luxtronik.definitions.calculations import CALCULATIONS_DEFINITIONS_LIST, CALCULATIONS_OFFSET -from luxtronik.constants import CALCULATIONS_FIELD_NAME -from luxtronik.shi.definitions import LuxtronikDefinitionsList +from luxtronik.cfi.constants import CALCULATIONS_FIELD_NAME from luxtronik.data_vector import DataVector from luxtronik.datatypes import Base diff --git a/luxtronik/cfi/constants.py b/luxtronik/cfi/constants.py new file mode 100644 index 00000000..dc3110f7 --- /dev/null +++ b/luxtronik/cfi/constants.py @@ -0,0 +1,23 @@ +"""Constants used throughout the Luxtronik config interface (CFI) module.""" + +# Default port to be used to connect to Luxtronik controller. +LUXTRONIK_DEFAULT_PORT = 8889 + +LUXTRONIK_PARAMETERS_WRITE = 3002 +LUXTRONIK_PARAMETERS_READ = 3003 +LUXTRONIK_CALCULATIONS_READ = 3004 +LUXTRONIK_VISIBILITIES_READ = 3005 + +LUXTRONIK_SOCKET_READ_SIZE_PEEK = 16 + +LUXTRONIK_SOCKET_READ_SIZE_INTEGER = 4 +LUXTRONIK_SOCKET_READ_SIZE_CHAR = 1 + +# Identifier of calculation data-vectors and partial name for unknown calculation fields +CALCULATIONS_FIELD_NAME = "calculation" + +# Identifier of parameter data-vectors and partial name for unknown parameter fields +PARAMETERS_FIELD_NAME = "parameter" + +# Identifier of visibilities data-vectors and partial name for unknown visibility fields +VISIBILITIES_FIELD_NAME = "visibility" \ No newline at end of file diff --git a/luxtronik/cfi/interface.py b/luxtronik/cfi/interface.py new file mode 100644 index 00000000..6306ec10 --- /dev/null +++ b/luxtronik/cfi/interface.py @@ -0,0 +1,262 @@ +"""Main components of the Luxtronik config interface.""" + +import logging +import socket +import struct +import time + +from luxtronik.common import get_host_lock +from luxtronik.cfi.constants import ( + LUXTRONIK_DEFAULT_PORT, + LUXTRONIK_PARAMETERS_WRITE, + LUXTRONIK_PARAMETERS_READ, + LUXTRONIK_CALCULATIONS_READ, + LUXTRONIK_VISIBILITIES_READ, + LUXTRONIK_SOCKET_READ_SIZE_INTEGER, + LUXTRONIK_SOCKET_READ_SIZE_CHAR, +) +from luxtronik.cfi.calculations import Calculations +from luxtronik.cfi.parameters import Parameters +from luxtronik.cfi.visibilities import Visibilities + + +LOGGER = logging.getLogger("Luxtronik") + +# Wait time (in seconds) after writing parameters to give controller +# some time to re-calculate values, etc. +WAIT_TIME_AFTER_PARAMETER_WRITE = 1 + +############################################################################### +# Config interface data +############################################################################### + +class LuxtronikData: + """ + Collection of parameters, calculations and visiblities. + Also provide some high level access functions to their data values. + """ + + def __init__(self, parameters=None, calculations=None, visibilities=None, safe=True): + self.parameters = Parameters(safe) if parameters is None else parameters + self.calculations = Calculations() if calculations is None else calculations + self.visibilities = Visibilities() if visibilities is None else visibilities + + def get_firmware_version(self): + return self.calculations.get_firmware_version() + +############################################################################### +# Config interface +############################################################################### + +class LuxtronikSocketInterface: + """Luxtronik read/write interface via socket.""" + + def __init__(self, host, port=LUXTRONIK_DEFAULT_PORT): + # Acquire a lock object for this host to ensure thread safety + self._lock = get_host_lock(host) + + self._host = host + self._port = port + self._socket = None + + @property + def lock(self): + return self._lock + + def _with_lock_and_connect(self, func, *args, **kwargs): + """ + Decorator around various read/write functions to connect first. + + This method is essentially a wrapper for the _read() and _write() methods. + Locking is being used to ensure that only a single socket operation is + performed at any point in time. This helps to avoid issues with the + Luxtronik controller, which seems unstable otherwise. + """ + with self.lock: + try: + ret_val = None + with socket.create_connection((self._host, self._port)) as sock: + self._socket = sock + LOGGER.info("Connected to Luxtronik heat pump %s:%s", self._host, self._port) + ret_val = func(*args, **kwargs) + except socket.gaierror as e: + LOGGER.error("Failed to connect to Luxtronik heat pump %s:%s. %s.", + self._host, self._port, f"Address-related error: {e}") + except socket.timeout as e: + LOGGER.error("Failed to connect to Luxtronik heat pump %s:%s. %s.", + self._host, self._port, f"Connection timed out: {e}") + except ConnectionRefusedError as e: + LOGGER.error("Failed to connect to Luxtronik heat pump %s:%s. %s.", + self._host, self._port, f"Connection refused: {e}") + except OSError as e: + LOGGER.error("Failed to connect to Luxtronik heat pump %s:%s. %s.", + self._host, self._port, f"OS error during connect: {e}") + except Exception as e: + LOGGER.error("Failed to connect to Luxtronik heat pump %s:%s. %s.", + self._host, self._port, f"Unknown exception: {e}") + self._socket = None + return ret_val + + def read(self, data=None): + """ + All available data will be read from the heat pump + and integrated to the passed data object. + This data object is returned afterwards, mainly for access to a newly created. + """ + if data is None: + data = LuxtronikData() + return self._with_lock_and_connect(self._read, data) + + def read_parameters(self, parameters=None): + """ + Read parameters from heat pump and integrate them to the passed dictionary. + This dictionary is returned afterwards, mainly for access to a newly created. + """ + if parameters is None: + parameters = Parameters() + return self._with_lock_and_connect(self._read_parameters, parameters) + + def read_calculations(self, calculations=None): + """ + Read calculations from heat pump and integrate them to the passed dictionary. + This dictionary is returned afterwards, mainly for access to a newly created. + """ + if calculations is None: + calculations = Calculations() + return self._with_lock_and_connect(self._read_calculations, calculations) + + def read_visibilities(self, visibilities=None): + """ + Read visibilities from heat pump and integrate them to the passed dictionary. + This dictionary is returned afterwards, mainly for access to a newly created. + """ + if visibilities is None: + visibilities = Visibilities() + return self._with_lock_and_connect(self._read_visibilities, visibilities) + + def write(self, parameters): + """ + Write all set parameters to the heat pump. + :param Parameters() parameters Parameter dictionary to be written + to the heatpump before reading all available data + from the heat pump. + """ + self._with_lock_and_connect(self._write, parameters) + + def write_and_read(self, parameters, data=None): + """ + Write all set parameter to the heat pump (see write()) + prior to reading back in all data from the heat pump (see read()) + after a short wait time + """ + if data is None: + data = LuxtronikData() + return self._with_lock_and_connect(self._write_and_read, parameters, data) + + def _read(self, data): + self._read_parameters(data.parameters) + self._read_calculations(data.calculations) + self._read_visibilities(data.visibilities) + return data + + def _write_and_read(self, parameters, data): + self._write(parameters) + return self._read(data) + + def _write(self, parameters): + for index, value in parameters.queue.items(): + if not isinstance(index, int) or not isinstance(value, int): + LOGGER.warning( + "%s: Parameter id '%s' or value '%s' invalid!", + self._host, + index, + value, + ) + continue + LOGGER.info("%s: Parameter '%d' set to '%s'", self._host, index, value) + self._send_ints(LUXTRONIK_PARAMETERS_WRITE, index, value) + cmd = self._read_int() + LOGGER.debug("%s: Command %s", self._host, cmd) + val = self._read_int() + LOGGER.debug("%s: Value %s", self._host, val) + # Flush queue after writing all values + parameters.queue = {} + # Give the heatpump a short time to handle the value changes/calculations: + time.sleep(WAIT_TIME_AFTER_PARAMETER_WRITE) + + def _read_parameters(self, parameters): + data = [] + self._send_ints(LUXTRONIK_PARAMETERS_READ, 0) + cmd = self._read_int() + LOGGER.debug("%s: Command %s", self._host, cmd) + length = self._read_int() + LOGGER.debug("%s: Length %s", self._host, length) + for _ in range(0, length): + data.append(self._read_int()) + LOGGER.info("%s: Read %d parameters", self._host, length) + parameters.parse(data) + return parameters + + def _read_calculations(self, calculations): + data = [] + self._send_ints(LUXTRONIK_CALCULATIONS_READ, 0) + cmd = self._read_int() + LOGGER.debug("%s: Command %s", self._host, cmd) + stat = self._read_int() + LOGGER.debug("%s: Stat %s", self._host, stat) + length = self._read_int() + LOGGER.debug("%s: Length %s", self._host, length) + for _ in range(0, length): + data.append(self._read_int()) + LOGGER.info("%s: Read %d calculations", self._host, length) + calculations.parse(data) + return calculations + + def _read_visibilities(self, visibilities): + data = [] + self._send_ints(LUXTRONIK_VISIBILITIES_READ, 0) + cmd = self._read_int() + LOGGER.debug("%s: Command %s", self._host, cmd) + length = self._read_int() + LOGGER.debug("%s: Length %s", self._host, length) + for _ in range(0, length): + data.append(self._read_char()) + LOGGER.info("%s: Read %d visibilities", self._host, length) + visibilities.parse(data) + return visibilities + + def _send_ints(self, *ints): + "Low-level helper to send a tuple of ints" + data = struct.pack(">" + "i" * len(ints), *ints) + LOGGER.debug("%s: sending %s", self._host, data) + self._socket.sendall(data) + + def _read_bytes(self, count): + "Low-level helper to receive a precise number of bytes" + total_reading = b"" + + while len(total_reading) is not count: + missing = count - len(total_reading) + + reading = self._socket.recv( missing ) + + if len(reading) == 0: + LOGGER.error("%s: Connection died.", self._host) + raise ConnectionError("Connection to %s died." % self._host) + + total_reading += reading + + if len(reading) is not missing: + LOGGER.debug("%s: received %s bytes out of %s bytes. Will read again.", self._host, len(reading), missing) + + return total_reading + + def _read_int(self): + "Low-level helper to receive an int" + reading = self._read_bytes(LUXTRONIK_SOCKET_READ_SIZE_INTEGER) + return struct.unpack(">i", reading)[0] + + def _read_char(self): + "Low-level helper to receive a signed int" + reading = self._read_bytes(LUXTRONIK_SOCKET_READ_SIZE_CHAR) + return struct.unpack(">b", reading)[0] \ No newline at end of file diff --git a/luxtronik/parameters.py b/luxtronik/cfi/parameters.py old mode 100755 new mode 100644 similarity index 92% rename from luxtronik/parameters.py rename to luxtronik/cfi/parameters.py index 5872b856..702f1d19 --- a/luxtronik/parameters.py +++ b/luxtronik/cfi/parameters.py @@ -3,10 +3,10 @@ import logging from typing import Final +from luxtronik.definitions import LuxtronikDefinitionsList from luxtronik.definitions.parameters import PARAMETERS_DEFINITIONS_LIST, PARAMETERS_OFFSET -from luxtronik.constants import PARAMETERS_FIELD_NAME -from luxtronik.shi.definitions import LuxtronikDefinitionsList +from luxtronik.cfi.constants import PARAMETERS_FIELD_NAME from luxtronik.data_vector import DataVector diff --git a/luxtronik/visibilities.py b/luxtronik/cfi/visibilities.py old mode 100755 new mode 100644 similarity index 86% rename from luxtronik/visibilities.py rename to luxtronik/cfi/visibilities.py index 1d40a521..04ed36e5 --- a/luxtronik/visibilities.py +++ b/luxtronik/cfi/visibilities.py @@ -3,10 +3,10 @@ import logging from typing import Final +from luxtronik.definitions import LuxtronikDefinitionsList from luxtronik.definitions.visibilities import VISIBILITIES_DEFINITIONS_LIST, VISIBILITIES_OFFSET -from luxtronik.constants import VISIBILITIES_FIELD_NAME -from luxtronik.shi.definitions import LuxtronikDefinitionsList +from luxtronik.cfi.constants import VISIBILITIES_FIELD_NAME from luxtronik.data_vector import DataVector diff --git a/luxtronik/common.py b/luxtronik/common.py index faf5040d..ed28ccdd 100644 --- a/luxtronik/common.py +++ b/luxtronik/common.py @@ -1,5 +1,9 @@ from threading import RLock +############################################################################### +# Multi-threading lock mechanism +############################################################################### + # Global lock to synchronize access to the hosts_locks dictionary _management_lock = RLock() _hosts_locks = {} @@ -23,8 +27,73 @@ def get_host_lock(host): _hosts_locks[host] = RLock() return _hosts_locks[host] +############################################################################### +# Class property +############################################################################### + class classproperty: def __init__(self, fget): self.fget = fget def __get__(self, instance, owner): return self.fget(owner) + +############################################################################### +# Version methods +############################################################################### + +def parse_version(version): + """ + Parse a version string into a tuple with exactly 4 integers. + The individual numbers correspond to `major.minor.patch.build`. + A given tuple of integers is expanded or reduced to 4 integers. + + Examples: + "1" -> (1, 0, 0, 0) + "2.1" -> (2, 1, 0, 0) + "3.2.1" -> (3, 2, 1, 0) + "1.2.3.4" -> (1, 2, 3, 4) + "1.2.3.4.5" -> (1, 2, 3, 4) # extra parts are ignored + "a.b" -> None + + Args: + version (str | tuple[int, ...]): Version string or version as tuple. + + Returns: + tuple[int, int, int, int] | None: Parsed version tuple, or None if invalid. + """ + if isinstance(version, tuple) and all(type(p) is int for p in version): + return (version + (0, 0, 0, 0))[:4] + elif isinstance(version, str): + parts = version.strip().split(".") + if not parts or any(not p.isdigit() for p in parts): + return None + nums = [int(p) for p in parts] + nums = (nums + [0, 0, 0, 0])[:4] + return tuple(nums) + else: + return None + + +def version_in_range(version, since=None, until=None): + """ + Check whether a version is within the specified range of `[since..until]`. + If an argument is None, the corresponding check is skipped. + + Args: + version (tuple[int, ...] | None): The version to check. + If None, returns True. + since (tuple[int, ...] | None): Lower bound (inclusive). + If None, no lower bound is applied. + until (tuple[int, ...] | None): Upper bound (inclusive). + If None, no upper bound is applied. + + Returns: + bool: True if version is within the range, False otherwise. + """ + if version is None: + return True + if since is not None and version < since: + return False + if until is not None and version > until: + return False + return True diff --git a/luxtronik/constants.py b/luxtronik/constants.py index d5336336..cb4e5795 100644 --- a/luxtronik/constants.py +++ b/luxtronik/constants.py @@ -1,18 +1,5 @@ """Constants used throughout the luxtronik module""" -# Default port to be used to connect to Luxtronik controller. -LUXTRONIK_DEFAULT_PORT = 8889 - -LUXTRONIK_PARAMETERS_WRITE = 3002 -LUXTRONIK_PARAMETERS_READ = 3003 -LUXTRONIK_CALCULATIONS_READ = 3004 -LUXTRONIK_VISIBILITIES_READ = 3005 - -LUXTRONIK_SOCKET_READ_SIZE_PEEK = 16 - -LUXTRONIK_SOCKET_READ_SIZE_INTEGER = 4 -LUXTRONIK_SOCKET_READ_SIZE_CHAR = 1 - # List of ports that are known to respond to discovery packets LUXTRONIK_DISCOVERY_PORTS = [4444, 47808] @@ -25,15 +12,6 @@ # Content of response that is contained in responses to discovery broadcast LUXTRONIK_DISCOVERY_RESPONSE_PREFIX = "2500;111;" -# Identifier of calculation data-vectors and partial name for unknown calculation fields -CALCULATIONS_FIELD_NAME = "calculation" - -# Identifier of parameter data-vectors and partial name for unknown parameter fields -PARAMETERS_FIELD_NAME = "parameter" - -# Identifier of visibilities data-vectors and partial name for unknown visibility fields -VISIBILITIES_FIELD_NAME = "visibility" - LUXTRONIK_NAME_CHECK_NONE = "none" LUXTRONIK_NAME_CHECK_PREFERRED = "preferred" LUXTRONIK_NAME_CHECK_OBSOLETE = "obsolete" diff --git a/luxtronik/definitions/__init__.py b/luxtronik/definitions/__init__.py new file mode 100644 index 00000000..d10822d2 --- /dev/null +++ b/luxtronik/definitions/__init__.py @@ -0,0 +1,540 @@ +""" +The metadata (`index`, `count`, ...) for a field (`Base`, `SelectionBase`) +is stored as a definition object. For ease of use, all definitions +of one type (`input`, `holding`, ...) are provided as a sorted list of objects. +This usually contains only predefined definitions (generated out of +`HOLDINGS_DEFINITIONS_LIST`, `INPUTS_DEFINITIONS_LIST`, ...), +but can be expanded by the user. +""" + +from luxtronik.common import ( + parse_version, + version_in_range +) +from luxtronik.datatypes import Unknown + +# TODO: Remove this LOGGER +import logging +LOGGER = logging.getLogger("Luxtronik.SmartHomeInterface") + +############################################################################### +# LuxtronikDefinition +############################################################################### + +class LuxtronikDefinition: + """ + Metadata container for a Luxtronik data field. + + Also provides a method to create a related field object. + """ + + DEFAULT_DATA = { + "index": -1, + "count": 1, + "type": Unknown, + "writeable": False, + "names": [], + "since": "", + "until": "", + "description": "", + } + + def __init__(self, data_dict, type_name, offset): + """ + Initialize a definition from a data-dictionary. + + Args: + data_dict (dict): Definition values. Missing keys are filled with defaults. + type_name (str): The type name e.g. 'parameter', 'holding', 'input', ... . + offset (str): Offset of the address from the specified index. + + Notes: + - Only 'index' is strictly required within the `data_dict`. + - The class may only be created with dictionaries + that have been checked for correctness using pytest. + This eliminates the need for type tests here. + """ + try: + data_dict = self.DEFAULT_DATA | data_dict + index = int(data_dict["index"]) + self._valid = index >= 0 + self._index = index if self._valid else 0 + self._count = int(data_dict["count"]) + self._data_type = data_dict["type"] + self._writeable = bool(data_dict["writeable"]) + names = data_dict["names"] + if not isinstance(names, list): + names = [str(names)] + names = [str(name).strip() for name in names if str(name).strip()] + if not names: + names = ["_invalid_"] + self._names = names + self._aliases = [] + since = str(data_dict["since"]) + self._since = parse_version(since) + until = str(data_dict["until"]) + self._until = parse_version(until) + self._description = str(data_dict["description"]) + self._type_name = type_name.lower() + self._valid &= len(self._type_name) > 0 + self._offset = int(offset) + self._addr = self._offset + self._index + except Exception as e: + self._valid = False + self._index = 0 + LOGGER.error(f"Failed to create LuxtronikDefinition: '{e}' with {data_dict}") + + @classmethod + def unknown(cls, index, type_name, offset): + """ + Create an "unknown" definition. + + Args: + index (int): The register index of the "unknown" definition. + type_name (str): The type name e.g. 'holding', 'input', ... . + offset (str): Offset of the address from the specified index. + + Returns: + LuxtronikDefinition: A definition marked as unknown. + """ + return cls({ + "index": index, + "names": [f"unknown_{type_name.lower()}_{index}"] + }, type_name, offset) + + def __bool__(self): + """Return True if the definition is valid.""" + return self._valid + + def __repr__(self): + return f"(name={self.name}, data_type={self.data_type}," \ + + f" index={self.index}, count={self.count})" + + @property + def valid(self): + return self._valid + + @property + def type_name(self): + "Returns the type name (e.g. 'parameter', 'holding', 'input', ...)." + return self._type_name + + @property + def index(self): + return self._index + + @property + def offset(self): + return self._offset + + @property + def addr(self): + return self._addr + + @property + def count(self): + "Returns the assigned number of used registers." + return self._count + + @property + def data_type(self): + return self._data_type + + @property + def writeable(self): + return self._writeable + + @property + def names(self): + return self._names + + @property + def aliases(self): + return self._aliases + + @property + def name(self): + "Returns the preferred name." + return self._names[0] + + @property + def since(self): + return self._since + + @property + def until(self): + return self._until + + def create_field(self): + """ + Create a data field instance from this definition. + + Returns: + Base | None: Field instance or None if invalid. + """ + return self.data_type(self.names, self.writeable) if self.valid else None + + +############################################################################### +# LuxtronikDefinitionsDictionary +############################################################################### + +class LuxtronikDefinitionsDictionary: + """ + Dictionary of definitions that can be searched by index, name, or aliases. + + To use aliases, they must first be registered here (locally = + only valid for this dictionary) or directly in the `LuxtronikDefinitionsList` + (globally = valid for all newly created dictionaries). + + This class is intended to speed up the lookup of definitions. + Dictionaries are used instead of searching through a list of definitions + one by one to find the one you are looking for. + """ + + def __init__(self): + self._index_dict = {} + self._name_dict = {} + self._alias_dict = {} + + def __getitem__(self, name_or_idx): + return self.get(name_or_idx) + + def __contains__(self, def_name_or_idx): + if isinstance(def_name_or_idx, LuxtronikDefinition): + return any(def_name_or_idx is d for d in self._index_dict.values()) + return self._get(def_name_or_idx) is not None + + def _add_alias(self, definition, alias): + """ + Register a single alias that references the given definition. + + Args: + definition (LuxtronikDefinition): Definition that the alias should map to. + alias (Hashable): Alias to register (str will be normalized). + """ + alias = alias.lower() if isinstance(alias, str) else alias + self._alias_dict[alias] = definition + + def register_alias(self, def_name_or_idx, alias): + """ + Register an alias (locally) that references a definition specified by + name, index, or the definition object. + + Args: + def_name_or_idx (str | int | LuxtronikDefinition): + Name, index, or definition to alias. + alias (Hashable): Alias key to register (str will be normalized). + + Returns: + LuxtronikDefinition | None: The resolved definition + when registration succeeded, otherwise None. + """ + if alias is None: + return None + # look-up definition + if isinstance(def_name_or_idx, LuxtronikDefinition): + definition = self.get(def_name_or_idx.name) + else: + definition = self.get(def_name_or_idx) + if definition is None: + return None + self._add_alias(definition, alias) + return definition + + def add(self, definition, alias=None): + """ + Add a definition to internal lookup tables and register its aliases. + Existing entries will be overwritten. + + Args: + definition (LuxtronikDefinition): Definition to add. + alias (Hashable): Optional additional alias to register for this definition. + """ + # Add to indices-dictionary + self._index_dict[definition.index] = definition + + # Add to name-dictionary + # Unique names has already been ensured by the pytest + for name in definition.names: + self._name_dict[name.lower()] = definition + + # Add to alias-dictionary + for a in definition.aliases: + self._add_alias(definition, a) + if alias is not None: + self._add_alias(definition, alias) + + def get(self, name_or_idx, default=None): + """ + Retrieve a definition by name or index. + + Args: + name_or_idx (str | int): Definition name or register index. + default (LuxtronikDefinition): Definition to return if the searched one is not found. + + Returns: + LuxtronikDefinition | None: The matching definition, or None if not found. + + Note: + If multiple definitions added for the same index/name, the last added takes precedence. + """ + d = self._get(name_or_idx) + if d is None: + LOGGER.debug(f"Definition for '{name_or_idx}' not found", ) + return d if d is not None else default + + def _get(self, name_or_idx): + """ + Retrieve a definition by name or index. + + Args: + name_or_idx (str | int): Definition name or register index. + + Returns: + LuxtronikDefinition | None: The matching definition, or None if not found. + + Note: + If multiple definitions added for the same index/name, the last added takes precedence. + """ + d = self._get_definition_by_alias(name_or_idx) + if d is None: + if isinstance(name_or_idx, int): + d = self._get_definition_by_idx(name_or_idx) + if d is None: + # search in alias-dict again with the index converted to a string + d = self._get_definition_by_alias(str(name_or_idx)) + if isinstance(name_or_idx, str): + try: + # Numbers are not allowed as names, so it could be an index as string + idx_from_str = int(name_or_idx) + d = self._get_definition_by_idx(idx_from_str) + if d is None: + # search in alias-dict again with the string converted to an index + d = self._get_definition_by_alias(str(name_or_idx)) + except ValueError: + d = self._get_definition_by_name(name_or_idx) + return d + + def _get_definition_by_idx(self, idx): + """ + Retrieve a definition by its index. + + Args: + idx (int): Register index. + + Returns: + LuxtronikDefinition | None: The matching definition, or None if not found. + + Note: + If multiple definitions added for the same index, the last added takes precedence. + """ + return self._index_dict.get(idx, None) + + def _get_definition_by_name(self, name): + """ + Retrieve a definition by its name (case-insensitive). + + Args: + name (str): Definition name. + + Returns: + LuxtronikDefinition | None: The matching definition, or None if not found. + + Note: + If multiple definitions added for the same name, the last added takes precedence. + """ + definition = self._name_dict.get(name.lower(), None) + if definition is not None and definition.valid and name.lower() != definition.name.lower(): + LOGGER.warning(f"'{name}' is outdated! Use '{definition.name}' instead.") + return definition + + def _get_definition_by_alias(self, alias): + """ + Retrieve a definition by its alias (case-insensitive when using strings). + + Args: + alias (Hashable): Alias for a definition. + + Returns: + LuxtronikDefinition | None: The matching definition, or None if not found. + + Note: + If multiple definitions added for the same alias, the last added takes precedence. + """ + alias = alias.lower() if isinstance(alias, str) else alias + return self._alias_dict.get(alias, None) + + +############################################################################### +# LuxtronikDefinitionsList +############################################################################### + +class LuxtronikDefinitionsList: + """ + Container for Luxtronik definitions. + + Provides lookup by index, name or alias. + + To use aliases, they must first be registered here (globally = valid for + all newly created dictionaries) or within the `LuxtronikDefinitionsDictionary` + (locally = only valid for that dictionary). + """ + + def _init_instance(self, name, offset, version): + """Re-usable method to initialize all instance variables.""" + self._name = name + self._offset = offset + self._version = version + # sorted list of all definitions + self._definitions = [] + self._lookup = LuxtronikDefinitionsDictionary() + + def __init__(self, definitions_list, name, offset): + """ + Initialize the (by index sorted) definitions list. + + Args: + definitions_list (list[dict]): Raw definition entries as list of data-dictionaries. + name (str): Name related to this type of definitions (e.g. "calculation", "holding", etc.) + offset (int): Offset applied to register indices. + + Notes on the definitions_list: + - Must be sorted by ascending index + - Each version may contain only one entry per register + - If there exists more than one definition per index, + only the last one can be found using indices/names + - The value of count must always be greater than or equal to 1 + - All names should be unique + """ + self._init_instance(name, offset, None) + + # Add definition objects only for valid items. + # The correct sorting has already been ensured by the pytest + for item in definitions_list: + d = LuxtronikDefinition(item, name, offset) + if d.valid: + self._add(d) + + @classmethod + def filtered(cls, definitions, version): + """ + Filter an existing definitions list by the given version + and return the new (by index sorted) definitions list. + + Args: + definitions (LuxtronikDefinitionsList): List of definitions to filter. + version (tuple[int] | None): + Only definitions that match this version are added to the list. + If None is passed, all available fields are added. + """ + obj = cls.__new__(cls) # this don't call __init__() + obj._init_instance(definitions.name, definitions.offset, version) + + for d in definitions: + if d.valid and version_in_range(obj._version, d.since, d.until): + obj._add(d) + + return obj + + def __getitem__(self, name_or_idx): + return self.get(name_or_idx) + + def __contains__(self, def_name_or_idx): + return def_name_or_idx in self._lookup + + def __len__(self): + return len(self._definitions) + + def __iter__(self): + return iter(self._definitions) + + def __repr__(self): + defs = [repr(d) for d in self._definitions] + return f"({self.name}, {self.offset}, {' ,'.join(defs)})" + + def create_unknown_definition(self, index): + """ + Create an "unknown" definition. + + Args: + index (int): The register index of the "unknown" definition. + + Returns: + LuxtronikDefinition: A definition marked as unknown. + """ + return LuxtronikDefinition.unknown(index, self._name, self._offset) + + def register_alias(self, def_name_or_idx, alias): + """ + Register an alias (globally) that references a definition specified by + name, index, or the definition object. + + Args: + def_name_or_idx (str | int | LuxtronikDefinition): + Name, index, or definition to alias. + alias (any): (Hashable) Alias key to register (str will be normalized). + + Returns: + LuxtronikDefinition | None: The resolved definition + when registration succeeded, otherwise None. + """ + # "local" registration to be able to find the definition again + definition = self._lookup.register_alias(def_name_or_idx, alias) + # "global" registration that is used in newly created definition-dictionaries + if definition is not None: + definition.aliases.append(alias) + return definition + + @property + def name(self): + return self._name + + @property + def offset(self): + return self._offset + + def get(self, name_or_idx, default=None): + """ + Retrieve a definition by name or index. + + Args: + name_or_idx (str | int): Definition name or register index. + + Returns: + LuxtronikDefinition | None: The matching definition, or None if not found. + + Note: + If multiple definitions added for the same index/name, the last added takes precedence. + """ + return self._lookup.get(name_or_idx, default) + + def _add(self, definition): + """ + Add a valid definition to the internal dictionaries + + Args: + definition (LuxtronikDefinition): Definition to add + """ + self._definitions.append(definition) + self._lookup.add(definition) + + def add(self, data_dict): + """ + Add a custom (valid) definition. Existing definitions will not be overwritten. + + Args: + data_dict (dict): Data for the definition to add + + Returns: + LuxtronikDefinition | None: The created definition or None if not valid + + Note: + If multiple definitions added for the same index/name, the last added takes precedence. + """ + definition = LuxtronikDefinition(data_dict, self._name, self._offset) + if not definition.valid: + return None + self._add(definition) + self._definitions.sort(key=lambda item: item.index) + return definition \ No newline at end of file diff --git a/luxtronik/shi/__init__.py b/luxtronik/shi/__init__.py index dea8b0df..ad20fddb 100644 --- a/luxtronik/shi/__init__.py +++ b/luxtronik/shi/__init__.py @@ -3,20 +3,18 @@ via the smart home interface. Powered by Guzz-T. """ +from luxtronik.common import parse_version from luxtronik.datatypes import FullVersion, MajorMinorVersion from luxtronik.shi.constants import ( LUXTRONIK_DEFAULT_MODBUS_PORT, LUXTRONIK_DEFAULT_MODBUS_TIMEOUT, LUXTRONIK_LATEST_SHI_VERSION, ) -from luxtronik.shi.common import LOGGER, parse_version -# Skip ruff unused-import (F401) by using "as" -from luxtronik.shi.inputs import Inputs as Inputs -from luxtronik.shi.inputs import INPUTS_DEFINITIONS as INPUTS_DEFINITIONS -from luxtronik.shi.holdings import Holdings as Holdings -from luxtronik.shi.holdings import HOLDINGS_DEFINITIONS as HOLDINGS_DEFINITIONS +from luxtronik.shi.common import LOGGER +from luxtronik.shi.inputs import INPUTS_DEFINITIONS, Inputs # noqa: F401 +from luxtronik.shi.holdings import HOLDINGS_DEFINITIONS, Holdings # noqa: F401 from luxtronik.shi.modbus import LuxtronikModbusTcpInterface -from luxtronik.shi.interface import LuxtronikSmartHomeInterface +from luxtronik.shi.interface import LuxtronikSmartHomeData, LuxtronikSmartHomeInterface # noqa: F401 VERSION_DETECT = "detect" VERSION_LATEST = "latest" diff --git a/luxtronik/shi/common.py b/luxtronik/shi/common.py index b5acf196..d4695683 100644 --- a/luxtronik/shi/common.py +++ b/luxtronik/shi/common.py @@ -7,67 +7,6 @@ LOGGER = logging.getLogger("Luxtronik.SmartHomeInterface") -############################################################################### -# Version methods -############################################################################### - -def parse_version(version): - """ - Parse a version string into a tuple with exactly 4 integers. - The individual numbers correspond to `major.minor.patch.build`. - A given tuple of integers is expanded or reduced to 4 integers. - - Examples: - "1" -> (1, 0, 0, 0) - "2.1" -> (2, 1, 0, 0) - "3.2.1" -> (3, 2, 1, 0) - "1.2.3.4" -> (1, 2, 3, 4) - "1.2.3.4.5" -> (1, 2, 3, 4) # extra parts are ignored - "a.b" -> None - - Args: - version (str | tuple[int, ...]): Version string or version as tuple. - - Returns: - tuple[int, int, int, int] | None: Parsed version tuple, or None if invalid. - """ - if isinstance(version, tuple) and all(type(p) is int for p in version): - return (version + (0, 0, 0, 0))[:4] - elif isinstance(version, str): - parts = version.strip().split(".") - if not parts or any(not p.isdigit() for p in parts): - return None - nums = [int(p) for p in parts] - nums = (nums + [0, 0, 0, 0])[:4] - return tuple(nums) - else: - return None - - -def version_in_range(version, since=None, until=None): - """ - Check whether a version is within the specified range of `[since..until]`. - If an argument is None, the corresponding check is skipped. - - Args: - version (tuple[int, ...] | None): The version to check. - If None, returns True. - since (tuple[int, ...] | None): Lower bound (inclusive). - If None, no lower bound is applied. - until (tuple[int, ...] | None): Upper bound (inclusive). - If None, no upper bound is applied. - - Returns: - bool: True if version is within the range, False otherwise. - """ - if version is None: - return True - if since is not None and version < since: - return False - if until is not None and version > until: - return False - return True - ############################################################################### # Smart home telegrams ############################################################################### diff --git a/luxtronik/shi/definitions.py b/luxtronik/shi/definitions.py index 56a945cf..2d18cde2 100644 --- a/luxtronik/shi/definitions.py +++ b/luxtronik/shi/definitions.py @@ -7,539 +7,10 @@ but can be expanded by the user. """ -from luxtronik.datatypes import Unknown from luxtronik.shi.constants import ( LUXTRONIK_SHI_REGISTER_BIT_SIZE, LUXTRONIK_VALUE_FUNCTION_NOT_AVAILABLE ) -from luxtronik.shi.common import ( - LOGGER, - parse_version, - version_in_range -) - - -############################################################################### -# LuxtronikDefinition -############################################################################### - -class LuxtronikDefinition: - """ - Metadata container for a Luxtronik data field. - - Also provides a method to create a related field object. - """ - - DEFAULT_DATA = { - "index": -1, - "count": 1, - "type": Unknown, - "writeable": False, - "names": [], - "since": "", - "until": "", - "description": "", - } - - def __init__(self, data_dict, type_name, offset): - """ - Initialize a definition from a data-dictionary. - - Args: - data_dict (dict): Definition values. Missing keys are filled with defaults. - type_name (str): The type name e.g. 'parameter', 'holding', 'input', ... . - offset (str): Offset of the address from the specified index. - - Notes: - - Only 'index' is strictly required within the `data_dict`. - - The class may only be created with dictionaries - that have been checked for correctness using pytest. - This eliminates the need for type tests here. - """ - try: - data_dict = self.DEFAULT_DATA | data_dict - index = int(data_dict["index"]) - self._valid = index >= 0 - self._index = index if self._valid else 0 - self._count = int(data_dict["count"]) - self._data_type = data_dict["type"] - self._writeable = bool(data_dict["writeable"]) - names = data_dict["names"] - if not isinstance(names, list): - names = [str(names)] - names = [str(name).strip() for name in names if str(name).strip()] - if not names: - names = ["_invalid_"] - self._names = names - self._aliases = [] - since = str(data_dict["since"]) - self._since = parse_version(since) - until = str(data_dict["until"]) - self._until = parse_version(until) - self._description = str(data_dict["description"]) - self._type_name = type_name.lower() - self._valid &= len(self._type_name) > 0 - self._offset = int(offset) - self._addr = self._offset + self._index - except Exception as e: - self._valid = False - self._index = 0 - LOGGER.error(f"Failed to create LuxtronikDefinition: '{e}' with {data_dict}") - - @classmethod - def unknown(cls, index, type_name, offset): - """ - Create an "unknown" definition. - - Args: - index (int): The register index of the "unknown" definition. - type_name (str): The type name e.g. 'holding', 'input', ... . - offset (str): Offset of the address from the specified index. - - Returns: - LuxtronikDefinition: A definition marked as unknown. - """ - return cls({ - "index": index, - "names": [f"unknown_{type_name.lower()}_{index}"] - }, type_name, offset) - - def __bool__(self): - """Return True if the definition is valid.""" - return self._valid - - def __repr__(self): - return f"(name={self.name}, data_type={self.data_type}," \ - + f" index={self.index}, count={self.count})" - - @property - def valid(self): - return self._valid - - @property - def type_name(self): - "Returns the type name (e.g. 'parameter', 'holding', 'input', ...)." - return self._type_name - - @property - def index(self): - return self._index - - @property - def offset(self): - return self._offset - - @property - def addr(self): - return self._addr - - @property - def count(self): - "Returns the assigned number of used registers." - return self._count - - @property - def data_type(self): - return self._data_type - - @property - def writeable(self): - return self._writeable - - @property - def names(self): - return self._names - - @property - def aliases(self): - return self._aliases - - @property - def name(self): - "Returns the preferred name." - return self._names[0] - - @property - def since(self): - return self._since - - @property - def until(self): - return self._until - - def create_field(self): - """ - Create a data field instance from this definition. - - Returns: - Base | None: Field instance or None if invalid. - """ - return self.data_type(self.names, self.writeable) if self.valid else None - - -############################################################################### -# LuxtronikDefinitionsDictionary -############################################################################### - -class LuxtronikDefinitionsDictionary: - """ - Dictionary of definitions that can be searched by index, name, or aliases. - - To use aliases, they must first be registered here (locally = - only valid for this dictionary) or directly in the `LuxtronikDefinitionsList` - (globally = valid for all newly created dictionaries). - - This class is intended to speed up the lookup of definitions. - Dictionaries are used instead of searching through a list of definitions - one by one to find the one you are looking for. - """ - - def __init__(self): - self._index_dict = {} - self._name_dict = {} - self._alias_dict = {} - - def __getitem__(self, name_or_idx): - return self.get(name_or_idx) - - def __contains__(self, def_name_or_idx): - if isinstance(def_name_or_idx, LuxtronikDefinition): - return any(def_name_or_idx is d for d in self._index_dict.values()) - return self._get(def_name_or_idx) is not None - - def _add_alias(self, definition, alias): - """ - Register a single alias that references the given definition. - - Args: - definition (LuxtronikDefinition): Definition that the alias should map to. - alias (Hashable): Alias to register (str will be normalized). - """ - alias = alias.lower() if isinstance(alias, str) else alias - self._alias_dict[alias] = definition - - def register_alias(self, def_name_or_idx, alias): - """ - Register an alias (locally) that references a definition specified by - name, index, or the definition object. - - Args: - def_name_or_idx (str | int | LuxtronikDefinition): - Name, index, or definition to alias. - alias (Hashable): Alias key to register (str will be normalized). - - Returns: - LuxtronikDefinition | None: The resolved definition - when registration succeeded, otherwise None. - """ - if alias is None: - return None - # look-up definition - if isinstance(def_name_or_idx, LuxtronikDefinition): - definition = self.get(def_name_or_idx.name) - else: - definition = self.get(def_name_or_idx) - if definition is None: - return None - self._add_alias(definition, alias) - return definition - - def add(self, definition, alias=None): - """ - Add a definition to internal lookup tables and register its aliases. - Existing entries will be overwritten. - - Args: - definition (LuxtronikDefinition): Definition to add. - alias (Hashable): Optional additional alias to register for this definition. - """ - # Add to indices-dictionary - self._index_dict[definition.index] = definition - - # Add to name-dictionary - # Unique names has already been ensured by the pytest - for name in definition.names: - self._name_dict[name.lower()] = definition - - # Add to alias-dictionary - for a in definition.aliases: - self._add_alias(definition, a) - if alias is not None: - self._add_alias(definition, alias) - - def get(self, name_or_idx, default=None): - """ - Retrieve a definition by name or index. - - Args: - name_or_idx (str | int): Definition name or register index. - default (LuxtronikDefinition): Definition to return if the searched one is not found. - - Returns: - LuxtronikDefinition | None: The matching definition, or None if not found. - - Note: - If multiple definitions added for the same index/name, the last added takes precedence. - """ - d = self._get(name_or_idx) - if d is None: - LOGGER.debug(f"Definition for '{name_or_idx}' not found", ) - return d if d is not None else default - - def _get(self, name_or_idx): - """ - Retrieve a definition by name or index. - - Args: - name_or_idx (str | int): Definition name or register index. - - Returns: - LuxtronikDefinition | None: The matching definition, or None if not found. - - Note: - If multiple definitions added for the same index/name, the last added takes precedence. - """ - d = self._get_definition_by_alias(name_or_idx) - if d is None: - if isinstance(name_or_idx, int): - d = self._get_definition_by_idx(name_or_idx) - if d is None: - # search in alias-dict again with the index converted to a string - d = self._get_definition_by_alias(str(name_or_idx)) - if isinstance(name_or_idx, str): - try: - # Numbers are not allowed as names, so it could be an index as string - idx_from_str = int(name_or_idx) - d = self._get_definition_by_idx(idx_from_str) - if d is None: - # search in alias-dict again with the string converted to an index - d = self._get_definition_by_alias(str(name_or_idx)) - except ValueError: - d = self._get_definition_by_name(name_or_idx) - return d - - def _get_definition_by_idx(self, idx): - """ - Retrieve a definition by its index. - - Args: - idx (int): Register index. - - Returns: - LuxtronikDefinition | None: The matching definition, or None if not found. - - Note: - If multiple definitions added for the same index, the last added takes precedence. - """ - return self._index_dict.get(idx, None) - - def _get_definition_by_name(self, name): - """ - Retrieve a definition by its name (case-insensitive). - - Args: - name (str): Definition name. - - Returns: - LuxtronikDefinition | None: The matching definition, or None if not found. - - Note: - If multiple definitions added for the same name, the last added takes precedence. - """ - definition = self._name_dict.get(name.lower(), None) - if definition is not None and definition.valid and name.lower() != definition.name.lower(): - LOGGER.warning(f"'{name}' is outdated! Use '{definition.name}' instead.") - return definition - - def _get_definition_by_alias(self, alias): - """ - Retrieve a definition by its alias (case-insensitive when using strings). - - Args: - alias (Hashable): Alias for a definition. - - Returns: - LuxtronikDefinition | None: The matching definition, or None if not found. - - Note: - If multiple definitions added for the same alias, the last added takes precedence. - """ - alias = alias.lower() if isinstance(alias, str) else alias - return self._alias_dict.get(alias, None) - - -############################################################################### -# LuxtronikDefinitionsList -############################################################################### - -class LuxtronikDefinitionsList: - """ - Container for Luxtronik definitions. - - Provides lookup by index, name or alias. - - To use aliases, they must first be registered here (globally = valid for - all newly created dictionaries) or within the `LuxtronikDefinitionsDictionary` - (locally = only valid for that dictionary). - """ - - def _init_instance(self, name, offset, version): - """Re-usable method to initialize all instance variables.""" - self._name = name - self._offset = offset - self._version = version - # sorted list of all definitions - self._definitions = [] - self._lookup = LuxtronikDefinitionsDictionary() - - def __init__(self, definitions_list, name, offset): - """ - Initialize the (by index sorted) definitions list. - - Args: - definitions_list (list[dict]): Raw definition entries as list of data-dictionaries. - name (str): Name related to this type of definitions (e.g. "calculation", "holding", etc.) - offset (int): Offset applied to register indices. - - Notes on the definitions_list: - - Must be sorted by ascending index - - Each version may contain only one entry per register - - If there exists more than one definition per index, - only the last one can be found using indices/names - - The value of count must always be greater than or equal to 1 - - All names should be unique - """ - self._init_instance(name, offset, None) - - # Add definition objects only for valid items. - # The correct sorting has already been ensured by the pytest - for item in definitions_list: - d = LuxtronikDefinition(item, name, offset) - if d.valid: - self._add(d) - - @classmethod - def filtered(cls, definitions, version): - """ - Filter an existing definitions list by the given version - and return the new (by index sorted) definitions list. - - Args: - definitions (LuxtronikDefinitionsList): List of definitions to filter. - version (tuple[int] | None): - Only definitions that match this version are added to the list. - If None is passed, all available fields are added. - """ - obj = cls.__new__(cls) # this don't call __init__() - obj._init_instance(definitions.name, definitions.offset, version) - - for d in definitions: - if d.valid and version_in_range(obj._version, d.since, d.until): - obj._add(d) - - return obj - - def __getitem__(self, name_or_idx): - return self.get(name_or_idx) - - def __contains__(self, def_name_or_idx): - return def_name_or_idx in self._lookup - - def __len__(self): - return len(self._definitions) - - def __iter__(self): - return iter(self._definitions) - - def __repr__(self): - defs = [repr(d) for d in self._definitions] - return f"({self.name}, {self.offset}, {' ,'.join(defs)})" - - def create_unknown_definition(self, index): - """ - Create an "unknown" definition. - - Args: - index (int): The register index of the "unknown" definition. - - Returns: - LuxtronikDefinition: A definition marked as unknown. - """ - return LuxtronikDefinition.unknown(index, self._name, self._offset) - - def register_alias(self, def_name_or_idx, alias): - """ - Register an alias (globally) that references a definition specified by - name, index, or the definition object. - - Args: - def_name_or_idx (str | int | LuxtronikDefinition): - Name, index, or definition to alias. - alias (any): (Hashable) Alias key to register (str will be normalized). - - Returns: - LuxtronikDefinition | None: The resolved definition - when registration succeeded, otherwise None. - """ - # "local" registration to be able to find the definition again - definition = self._lookup.register_alias(def_name_or_idx, alias) - # "global" registration that is used in newly created definition-dictionaries - if definition is not None: - definition.aliases.append(alias) - return definition - - @property - def name(self): - return self._name - - @property - def offset(self): - return self._offset - - def get(self, name_or_idx, default=None): - """ - Retrieve a definition by name or index. - - Args: - name_or_idx (str | int): Definition name or register index. - - Returns: - LuxtronikDefinition | None: The matching definition, or None if not found. - - Note: - If multiple definitions added for the same index/name, the last added takes precedence. - """ - return self._lookup.get(name_or_idx, default) - - def _add(self, definition): - """ - Add a valid definition to the internal dictionaries - - Args: - definition (LuxtronikDefinition): Definition to add - """ - self._definitions.append(definition) - self._lookup.add(definition) - - def add(self, data_dict): - """ - Add a custom (valid) definition. Existing definitions will not be overwritten. - - Args: - data_dict (dict): Data for the definition to add - - Returns: - LuxtronikDefinition | None: The created definition or None if not valid - - Note: - If multiple definitions added for the same index/name, the last added takes precedence. - """ - definition = LuxtronikDefinition(data_dict, self._name, self._offset) - if not definition.valid: - return None - self._add(definition) - self._definitions.sort(key=lambda item: item.index) - return definition ############################################################################### diff --git a/luxtronik/shi/holdings.py b/luxtronik/shi/holdings.py index 46e7b35e..768c1a66 100644 --- a/luxtronik/shi/holdings.py +++ b/luxtronik/shi/holdings.py @@ -3,10 +3,10 @@ import logging from typing import Final +from luxtronik.definitions import LuxtronikDefinitionsList from luxtronik.definitions.holdings import HOLDINGS_DEFINITIONS_LIST, HOLDINGS_OFFSET from luxtronik.shi.constants import HOLDINGS_FIELD_NAME -from luxtronik.shi.definitions import LuxtronikDefinitionsList from luxtronik.shi.vector import DataVectorSmartHome diff --git a/luxtronik/shi/inputs.py b/luxtronik/shi/inputs.py index 4ebdba89..18730c1b 100644 --- a/luxtronik/shi/inputs.py +++ b/luxtronik/shi/inputs.py @@ -3,10 +3,10 @@ import logging from typing import Final +from luxtronik.definitions import LuxtronikDefinitionsList from luxtronik.definitions.inputs import INPUTS_DEFINITIONS_LIST, INPUTS_OFFSET from luxtronik.shi.constants import INPUTS_FIELD_NAME -from luxtronik.shi.definitions import LuxtronikDefinitionsList from luxtronik.shi.vector import DataVectorSmartHome diff --git a/luxtronik/shi/interface.py b/luxtronik/shi/interface.py index b7b6b451..b8fee718 100644 --- a/luxtronik/shi/interface.py +++ b/luxtronik/shi/interface.py @@ -1,20 +1,19 @@ """Main components of the Luxtronik smart home interface.""" -from luxtronik.common import classproperty +from luxtronik.common import classproperty, version_in_range from luxtronik.datatypes import Base +from luxtronik.definitions import ( + LuxtronikDefinition, + LuxtronikDefinitionsList, +) from luxtronik.shi.constants import LUXTRONIK_LATEST_SHI_VERSION from luxtronik.shi.common import ( LOGGER, - version_in_range, LuxtronikSmartHomeReadHoldingsTelegram, LuxtronikSmartHomeReadInputsTelegram, LuxtronikSmartHomeWriteHoldingsTelegram, ) -from luxtronik.shi.definitions import ( - LuxtronikDefinition, - LuxtronikDefinitionsList, - check_data -) +from luxtronik.shi.definitions import check_data from luxtronik.shi.vector import DataVectorSmartHome from luxtronik.shi.holdings import Holdings, HOLDINGS_DEFINITIONS from luxtronik.shi.inputs import Inputs, INPUTS_DEFINITIONS diff --git a/luxtronik/shi/vector.py b/luxtronik/shi/vector.py index 510f0df2..705d9b14 100644 --- a/luxtronik/shi/vector.py +++ b/luxtronik/shi/vector.py @@ -1,13 +1,13 @@ +from luxtronik.common import version_in_range from luxtronik.data_vector import DataVector from luxtronik.datatypes import Base, Unknown - -from luxtronik.shi.constants import LUXTRONIK_LATEST_SHI_VERSION -from luxtronik.shi.common import version_in_range -from luxtronik.shi.definitions import ( - integrate_data, +from luxtronik.definitions import ( LuxtronikDefinition, LuxtronikDefinitionsDictionary, ) + +from luxtronik.shi.constants import LUXTRONIK_LATEST_SHI_VERSION +from luxtronik.shi.definitions import integrate_data from luxtronik.shi.contiguous import ContiguousDataBlockList ############################################################################### diff --git a/tests/test_calculations.py b/tests/cfi/test_cfi_calculations.py similarity index 88% rename from tests/test_calculations.py rename to tests/cfi/test_cfi_calculations.py index 3c06d351..f246cb36 100644 --- a/tests/test_calculations.py +++ b/tests/cfi/test_cfi_calculations.py @@ -2,7 +2,7 @@ # pylint: disable=too-few-public-methods -from luxtronik.calculations import Calculations +from luxtronik import Calculations class TestCalculations: diff --git a/tests/test_parameters.py b/tests/cfi/test_cfi_parameters.py similarity index 98% rename from tests/test_parameters.py rename to tests/cfi/test_cfi_parameters.py index 3cce3943..3d8683de 100644 --- a/tests/test_parameters.py +++ b/tests/cfi/test_cfi_parameters.py @@ -2,7 +2,7 @@ # pylint: disable=too-few-public-methods,invalid-name,protected-access -from luxtronik.parameters import Parameters +from luxtronik import Parameters class TestParameters: diff --git a/tests/test_visibilities.py b/tests/cfi/test_cfi_visibilities.py similarity index 88% rename from tests/test_visibilities.py rename to tests/cfi/test_cfi_visibilities.py index 5c7090bd..460d04ab 100644 --- a/tests/test_visibilities.py +++ b/tests/cfi/test_cfi_visibilities.py @@ -2,7 +2,7 @@ # pylint: disable=too-few-public-methods -from luxtronik.visibilities import Visibilities +from luxtronik import Visibilities class TestVisibilities: diff --git a/tests/shi/test_common.py b/tests/shi/test_shi_common.py similarity index 54% rename from tests/shi/test_common.py rename to tests/shi/test_shi_common.py index 65adbd09..9030a23d 100644 --- a/tests/shi/test_common.py +++ b/tests/shi/test_shi_common.py @@ -1,8 +1,4 @@ -import pytest - from luxtronik.shi.common import ( - parse_version, - version_in_range, LuxtronikSmartHomeReadTelegram, LuxtronikSmartHomeWriteTelegram, ) @@ -11,55 +7,6 @@ # Tests ############################################################################### -class TestVersion: - - @pytest.mark.parametrize( - "string, version", - [ - ("1", (1, 0, 0, 0)), - ("2.1", (2, 1, 0, 0)), - ("3.2.1", (3, 2, 1, 0)), - ("1.2.3.4", (1, 2, 3, 4)), - ("1.2.3.4.5", (1, 2, 3, 4)), - ("a.b", None), - ("hello", None), - ("foo.bar", None), - ("1_2", None), - ("3 4", None), - (None, None), - ((1, 0, 0, 4), (1, 0, 0, 4)), - ((2, 1, 3), (2, 1, 3, 0)), - ((3, 2), (3, 2, 0, 0)), - ((5,), (5, 0, 0, 0)), - ((), (0, 0, 0, 0)), - ((3, "foo", 2), None), - ] - ) - def test_parse(self, string, version): - parsed = parse_version(string) - assert parsed == version - - @pytest.mark.parametrize( - "version, since, until, in_range", - [ - (None, None, None, True), - ((1, 2), None, None, True), - (None, (5, 4), (2, 1), True), - ((2, 4), (1, 3), None, True), - ((2, 4), (5, 1), None, False), - ((2, 4), None, (2, 3, 9), False), - ((2, 4), None, (2, 4, 0, 1), True), - ((5, 6), (5, 6), (5, 6), True), - ((5, 6), (5, 7), (5, 6), False), - ((5, 6), (5, 6), (5, 5), False), - ((3, 7), (2, 8), (4, 6), True), - ] - ) - def test_in_range(self, version, since, until, in_range): - result = version_in_range(version, since, until) - assert result == in_range - - class TestReadTelegram: def test_init(self): diff --git a/tests/shi/test_contiguous.py b/tests/shi/test_shi_contiguous.py similarity index 99% rename from tests/shi/test_contiguous.py rename to tests/shi/test_shi_contiguous.py index 2153d256..cc655da4 100644 --- a/tests/shi/test_contiguous.py +++ b/tests/shi/test_shi_contiguous.py @@ -1,7 +1,7 @@ from luxtronik.datatypes import Base +from luxtronik.definitions import LuxtronikDefinition from luxtronik.shi.constants import LUXTRONIK_VALUE_FUNCTION_NOT_AVAILABLE -from luxtronik.shi.definitions import LuxtronikDefinition from luxtronik.shi.contiguous import ( ContiguousDataPart, ContiguousDataBlock, diff --git a/tests/shi/test_shi_definitions.py b/tests/shi/test_shi_definitions.py new file mode 100644 index 00000000..acb29813 --- /dev/null +++ b/tests/shi/test_shi_definitions.py @@ -0,0 +1,122 @@ +from luxtronik.definitions import LuxtronikDefinition +from luxtronik.shi.constants import LUXTRONIK_VALUE_FUNCTION_NOT_AVAILABLE +from luxtronik.shi.definitions import ( + get_data_arr, + check_data, + integrate_data, +) + +############################################################################### +# Tests +############################################################################### + +class TestDefinitionFieldPair: + + def test_data_arr(self): + definition = LuxtronikDefinition.unknown(2, 'Foo', 30) + field = definition.create_field() + field.concatenate_multiple_data_chunks = False + + # get from value + definition._count = 1 + field.raw = 5 + arr = get_data_arr(definition, field) + assert arr == [5] + assert check_data(definition, field) + + # get from array + definition._count = 2 + field.raw = [7, 3] + arr = get_data_arr(definition, field) + assert arr == [7, 3] + assert check_data(definition, field) + + # too much data + definition._count = 2 + field.raw = [4, 8, 1] + arr = get_data_arr(definition, field) + assert arr is None + assert not check_data(definition, field) + + # insufficient data + definition._count = 2 + field.raw = [9] + arr = get_data_arr(definition, field) + assert arr is None + assert not check_data(definition, field) + + field.concatenate_multiple_data_chunks = True + + # get from array + definition._count = 2 + field.raw = 0x0007_0003 + arr = get_data_arr(definition, field) + assert arr == [7, 3] + assert check_data(definition, field) + + # too much data + definition._count = 2 + field.raw = 0x0004_0008_0001 + arr = get_data_arr(definition, field) + assert arr == [8, 1] + assert check_data(definition, field) + + # insufficient data + definition._count = 2 + field.raw = 0x0009 + arr = get_data_arr(definition, field) + assert arr == [0, 9] + assert check_data(definition, field) + + def test_integrate(self): + definition = LuxtronikDefinition.unknown(2, 'Foo', 30) + field = definition.create_field() + field.concatenate_multiple_data_chunks = False + + data = [1, LUXTRONIK_VALUE_FUNCTION_NOT_AVAILABLE, 3, 4, 5, 6, 7] + + # set array + definition._count = 2 + integrate_data(definition, field, data) + assert field.raw == [3, 4] + integrate_data(definition, field, data, 4) + assert field.raw == [5, 6] + integrate_data(definition, field, data, 7) + assert field.raw is None + integrate_data(definition, field, data, 0) + assert field.raw is None + + # set value + definition._count = 1 + integrate_data(definition, field, data) + assert field.raw == 3 + integrate_data(definition, field, data, 5) + assert field.raw == 6 + integrate_data(definition, field, data, 9) + assert field.raw is None + integrate_data(definition, field, data, 1) + assert field.raw is None + + field.concatenate_multiple_data_chunks = True + + # set array + definition._count = 2 + integrate_data(definition, field, data) + assert field.raw == 0x0003_0004 + integrate_data(definition, field, data, 4) + assert field.raw == 0x0005_0006 + integrate_data(definition, field, data, 7) + assert field.raw is None + integrate_data(definition, field, data, 0) + assert field.raw is None + + # set value + definition._count = 1 + integrate_data(definition, field, data) + assert field.raw == 0x0003 + integrate_data(definition, field, data, 5) + assert field.raw == 0x0006 + integrate_data(definition, field, data, 9) + assert field.raw is None + integrate_data(definition, field, data, 1) + assert field.raw is None \ No newline at end of file diff --git a/tests/shi/test_interface.py b/tests/shi/test_shi_interface.py similarity index 99% rename from tests/shi/test_interface.py rename to tests/shi/test_shi_interface.py index ee9172ec..f474bb41 100644 --- a/tests/shi/test_interface.py +++ b/tests/shi/test_shi_interface.py @@ -2,6 +2,7 @@ from unittest.mock import patch from luxtronik.datatypes import Base, Unknown +from luxtronik.definitions import LuxtronikDefinition from luxtronik.shi.constants import ( LUXTRONIK_LATEST_SHI_VERSION, @@ -14,18 +15,18 @@ LuxtronikSmartHomeReadInputsTelegram, LuxtronikSmartHomeWriteHoldingsTelegram, ) -from luxtronik.shi.definitions import LuxtronikDefinition -from luxtronik.shi.holdings import HOLDINGS_DEFINITIONS, Holdings -from luxtronik.shi.inputs import INPUTS_DEFINITIONS from luxtronik.shi.contiguous import ( ContiguousDataBlock, ContiguousDataBlockList, ) -from luxtronik.shi.interface import ( +from luxtronik.shi import ( + HOLDINGS_DEFINITIONS, + Holdings, + INPUTS_DEFINITIONS, LuxtronikSmartHomeData, LuxtronikSmartHomeInterface, + create_modbus_tcp, ) -from luxtronik.shi import create_modbus_tcp ############################################################################### # Fake modbus client diff --git a/tests/shi/test_modbus.py b/tests/shi/test_shi_modbus.py similarity index 100% rename from tests/shi/test_modbus.py rename to tests/shi/test_shi_modbus.py diff --git a/tests/shi/test_vector.py b/tests/shi/test_shi_vector.py similarity index 99% rename from tests/shi/test_vector.py rename to tests/shi/test_shi_vector.py index 74e429dc..21e3a125 100644 --- a/tests/shi/test_vector.py +++ b/tests/shi/test_shi_vector.py @@ -1,8 +1,7 @@ - +from luxtronik.common import parse_version from luxtronik.datatypes import Base, Unknown -from luxtronik.shi.common import parse_version +from luxtronik.definitions import LuxtronikDefinitionsList from luxtronik.shi.vector import DataVectorSmartHome -from luxtronik.shi.definitions import LuxtronikDefinitionsList """ The test was originally written for "False". diff --git a/tests/test_common.py b/tests/test_common.py new file mode 100644 index 00000000..4d4f6d52 --- /dev/null +++ b/tests/test_common.py @@ -0,0 +1,58 @@ +import pytest + +from luxtronik.common import ( + parse_version, + version_in_range +) + +############################################################################### +# Tests +############################################################################### + +class TestVersion: + + @pytest.mark.parametrize( + "string, version", + [ + ("1", (1, 0, 0, 0)), + ("2.1", (2, 1, 0, 0)), + ("3.2.1", (3, 2, 1, 0)), + ("1.2.3.4", (1, 2, 3, 4)), + ("1.2.3.4.5", (1, 2, 3, 4)), + ("a.b", None), + ("hello", None), + ("foo.bar", None), + ("1_2", None), + ("3 4", None), + (None, None), + ((1, 0, 0, 4), (1, 0, 0, 4)), + ((2, 1, 3), (2, 1, 3, 0)), + ((3, 2), (3, 2, 0, 0)), + ((5,), (5, 0, 0, 0)), + ((), (0, 0, 0, 0)), + ((3, "foo", 2), None), + ] + ) + def test_parse(self, string, version): + parsed = parse_version(string) + assert parsed == version + + @pytest.mark.parametrize( + "version, since, until, in_range", + [ + (None, None, None, True), + ((1, 2), None, None, True), + (None, (5, 4), (2, 1), True), + ((2, 4), (1, 3), None, True), + ((2, 4), (5, 1), None, False), + ((2, 4), None, (2, 3, 9), False), + ((2, 4), None, (2, 4, 0, 1), True), + ((5, 6), (5, 6), (5, 6), True), + ((5, 6), (5, 7), (5, 6), False), + ((5, 6), (5, 6), (5, 5), False), + ((3, 7), (2, 8), (4, 6), True), + ] + ) + def test_in_range(self, version, since, until, in_range): + result = version_in_range(version, since, until) + assert result == in_range \ No newline at end of file diff --git a/tests/shi/test_definition_list.py b/tests/test_definition_list.py similarity index 99% rename from tests/shi/test_definition_list.py rename to tests/test_definition_list.py index 29686dd7..e749d89a 100644 --- a/tests/shi/test_definition_list.py +++ b/tests/test_definition_list.py @@ -1,5 +1,6 @@ import re +from luxtronik.common import parse_version from luxtronik.datatypes import Base from luxtronik.definitions.calculations import CALCULATIONS_DEFINITIONS_LIST from luxtronik.definitions.holdings import HOLDINGS_DEFINITIONS_LIST @@ -7,7 +8,6 @@ from luxtronik.definitions.parameters import PARAMETERS_DEFINITIONS_LIST from luxtronik.definitions.visibilities import VISIBILITIES_DEFINITIONS_LIST -from luxtronik.shi.common import parse_version KEY_IDX = "index" KEY_COUNT = "count" diff --git a/tests/shi/test_definitions.py b/tests/test_definitions.py similarity index 78% rename from tests/shi/test_definitions.py rename to tests/test_definitions.py index aaa1752b..b0c36293 100644 --- a/tests/shi/test_definitions.py +++ b/tests/test_definitions.py @@ -1,12 +1,8 @@ from luxtronik.datatypes import Base, Unknown -from luxtronik.shi.constants import LUXTRONIK_VALUE_FUNCTION_NOT_AVAILABLE -from luxtronik.shi.definitions import ( +from luxtronik.definitions import ( LuxtronikDefinition, LuxtronikDefinitionsDictionary, LuxtronikDefinitionsList, - get_data_arr, - check_data, - integrate_data, ) ############################################################################### @@ -457,116 +453,4 @@ def test_add(self): def test_repr(self): definitions = LuxtronikDefinitionsList(self.def_list, 'foo', 100) text = repr(definitions) - assert text - - -class TestDefinitionFieldPair: - - def test_data_arr(self): - definition = LuxtronikDefinition.unknown(2, 'Foo', 30) - field = definition.create_field() - field.concatenate_multiple_data_chunks = False - - # get from value - definition._count = 1 - field.raw = 5 - arr = get_data_arr(definition, field) - assert arr == [5] - assert check_data(definition, field) - - # get from array - definition._count = 2 - field.raw = [7, 3] - arr = get_data_arr(definition, field) - assert arr == [7, 3] - assert check_data(definition, field) - - # too much data - definition._count = 2 - field.raw = [4, 8, 1] - arr = get_data_arr(definition, field) - assert arr is None - assert not check_data(definition, field) - - # insufficient data - definition._count = 2 - field.raw = [9] - arr = get_data_arr(definition, field) - assert arr is None - assert not check_data(definition, field) - - field.concatenate_multiple_data_chunks = True - - # get from array - definition._count = 2 - field.raw = 0x0007_0003 - arr = get_data_arr(definition, field) - assert arr == [7, 3] - assert check_data(definition, field) - - # too much data - definition._count = 2 - field.raw = 0x0004_0008_0001 - arr = get_data_arr(definition, field) - assert arr == [8, 1] - assert check_data(definition, field) - - # insufficient data - definition._count = 2 - field.raw = 0x0009 - arr = get_data_arr(definition, field) - assert arr == [0, 9] - assert check_data(definition, field) - - def test_integrate(self): - definition = LuxtronikDefinition.unknown(2, 'Foo', 30) - field = definition.create_field() - field.concatenate_multiple_data_chunks = False - - data = [1, LUXTRONIK_VALUE_FUNCTION_NOT_AVAILABLE, 3, 4, 5, 6, 7] - - # set array - definition._count = 2 - integrate_data(definition, field, data) - assert field.raw == [3, 4] - integrate_data(definition, field, data, 4) - assert field.raw == [5, 6] - integrate_data(definition, field, data, 7) - assert field.raw is None - integrate_data(definition, field, data, 0) - assert field.raw is None - - # set value - definition._count = 1 - integrate_data(definition, field, data) - assert field.raw == 3 - integrate_data(definition, field, data, 5) - assert field.raw == 6 - integrate_data(definition, field, data, 9) - assert field.raw is None - integrate_data(definition, field, data, 1) - assert field.raw is None - - field.concatenate_multiple_data_chunks = True - - # set array - definition._count = 2 - integrate_data(definition, field, data) - assert field.raw == 0x0003_0004 - integrate_data(definition, field, data, 4) - assert field.raw == 0x0005_0006 - integrate_data(definition, field, data, 7) - assert field.raw is None - integrate_data(definition, field, data, 0) - assert field.raw is None - - # set value - definition._count = 1 - integrate_data(definition, field, data) - assert field.raw == 0x0003 - integrate_data(definition, field, data, 5) - assert field.raw == 0x0006 - integrate_data(definition, field, data, 9) - assert field.raw is None - integrate_data(definition, field, data, 1) - assert field.raw is None \ No newline at end of file + assert text \ No newline at end of file