Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .cspell/custom-dictionary-workspace.txt
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ batteryb
beforeunload
bierner
brickatius
byok
calib
Cantarell
cexxxx
Expand All @@ -53,6 +54,7 @@ compareform
configform
Consolas
coro
creds
crosscharge
customisation
Customise
Expand Down
263 changes: 245 additions & 18 deletions apps/predbat/axle.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,13 @@
Integrates with the Axle Energy API to receive and process VPP events
(import/export commands) with event history tracking and binary sensor
publishing for demand response participation.

Supports two modes:
- BYOK (Bring Your Own Key): User's own API key with /vpp/home-assistant/event endpoint.
- Managed VPP: Partner credentials with /entities/site/{site_id}/price-curve endpoint.
"""

from datetime import datetime, timedelta
from datetime import datetime, timedelta, timezone
import asyncio
import aiohttp
from component_base import ComponentBase
Expand All @@ -25,11 +29,23 @@


class AxleAPI(ComponentBase):
"""Axle Energy VPP client."""
"""Axle Energy VPP client with managed mode support."""

def initialize(self, api_key, pence_per_kwh, automatic, managed_mode=False, site_id=None, partner_username=None, partner_password=None, api_base_url="https://api.axle.energy"):
"""Initialize the AxleAPI component.

def initialize(self, api_key, pence_per_kwh, automatic):
"""Initialize the AxleAPI component"""
if not isinstance(api_key, str) or not api_key:
Args:
api_key: BYOK API key (used in BYOK mode only)
pence_per_kwh: VPP compensation rate in pence per kWh
automatic: Whether to auto-configure entity mappings
managed_mode: If True, use partner API price curve instead of BYOK event endpoint
site_id: Axle site ID (required for managed mode)
partner_username: Partner API username (required for managed mode)
partner_password: Partner API password (required for managed mode)
api_base_url: Axle API base URL
"""
# Validate api_key type (common YAML misconfiguration: list instead of string)
if api_key is not None and (not isinstance(api_key, str) or not api_key):
self.log("Error: AxleAPI: axle_api_key is missing or invalid, you must set it to a string (not a list or number). Axle Energy integration will not function correctly.")
api_key = None
self.api_key = api_key
Expand All @@ -46,6 +62,29 @@ def initialize(self, api_key, pence_per_kwh, automatic):
}
self.updated_at = None # Last updated moved out to separate attribute to not pollute triggering on change of current_event

# Managed mode config
self.managed_mode = managed_mode
self.site_id = site_id
self.partner_username = partner_username
self.partner_password = partner_password
self.api_base_url = api_base_url.rstrip("/") if api_base_url else "https://api.axle.energy"
self.partner_token = None
self.partner_token_expiry = None

if self.managed_mode:
missing = []
if not self.site_id:
missing.append("site_id")
if not self.partner_username:
missing.append("partner_username")
if not self.partner_password:
missing.append("partner_password")
if missing:
self.log(f"Error: AxleAPI: Managed mode missing required params: {', '.join(missing)}")
self.managed_mode = False # Fall back to disabled
elif not self.api_key:
self.log("Warn: AxleAPI: BYOK mode requires api_key — Axle integration disabled")

def load_event_history(self):
"""
Load event history from the sensor on startup.
Expand Down Expand Up @@ -109,10 +148,13 @@ def cleanup_event_history(self):
if removed > 0:
self.log(f"AxleAPI: Cleaned up {removed} events older than 7 days")

def add_event_to_history(self, event_data):
"""
Add an event to history as soon as it starts (becomes active).
Once an event starts, it won't change and should be recorded.
def add_event_to_history(self, event_data, allow_future=False):
"""Add an event to history.

Args:
event_data: Event dict with start_time, end_time, import_export, pence_per_kwh.
allow_future: If True, accept future events (used by managed mode price curves).
BYOK mode only adds events once they become active.
"""
start_time_str = event_data.get("start_time")
end_time_str = event_data.get("end_time")
Expand All @@ -127,14 +169,14 @@ def add_event_to_history(self, event_data):
self.log(f"Warn: Unable to parse start_time for history: {start_time_str}")
return

# Only add if event has started (not future events)
# Once event starts, add it to history even if still active
if start_time > self.now_utc:
# Only add if event has started (not future events), unless allow_future
if not allow_future and start_time > self.now_utc:
return

# Check if event already exists in history (by start_time string)
# Check if event already exists in history (by start_time and import_export direction)
# Managed mode creates both export and import per timeslot, so dedup must check direction
for existing_event in self.event_history:
if existing_event.get("start_time") == start_time_str:
if existing_event.get("start_time") == start_time_str and existing_event.get("import_export") == event_data.get("import_export"):
# Update existing event in case any details changed
existing_event.update(event_data)
return
Expand All @@ -147,6 +189,9 @@ async def _request_with_retry(self, url, headers, max_retries=3):
"""
Perform HTTP GET request with retry logic, check status code, and decode JSON

Retries on connection errors, timeouts, and 5xx server errors.
Returns None immediately on 4xx client errors (no retry).

Args:
url: URL to request
headers: Request headers
Expand All @@ -170,9 +215,20 @@ async def _request_with_retry(self, url, headers, max_retries=3):
self.log(f"Warn: AxleAPI: Failed to parse JSON response: {e}")
record_api_call("axle", False, "decode_error")
return None
elif response.status >= 500:
# Server error — retry with backoff
record_api_call("axle", False, "server_error")
if attempt < max_retries - 1:
sleep_time = 2**attempt
self.log(f"Warn: AxleAPI: Server error {response.status}, retrying in {sleep_time}s...")
await asyncio.sleep(sleep_time)
else:
self.log(f"Warn: AxleAPI: Server error {response.status} after {max_retries} attempts")
return None
else:
# Client error (4xx) — no retry
self.log(f"Warn: AxleAPI: Failed to fetch data, status code {response.status}")
record_api_call("axle", False, "server_error")
record_api_call("axle", False, "client_error")
return None
except (aiohttp.ClientError, asyncio.TimeoutError) as e:
if attempt < max_retries - 1:
Expand All @@ -185,18 +241,68 @@ async def _request_with_retry(self, url, headers, max_retries=3):
return None
return None

async def _get_partner_token(self):
"""Authenticate with Axle partner API and cache the token."""
if self.partner_token and self.partner_token_expiry and self.partner_token_expiry > datetime.now(timezone.utc):
return self.partner_token

url = f"{self.api_base_url}/auth/token-form"
timeout = aiohttp.ClientTimeout(total=30)

try:
async with aiohttp.ClientSession(timeout=timeout) as session:
async with session.post(
url,
data={
"grant_type": "password",
"username": self.partner_username,
"password": self.partner_password,
},
headers={"Content-Type": "application/x-www-form-urlencoded"},
) as response:
if response.status == 200:
data = await response.json()
token = data.get("access_token")
if not token:
self.log("Warn: AxleAPI: Auth response missing access_token")
record_api_call("axle", False, "auth_failed")
return None
self.partner_token = token
# Cache for 50 minutes (tokens typically last 60 min)
self.partner_token_expiry = datetime.now(timezone.utc) + timedelta(minutes=50)
self.log("AxleAPI: Partner token obtained successfully")
record_api_call("axle")
return self.partner_token
else:
self.log(f"Warn: AxleAPI: Partner auth failed (status {response.status})")
record_api_call("axle", False, "auth_failed")
return None
except Exception as e:
self.log(f"Warn: AxleAPI: Partner auth exception: {e}")
record_api_call("axle", False, "auth_error")
return None

async def fetch_axle_event(self):
"""Fetch the latest VPP event from Axle Energy API.

In BYOK mode: GET /vpp/home-assistant/event with user's API key.
In managed mode: GET /entities/site/{site_id}/price-curve with partner token.
"""
Fetch the latest VPP event from Axle Energy API
"""
if self.managed_mode:
await self._fetch_managed_price_curve()
else:
await self._fetch_byok_event()

async def _fetch_byok_event(self):
"""Original BYOK event fetch logic."""
if not self.api_key:
self.log("Error: AxleAPI: Cannot fetch event - axle_api_key is not set or invalid. Please check your apps.yaml configuration.")
self.failures_total += 1
return

self.log("AxleAPI: Fetching latest VPP event data")

url = "https://api.axle.energy/vpp/home-assistant/event"
url = f"{self.api_base_url}/vpp/home-assistant/event"
headers = {"Authorization": f"Bearer {self.api_key}"}

data = await self._request_with_retry(url, headers)
Expand Down Expand Up @@ -267,6 +373,126 @@ async def fetch_axle_event(self):
self.log(f"AxleAPI: Successfully fetched event data - {import_export} event from {start_time} to {end_time}" if start_time else "AxleAPI: No scheduled event")
self.update_success_timestamp()

async def _fetch_managed_price_curve(self):
"""Fetch price curve from Axle partner API and convert to sessions."""
if not self.site_id:
self.log("Warn: AxleAPI: Managed mode requires site_id")
return

token = await self._get_partner_token()
if not token:
self.log("Warn: AxleAPI: Could not obtain partner token")
self.failures_total += 1
return

self.log(f"AxleAPI: Fetching price curve for site {self.site_id} (managed)")
url = f"{self.api_base_url}/entities/site/{self.site_id}/price-curve"
headers = {"Authorization": f"Bearer {token}"}

data = await self._request_with_retry(url, headers)
if data is None:
# Could be an expired/revoked token — invalidate and retry once
self.partner_token = None
self.partner_token_expiry = None
token = await self._get_partner_token()
if token:
headers = {"Authorization": f"Bearer {token}"}
data = await self._request_with_retry(url, headers)

if data is None:
self.log("Warn: AxleAPI: No price curve data after retry")
self.failures_total += 1
return

self._process_price_curve(data)
self.cleanup_event_history()
self.publish_axle_event()
self.update_success_timestamp()
self.log("AxleAPI: Price curve processed successfully (managed mode)")

def _process_price_curve(self, data):
"""Convert Axle price curve to session format for load_axle_slot().

The price curve provides half-hourly wholesale market prices (GBP/MWh).
These are overlaid onto existing tariff rates:
- Export: wholesale price added to rate_export (high price = more export)
- Import: wholesale price added to rate_import (high price = less import,
negative price = cheap import encourages charging)
- Null prices are skipped (no modification, normal tariff applies)

Note: load_axle_slot subtracts import pence_per_kwh, so we negate it here
to achieve addition of the wholesale price to the import rate.
"""
prices = data.get("half_hourly_traded_prices", [])
session_count = 0

for slot in prices:
price_gbp_mwh = slot.get("price_gbp_per_mwh")
if price_gbp_mwh is None:
continue

start_str = slot.get("start_timestamp")
if not start_str:
continue

try:
start_dt = str2time(start_str)
except (ValueError, TypeError):
self.log(f"Warn: AxleAPI: Failed to parse price curve timestamp: {start_str}")
continue

end_dt = start_dt + timedelta(minutes=30)
pence_per_kwh = price_gbp_mwh / 10 # GBP/MWh -> p/kWh

start_formatted = start_dt.strftime(TIME_FORMAT)
end_formatted = end_dt.strftime(TIME_FORMAT)

# Create export session: adds wholesale price as export bonus
# load_axle_slot does: rate_export + pence_per_kwh
export_session = {
"start_time": start_formatted,
"end_time": end_formatted,
"import_export": "export",
"pence_per_kwh": pence_per_kwh,
}
self.add_event_to_history(export_session, allow_future=True)

# Create import session: adds wholesale price to import cost
# load_axle_slot does: rate_import - pence_per_kwh, so we negate
# to get rate_import + wholesale_price (high price = expensive import,
# negative price = cheap import)
import_session = {
"start_time": start_formatted,
"end_time": end_formatted,
"import_export": "import",
"pence_per_kwh": -pence_per_kwh,
}
self.add_event_to_history(import_session, allow_future=True)
session_count += 1

self.log(f"AxleAPI: Processed {session_count} price curve slots into sessions")

# Update current event to the nearest future/active slot for sensor state
now = self.now_utc
self.current_event = {
"start_time": None,
"end_time": None,
"import_export": None,
"pence_per_kwh": None,
}
self.updated_at = now.strftime(TIME_FORMAT)

# Find the active or nearest future session for display
for event in self.event_history:
try:
start = str2time(event["start_time"])
end = str2time(event["end_time"])
if start <= now < end and event.get("import_export") == "export":
self.current_event = event.copy()
break
except (ValueError, TypeError, KeyError):
continue

def publish_axle_event(self):
"""
Publish the latest Axle VPP event to the system as a binary sensor
Expand Down Expand Up @@ -304,6 +530,7 @@ def publish_axle_event(self):
"event_current": event_current,
"event_history": self.event_history,
"updated_at": self.updated_at,
"managed_mode": self.managed_mode,
},
)

Expand Down
8 changes: 7 additions & 1 deletion apps/predbat/components.py
Original file line number Diff line number Diff line change
Expand Up @@ -254,10 +254,16 @@
"name": "Axle Energy",
"event_filter": "predbat_axle_",
"args": {
"api_key": {"required": True, "config": "axle_api_key"},
"api_key": {"required": False, "config": "axle_api_key"},
"pence_per_kwh": {"required": False, "config": "axle_pence_per_kwh", "default": 100},
"automatic": {"required": False, "config": "axle_automatic", "default": True},
"managed_mode": {"required": False, "config": "axle_managed_mode", "default": False},
"site_id": {"required": False, "config": "axle_site_id"},
"partner_username": {"required": False, "config": "axle_partner_username"},
"partner_password": {"required": False, "config": "axle_partner_password"},
"api_base_url": {"required": False, "config": "axle_api_base_url", "default": "https://api.axle.energy"},
},
"required_or": ["api_key", "managed_mode"],
"phase": 1,
},
"solax": {
Expand Down
Loading
Loading