From 7fc23e459685176e37c7b3697c28b948ae692cb7 Mon Sep 17 00:00:00 2001 From: Mark Gascoyne Date: Wed, 11 Mar 2026 12:21:24 +0000 Subject: [PATCH 1/5] feat: add managed VPP mode to Axle Energy integration Extend the Axle Energy component to support two modes: - BYOK (existing): User's own API key with /vpp/home-assistant/event - Managed VPP (new): Partner credentials with /entities/site/{id}/price-curve Managed mode authenticates via partner API (OAuth token cached 50min), fetches half-hourly wholesale prices (GBP/MWh), converts to p/kWh, and creates both export and import sessions per timeslot. Key changes: - axle.py: Add managed mode init params, _get_partner_token(), _fetch_managed_price_curve(), _process_price_curve() methods. Split fetch_axle_event() into dispatcher with _fetch_byok_event(). Add allow_future param to add_event_to_history() for price curves. Dedup events by start_time AND import_export direction. Add 5xx retry with backoff, fail immediately on 4xx. - components.py: Change api_key to required=False, add managed mode args, add required_or=["api_key", "managed_mode"] to prevent activation without config. - test_axle.py: Add 6 managed mode tests (init, price curve, dedup, future events, publish attrs, disabled without creds). Co-Authored-By: Claude Opus 4.6 --- .cspell/custom-dictionary-workspace.txt | 3 + apps/predbat/axle.py | 258 ++++++++++++++++++++++-- apps/predbat/components.py | 8 +- apps/predbat/tests/test_axle.py | 237 ++++++++++++++++++++++ 4 files changed, 487 insertions(+), 19 deletions(-) diff --git a/.cspell/custom-dictionary-workspace.txt b/.cspell/custom-dictionary-workspace.txt index 217577ac0..6742bdf0c 100644 --- a/.cspell/custom-dictionary-workspace.txt +++ b/.cspell/custom-dictionary-workspace.txt @@ -36,6 +36,8 @@ batteryb beforeunload bierner brickatius +byok +BYOK calib Cantarell cexxxx @@ -53,6 +55,7 @@ compareform configform Consolas coro +creds crosscharge customisation Customise diff --git a/apps/predbat/axle.py b/apps/predbat/axle.py index 2d93dda03..50d6ad026 100644 --- a/apps/predbat/axle.py +++ b/apps/predbat/axle.py @@ -14,9 +14,13 @@ Integrates with the Axle Energy API to receive and process VPP events (import/export commands) with event history tracking and binary sensor publishing for demand response participation. + +Supports two modes: +- BYOK (Bring Your Own Key): User's own API key with /vpp/home-assistant/event endpoint. +- Managed VPP: Partner credentials with /entities/site/{site_id}/price-curve endpoint. """ -from datetime import datetime, timedelta +from datetime import datetime, timedelta, timezone import asyncio import aiohttp from component_base import ComponentBase @@ -25,11 +29,23 @@ class AxleAPI(ComponentBase): - """Axle Energy VPP client.""" + """Axle Energy VPP client with managed mode support.""" + + def initialize(self, api_key, pence_per_kwh, automatic, managed_mode=False, site_id=None, partner_username=None, partner_password=None, api_base_url="https://api.axle.energy"): + """Initialize the AxleAPI component. - def initialize(self, api_key, pence_per_kwh, automatic): - """Initialize the AxleAPI component""" - if not isinstance(api_key, str) or not api_key: + Args: + api_key: BYOK API key (used in BYOK mode only) + pence_per_kwh: VPP compensation rate in pence per kWh + automatic: Whether to auto-configure entity mappings + managed_mode: If True, use partner API price curve instead of BYOK event endpoint + site_id: Axle site ID (required for managed mode) + partner_username: Partner API username (required for managed mode) + partner_password: Partner API password (required for managed mode) + api_base_url: Axle API base URL + """ + # Validate api_key type (common YAML misconfiguration: list instead of string) + if api_key is not None and (not isinstance(api_key, str) or not api_key): self.log("Error: AxleAPI: axle_api_key is missing or invalid, you must set it to a string (not a list or number). Axle Energy integration will not function correctly.") api_key = None self.api_key = api_key @@ -46,6 +62,29 @@ def initialize(self, api_key, pence_per_kwh, automatic): } self.updated_at = None # Last updated moved out to separate attribute to not pollute triggering on change of current_event + # Managed mode config + self.managed_mode = managed_mode + self.site_id = site_id + self.partner_username = partner_username + self.partner_password = partner_password + self.api_base_url = api_base_url.rstrip("/") if api_base_url else "https://api.axle.energy" + self.partner_token = None + self.partner_token_expiry = None + + if self.managed_mode: + missing = [] + if not self.site_id: + missing.append("site_id") + if not self.partner_username: + missing.append("partner_username") + if not self.partner_password: + missing.append("partner_password") + if missing: + self.log(f"Error: AxleAPI: Managed mode missing required params: {', '.join(missing)}") + self.managed_mode = False # Fall back to disabled + elif not self.api_key: + self.log("Warn: AxleAPI: BYOK mode requires api_key — Axle integration disabled") + def load_event_history(self): """ Load event history from the sensor on startup. @@ -109,10 +148,13 @@ def cleanup_event_history(self): if removed > 0: self.log(f"AxleAPI: Cleaned up {removed} events older than 7 days") - def add_event_to_history(self, event_data): - """ - Add an event to history as soon as it starts (becomes active). - Once an event starts, it won't change and should be recorded. + def add_event_to_history(self, event_data, allow_future=False): + """Add an event to history. + + Args: + event_data: Event dict with start_time, end_time, import_export, pence_per_kwh. + allow_future: If True, accept future events (used by managed mode price curves). + BYOK mode only adds events once they become active. """ start_time_str = event_data.get("start_time") end_time_str = event_data.get("end_time") @@ -127,14 +169,14 @@ def add_event_to_history(self, event_data): self.log(f"Warn: Unable to parse start_time for history: {start_time_str}") return - # Only add if event has started (not future events) - # Once event starts, add it to history even if still active - if start_time > self.now_utc: + # Only add if event has started (not future events), unless allow_future + if not allow_future and start_time > self.now_utc: return - # Check if event already exists in history (by start_time string) + # Check if event already exists in history (by start_time and import_export direction) + # Managed mode creates both export and import per timeslot, so dedup must check direction for existing_event in self.event_history: - if existing_event.get("start_time") == start_time_str: + if existing_event.get("start_time") == start_time_str and existing_event.get("import_export") == event_data.get("import_export"): # Update existing event in case any details changed existing_event.update(event_data) return @@ -147,6 +189,9 @@ async def _request_with_retry(self, url, headers, max_retries=3): """ Perform HTTP GET request with retry logic, check status code, and decode JSON + Retries on connection errors, timeouts, and 5xx server errors. + Returns None immediately on 4xx client errors (no retry). + Args: url: URL to request headers: Request headers @@ -170,9 +215,20 @@ async def _request_with_retry(self, url, headers, max_retries=3): self.log(f"Warn: AxleAPI: Failed to parse JSON response: {e}") record_api_call("axle", False, "decode_error") return None + elif response.status >= 500: + # Server error — retry with backoff + record_api_call("axle", False, "server_error") + if attempt < max_retries - 1: + sleep_time = 2**attempt + self.log(f"Warn: AxleAPI: Server error {response.status}, retrying in {sleep_time}s...") + await asyncio.sleep(sleep_time) + else: + self.log(f"Warn: AxleAPI: Server error {response.status} after {max_retries} attempts") + return None else: + # Client error (4xx) — no retry self.log(f"Warn: AxleAPI: Failed to fetch data, status code {response.status}") - record_api_call("axle", False, "server_error") + record_api_call("axle", False, "client_error") return None except (aiohttp.ClientError, asyncio.TimeoutError) as e: if attempt < max_retries - 1: @@ -185,10 +241,55 @@ async def _request_with_retry(self, url, headers, max_retries=3): return None return None + async def _get_partner_token(self): + """Authenticate with Axle partner API and cache the token.""" + if self.partner_token and self.partner_token_expiry and self.partner_token_expiry > datetime.now(timezone.utc): + return self.partner_token + + url = f"{self.api_base_url}/auth/token-form" + timeout = aiohttp.ClientTimeout(total=30) + + try: + async with aiohttp.ClientSession(timeout=timeout) as session: + async with session.post( + url, + data={ + "grant_type": "password", + "username": self.partner_username, + "password": self.partner_password, + }, + headers={"Content-Type": "application/x-www-form-urlencoded"}, + ) as response: + if response.status == 200: + data = await response.json() + self.partner_token = data.get("access_token") + # Cache for 50 minutes (tokens typically last 60 min) + self.partner_token_expiry = datetime.now(timezone.utc) + timedelta(minutes=50) + self.log("AxleAPI: Partner token obtained successfully") + record_api_call("axle") + return self.partner_token + else: + self.log(f"Warn: AxleAPI: Partner auth failed (status {response.status})") + record_api_call("axle", False, "auth_failed") + return None + except Exception as e: + self.log(f"Warn: AxleAPI: Partner auth exception: {e}") + record_api_call("axle", False, "auth_error") + return None + async def fetch_axle_event(self): + """Fetch the latest VPP event from Axle Energy API. + + In BYOK mode: GET /vpp/home-assistant/event with user's API key. + In managed mode: GET /entities/site/{site_id}/price-curve with partner token. """ - Fetch the latest VPP event from Axle Energy API - """ + if self.managed_mode: + await self._fetch_managed_price_curve() + else: + await self._fetch_byok_event() + + async def _fetch_byok_event(self): + """Original BYOK event fetch logic.""" if not self.api_key: self.log("Error: AxleAPI: Cannot fetch event - axle_api_key is not set or invalid. Please check your apps.yaml configuration.") self.failures_total += 1 @@ -196,7 +297,7 @@ async def fetch_axle_event(self): self.log("AxleAPI: Fetching latest VPP event data") - url = "https://api.axle.energy/vpp/home-assistant/event" + url = f"{self.api_base_url}/vpp/home-assistant/event" headers = {"Authorization": f"Bearer {self.api_key}"} data = await self._request_with_retry(url, headers) @@ -267,6 +368,126 @@ async def fetch_axle_event(self): self.log(f"AxleAPI: Successfully fetched event data - {import_export} event from {start_time} to {end_time}" if start_time else "AxleAPI: No scheduled event") self.update_success_timestamp() + async def _fetch_managed_price_curve(self): + """Fetch price curve from Axle partner API and convert to sessions.""" + if not self.site_id: + self.log("Warn: AxleAPI: Managed mode requires site_id") + return + + token = await self._get_partner_token() + if not token: + self.log("Warn: AxleAPI: Could not obtain partner token") + self.failures_total += 1 + return + + self.log(f"AxleAPI: Fetching price curve for site {self.site_id} (managed)") + url = f"{self.api_base_url}/entities/site/{self.site_id}/price-curve" + headers = {"Authorization": f"Bearer {token}"} + + data = await self._request_with_retry(url, headers) + if data is None: + # Could be an expired/revoked token — invalidate and retry once + self.partner_token = None + self.partner_token_expiry = None + token = await self._get_partner_token() + if token: + headers = {"Authorization": f"Bearer {token}"} + data = await self._request_with_retry(url, headers) + + if data is None: + self.log("Warn: AxleAPI: No price curve data after retry") + self.failures_total += 1 + return + + self._process_price_curve(data) + self.cleanup_event_history() + self.publish_axle_event() + self.update_success_timestamp() + self.log("AxleAPI: Price curve processed successfully (managed mode)") + + def _process_price_curve(self, data): + """Convert Axle price curve to session format for load_axle_slot(). + + The price curve provides half-hourly wholesale market prices (GBP/MWh). + These are overlaid onto existing tariff rates: + - Export: wholesale price added to rate_export (high price = more export) + - Import: wholesale price added to rate_import (high price = less import, + negative price = cheap import encourages charging) + - Null prices are skipped (no modification, normal tariff applies) + + Note: load_axle_slot subtracts import pence_per_kwh, so we negate it here + to achieve addition of the wholesale price to the import rate. + """ + prices = data.get("half_hourly_traded_prices", []) + session_count = 0 + + for slot in prices: + price_gbp_mwh = slot.get("price_gbp_per_mwh") + if price_gbp_mwh is None: + continue + + start_str = slot.get("start_timestamp") + if not start_str: + continue + + try: + start_dt = str2time(start_str) + except (ValueError, TypeError): + self.log(f"Warn: AxleAPI: Failed to parse price curve timestamp: {start_str}") + continue + + end_dt = start_dt + timedelta(minutes=30) + pence_per_kwh = price_gbp_mwh / 10 # GBP/MWh -> p/kWh + + start_formatted = start_dt.strftime(TIME_FORMAT) + end_formatted = end_dt.strftime(TIME_FORMAT) + + # Create export session: adds wholesale price as export bonus + # load_axle_slot does: rate_export + pence_per_kwh + export_session = { + "start_time": start_formatted, + "end_time": end_formatted, + "import_export": "export", + "pence_per_kwh": pence_per_kwh, + } + self.add_event_to_history(export_session, allow_future=True) + + # Create import session: adds wholesale price to import cost + # load_axle_slot does: rate_import - pence_per_kwh, so we negate + # to get rate_import + wholesale_price (high price = expensive import, + # negative price = cheap import) + import_session = { + "start_time": start_formatted, + "end_time": end_formatted, + "import_export": "import", + "pence_per_kwh": -pence_per_kwh, + } + self.add_event_to_history(import_session, allow_future=True) + session_count += 1 + + self.log(f"AxleAPI: Processed {session_count} price curve slots into sessions") + + # Update current event to the nearest future/active slot for sensor state + now = self.now_utc + self.current_event = { + "start_time": None, + "end_time": None, + "import_export": None, + "pence_per_kwh": None, + } + self.updated_at = now.strftime(TIME_FORMAT) + + # Find the active or nearest future session for display + for event in self.event_history: + try: + start = str2time(event["start_time"]) + end = str2time(event["end_time"]) + if start <= now < end and event.get("import_export") == "export": + self.current_event = event.copy() + break + except (ValueError, TypeError, KeyError): + continue + def publish_axle_event(self): """ Publish the latest Axle VPP event to the system as a binary sensor @@ -304,6 +525,7 @@ def publish_axle_event(self): "event_current": event_current, "event_history": self.event_history, "updated_at": self.updated_at, + "managed_mode": self.managed_mode, }, ) diff --git a/apps/predbat/components.py b/apps/predbat/components.py index 8a6afc2ea..321705e49 100644 --- a/apps/predbat/components.py +++ b/apps/predbat/components.py @@ -254,10 +254,16 @@ "name": "Axle Energy", "event_filter": "predbat_axle_", "args": { - "api_key": {"required": True, "config": "axle_api_key"}, + "api_key": {"required": False, "config": "axle_api_key"}, "pence_per_kwh": {"required": False, "config": "axle_pence_per_kwh", "default": 100}, "automatic": {"required": False, "config": "axle_automatic", "default": True}, + "managed_mode": {"required": False, "config": "axle_managed_mode", "default": False}, + "site_id": {"required": False, "config": "axle_site_id"}, + "partner_username": {"required": False, "config": "axle_partner_username"}, + "partner_password": {"required": False, "config": "axle_partner_password"}, + "api_base_url": {"required": False, "config": "axle_api_base_url", "default": "https://api.axle.energy"}, }, + "required_or": ["api_key", "managed_mode"], "phase": 1, }, "solax": { diff --git a/apps/predbat/tests/test_axle.py b/apps/predbat/tests/test_axle.py index 4ee595730..ae191963f 100644 --- a/apps/predbat/tests/test_axle.py +++ b/apps/predbat/tests/test_axle.py @@ -45,6 +45,16 @@ def __init__(self): self._now_utc = datetime.now(timezone.utc) self._state_store = {} # Mock state storage + # Managed mode defaults + self.managed_mode = False + self.site_id = None + self.partner_username = None + self.partner_password = None + self.api_base_url = "https://api.axle.energy" + self.partner_token = None + self.partner_token_expiry = None + self.automatic = False + @property def now_utc(self): """Mock now_utc property""" @@ -133,6 +143,12 @@ def test_axle(my_predbat=None): ("load_slot_export", _test_axle_load_slot_export, "Load slot export integration"), ("load_slot_import", _test_axle_load_slot_import, "Load slot import integration"), ("active_function", _test_axle_active_function, "Active status checking"), + ("managed_init", _test_axle_managed_initialization, "Managed mode initialization"), + ("managed_price_curve", _test_axle_managed_price_curve_processing, "Managed mode price curve processing"), + ("managed_dedup", _test_axle_managed_event_dedup, "Managed mode event deduplication"), + ("managed_future", _test_axle_managed_future_events, "Managed mode future events"), + ("managed_publish", _test_axle_managed_publish, "Managed mode publish attributes"), + ("managed_no_creds", _test_axle_managed_disabled_without_creds, "Managed mode disabled without credentials"), ] # Run all sub-tests @@ -1103,3 +1119,224 @@ def _test_axle_active_function(my_predbat=None): print("✓ fetch_axle_active function test passed") return False + + +def _test_axle_managed_initialization(my_predbat=None): + """Test managed mode initialization with valid and missing params""" + print("Test: Managed mode initialization") + + # Valid managed mode config + axle = MockAxleAPI() + axle.initialize(api_key=None, pence_per_kwh=100, automatic=False, managed_mode=True, site_id="site_123", partner_username="user@test.com", partner_password="secret", api_base_url="https://api-sandbox.axle.energy") + assert axle.managed_mode is True, "managed_mode should be True" + assert axle.site_id == "site_123" + assert axle.partner_username == "user@test.com" + assert axle.partner_password == "secret" + assert axle.api_base_url == "https://api-sandbox.axle.energy" + assert axle.api_key is None, "api_key should be None in managed mode" + print(" ✓ Valid managed mode initialization") + + # Missing site_id should disable managed mode + axle2 = MockAxleAPI() + axle2.initialize(api_key=None, pence_per_kwh=100, automatic=False, managed_mode=True, site_id=None, partner_username="user@test.com", partner_password="secret") + assert axle2.managed_mode is False, "managed_mode should be False when site_id missing" + assert any("site_id" in msg for msg in axle2.log_messages), "Should log missing site_id" + print(" ✓ Missing site_id disables managed mode") + + # Missing all managed params + axle3 = MockAxleAPI() + axle3.initialize(api_key=None, pence_per_kwh=100, automatic=False, managed_mode=True) + assert axle3.managed_mode is False, "managed_mode should be False when all params missing" + assert any("partner_username" in msg for msg in axle3.log_messages) + assert any("partner_password" in msg for msg in axle3.log_messages) + print(" ✓ Missing all params disables managed mode") + + return False + + +def _test_axle_managed_price_curve_processing(my_predbat=None): + """Test price curve GBP/MWh to p/kWh conversion and session creation""" + print("Test: Managed mode price curve processing") + + axle = MockAxleAPI() + axle.initialize(api_key=None, pence_per_kwh=100, automatic=False, managed_mode=True, site_id="site_123", partner_username="user@test.com", partner_password="secret") + + now = datetime(2025, 12, 20, 14, 0, 0, tzinfo=timezone.utc) + axle._now_utc = now + + # Mock price curve data (GBP/MWh) + price_curve = { + "half_hourly_traded_prices": [ + {"start_timestamp": "2025-12-20T14:00:00Z", "price_gbp_per_mwh": 50.0}, + {"start_timestamp": "2025-12-20T14:30:00Z", "price_gbp_per_mwh": -20.0}, + {"start_timestamp": "2025-12-20T15:00:00Z", "price_gbp_per_mwh": None}, # Should be skipped + ] + } + + axle._process_price_curve(price_curve) + + # 2 valid slots × 2 directions = 4 events + assert len(axle.event_history) == 4, f"Expected 4 events, got {len(axle.event_history)}" + + # Check first slot: 50 GBP/MWh = 5.0 p/kWh + export_events = [e for e in axle.event_history if e["import_export"] == "export"] + import_events = [e for e in axle.event_history if e["import_export"] == "import"] + assert len(export_events) == 2 + assert len(import_events) == 2 + + # First export: 50 GBP/MWh → 5.0 p/kWh + first_export = [e for e in export_events if "14:00:00" in e["start_time"]][0] + assert first_export["pence_per_kwh"] == 5.0, f"Expected 5.0, got {first_export['pence_per_kwh']}" + + # First import: negated → -5.0 p/kWh + first_import = [e for e in import_events if "14:00:00" in e["start_time"]][0] + assert first_import["pence_per_kwh"] == -5.0, f"Expected -5.0, got {first_import['pence_per_kwh']}" + + # Second slot: -20 GBP/MWh = -2.0 p/kWh (negative wholesale price) + second_export = [e for e in export_events if "14:30:00" in e["start_time"]][0] + assert second_export["pence_per_kwh"] == -2.0, f"Expected -2.0, got {second_export['pence_per_kwh']}" + + second_import = [e for e in import_events if "14:30:00" in e["start_time"]][0] + assert second_import["pence_per_kwh"] == 2.0, f"Expected 2.0, got {second_import['pence_per_kwh']}" + + # Null price slot should be skipped + null_events = [e for e in axle.event_history if "15:00:00" in e.get("start_time", "")] + assert len(null_events) == 0, "Null price slots should be skipped" + + print(" ✓ Price curve conversion correct (GBP/MWh → p/kWh)") + print(" ✓ Export and import sessions created per slot") + print(" ✓ Null prices skipped") + return False + + +def _test_axle_managed_event_dedup(my_predbat=None): + """Test event deduplication by start_time and import_export direction""" + print("Test: Managed mode event deduplication") + + axle = MockAxleAPI() + axle.initialize(api_key=None, pence_per_kwh=100, automatic=False, managed_mode=True, site_id="site_123", partner_username="user@test.com", partner_password="secret") + + now = datetime(2025, 12, 20, 14, 0, 0, tzinfo=timezone.utc) + axle._now_utc = now + + # Add an export event + event1 = { + "start_time": "2025-12-20T14:00:00+0000", + "end_time": "2025-12-20T14:30:00+0000", + "import_export": "export", + "pence_per_kwh": 5.0, + } + axle.add_event_to_history(event1, allow_future=True) + assert len(axle.event_history) == 1 + + # Add import at same time — should NOT be deduped (different direction) + event2 = { + "start_time": "2025-12-20T14:00:00+0000", + "end_time": "2025-12-20T14:30:00+0000", + "import_export": "import", + "pence_per_kwh": -5.0, + } + axle.add_event_to_history(event2, allow_future=True) + assert len(axle.event_history) == 2, "Different directions should not dedup" + + # Add duplicate export at same time — SHOULD be deduped (same start + direction) + event3 = { + "start_time": "2025-12-20T14:00:00+0000", + "end_time": "2025-12-20T14:30:00+0000", + "import_export": "export", + "pence_per_kwh": 6.0, # Updated price + } + axle.add_event_to_history(event3, allow_future=True) + assert len(axle.event_history) == 2, "Same start_time + direction should dedup" + # Price should be updated + export_event = [e for e in axle.event_history if e["import_export"] == "export"][0] + assert export_event["pence_per_kwh"] == 6.0, "Deduped event should have updated price" + + print(" ✓ Different directions at same time are kept separately") + print(" ✓ Same direction at same time is deduped with update") + return False + + +def _test_axle_managed_future_events(my_predbat=None): + """Test that managed mode allows future events via allow_future=True""" + print("Test: Managed mode future events") + + axle = MockAxleAPI() + axle.initialize(api_key=None, pence_per_kwh=100, automatic=False, managed_mode=True, site_id="site_123", partner_username="user@test.com", partner_password="secret") + + now = datetime(2025, 12, 20, 14, 0, 0, tzinfo=timezone.utc) + axle._now_utc = now + + # Future event (tomorrow) + future_event = { + "start_time": "2025-12-21T14:00:00+0000", + "end_time": "2025-12-21T14:30:00+0000", + "import_export": "export", + "pence_per_kwh": 5.0, + } + + # Without allow_future, should be rejected + axle.add_event_to_history(future_event, allow_future=False) + assert len(axle.event_history) == 0, "Future event should be rejected without allow_future" + + # With allow_future, should be accepted + axle.add_event_to_history(future_event, allow_future=True) + assert len(axle.event_history) == 1, "Future event should be accepted with allow_future" + + print(" ✓ Future events rejected without allow_future") + print(" ✓ Future events accepted with allow_future") + return False + + +def _test_axle_managed_publish(my_predbat=None): + """Test that managed_mode attribute appears in published sensor""" + print("Test: Managed mode publish attributes") + + # Test BYOK mode + axle_byok = MockAxleAPI() + axle_byok.initialize(api_key="test_key", pence_per_kwh=100, automatic=False) + axle_byok.publish_axle_event() + + sensor = axle_byok.dashboard_items["binary_sensor.predbat_axle_event"] + assert sensor["attributes"]["managed_mode"] is False, "BYOK should have managed_mode=False" + + # Test managed mode + axle_managed = MockAxleAPI() + axle_managed.initialize(api_key=None, pence_per_kwh=100, automatic=False, managed_mode=True, site_id="site_123", partner_username="user@test.com", partner_password="secret") + axle_managed.publish_axle_event() + + sensor = axle_managed.dashboard_items["binary_sensor.predbat_axle_event"] + assert sensor["attributes"]["managed_mode"] is True, "Managed should have managed_mode=True" + + print(" ✓ managed_mode attribute correctly set in sensor") + return False + + +def _test_axle_managed_disabled_without_creds(my_predbat=None): + """Test managed mode falls back to disabled when credentials are missing""" + print("Test: Managed mode disabled without credentials") + + axle = MockAxleAPI() + axle.initialize( + api_key=None, + pence_per_kwh=100, + automatic=False, + managed_mode=True, + site_id="site_123", + partner_username=None, + partner_password=None, + ) + + assert axle.managed_mode is False, "Should disable managed mode without credentials" + assert any("partner_username" in msg for msg in axle.log_messages) + assert any("partner_password" in msg for msg in axle.log_messages) + + # BYOK mode warning only appears when managed_mode was False from the start + # (elif branch). When managed was True but failed, only the managed error is logged. + axle2 = MockAxleAPI() + axle2.initialize(api_key=None, pence_per_kwh=100, automatic=False, managed_mode=False) + assert any("BYOK" in msg for msg in axle2.log_messages), "Should warn about BYOK needing api_key" + + print(" ✓ Managed mode disabled when credentials missing") + print(" ✓ BYOK warning logged when managed_mode=False and no api_key") + return False From 60066927769a0dda8a135d84d8c9ab267a021c07 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci-lite[bot]" <117423508+pre-commit-ci-lite[bot]@users.noreply.github.com> Date: Wed, 11 Mar 2026 12:24:08 +0000 Subject: [PATCH 2/5] [pre-commit.ci lite] apply automatic fixes --- .cspell/custom-dictionary-workspace.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.cspell/custom-dictionary-workspace.txt b/.cspell/custom-dictionary-workspace.txt index 6742bdf0c..041de7a50 100644 --- a/.cspell/custom-dictionary-workspace.txt +++ b/.cspell/custom-dictionary-workspace.txt @@ -36,8 +36,8 @@ batteryb beforeunload bierner brickatius -byok BYOK +byok calib Cantarell cexxxx From 5d1e11941362aaa6ce171da98a5f328d829cf0f8 Mon Sep 17 00:00:00 2001 From: Mark Gascoyne Date: Wed, 11 Mar 2026 12:42:07 +0000 Subject: [PATCH 3/5] fix: validate partner token and add managed mode HTTP tests - Validate access_token is truthy before caching in _get_partner_token() to prevent caching None with a 50-minute expiry when the auth response is 200 but missing the token field. - Add 3 async HTTP tests for managed mode: - _test_axle_managed_get_partner_token: token fetch, caching, missing access_token, and auth failure - _test_axle_managed_fetch_end_to_end: full auth -> price curve -> session creation flow - _test_axle_managed_token_retry: token invalidation, re-auth, retry on price curve failure, and complete failure path Co-Authored-By: Claude Opus 4.6 --- apps/predbat/axle.py | 7 +- apps/predbat/tests/test_axle.py | 216 ++++++++++++++++++++++++++++++++ 2 files changed, 222 insertions(+), 1 deletion(-) diff --git a/apps/predbat/axle.py b/apps/predbat/axle.py index 50d6ad026..e8728d632 100644 --- a/apps/predbat/axle.py +++ b/apps/predbat/axle.py @@ -262,7 +262,12 @@ async def _get_partner_token(self): ) as response: if response.status == 200: data = await response.json() - self.partner_token = data.get("access_token") + token = data.get("access_token") + if not token: + self.log("Warn: AxleAPI: Auth response missing access_token") + record_api_call("axle", False, "auth_failed") + return None + self.partner_token = token # Cache for 50 minutes (tokens typically last 60 min) self.partner_token_expiry = datetime.now(timezone.utc) + timedelta(minutes=50) self.log("AxleAPI: Partner token obtained successfully") diff --git a/apps/predbat/tests/test_axle.py b/apps/predbat/tests/test_axle.py index ae191963f..75c6f276d 100644 --- a/apps/predbat/tests/test_axle.py +++ b/apps/predbat/tests/test_axle.py @@ -149,6 +149,9 @@ def test_axle(my_predbat=None): ("managed_future", _test_axle_managed_future_events, "Managed mode future events"), ("managed_publish", _test_axle_managed_publish, "Managed mode publish attributes"), ("managed_no_creds", _test_axle_managed_disabled_without_creds, "Managed mode disabled without credentials"), + ("managed_token", _test_axle_managed_get_partner_token, "Managed mode partner token auth"), + ("managed_fetch_e2e", _test_axle_managed_fetch_end_to_end, "Managed mode end-to-end fetch"), + ("managed_token_retry", _test_axle_managed_token_retry, "Managed mode token invalidation and retry"), ] # Run all sub-tests @@ -1340,3 +1343,216 @@ def _test_axle_managed_disabled_without_creds(my_predbat=None): print(" ✓ Managed mode disabled when credentials missing") print(" ✓ BYOK warning logged when managed_mode=False and no api_key") return False + + +def _test_axle_managed_get_partner_token(my_predbat=None): + """Test _get_partner_token: success, caching, missing token, and auth failure""" + print("Test: Managed mode partner token auth") + + axle = MockAxleAPI() + axle.initialize(api_key=None, pence_per_kwh=100, automatic=False, managed_mode=True, site_id="site_123", partner_username="user@test.com", partner_password="secret") + + # Test 1: Successful token fetch + auth_response = create_aiohttp_mock_response(status=200, json_data={"access_token": "tok_abc123"}) + auth_session = create_aiohttp_mock_session(mock_response=auth_response) + + with patch("aiohttp.ClientSession", return_value=auth_session): + token = run_async(axle._get_partner_token()) + + assert token == "tok_abc123", f"Expected tok_abc123, got {token}" + assert axle.partner_token == "tok_abc123" + assert axle.partner_token_expiry is not None + print(" ✓ Token fetched successfully") + + # Test 2: Cached token returned without HTTP call + call_count = [0] + original_return = auth_session.__aenter__ + + async def counting_aenter(*args): + call_count[0] += 1 + return await original_return(*args) + + auth_session.__aenter__ = counting_aenter + + with patch("aiohttp.ClientSession", return_value=auth_session): + token2 = run_async(axle._get_partner_token()) + + assert token2 == "tok_abc123", "Should return cached token" + assert call_count[0] == 0, "Should not make HTTP call for cached token" + print(" ✓ Cached token returned without HTTP call") + + # Test 3: Auth response missing access_token + axle2 = MockAxleAPI() + axle2.initialize(api_key=None, pence_per_kwh=100, automatic=False, managed_mode=True, site_id="site_123", partner_username="user@test.com", partner_password="secret") + + empty_token_response = create_aiohttp_mock_response(status=200, json_data={"token_type": "bearer"}) + empty_token_session = create_aiohttp_mock_session(mock_response=empty_token_response) + + with patch("aiohttp.ClientSession", return_value=empty_token_session): + token3 = run_async(axle2._get_partner_token()) + + assert token3 is None, "Should return None when access_token missing from response" + assert axle2.partner_token is None, "Should not cache empty token" + assert any("missing access_token" in msg for msg in axle2.log_messages), "Should log missing token warning" + print(" ✓ Missing access_token handled correctly") + + # Test 4: Auth failure (401) + axle3 = MockAxleAPI() + axle3.initialize(api_key=None, pence_per_kwh=100, automatic=False, managed_mode=True, site_id="site_123", partner_username="user@test.com", partner_password="bad_pass") + + fail_response = create_aiohttp_mock_response(status=401) + fail_session = create_aiohttp_mock_session(mock_response=fail_response) + + with patch("aiohttp.ClientSession", return_value=fail_session): + token4 = run_async(axle3._get_partner_token()) + + assert token4 is None, "Should return None on auth failure" + assert any("auth failed" in msg for msg in axle3.log_messages), "Should log auth failure" + print(" ✓ Auth failure returns None") + + return False + + +def _test_axle_managed_fetch_end_to_end(my_predbat=None): + """Test full managed fetch: auth -> price curve -> sessions created""" + print("Test: Managed mode end-to-end fetch") + + axle = MockAxleAPI() + axle.initialize(api_key=None, pence_per_kwh=100, automatic=False, managed_mode=True, site_id="site_123", partner_username="user@test.com", partner_password="secret") + + now = datetime(2025, 12, 20, 14, 15, 0, tzinfo=timezone.utc) + axle._now_utc = now + + # Auth response (POST) + auth_json = {"access_token": "tok_managed_123"} + # Price curve response (GET) + price_curve_json = { + "half_hourly_traded_prices": [ + {"start_timestamp": "2025-12-20T14:00:00Z", "price_gbp_per_mwh": 80.0}, + {"start_timestamp": "2025-12-20T14:30:00Z", "price_gbp_per_mwh": 60.0}, + ] + } + + # We need POST to return auth, GET to return price curve + # The mock session uses the same response for both, so we use side_effect on ClientSession + call_order = [0] + + def session_factory(*args, **kwargs): + call_order[0] += 1 + if call_order[0] == 1: + # First session: auth (POST) + return create_aiohttp_mock_session(mock_response=create_aiohttp_mock_response(status=200, json_data=auth_json)) + else: + # Second session: price curve (GET) + return create_aiohttp_mock_session(mock_response=create_aiohttp_mock_response(status=200, json_data=price_curve_json)) + + with patch("aiohttp.ClientSession", side_effect=session_factory): + run_async(axle.fetch_axle_event()) + + # Should have 4 events: 2 slots × 2 directions + assert len(axle.event_history) == 4, f"Expected 4 events, got {len(axle.event_history)}" + + # Check conversion: 80 GBP/MWh = 8.0 p/kWh + export_events = [e for e in axle.event_history if e["import_export"] == "export"] + import_events = [e for e in axle.event_history if e["import_export"] == "import"] + assert len(export_events) == 2 + assert len(import_events) == 2 + + first_export = [e for e in export_events if "14:00:00" in e["start_time"]][0] + assert first_export["pence_per_kwh"] == 8.0 + + # Sensor should be published + sensor_id = "binary_sensor.predbat_axle_event" + assert sensor_id in axle.dashboard_items, "Sensor should be published" + sensor = axle.dashboard_items[sensor_id] + assert sensor["attributes"]["managed_mode"] is True + + # Current event should be the active export slot (14:00 is active at 14:15) + assert axle.current_event["import_export"] == "export" + assert "14:00:00" in axle.current_event["start_time"] + + assert axle.failures_total == 0 + assert any("Price curve processed successfully" in msg for msg in axle.log_messages) + + print(" ✓ Auth + price curve fetch works end-to-end") + print(" ✓ Sessions created with correct prices") + print(" ✓ Current event set to active export slot") + return False + + +def _test_axle_managed_token_retry(my_predbat=None): + """Test token invalidation and retry when price curve fetch fails""" + print("Test: Managed mode token invalidation and retry") + + axle = MockAxleAPI() + axle.initialize(api_key=None, pence_per_kwh=100, automatic=False, managed_mode=True, site_id="site_123", partner_username="user@test.com", partner_password="secret") + + now = datetime(2025, 12, 20, 14, 0, 0, tzinfo=timezone.utc) + axle._now_utc = now + + auth_json = {"access_token": "tok_fresh_456"} + price_curve_json = { + "half_hourly_traded_prices": [ + {"start_timestamp": "2025-12-20T14:00:00Z", "price_gbp_per_mwh": 50.0}, + ] + } + + # Sequence: auth OK -> price curve 401 (fail) -> auth OK (re-auth) -> price curve OK + call_order = [0] + + def session_factory(*args, **kwargs): + call_order[0] += 1 + if call_order[0] == 1: + # First auth + return create_aiohttp_mock_session(mock_response=create_aiohttp_mock_response(status=200, json_data=auth_json)) + elif call_order[0] == 2: + # Price curve fails with 401 (client error, no retry in _request_with_retry) + return create_aiohttp_mock_session(mock_response=create_aiohttp_mock_response(status=401)) + elif call_order[0] == 3: + # Re-auth after token invalidation + return create_aiohttp_mock_session(mock_response=create_aiohttp_mock_response(status=200, json_data=auth_json)) + else: + # Retry price curve succeeds + return create_aiohttp_mock_session(mock_response=create_aiohttp_mock_response(status=200, json_data=price_curve_json)) + + with patch("aiohttp.ClientSession", side_effect=session_factory): + with patch("asyncio.sleep"): + run_async(axle.fetch_axle_event()) + + # Should succeed after retry: 1 slot × 2 directions = 2 events + assert len(axle.event_history) == 2, f"Expected 2 events after token retry, got {len(axle.event_history)}" + assert axle.partner_token == "tok_fresh_456" + + # Verify token was invalidated and re-fetched + assert call_order[0] == 4, f"Expected 4 HTTP calls (auth, fail, re-auth, success), got {call_order[0]}" + assert any("Price curve processed successfully" in msg for msg in axle.log_messages) + + print(" ✓ Token invalidated after price curve failure") + print(" ✓ Re-auth and retry succeeds") + + # Test 2: Both attempts fail — should record failure + axle2 = MockAxleAPI() + axle2.initialize(api_key=None, pence_per_kwh=100, automatic=False, managed_mode=True, site_id="site_123", partner_username="user@test.com", partner_password="secret") + axle2._now_utc = now + + call_order2 = [0] + + def session_factory_all_fail(*args, **kwargs): + call_order2[0] += 1 + if call_order2[0] in (1, 3): + # Auth succeeds + return create_aiohttp_mock_session(mock_response=create_aiohttp_mock_response(status=200, json_data=auth_json)) + else: + # Price curve always fails + return create_aiohttp_mock_session(mock_response=create_aiohttp_mock_response(status=401)) + + with patch("aiohttp.ClientSession", side_effect=session_factory_all_fail): + with patch("asyncio.sleep"): + run_async(axle2.fetch_axle_event()) + + assert axle2.failures_total == 1, f"Expected 1 failure, got {axle2.failures_total}" + assert len(axle2.event_history) == 0, "No events should be created on complete failure" + assert any("No price curve data after retry" in msg for msg in axle2.log_messages) + print(" ✓ Complete failure after retry records failure correctly") + + return False From d61e1ea4d9661adf694f40b70dafb60961194d49 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci-lite[bot]" <117423508+pre-commit-ci-lite[bot]@users.noreply.github.com> Date: Wed, 11 Mar 2026 12:43:57 +0000 Subject: [PATCH 4/5] [pre-commit.ci lite] apply automatic fixes --- .cspell/custom-dictionary-workspace.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.cspell/custom-dictionary-workspace.txt b/.cspell/custom-dictionary-workspace.txt index 041de7a50..6742bdf0c 100644 --- a/.cspell/custom-dictionary-workspace.txt +++ b/.cspell/custom-dictionary-workspace.txt @@ -36,8 +36,8 @@ batteryb beforeunload bierner brickatius -BYOK byok +BYOK calib Cantarell cexxxx From 65d7e2636ab3ca34ef31b434ccf5843276c22015 Mon Sep 17 00:00:00 2001 From: Trefor Southwell <48591903+springfall2008@users.noreply.github.com> Date: Sat, 14 Mar 2026 18:43:39 +0000 Subject: [PATCH 5/5] Remove duplicate BYOK entry from custom dictionary --- .cspell/custom-dictionary-workspace.txt | 1 - 1 file changed, 1 deletion(-) diff --git a/.cspell/custom-dictionary-workspace.txt b/.cspell/custom-dictionary-workspace.txt index 6742bdf0c..c27802e5d 100644 --- a/.cspell/custom-dictionary-workspace.txt +++ b/.cspell/custom-dictionary-workspace.txt @@ -37,7 +37,6 @@ beforeunload bierner brickatius byok -BYOK calib Cantarell cexxxx