diff --git a/.cspell.json b/.cspell.json index 314e8837e..1d34f3f5a 100644 --- a/.cspell.json +++ b/.cspell.json @@ -14,6 +14,8 @@ "ignorePaths": [ "**/*.json", "**/*.yaml", + "**/*_pb2.py", + "**/*.proto", ".gitignore", ], "import": [ diff --git a/.cspell/custom-dictionary-workspace.txt b/.cspell/custom-dictionary-workspace.txt index 217577ac0..6ff2148c3 100644 --- a/.cspell/custom-dictionary-workspace.txt +++ b/.cspell/custom-dictionary-workspace.txt @@ -6,6 +6,7 @@ afci AIO AIO's aiohttp +aiomqtt Alertfeed allclose Anson @@ -134,6 +135,7 @@ givtcp gridconsumption gridconsumptionpower growatt +GWMQTT HACS hadashboard hahistory @@ -169,6 +171,7 @@ ivtime jedlix jsyaml kaiming +keepalive killall kopt Kostal @@ -249,6 +252,7 @@ onmouseover openweathermap overfitting ownerapi +pbgw pdata pdetails perc diff --git a/apps/predbat/components.py b/apps/predbat/components.py index 8a6afc2ea..77241fef0 100644 --- a/apps/predbat/components.py +++ b/apps/predbat/components.py @@ -32,6 +32,14 @@ from db_manager import DatabaseManager from fox import FoxAPI from web_mcp import PredbatMCPServer + +try: + from gateway import GatewayMQTT + + HAS_GATEWAY = True +except (ImportError, Exception): + HAS_GATEWAY = False + GatewayMQTT = None from load_ml_component import LoadMLComponent from datetime import datetime, timezone, timedelta import asyncio @@ -309,6 +317,21 @@ }, } +if HAS_GATEWAY: + COMPONENT_LIST["gateway"] = { + "class": GatewayMQTT, + "name": "PredBat Gateway", + "event_filter": "predbat_gateway_", + "args": { + "gateway_device_id": {"required": True, "config": "gateway_device_id"}, + "mqtt_host": {"required": True, "config": "gateway_mqtt_host"}, + "mqtt_port": {"required": False, "config": "gateway_mqtt_port", "default": 8883}, + "mqtt_token": {"required": True, "config": "gateway_mqtt_token"}, + }, + "phase": 1, + "can_restart": True, + } + class Components: """Central component registry and lifecycle manager. diff --git a/apps/predbat/config.py b/apps/predbat/config.py index b18719d0b..22dbda7b9 100644 --- a/apps/predbat/config.py +++ b/apps/predbat/config.py @@ -1933,6 +1933,33 @@ "charge_discharge_with_rate": False, "target_soc_used_for_discharge": True, }, + "GWMQTT": { + "name": "ESP32 Gateway MQTT", + "has_rest_api": False, + "has_mqtt_api": False, + "output_charge_control": "power", + "charge_control_immediate": True, + "has_charge_enable_time": True, + "has_discharge_enable_time": True, + "has_target_soc": True, + "has_reserve_soc": True, + "has_timed_pause": False, + "charge_time_format": "HH:MM:SS", + "charge_time_entity_is_option": True, + "soc_units": "%", + "num_load_entities": 1, + "has_ge_inverter_mode": False, + "time_button_press": False, + "clock_time_format": "%H:%M:%S", + "write_and_poll_sleep": 2, + "has_time_window": True, + "support_charge_freeze": False, + "support_discharge_freeze": False, + "has_idle_time": False, + "can_span_midnight": True, + "charge_discharge_with_rate": False, + "target_soc_used_for_discharge": False, + }, } # Control modes for Solax inverters diff --git a/apps/predbat/gateway.py b/apps/predbat/gateway.py new file mode 100644 index 000000000..2d7bfce33 --- /dev/null +++ b/apps/predbat/gateway.py @@ -0,0 +1,934 @@ +"""ESP32 Gateway MQTT component. + +Provides full inverter telemetry and control via the ESP32 gateway's +MQTT interface. Registered in COMPONENT_LIST as 'gateway'. This is +the sole data source and control interface for SaaS users with a +gateway — no Home Assistant in the loop. +""" + +import asyncio +import json +import os +import ssl +import time +import uuid +import traceback + +from datetime import datetime +from component_base import ComponentBase + +try: + from proto import gateway_status_pb2 as pb + + HAS_PROTOBUF = True +except (ImportError, Exception): + pb = None + HAS_PROTOBUF = False + +try: + import aiohttp + + HAS_AIOHTTP = True +except ImportError: + aiohttp = None + HAS_AIOHTTP = False + +try: + import aiomqtt + + HAS_AIOMQTT = True +except ImportError: + HAS_AIOMQTT = False + +if not HAS_PROTOBUF: + raise ImportError("GatewayMQTT requires the 'protobuf' package: pip install protobuf") + + +# Entity mapping: protobuf field path → entity name +ENTITY_MAP = { + # Battery + "battery.soc_percent": "predbat_gateway_soc", + "battery.power_w": "predbat_gateway_battery_power", + "battery.voltage_v": "predbat_gateway_battery_voltage", + "battery.current_a": "predbat_gateway_battery_current", + "battery.temperature_c": "predbat_gateway_battery_temp", + "battery.soh_percent": "predbat_gateway_battery_soh", + "battery.cycle_count": "predbat_gateway_battery_cycles", + "battery.capacity_wh": "predbat_gateway_battery_capacity", + "battery.rate_max_w": "predbat_gateway_battery_rate_max", + # Power flows + "pv.power_w": "predbat_gateway_pv_power", + "grid.power_w": "predbat_gateway_grid_power", + "grid.voltage_v": "predbat_gateway_grid_voltage", + "grid.frequency_hz": "predbat_gateway_grid_frequency", + "load.power_w": "predbat_gateway_load_power", + "inverter.active_power_w": "predbat_gateway_inverter_power", + "inverter.temperature_c": "predbat_gateway_inverter_temp", + # Control + "control.mode": "predbat_gateway_mode", + "control.charge_enabled": "predbat_gateway_charge_enabled", + "control.discharge_enabled": "predbat_gateway_discharge_enabled", + "control.charge_rate_w": "predbat_gateway_charge_rate", + "control.discharge_rate_w": "predbat_gateway_discharge_rate", + "control.reserve_soc": "predbat_gateway_reserve", + "control.target_soc": "predbat_gateway_target_soc", + "control.force_power_w": "predbat_gateway_force_power", + "control.command_expires": "predbat_gateway_command_expires", + # Schedule + "schedule.charge_start": "predbat_gateway_charge_start", + "schedule.charge_end": "predbat_gateway_charge_end", + "schedule.discharge_start": "predbat_gateway_discharge_start", + "schedule.discharge_end": "predbat_gateway_discharge_end", +} + +# Plan re-publish interval (seconds) +_PLAN_REPUBLISH_INTERVAL = 5 * 60 + +# Telemetry staleness threshold (seconds) +_TELEMETRY_STALE_THRESHOLD = 120 + + +class GatewayMQTT(ComponentBase): + """ESP32 Gateway MQTT component for PredBat. + + Static methods handle data transformation (protobuf <-> entities/commands). + Instance methods handle MQTT lifecycle and ComponentBase integration. + """ + + def initialize(self, gateway_device_id=None, mqtt_host=None, mqtt_port=8883, mqtt_token=None, **kwargs): + """Initialize gateway configuration and build MQTT topic strings. + + Args: + gateway_device_id: The gateway's device ID (e.g. "pbgw_abc123"). + mqtt_host: MQTT broker hostname. + mqtt_port: MQTT broker port (default 8883 for TLS). + mqtt_token: JWT access token for MQTT authentication. + **kwargs: Additional keyword arguments (ignored). + """ + self.gateway_device_id = gateway_device_id + self.mqtt_host = mqtt_host + self.mqtt_port = mqtt_port + self.mqtt_token = mqtt_token + self.mqtt_token_expires_at = 0 + + # MQTT topic strings + self._topic_base = f"predbat/devices/{gateway_device_id}" if gateway_device_id else "predbat/devices/unknown" + self.topic_status = f"{self._topic_base}/status" + self.topic_online = f"{self._topic_base}/online" + self.topic_schedule = f"{self._topic_base}/schedule" + self.topic_command = f"{self._topic_base}/command" + + # Runtime state + self._mqtt_client = None + self._mqtt_task = None + self._mqtt_connected = False + self._gateway_online = False + self._last_telemetry_time = 0 + self._last_plan_data = None + self._last_plan_publish_time = 0 + self._plan_version = 0 + self._refresh_in_progress = False + self._error_count = 0 + + # Entity naming prefix + self.prefix = "predbat" + + # Auto-config state + self._last_status = None + self._auto_configured = False + self._last_published_plan = None + + async def run(self, seconds, first): + """Component run loop — called every 60 seconds by ComponentBase.start(). + + On the first call, starts the background MQTT listener task. + Subsequent calls perform housekeeping: token refresh checks and + plan re-publishing if stale. + + Args: + seconds: Elapsed seconds since component start. + first: True on the first invocation. + + Returns: + True on success, False on failure. + """ + if not HAS_AIOMQTT: + self.log("Error: GatewayMQTT: aiomqtt not installed — cannot start") + return False + + if not self.gateway_device_id or not self.mqtt_host: + self.log("Error: GatewayMQTT: gateway_device_id and mqtt_host are required") + return False + + if first: + # Start MQTT listener as a background task + self._mqtt_task = asyncio.ensure_future(self._mqtt_loop()) + self.log("Info: GatewayMQTT: MQTT listener task started") + return True + + # Housekeeping on subsequent runs + try: + # Check if MQTT task died unexpectedly + if self._mqtt_task and self._mqtt_task.done(): + exc = self._mqtt_task.exception() if not self._mqtt_task.cancelled() else None + if exc: + self.log(f"Warn: GatewayMQTT: MQTT task died with: {exc}") + self.log("Info: GatewayMQTT: Restarting MQTT listener task") + self._mqtt_task = asyncio.ensure_future(self._mqtt_loop()) + + # Token refresh check + await self._check_token_refresh() + + # Re-publish plan if stale + if self._last_plan_data and self._mqtt_connected: + elapsed = time.time() - self._last_plan_publish_time + if elapsed > _PLAN_REPUBLISH_INTERVAL: + await self._publish_raw(self.topic_schedule, self._last_plan_data, retain=True) + self._last_plan_publish_time = time.time() + self.log("Info: GatewayMQTT: Re-published execution plan (stale)") + + except Exception as e: + self.log(f"Warn: GatewayMQTT: housekeeping error: {e}") + + return True + + async def _mqtt_loop(self): + """Continuous MQTT listener with automatic reconnection. + + Connects to the broker with TLS, subscribes to status and online + topics, and dispatches incoming messages. Reconnects on failure + with exponential backoff. + """ + backoff = 5 + max_backoff = 60 + + while not self.api_stop: + try: + tls_context = ssl.create_default_context() + + client_id = f"predbat-{self.gateway_device_id}-{uuid.uuid4().hex[:8]}" + + async with aiomqtt.Client( + hostname=self.mqtt_host, + port=self.mqtt_port, + username=self.gateway_device_id, + password=self.mqtt_token, + tls_context=tls_context, + identifier=client_id, + keepalive=60, + ) as client: + self._mqtt_client = client + self._mqtt_connected = True + backoff = 5 # Reset backoff on successful connection + self.log(f"Info: GatewayMQTT: Connected to {self.mqtt_host}:{self.mqtt_port}") + + # Subscribe to status and LWT topics + await client.subscribe(self.topic_status, qos=1) + await client.subscribe(self.topic_online, qos=1) + self.log(f"Info: GatewayMQTT: Subscribed to {self.topic_status} and {self.topic_online}") + + async for message in client.messages: + if self.api_stop: + break + await self._handle_message(message) + + except asyncio.CancelledError: + self.log("Info: GatewayMQTT: MQTT loop cancelled") + break + except Exception as e: + self._error_count += 1 + self.log(f"Warn: GatewayMQTT: MQTT connection error: {e}") + self._mqtt_connected = False + self._mqtt_client = None + + if self.api_stop: + break + + self.log(f"Info: GatewayMQTT: Reconnecting in {backoff}s") + await asyncio.sleep(backoff) + backoff = min(backoff * 2, max_backoff) + + self._mqtt_connected = False + self._mqtt_client = None + + async def _handle_message(self, message): + """Dispatch an incoming MQTT message to the appropriate handler. + + Args: + message: An aiomqtt.Message with topic and payload. + """ + topic = str(message.topic) + + try: + if topic == self.topic_status: + self._process_telemetry(message.payload) + elif topic == self.topic_online: + payload = message.payload.decode("utf-8", errors="replace").strip() + was_online = self._gateway_online + self._gateway_online = payload == "1" + if self._gateway_online != was_online: + self.log(f"Info: GatewayMQTT: Gateway is {'online' if self._gateway_online else 'offline'}") + self.set_state_wrapper( + f"binary_sensor.{self.prefix}_gateway_online", + self._gateway_online, + attributes={"friendly_name": "Gateway Online"}, + ) + except Exception as e: + self._error_count += 1 + self.log(f"Warn: GatewayMQTT: Error handling message on {topic}: {e}") + self.log(f"Warn: {traceback.format_exc()}") + + def _process_telemetry(self, data): + """Decode telemetry protobuf and inject per-inverter entities. + + Args: + data: Raw protobuf bytes from the /status topic. + """ + try: + status = pb.GatewayStatus() + status.ParseFromString(data) + except Exception as e: + self._error_count += 1 + self.log(f"Warn: GatewayMQTT: Failed to decode telemetry: {e}") + return + + if len(status.inverters) == 0: + return + + self._last_status = status + self._last_telemetry_time = time.time() + self.update_success_timestamp() + + if not self.api_started: + self.api_started = True + self.log("Info: GatewayMQTT: First telemetry received, API started") + + self._inject_entities(status) + + if not self._auto_configured: + self.automatic_config() + + def _inject_entities(self, status): + """Inject inverter entities into PredBat state cache. + + Maps GatewayStatus fields to PredBat entity format using HA-style + entity naming: {type}.{prefix}_gateway_{suffix}_{attribute} + """ + device_id = status.device_id + firmware = status.firmware + + self.set_state_wrapper( + f"binary_sensor.{self.prefix}_gateway_online", + True, + attributes={"device_id": device_id, "firmware": firmware}, + ) + + # Inverter time from gateway timestamp + if status.timestamp > 0 and len(status.inverters) > 0: + dt = datetime.fromtimestamp(status.timestamp) + ts_suffix = status.inverters[0].serial[-6:].lower() + self.set_state_wrapper( + f"sensor.{self.prefix}_gateway_{ts_suffix}_inverter_time", + dt.strftime("%Y-%m-%d %H:%M:%S"), + ) + + for inv in status.inverters: + suffix = inv.serial[-6:].lower() if len(inv.serial) > 6 else inv.serial.lower() + self._inject_inverter_entities(inv, suffix) + + # EMS aggregate entities (when type is GIVENERGY_EMS) + inv0 = status.inverters[0] + if inv0.type == pb.INVERTER_TYPE_GIVENERGY_EMS and inv0.ems.num_inverters > 0: + pfx = f"{self.prefix}_gateway" + self.set_state_wrapper(f"sensor.{pfx}_ems_total_soc", inv0.ems.total_soc) + self.set_state_wrapper(f"sensor.{pfx}_ems_total_charge", inv0.ems.total_charge_w) + self.set_state_wrapper(f"sensor.{pfx}_ems_total_discharge", inv0.ems.total_discharge_w) + self.set_state_wrapper(f"sensor.{pfx}_ems_total_grid", inv0.ems.total_grid_w) + self.set_state_wrapper(f"sensor.{pfx}_ems_total_pv", inv0.ems.total_pv_w) + self.set_state_wrapper(f"sensor.{pfx}_ems_total_load", inv0.ems.total_load_w) + + for idx, sub in enumerate(inv0.ems.sub_inverters): + sp = f"sensor.{pfx}_sub{idx}" + self.set_state_wrapper(f"{sp}_soc", sub.soc) + self.set_state_wrapper(f"{sp}_battery_power", sub.battery_w) + self.set_state_wrapper(f"{sp}_pv_power", sub.pv_w) + self.set_state_wrapper(f"{sp}_grid_power", sub.grid_w) + self.set_state_wrapper(f"{sp}_temp", sub.temp_c) + + def _inject_inverter_entities(self, inv, suffix): + """Inject entities for a single inverter using HA-style naming. + + Entity naming pattern: {type}.{prefix}_gateway_{suffix}_{attribute} + """ + pfx = f"{self.prefix}_gateway_{suffix}" + + bat = inv.battery + self.set_state_wrapper(f"sensor.{pfx}_soc", bat.soc_percent) + self.set_state_wrapper(f"sensor.{pfx}_battery_power", bat.power_w) + self.set_state_wrapper(f"sensor.{pfx}_battery_voltage", bat.voltage_v) + self.set_state_wrapper(f"sensor.{pfx}_battery_current", bat.current_a) + self.set_state_wrapper(f"sensor.{pfx}_battery_temperature", bat.temperature_c) + if bat.capacity_wh: + self.set_state_wrapper(f"sensor.{pfx}_battery_capacity", round(bat.capacity_wh / 1000.0, 2)) + if bat.soh_percent > 0: + self.set_state_wrapper(f"sensor.{pfx}_battery_soh", bat.soh_percent) + if bat.rate_max_w > 0: + self.set_state_wrapper(f"sensor.{pfx}_battery_rate_max", bat.rate_max_w) + + self.set_state_wrapper(f"sensor.{pfx}_pv_power", inv.pv.power_w) + + grid = inv.grid + self.set_state_wrapper(f"sensor.{pfx}_grid_power", grid.power_w) + if grid.voltage_v: + self.set_state_wrapper(f"sensor.{pfx}_grid_voltage", grid.voltage_v) + if grid.frequency_hz: + self.set_state_wrapper(f"sensor.{pfx}_grid_frequency", grid.frequency_hz) + + self.set_state_wrapper(f"sensor.{pfx}_load_power", inv.load.power_w) + + self.set_state_wrapper(f"sensor.{pfx}_inverter_power", inv.inverter.active_power_w) + if inv.inverter.temperature_c: + self.set_state_wrapper(f"sensor.{pfx}_inverter_temperature", inv.inverter.temperature_c) + + control = inv.control + self.set_state_wrapper(f"switch.{pfx}_charge_enabled", control.charge_enabled) + self.set_state_wrapper(f"switch.{pfx}_discharge_enabled", control.discharge_enabled) + self.set_state_wrapper(f"number.{pfx}_charge_rate", control.charge_rate_w) + self.set_state_wrapper(f"number.{pfx}_discharge_rate", control.discharge_rate_w) + self.set_state_wrapper(f"number.{pfx}_reserve_soc", control.reserve_soc) + self.set_state_wrapper(f"number.{pfx}_target_soc", control.target_soc) + + # Schedule times (convert HHMM uint32 → HH:MM:SS string) + if inv.HasField("schedule"): + sched = inv.schedule + for field, name in [ + ("charge_start", "charge_slot1_start"), + ("charge_end", "charge_slot1_end"), + ("discharge_start", "discharge_slot1_start"), + ("discharge_end", "discharge_slot1_end"), + ]: + hhmm = getattr(sched, field, 0) + hours = hhmm // 100 + minutes = hhmm % 100 + time_str = f"{hours:02d}:{minutes:02d}:00" + self.set_state_wrapper(f"select.{pfx}_{name}", time_str) + + # Energy counters (Wh → kWh) + if inv.HasField("energy"): + energy = inv.energy + self.set_state_wrapper(f"sensor.{pfx}_pv_today", round(energy.pv_today_wh / 1000.0, 2)) + self.set_state_wrapper(f"sensor.{pfx}_import_today", round(energy.grid_import_today_wh / 1000.0, 2)) + self.set_state_wrapper(f"sensor.{pfx}_export_today", round(energy.grid_export_today_wh / 1000.0, 2)) + self.set_state_wrapper(f"sensor.{pfx}_load_today", round(energy.consumption_today_wh / 1000.0, 2)) + + def automatic_config(self): + """Register gateway entities with PredBat's inverter model. + + Called once after first telemetry is received. Maps proto fields + to PredBat args so the plan engine has data to work with. + """ + if not self._last_status: + self.log("Error: GatewayMQTT: automatic_config called with no status data") + return + + status = self._last_status + inverters = status.inverters + + if not inverters: + self.log("Error: GatewayMQTT: no inverters in gateway status") + return + + num_inverters = len(inverters) + self.log(f"Info: GatewayMQTT: auto-config: {num_inverters} inverter(s)") + + # Set inverter type + self.set_arg("inverter_type", ["GWMQTT"] * num_inverters) + self.set_arg("num_inverters", num_inverters) + + # Per-inverter entity mappings + soc_entities = [] + soc_max_entities = [] + battery_power_entities = [] + pv_power_entities = [] + grid_power_entities = [] + load_power_entities = [] + charge_rate_entities = [] + discharge_rate_entities = [] + reserve_entities = [] + charge_limit_entities = [] + battery_temp_entities = [] + charge_start_entities = [] + charge_end_entities = [] + discharge_start_entities = [] + discharge_end_entities = [] + charge_enable_entities = [] + discharge_enable_entities = [] + + for inv in inverters: + suffix = inv.serial[-6:].lower() + base = f"{self.prefix}_gateway_{suffix}" + + soc_entities.append(f"sensor.{base}_soc") + battery_power_entities.append(f"sensor.{base}_battery_power") + pv_power_entities.append(f"sensor.{base}_pv_power") + grid_power_entities.append(f"sensor.{base}_grid_power") + load_power_entities.append(f"sensor.{base}_load_power") + charge_rate_entities.append(f"number.{base}_charge_rate") + discharge_rate_entities.append(f"number.{base}_discharge_rate") + reserve_entities.append(f"number.{base}_reserve_soc") + charge_limit_entities.append(f"number.{base}_target_soc") + battery_temp_entities.append(f"sensor.{base}_battery_temperature") + charge_start_entities.append(f"select.{base}_charge_slot1_start") + charge_end_entities.append(f"select.{base}_charge_slot1_end") + discharge_start_entities.append(f"select.{base}_discharge_slot1_start") + discharge_end_entities.append(f"select.{base}_discharge_slot1_end") + charge_enable_entities.append(f"switch.{base}_charge_enabled") + discharge_enable_entities.append(f"switch.{base}_discharge_enabled") + + # soc_max: from battery capacity entity + if inv.battery.capacity_wh > 0: + soc_max_entities.append(f"sensor.{base}_battery_capacity") + else: + self.log(f"Warn: GatewayMQTT: inverter {inv.serial} has no battery capacity, using fallback") + soc_max_entities.append(None) + + # Map entity lists to PredBat args + self.set_arg("soc_percent", soc_entities) + self.set_arg("soc_max", soc_max_entities) + self.set_arg("battery_power", battery_power_entities) + self.set_arg("pv_power", pv_power_entities) + self.set_arg("grid_power", grid_power_entities) + self.set_arg("load_power", load_power_entities) + self.set_arg("charge_rate", charge_rate_entities) + self.set_arg("discharge_rate", discharge_rate_entities) + self.set_arg("reserve", reserve_entities) + self.set_arg("charge_limit", charge_limit_entities) + self.set_arg("battery_temperature", battery_temp_entities) + self.set_arg("charge_start_time", charge_start_entities) + self.set_arg("charge_end_time", charge_end_entities) + self.set_arg("discharge_start_time", discharge_start_entities) + self.set_arg("discharge_end_time", discharge_end_entities) + self.set_arg("scheduled_charge_enable", charge_enable_entities) + self.set_arg("scheduled_discharge_enable", discharge_enable_entities) + + # Energy counters (first inverter) + suffix0 = inverters[0].serial[-6:].lower() + base0 = f"{self.prefix}_gateway_{suffix0}" + self.set_arg("pv_today", [f"sensor.{base0}_pv_today"]) + self.set_arg("import_today", [f"sensor.{base0}_import_today"]) + self.set_arg("export_today", [f"sensor.{base0}_export_today"]) + self.set_arg("load_today", [f"sensor.{base0}_load_today"]) + + # Battery health (first inverter) + self.set_arg("battery_temperature_history", f"sensor.{base0}_battery_temperature") + + # Battery rate max + rate_max = inverters[0].battery.rate_max_w + if rate_max > 0: + self.set_arg("battery_rate_max", [f"sensor.{base0}_battery_rate_max"]) + else: + self.log("Warn: GatewayMQTT: no battery_rate_max from gateway, using default 6000W") + self.set_arg("battery_rate_max", [6000]) + + # Inverter time (clock drift detection) + self.set_arg("inverter_time", [f"sensor.{base0}_inverter_time"]) + + # EMS aggregate entities (GivEnergy EMS only) + inv0 = inverters[0] + if inv0.type == pb.INVERTER_TYPE_GIVENERGY_EMS and inv0.ems.num_inverters > 0: + pfx = f"{self.prefix}_gateway" + self.set_arg("ems_total_soc", f"sensor.{pfx}_ems_total_soc") + self.set_arg("ems_total_charge", f"sensor.{pfx}_ems_total_charge") + self.set_arg("ems_total_discharge", f"sensor.{pfx}_ems_total_discharge") + self.set_arg("ems_total_grid", f"sensor.{pfx}_ems_total_grid") + self.set_arg("ems_total_pv", f"sensor.{pfx}_ems_total_pv") + self.set_arg("ems_total_load", f"sensor.{pfx}_ems_total_load") + self.log(f"Info: GatewayMQTT: EMS mode with {inv0.ems.num_inverters} sub-inverters") + + # Explicitly set unsupported features to None + self.set_arg("givtcp_rest", None) + self.set_arg("charge_rate_percent", None) + self.set_arg("discharge_rate_percent", None) + self.set_arg("pause_mode", None) + self.set_arg("pause_start_time", None) + self.set_arg("pause_end_time", None) + self.set_arg("discharge_target_soc", None) + + self._auto_configured = True + self.log(f"Info: GatewayMQTT: auto-config complete: {num_inverters} inverter(s) registered") + + def _plan_changed(self, plan_entries): + """Check if the plan differs from the last published plan.""" + if self._last_published_plan is None: + return True + return plan_entries != self._last_published_plan + + async def publish_plan(self, plan_entries, timezone_str): + """Build and publish an ExecutionPlan protobuf to the gateway. + + Args: + plan_entries: List of plan entry dicts. + timezone_str: IANA timezone string (e.g. "Europe/London"). + """ + if not self._plan_changed(plan_entries): + return # No change, skip publish + + self._plan_version += 1 + data = self.build_execution_plan(plan_entries, plan_version=self._plan_version, timezone=timezone_str) + self._last_plan_data = data + self._last_plan_publish_time = time.time() + + if self._mqtt_connected: + await self._publish_raw(self.topic_schedule, data, retain=True) + self._last_published_plan = plan_entries + self.log(f"Info: GatewayMQTT: Published execution plan v{self._plan_version} ({len(plan_entries)} entries)") + else: + self.log("Warn: GatewayMQTT: Not connected — plan queued for next publish") + + async def publish_command(self, command, **kwargs): + """Build and publish a JSON command to the gateway. + + Args: + command: Command name (set_mode, set_charge_rate, etc.) + **kwargs: Command-specific fields (mode, power_w, target_soc). + """ + cmd_json = self.build_command(command, **kwargs) + + if self._mqtt_connected: + await self._publish_raw(self.topic_command, cmd_json.encode("utf-8")) + self.log(f"Info: GatewayMQTT: Published command: {command}") + else: + self.log(f"Warn: GatewayMQTT: Not connected — cannot publish command: {command}") + + async def _publish_raw(self, topic, payload, retain=False): + """Publish raw bytes to an MQTT topic. + + Args: + topic: MQTT topic string. + payload: Bytes to publish. + retain: Whether to set the retain flag. + """ + if self._mqtt_client and self._mqtt_connected: + await self._mqtt_client.publish(topic, payload, qos=1, retain=retain) + + def is_alive(self): + """Check if the gateway component is alive and receiving data. + + Returns True when MQTT is connected AND either the gateway is + offline (LWT says so — we're still connected, just no data) OR + we've received telemetry within the last 2 minutes. + + Returns: + bool: True if healthy, False otherwise. + """ + if not self._mqtt_connected: + return False + + if not self._gateway_online: + # Gateway is offline but we're connected to broker — that's OK + return True + + # Gateway is online — check telemetry freshness + if self._last_telemetry_time == 0: + return False + + return (time.time() - self._last_telemetry_time) < _TELEMETRY_STALE_THRESHOLD + + def get_error_count(self): + """Return the cumulative error count (decode failures, MQTT disconnects, publish failures).""" + return self._error_count + + async def select_event(self, entity_id, value): + """Handle select entity changes (e.g. mode selection). + + Args: + entity_id: The entity ID that changed. + value: The new selected value. + """ + if "gateway_mode" in entity_id: + mode_map = {"auto": 0, "charge": 1, "discharge": 2, "idle": 3} + mode_val = mode_map.get(str(value).lower()) + if mode_val is not None: + await self.publish_command("set_mode", mode=mode_val) + self.log(f"Info: GatewayMQTT: Mode set to {value} ({mode_val})") + + async def number_event(self, entity_id, value): + """Handle number entity changes (e.g. charge rate, target SOC). + + Args: + entity_id: The entity ID that changed. + value: The new numeric value. + """ + try: + val = int(float(value)) + except (ValueError, TypeError): + self.log(f"Warn: GatewayMQTT: Invalid number value: {value}") + return + + if "charge_rate" in entity_id: + await self.publish_command("set_charge_rate", power_w=val) + elif "discharge_rate" in entity_id: + await self.publish_command("set_discharge_rate", power_w=val) + elif "reserve" in entity_id: + await self.publish_command("set_reserve", target_soc=val) + elif "target_soc" in entity_id: + await self.publish_command("set_target_soc", target_soc=val) + + async def switch_event(self, entity_id, service): + """Handle switch entity service calls. Stub for v1. + + Args: + entity_id: The entity ID being controlled. + service: The service being called (turn_on/turn_off). + """ + pass + + async def final(self): + """Cleanup: send AUTO mode, cancel listener task, disconnect.""" + try: + # Send AUTO mode before disconnecting + if self._mqtt_connected: + await self.publish_command("set_mode", mode=0) + self.log("Info: GatewayMQTT: Sent AUTO mode on shutdown") + except Exception as e: + self.log(f"Warn: GatewayMQTT: Error sending final AUTO mode: {e}") + + # Cancel the MQTT listener task + if self._mqtt_task and not self._mqtt_task.done(): + self._mqtt_task.cancel() + try: + await self._mqtt_task + except (asyncio.CancelledError, Exception): + pass + + self._mqtt_connected = False + self._mqtt_client = None + self.log("Info: GatewayMQTT: Finalized") + + async def _check_token_refresh(self): + """Check if the MQTT JWT token needs refreshing and refresh if needed. + + Uses the oauth-refresh edge function (same pattern as OAuthMixin) to + obtain a new access token before the current one expires. The refresh + token is held server-side in instance secrets. + """ + if not HAS_AIOHTTP: + return + + # Extract expiry from JWT if not yet known + if not self.mqtt_token_expires_at and self.mqtt_token: + exp = self.extract_jwt_expiry(self.mqtt_token) + if exp: + self.mqtt_token_expires_at = exp + else: + # Parse failed — set sentinel to avoid retrying every cycle + self.mqtt_token_expires_at = -1 + self.log("Warn: GatewayMQTT: could not extract JWT expiry, token refresh disabled") + return + + if self.mqtt_token_expires_at and self.mqtt_token_expires_at > 0 and not self.token_needs_refresh(self.mqtt_token_expires_at): + return + + if self.mqtt_token_expires_at == -1: + return + + if self._refresh_in_progress: + return + + self._refresh_in_progress = True + try: + supabase_url = os.environ.get("SUPABASE_URL", "") + supabase_key = os.environ.get("SUPABASE_KEY", "") + instance_id = self.args.get("user_id", "") if isinstance(self.args, dict) else "" + + if not supabase_url or not supabase_key or not instance_id: + self.log("Warn: GatewayMQTT: Token refresh skipped — missing env vars or instance_id") + return + + url = f"{supabase_url}/functions/v1/oauth-refresh" + headers = { + "Authorization": f"Bearer {supabase_key}", + "Content-Type": "application/json", + } + payload = { + "instance_id": instance_id, + "provider": "predbat_gateway", + } + + self.log("Info: GatewayMQTT: Refreshing MQTT token") + + timeout = aiohttp.ClientTimeout(total=30) + async with aiohttp.ClientSession(timeout=timeout) as session: + async with session.post(url, headers=headers, json=payload) as response: + if response.status != 200: + self.log(f"Warn: GatewayMQTT: Token refresh HTTP {response.status}") + return + + data = await response.json() + + if data.get("success"): + self.mqtt_token = data["access_token"] + if data.get("expires_at"): + try: + if isinstance(data["expires_at"], (int, float)): + self.mqtt_token_expires_at = float(data["expires_at"]) + else: + dt = datetime.fromisoformat(data["expires_at"].replace("Z", "+00:00")) + self.mqtt_token_expires_at = dt.timestamp() + except (ValueError, AttributeError): + self.mqtt_token_expires_at = 0 + self.log("Info: GatewayMQTT: MQTT token refreshed") + else: + self.log(f"Warn: GatewayMQTT: Token refresh failed: {data.get('error', 'unknown')}") + + except (aiohttp.ClientError, asyncio.TimeoutError) as e: + self.log(f"Warn: GatewayMQTT: Token refresh network error: {e}") + except Exception as e: + self.log(f"Warn: GatewayMQTT: Token refresh error: {e}") + finally: + self._refresh_in_progress = False + + @staticmethod + def decode_telemetry(data): + """Decode protobuf GatewayStatus -> dict of entity_name: value. + + Args: + data: Raw protobuf bytes from /status topic. + + Returns: + Dict mapping entity names to values. Uses first inverter entry. + """ + status = pb.GatewayStatus() + status.ParseFromString(data) + + if len(status.inverters) == 0: + return {} + + inv = status.inverters[0] + entities = {} + + for field_path, entity_name in ENTITY_MAP.items(): + parts = field_path.split(".") + obj = inv + for part in parts: + obj = getattr(obj, part, None) + if obj is None: + break + if obj is not None: + entities[entity_name] = obj + + # EMS aggregate entities (when type is GIVENERGY_EMS) + if inv.type == pb.INVERTER_TYPE_GIVENERGY_EMS and inv.ems.num_inverters > 0: + entities["predbat_gateway_ems_total_soc"] = inv.ems.total_soc + entities["predbat_gateway_ems_total_charge"] = inv.ems.total_charge_w + entities["predbat_gateway_ems_total_discharge"] = inv.ems.total_discharge_w + entities["predbat_gateway_ems_total_grid"] = inv.ems.total_grid_w + entities["predbat_gateway_ems_total_pv"] = inv.ems.total_pv_w + entities["predbat_gateway_ems_total_load"] = inv.ems.total_load_w + + # Per-sub-inverter entities + for idx, sub in enumerate(inv.ems.sub_inverters): + prefix = f"predbat_gateway_sub{idx}" + entities[f"{prefix}_soc"] = sub.soc + entities[f"{prefix}_battery_power"] = sub.battery_w + entities[f"{prefix}_pv_power"] = sub.pv_w + entities[f"{prefix}_grid_power"] = sub.grid_w + entities[f"{prefix}_temp"] = sub.temp_c + + return entities + + @staticmethod + def build_execution_plan(entries, plan_version, timezone): + """Build protobuf ExecutionPlan from a list of plan entry dicts. + + Args: + entries: List of dicts with keys matching PlanEntry fields. + plan_version: Monotonic version number. + timezone: IANA timezone string (e.g. "Europe/London"). + + Returns: + Serialized protobuf bytes. + """ + plan = pb.ExecutionPlan() + plan.timestamp = int(time.time()) + plan.plan_version = plan_version + plan.timezone = timezone + + for entry_dict in entries: + pe = plan.entries.add() + pe.enabled = entry_dict.get("enabled", True) + pe.start_hour = entry_dict.get("start_hour", 0) + pe.start_minute = entry_dict.get("start_minute", 0) + pe.end_hour = entry_dict.get("end_hour", 0) + pe.end_minute = entry_dict.get("end_minute", 0) + pe.mode = entry_dict.get("mode", 0) + pe.power_w = entry_dict.get("power_w", 0) + pe.target_soc = entry_dict.get("target_soc", 0) + pe.days_of_week = entry_dict.get("days_of_week", 0x7F) + pe.use_native = entry_dict.get("use_native", False) + + return plan.SerializeToString() + + @staticmethod + def extract_jwt_expiry(jwt_token): + """Extract the exp claim from a JWT without verifying signature. + + Args: + jwt_token: JWT string (header.payload.signature). + + Returns: + Unix timestamp of expiry, or 0 if parsing fails. + """ + import base64 + + try: + parts = jwt_token.split(".") + if len(parts) != 3: + return 0 + # Add padding + payload_b64 = parts[1] + "=" * (4 - len(parts[1]) % 4) + payload = json.loads(base64.urlsafe_b64decode(payload_b64)) + return payload.get("exp", 0) + except Exception: + return 0 + + @staticmethod + def token_needs_refresh(exp_epoch): + """Check if token should be refreshed (1 hour before expiry). + + Args: + exp_epoch: Unix timestamp of token expiry. + + Returns: + True if token expires within 1 hour. + """ + return (exp_epoch - int(time.time())) < 3600 + + @staticmethod + def build_command(command, **kwargs): + """Build JSON command string for ad-hoc control. + + Args: + command: Command name (set_mode, set_charge_rate, etc.) + **kwargs: Command-specific fields (mode, power_w, target_soc). + + Returns: + JSON string ready to publish to /command topic. + """ + cmd = { + "command": command, + "command_id": str(uuid.uuid4()), + } + + if "mode" in kwargs: + cmd["mode"] = kwargs["mode"] + if "power_w" in kwargs: + cmd["power_w"] = kwargs["power_w"] + if "target_soc" in kwargs: + cmd["target_soc"] = kwargs["target_soc"] + + # Mode commands need expires_at (5-minute deadman) + if command == "set_mode": + cmd["expires_at"] = int(time.time()) + 300 + + return json.dumps(cmd) diff --git a/apps/predbat/proto/__init__.py b/apps/predbat/proto/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/apps/predbat/proto/gateway_status.proto b/apps/predbat/proto/gateway_status.proto new file mode 100644 index 000000000..538147812 --- /dev/null +++ b/apps/predbat/proto/gateway_status.proto @@ -0,0 +1,151 @@ +syntax = "proto3"; +package predbat; + +// Gateway Status Protocol Buffer Schema +// +// Schema version: 1 +// This is the canonical contract for ESP32 gateway → cloud MQTT status payloads. +// Proto3 zero-value omission means unpopulated future fields cost nothing on the wire. + +// Mirrors InverterType enum in types.h (values must match 1:1 for safe casting) +enum InverterType { + INVERTER_TYPE_UNKNOWN = 0; + INVERTER_TYPE_SOLIS_HYBRID = 1; + INVERTER_TYPE_SOLIS_AC = 2; + INVERTER_TYPE_SOFAR_G3 = 3; + INVERTER_TYPE_GROWATT_SPH = 4; + INVERTER_TYPE_DEYE_SUNSYNK = 5; + INVERTER_TYPE_GIVENERGY = 6; + INVERTER_TYPE_GIVENERGY_EMS = 7; + INVERTER_TYPE_CUSTOM = 8; +} + +message BatteryStatus { + uint32 soc_percent = 1; // 0-100 + float voltage_v = 2; + float current_a = 3; // positive = charging + int32 power_w = 4; // positive = charging + float temperature_c = 5; + // Future: SunSpec 802-inspired fields (populated when available) + uint32 soh_percent = 6; // state of health, 0 = not available + uint32 cycle_count = 7; + int32 capacity_wh = 8; // rated capacity + uint32 rate_max_w = 9; // max charge/discharge rate in W +} + +message PvStatus { + int32 power_w = 1; +} + +message GridStatus { + int32 power_w = 1; // positive = importing from grid + // Future: grid quality fields + float voltage_v = 2; // 0 = not available + float frequency_hz = 3; +} + +message LoadStatus { + int32 power_w = 1; // house consumption +} + +message InverterData { + int32 active_power_w = 1; + float temperature_c = 2; +} + +message ControlStatus { + uint32 mode = 1; // OperatingMode enum (0=auto,1=charge,2=discharge,3=idle) + bool charge_enabled = 2; + bool discharge_enabled = 3; + uint32 charge_rate_w = 4; + uint32 discharge_rate_w = 5; + uint32 reserve_soc = 6; // min SOC % + uint32 target_soc = 7; // charge target SOC % + uint32 force_power_w = 8; + uint32 command_expires = 9; // unix timestamp +} + +message ScheduleStatus { + uint32 charge_start = 1; // HHMM format + uint32 charge_end = 2; + uint32 discharge_start = 3; + uint32 discharge_end = 4; +} + +message EmsSubInverter { + uint32 soc = 1; + int32 battery_w = 2; + int32 pv_w = 3; + int32 grid_w = 4; + float temp_c = 5; +} + +message EmsStatus { + uint32 num_inverters = 1; + uint32 total_soc = 2; + int32 total_charge_w = 3; + int32 total_discharge_w = 4; + int32 total_grid_w = 5; + int32 total_pv_w = 6; + int32 total_load_w = 7; + repeated EmsSubInverter sub_inverters = 8; +} + +// Daily energy counters (Wh, reset at midnight, populated from extended poll tier) +message EnergyCounters { + uint32 pv_today_wh = 1; + uint32 grid_import_today_wh = 2; + uint32 grid_export_today_wh = 3; + uint32 battery_charge_today_wh = 4; + uint32 battery_discharge_today_wh = 5; + uint32 consumption_today_wh = 6; +} + +message InverterEntry { + InverterType type = 1; + string serial = 2; + string ip = 3; + bool connected = 4; + bool active = 5; + BatteryStatus battery = 6; + PvStatus pv = 7; + GridStatus grid = 8; + LoadStatus load = 9; + InverterData inverter = 10; + ControlStatus control = 11; + ScheduleStatus schedule = 12; + EmsStatus ems = 13; + EnergyCounters energy = 14; // daily energy counters (populated from extended poll tier) +} + +message GatewayStatus { + string device_id = 1; + uint32 dongle_count = 2; + string firmware = 3; + uint32 timestamp = 4; // unix timestamp + uint32 schema_version = 5; // currently 1 + repeated InverterEntry inverters = 6; +} + +// Execution plan sent from PredBat cloud to gateway +// Published to predbat/devices/{id}/schedule as protobuf + +message PlanEntry { + bool enabled = 1; + uint32 start_hour = 2; // 0-23 (local time, per timezone field) + uint32 start_minute = 3; // 0-59 + uint32 end_hour = 4; // 0-23 + uint32 end_minute = 5; // 0-59 + uint32 mode = 6; // OperatingMode: 0=auto, 1=charge, 2=discharge, 3=idle + uint32 power_w = 7; // target power + uint32 target_soc = 8; // target SOC for charge, min SOC for discharge + uint32 days_of_week = 9; // bitmask: bit 0 = Sunday, bit 6 = Saturday + bool use_native = 10; // true = write to inverter schedule registers +} + +message ExecutionPlan { + uint32 timestamp = 1; // when plan was generated (unix epoch) + uint32 plan_version = 2; // monotonic, gateway skips stale plans + string timezone = 3; // IANA timezone (e.g. "Europe/London") + repeated PlanEntry entries = 4; +} diff --git a/apps/predbat/proto/gateway_status_pb2.py b/apps/predbat/proto/gateway_status_pb2.py new file mode 100644 index 000000000..5c6cdb53d --- /dev/null +++ b/apps/predbat/proto/gateway_status_pb2.py @@ -0,0 +1,58 @@ +# -*- coding: utf-8 -*- +# Generated by the protocol buffer compiler. DO NOT EDIT! +# NO CHECKED-IN PROTOBUF GENCODE +# source: gateway_status.proto +# Protobuf Python Version: 6.33.4 +"""Generated protocol buffer code.""" +from google.protobuf import descriptor as _descriptor +from google.protobuf import descriptor_pool as _descriptor_pool +from google.protobuf import runtime_version as _runtime_version +from google.protobuf import symbol_database as _symbol_database +from google.protobuf.internal import builder as _builder + +_runtime_version.ValidateProtobufRuntimeVersion(_runtime_version.Domain.PUBLIC, 6, 33, 4, "", "gateway_status.proto") +# @@protoc_insertion_point(imports) + +_sym_db = _symbol_database.Default() + + +DESCRIPTOR = _descriptor_pool.Default().AddSerializedFile( + b'\n\x14gateway_status.proto\x12\x07predbat"\xc5\x01\n\rBatteryStatus\x12\x13\n\x0bsoc_percent\x18\x01 \x01(\r\x12\x11\n\tvoltage_v\x18\x02 \x01(\x02\x12\x11\n\tcurrent_a\x18\x03 \x01(\x02\x12\x0f\n\x07power_w\x18\x04 \x01(\x05\x12\x15\n\rtemperature_c\x18\x05 \x01(\x02\x12\x13\n\x0bsoh_percent\x18\x06 \x01(\r\x12\x13\n\x0b\x63ycle_count\x18\x07 \x01(\r\x12\x13\n\x0b\x63\x61pacity_wh\x18\x08 \x01(\x05\x12\x12\n\nrate_max_w\x18\t \x01(\r"\x1b\n\x08PvStatus\x12\x0f\n\x07power_w\x18\x01 \x01(\x05"F\n\nGridStatus\x12\x0f\n\x07power_w\x18\x01 \x01(\x05\x12\x11\n\tvoltage_v\x18\x02 \x01(\x02\x12\x14\n\x0c\x66requency_hz\x18\x03 \x01(\x02"\x1d\n\nLoadStatus\x12\x0f\n\x07power_w\x18\x01 \x01(\x05"=\n\x0cInverterData\x12\x16\n\x0e\x61\x63tive_power_w\x18\x01 \x01(\x05\x12\x15\n\rtemperature_c\x18\x02 \x01(\x02"\xda\x01\n\rControlStatus\x12\x0c\n\x04mode\x18\x01 \x01(\r\x12\x16\n\x0e\x63harge_enabled\x18\x02 \x01(\x08\x12\x19\n\x11\x64ischarge_enabled\x18\x03 \x01(\x08\x12\x15\n\rcharge_rate_w\x18\x04 \x01(\r\x12\x18\n\x10\x64ischarge_rate_w\x18\x05 \x01(\r\x12\x13\n\x0breserve_soc\x18\x06 \x01(\r\x12\x12\n\ntarget_soc\x18\x07 \x01(\r\x12\x15\n\rforce_power_w\x18\x08 \x01(\r\x12\x17\n\x0f\x63ommand_expires\x18\t \x01(\r"j\n\x0eScheduleStatus\x12\x14\n\x0c\x63harge_start\x18\x01 \x01(\r\x12\x12\n\ncharge_end\x18\x02 \x01(\r\x12\x17\n\x0f\x64ischarge_start\x18\x03 \x01(\r\x12\x15\n\rdischarge_end\x18\x04 \x01(\r"^\n\x0e\x45msSubInverter\x12\x0b\n\x03soc\x18\x01 \x01(\r\x12\x11\n\tbattery_w\x18\x02 \x01(\x05\x12\x0c\n\x04pv_w\x18\x03 \x01(\x05\x12\x0e\n\x06grid_w\x18\x04 \x01(\x05\x12\x0e\n\x06temp_c\x18\x05 \x01(\x02"\xd8\x01\n\tEmsStatus\x12\x15\n\rnum_inverters\x18\x01 \x01(\r\x12\x11\n\ttotal_soc\x18\x02 \x01(\r\x12\x16\n\x0etotal_charge_w\x18\x03 \x01(\x05\x12\x19\n\x11total_discharge_w\x18\x04 \x01(\x05\x12\x14\n\x0ctotal_grid_w\x18\x05 \x01(\x05\x12\x12\n\ntotal_pv_w\x18\x06 \x01(\x05\x12\x14\n\x0ctotal_load_w\x18\x07 \x01(\x05\x12.\n\rsub_inverters\x18\x08 \x03(\x0b\x32\x17.predbat.EmsSubInverter"\xc4\x01\n\x0e\x45nergyCounters\x12\x13\n\x0bpv_today_wh\x18\x01 \x01(\r\x12\x1c\n\x14grid_import_today_wh\x18\x02 \x01(\r\x12\x1c\n\x14grid_export_today_wh\x18\x03 \x01(\r\x12\x1f\n\x17\x62\x61ttery_charge_today_wh\x18\x04 \x01(\r\x12"\n\x1a\x62\x61ttery_discharge_today_wh\x18\x05 \x01(\r\x12\x1c\n\x14\x63onsumption_today_wh\x18\x06 \x01(\r"\xc8\x03\n\rInverterEntry\x12#\n\x04type\x18\x01 \x01(\x0e\x32\x15.predbat.InverterType\x12\x0e\n\x06serial\x18\x02 \x01(\t\x12\n\n\x02ip\x18\x03 \x01(\t\x12\x11\n\tconnected\x18\x04 \x01(\x08\x12\x0e\n\x06\x61\x63tive\x18\x05 \x01(\x08\x12\'\n\x07\x62\x61ttery\x18\x06 \x01(\x0b\x32\x16.predbat.BatteryStatus\x12\x1d\n\x02pv\x18\x07 \x01(\x0b\x32\x11.predbat.PvStatus\x12!\n\x04grid\x18\x08 \x01(\x0b\x32\x13.predbat.GridStatus\x12!\n\x04load\x18\t \x01(\x0b\x32\x13.predbat.LoadStatus\x12\'\n\x08inverter\x18\n \x01(\x0b\x32\x15.predbat.InverterData\x12\'\n\x07\x63ontrol\x18\x0b \x01(\x0b\x32\x16.predbat.ControlStatus\x12)\n\x08schedule\x18\x0c \x01(\x0b\x32\x17.predbat.ScheduleStatus\x12\x1f\n\x03\x65ms\x18\r \x01(\x0b\x32\x12.predbat.EmsStatus\x12\'\n\x06\x65nergy\x18\x0e \x01(\x0b\x32\x17.predbat.EnergyCounters"\xa0\x01\n\rGatewayStatus\x12\x11\n\tdevice_id\x18\x01 \x01(\t\x12\x14\n\x0c\x64ongle_count\x18\x02 \x01(\r\x12\x10\n\x08\x66irmware\x18\x03 \x01(\t\x12\x11\n\ttimestamp\x18\x04 \x01(\r\x12\x16\n\x0eschema_version\x18\x05 \x01(\r\x12)\n\tinverters\x18\x06 \x03(\x0b\x32\x16.predbat.InverterEntry"\xc9\x01\n\tPlanEntry\x12\x0f\n\x07\x65nabled\x18\x01 \x01(\x08\x12\x12\n\nstart_hour\x18\x02 \x01(\r\x12\x14\n\x0cstart_minute\x18\x03 \x01(\r\x12\x10\n\x08\x65nd_hour\x18\x04 \x01(\r\x12\x12\n\nend_minute\x18\x05 \x01(\r\x12\x0c\n\x04mode\x18\x06 \x01(\r\x12\x0f\n\x07power_w\x18\x07 \x01(\r\x12\x12\n\ntarget_soc\x18\x08 \x01(\r\x12\x14\n\x0c\x64\x61ys_of_week\x18\t \x01(\r\x12\x12\n\nuse_native\x18\n \x01(\x08"o\n\rExecutionPlan\x12\x11\n\ttimestamp\x18\x01 \x01(\r\x12\x14\n\x0cplan_version\x18\x02 \x01(\r\x12\x10\n\x08timezone\x18\x03 \x01(\t\x12#\n\x07\x65ntries\x18\x04 \x03(\x0b\x32\x12.predbat.PlanEntry*\x98\x02\n\x0cInverterType\x12\x19\n\x15INVERTER_TYPE_UNKNOWN\x10\x00\x12\x1e\n\x1aINVERTER_TYPE_SOLIS_HYBRID\x10\x01\x12\x1a\n\x16INVERTER_TYPE_SOLIS_AC\x10\x02\x12\x1a\n\x16INVERTER_TYPE_SOFAR_G3\x10\x03\x12\x1d\n\x19INVERTER_TYPE_GROWATT_SPH\x10\x04\x12\x1e\n\x1aINVERTER_TYPE_DEYE_SUNSYNK\x10\x05\x12\x1b\n\x17INVERTER_TYPE_GIVENERGY\x10\x06\x12\x1f\n\x1bINVERTER_TYPE_GIVENERGY_EMS\x10\x07\x12\x18\n\x14INVERTER_TYPE_CUSTOM\x10\x08\x62\x06proto3' +) + +_globals = globals() +_builder.BuildMessageAndEnumDescriptors(DESCRIPTOR, _globals) +_builder.BuildTopDescriptorsAndMessages(DESCRIPTOR, "gateway_status_pb2", _globals) +if not _descriptor._USE_C_DESCRIPTORS: + DESCRIPTOR._loaded_options = None + _globals["_INVERTERTYPE"]._serialized_start = 2211 + _globals["_INVERTERTYPE"]._serialized_end = 2491 + _globals["_BATTERYSTATUS"]._serialized_start = 34 + _globals["_BATTERYSTATUS"]._serialized_end = 231 + _globals["_PVSTATUS"]._serialized_start = 233 + _globals["_PVSTATUS"]._serialized_end = 260 + _globals["_GRIDSTATUS"]._serialized_start = 262 + _globals["_GRIDSTATUS"]._serialized_end = 332 + _globals["_LOADSTATUS"]._serialized_start = 334 + _globals["_LOADSTATUS"]._serialized_end = 363 + _globals["_INVERTERDATA"]._serialized_start = 365 + _globals["_INVERTERDATA"]._serialized_end = 426 + _globals["_CONTROLSTATUS"]._serialized_start = 429 + _globals["_CONTROLSTATUS"]._serialized_end = 647 + _globals["_SCHEDULESTATUS"]._serialized_start = 649 + _globals["_SCHEDULESTATUS"]._serialized_end = 755 + _globals["_EMSSUBINVERTER"]._serialized_start = 757 + _globals["_EMSSUBINVERTER"]._serialized_end = 851 + _globals["_EMSSTATUS"]._serialized_start = 854 + _globals["_EMSSTATUS"]._serialized_end = 1070 + _globals["_ENERGYCOUNTERS"]._serialized_start = 1073 + _globals["_ENERGYCOUNTERS"]._serialized_end = 1269 + _globals["_INVERTERENTRY"]._serialized_start = 1272 + _globals["_INVERTERENTRY"]._serialized_end = 1728 + _globals["_GATEWAYSTATUS"]._serialized_start = 1731 + _globals["_GATEWAYSTATUS"]._serialized_end = 1891 + _globals["_PLANENTRY"]._serialized_start = 1894 + _globals["_PLANENTRY"]._serialized_end = 2095 + _globals["_EXECUTIONPLAN"]._serialized_start = 2097 + _globals["_EXECUTIONPLAN"]._serialized_end = 2208 +# @@protoc_insertion_point(module_scope) diff --git a/apps/predbat/tests/test_gateway.py b/apps/predbat/tests/test_gateway.py new file mode 100644 index 000000000..94285abdb --- /dev/null +++ b/apps/predbat/tests/test_gateway.py @@ -0,0 +1,329 @@ +"""Tests for GatewayMQTT component.""" +import pytest +import sys +import os + +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..")) + +from proto import gateway_status_pb2 as pb + +import importlib.util + +HAS_AIOMQTT = importlib.util.find_spec("aiomqtt") is not None + + +class TestProtobufDecode: + """Test protobuf telemetry → entity mapping.""" + + def _make_status(self, soc=50, battery_power=1000, pv_power=2000, grid_power=-500, load_power=1500, mode=0): + status = pb.GatewayStatus() + status.device_id = "pbgw_test123" + status.firmware = "0.4.5" + status.timestamp = 1741789200 + status.schema_version = 1 + status.dongle_count = 1 + + inv = status.inverters.add() + inv.type = pb.INVERTER_TYPE_GIVENERGY + inv.serial = "CE1234G567" + inv.ip = "192.168.1.100" + inv.connected = True + inv.active = True + + inv.battery.soc_percent = soc + inv.battery.power_w = battery_power + inv.battery.voltage_v = 51.2 + inv.battery.current_a = 19.5 + inv.battery.temperature_c = 22.5 + inv.battery.soh_percent = 98 + inv.battery.cycle_count = 150 + inv.battery.capacity_wh = 9500 + + inv.pv.power_w = pv_power + inv.grid.power_w = grid_power + inv.grid.voltage_v = 242.5 + inv.grid.frequency_hz = 50.01 + inv.load.power_w = load_power + + inv.inverter.active_power_w = 1800 + inv.inverter.temperature_c = 35.0 + + inv.control.mode = mode + inv.control.charge_enabled = True + inv.control.discharge_enabled = True + inv.control.charge_rate_w = 3000 + inv.control.discharge_rate_w = 3000 + inv.control.reserve_soc = 4 + inv.control.target_soc = 100 + + inv.schedule.charge_start = 130 + inv.schedule.charge_end = 430 + inv.schedule.discharge_start = 1600 + inv.schedule.discharge_end = 1900 + + return status + + def test_serialize_deserialize_roundtrip(self): + original = self._make_status(soc=75, battery_power=2000) + data = original.SerializeToString() + decoded = pb.GatewayStatus() + decoded.ParseFromString(data) + + assert decoded.device_id == "pbgw_test123" + assert decoded.inverters[0].battery.soc_percent == 75 + assert decoded.inverters[0].battery.power_w == 2000 + assert decoded.inverters[0].pv.power_w == 2000 + assert decoded.inverters[0].grid.power_w == -500 + assert decoded.inverters[0].grid.voltage_v == pytest.approx(242.5, abs=0.1) + assert decoded.inverters[0].control.charge_enabled is True + assert decoded.inverters[0].battery.soh_percent == 98 + + def test_entity_mapping(self): + from gateway import GatewayMQTT + + status = self._make_status() + data = status.SerializeToString() + + entities = GatewayMQTT.decode_telemetry(data) + + assert entities["predbat_gateway_soc"] == 50 + assert entities["predbat_gateway_battery_power"] == 1000 + assert entities["predbat_gateway_pv_power"] == 2000 + assert entities["predbat_gateway_grid_power"] == -500 + assert entities["predbat_gateway_load_power"] == 1500 + assert entities["predbat_gateway_battery_voltage"] == pytest.approx(51.2, abs=0.1) + assert entities["predbat_gateway_battery_current"] == pytest.approx(19.5, abs=0.1) + assert entities["predbat_gateway_battery_temp"] == pytest.approx(22.5, abs=0.1) + assert entities["predbat_gateway_battery_soh"] == 98 + assert entities["predbat_gateway_battery_cycles"] == 150 + assert entities["predbat_gateway_battery_capacity"] == 9500 + assert entities["predbat_gateway_grid_voltage"] == pytest.approx(242.5, abs=0.1) + assert entities["predbat_gateway_grid_frequency"] == pytest.approx(50.01, abs=0.01) + assert entities["predbat_gateway_inverter_power"] == 1800 + assert entities["predbat_gateway_inverter_temp"] == pytest.approx(35.0, abs=0.1) + assert entities["predbat_gateway_mode"] == 0 + assert entities["predbat_gateway_charge_enabled"] is True + assert entities["predbat_gateway_discharge_enabled"] is True + assert entities["predbat_gateway_charge_rate"] == 3000 + assert entities["predbat_gateway_discharge_rate"] == 3000 + assert entities["predbat_gateway_reserve"] == 4 + assert entities["predbat_gateway_target_soc"] == 100 + assert entities["predbat_gateway_charge_start"] == 130 + assert entities["predbat_gateway_charge_end"] == 430 + assert entities["predbat_gateway_discharge_start"] == 1600 + assert entities["predbat_gateway_discharge_end"] == 1900 + + +class TestPlanSerialization: + def test_plan_roundtrip(self): + from gateway import GatewayMQTT + + plan_entries = [ + { + "enabled": True, + "start_hour": 1, + "start_minute": 30, + "end_hour": 4, + "end_minute": 30, + "mode": 1, + "power_w": 3000, + "target_soc": 100, + "days_of_week": 0x7F, + "use_native": True, + }, + { + "enabled": True, + "start_hour": 16, + "start_minute": 0, + "end_hour": 19, + "end_minute": 0, + "mode": 2, + "power_w": 2500, + "target_soc": 10, + "days_of_week": 0x7F, + "use_native": False, + }, + ] + + data = GatewayMQTT.build_execution_plan(plan_entries, plan_version=42, timezone="Europe/London") + + plan = pb.ExecutionPlan() + plan.ParseFromString(data) + + assert plan.plan_version == 42 + assert plan.timezone == "Europe/London" + assert len(plan.entries) == 2 + assert plan.entries[0].start_hour == 1 + assert plan.entries[0].start_minute == 30 + assert plan.entries[0].mode == 1 + assert plan.entries[0].use_native is True + assert plan.entries[1].mode == 2 + assert plan.entries[1].use_native is False + + def test_empty_plan(self): + from gateway import GatewayMQTT + + data = GatewayMQTT.build_execution_plan([], plan_version=1, timezone="UTC") + plan = pb.ExecutionPlan() + plan.ParseFromString(data) + assert len(plan.entries) == 0 + assert plan.plan_version == 1 + + +class TestCommandFormat: + def test_set_mode_command(self): + from gateway import GatewayMQTT + + cmd = GatewayMQTT.build_command("set_mode", mode=1, power_w=3000, target_soc=100) + import json + + parsed = json.loads(cmd) + assert parsed["command"] == "set_mode" + assert parsed["mode"] == 1 + assert parsed["power_w"] == 3000 + assert parsed["target_soc"] == 100 + assert "command_id" in parsed + assert "expires_at" in parsed + import time + + assert abs(parsed["expires_at"] - int(time.time())) < 310 + + def test_set_charge_rate_command(self): + from gateway import GatewayMQTT + + cmd = GatewayMQTT.build_command("set_charge_rate", power_w=2500) + import json + + parsed = json.loads(cmd) + assert parsed["command"] == "set_charge_rate" + assert parsed["power_w"] == 2500 + + def test_set_reserve_command(self): + from gateway import GatewayMQTT + + cmd = GatewayMQTT.build_command("set_reserve", target_soc=10) + import json + + parsed = json.loads(cmd) + assert parsed["command"] == "set_reserve" + assert parsed["target_soc"] == 10 + + +class TestEMSEntities: + def test_ems_aggregate_entities(self): + """EMS type produces aggregate entities.""" + status = pb.GatewayStatus() + status.device_id = "pbgw_ems_test" + status.timestamp = 1741789200 + status.schema_version = 1 + status.dongle_count = 1 + + inv = status.inverters.add() + inv.type = pb.INVERTER_TYPE_GIVENERGY_EMS + inv.serial = "EM1234" + inv.connected = True + inv.active = True + + inv.ems.num_inverters = 2 + inv.ems.total_soc = 60 + inv.ems.total_charge_w = 3000 + inv.ems.total_pv_w = 5000 + inv.ems.total_grid_w = -1000 + inv.ems.total_load_w = 4000 + + sub0 = inv.ems.sub_inverters.add() + sub0.soc = 55 + sub0.battery_w = 1500 + sub0.pv_w = 2500 + sub1 = inv.ems.sub_inverters.add() + sub1.soc = 65 + sub1.battery_w = 1500 + sub1.pv_w = 2500 + + from gateway import GatewayMQTT + + entities = GatewayMQTT.decode_telemetry(status.SerializeToString()) + + # EMS aggregate entities + assert entities.get("predbat_gateway_ems_total_soc") == 60 + assert entities.get("predbat_gateway_ems_total_pv") == 5000 + assert entities.get("predbat_gateway_ems_total_load") == 4000 + # Per-sub-inverter + assert entities.get("predbat_gateway_sub0_soc") == 55 + assert entities.get("predbat_gateway_sub1_soc") == 65 + assert entities.get("predbat_gateway_sub0_battery_power") == 1500 + + +class TestTokenRefresh: + def test_jwt_expiry_extraction(self): + """Extract exp claim from a JWT without verification.""" + from gateway import GatewayMQTT + import base64 + import json as json_mod + + # Build a fake JWT with exp claim + header = base64.urlsafe_b64encode(json_mod.dumps({"alg": "RS256"}).encode()).rstrip(b"=") + payload = base64.urlsafe_b64encode(json_mod.dumps({"exp": 1741789200, "sub": "test"}).encode()).rstrip(b"=") + fake_jwt = f"{header.decode()}.{payload.decode()}.fake_signature" + + exp = GatewayMQTT.extract_jwt_expiry(fake_jwt) + assert exp == 1741789200 + + def test_jwt_expiry_invalid_token(self): + """Invalid JWT returns 0.""" + from gateway import GatewayMQTT + + assert GatewayMQTT.extract_jwt_expiry("not-a-jwt") == 0 + assert GatewayMQTT.extract_jwt_expiry("") == 0 + + def test_token_needs_refresh(self): + """Token should be refreshed 1 hour before expiry.""" + from gateway import GatewayMQTT + import time as time_mod + + # Token expiring in 30 minutes — needs refresh + exp_soon = int(time_mod.time()) + 1800 + assert GatewayMQTT.token_needs_refresh(exp_soon) is True + + # Token expiring in 2 hours — does not need refresh + exp_later = int(time_mod.time()) + 7200 + assert GatewayMQTT.token_needs_refresh(exp_later) is False + + +class TestMQTTIntegration: + """Integration tests for MQTT plan publishing format.""" + + @pytest.mark.skipif(not HAS_AIOMQTT, reason="aiomqtt not installed") + def test_plan_publish_format(self): + """Plan published to /schedule topic is valid protobuf.""" + from gateway import GatewayMQTT + + entries = [ + { + "enabled": True, + "start_hour": 1, + "start_minute": 30, + "end_hour": 4, + "end_minute": 30, + "mode": 1, + "power_w": 3000, + "target_soc": 100, + "days_of_week": 0x7F, + "use_native": True, + } + ] + + data = GatewayMQTT.build_execution_plan(entries, plan_version=1, timezone="Europe/London") + + # Verify the protobuf is valid and can be decoded + plan = pb.ExecutionPlan() + plan.ParseFromString(data) + assert plan.entries[0].start_hour == 1 + assert plan.entries[0].use_native is True + assert plan.timezone == "Europe/London" + + # Verify plan_version is monotonically increasing + data2 = GatewayMQTT.build_execution_plan(entries, plan_version=2, timezone="Europe/London") + plan2 = pb.ExecutionPlan() + plan2.ParseFromString(data2) + assert plan2.plan_version > plan.plan_version