Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 25 additions & 2 deletions src/flexmeasures_client/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import re
import socket
import warnings
from dataclasses import dataclass
from dataclasses import dataclass, field
from datetime import datetime, timedelta
from logging import Logger
from typing import Any, cast
Expand Down Expand Up @@ -120,6 +120,9 @@ class FlexMeasuresClient:
session: ClientSession | None = None
server_version: str | None = None
logger: Logger = LOGGER
_sensor_asset_id_cache: dict[int, int] = field(
default_factory=dict, init=False, repr=False
)

def __post_init__(self):
if self.session is None:
Expand Down Expand Up @@ -1319,6 +1322,26 @@ async def trigger_schedule(

if prior is not None:
message["prior"] = pd.Timestamp(prior).isoformat()

# For a sensor_id, try to resolve to the sensor's asset_id so we can use
# the asset scheduling endpoint (preferred over the sensor endpoint).
# Fall back to the sensor endpoint only when the server is known to be
# older than v0.27.0, which is when the asset endpoint was introduced.
use_sensor_endpoint = False
if sensor_id is not None:
if self.server_version is not None and Version(
self.server_version
) < Version("0.27.0"):
use_sensor_endpoint = True
else:
# Look up asset_id from sensor (cached after first lookup)
if sensor_id not in self._sensor_asset_id_cache:
sensor = await self.get_sensor(
sensor_id=sensor_id, parse_json_fields=False
)
self._sensor_asset_id_cache[sensor_id] = sensor["generic_asset_id"]
asset_id = self._sensor_asset_id_cache[sensor_id]

if scheduler is not None:
if asset_id is None:
raise ValueError(
Expand All @@ -1341,7 +1364,7 @@ async def trigger_schedule(
asset_id=asset_id, updates=dict(attributes=asset_attributes)
)

if sensor_id is not None:
if use_sensor_endpoint:
response, status = await self.request(
uri=f"sensors/{sensor_id}/schedules/trigger",
json_payload=message,
Expand Down
1 change: 1 addition & 0 deletions tests/client/test_init.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ async def test__init__(
init_dict = flexmeasures_client.__dict__
init_dict.pop("session")
init_dict.pop("logger")
init_dict.pop("_sensor_asset_id_cache")
assert init_dict == assert_dict


Expand Down
184 changes: 174 additions & 10 deletions tests/client/test_schedule.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,13 @@ async def test_trigger_schedule() -> None:
email="test@test.test", password="test"
)
flexmeasures_client.access_token = "test-token"
m.get(
"http://localhost:5000/api/v3_0/sensors/3",
status=200,
payload={"id": 3, "generic_asset_id": 5, "name": "test-sensor"},
)
m.post(
"http://localhost:5000/api/v3_0/sensors/3/schedules/trigger",
"http://localhost:5000/api/v3_0/assets/5/schedules/trigger",
status=200,
payload={"schedule": "test_schedule_id"},
)
Expand Down Expand Up @@ -48,7 +53,7 @@ async def test_trigger_schedule() -> None:

assert schedule_id == "test_schedule_id"

m.assert_called_once_with(
m.assert_called_with(
method="POST",
headers={"Content-Type": "application/json", "Authorization": "test-token"},
json={
Expand All @@ -65,7 +70,7 @@ async def test_trigger_schedule() -> None:
},
"flex-context": {"consumption-price-sensor": 3},
},
url="http://localhost:5000/api/v3_0/sensors/3/schedules/trigger",
url="http://localhost:5000/api/v3_0/assets/5/schedules/trigger",
params=None,
ssl=False,
allow_redirects=False,
Expand Down Expand Up @@ -162,8 +167,13 @@ async def test_trigger_and_get_schedule() -> None:
m.get(
url=url, status=400, payload={"message": "Scheduling job waiting"}, repeat=3
)
m.get(
"http://localhost:5000/api/v3_0/sensors/1",
status=200,
payload={"id": 1, "generic_asset_id": 10, "name": "test-sensor"},
)
m.post(
"http://localhost:5000/api/v3_0/sensors/1/schedules/trigger",
"http://localhost:5000/api/v3_0/assets/10/schedules/trigger",
status=200,
payload={"schedule": "schedule-uuid"},
)
Expand Down Expand Up @@ -207,8 +217,13 @@ async def test_get_fallback_schedule():
payload={"message": "Scheduling job waiting"},
repeat=3,
)
m.get(
"http://localhost:5000/api/v3_0/sensors/1",
status=200,
payload={"id": 1, "generic_asset_id": 10, "name": "test-sensor"},
)
m.post(
"http://localhost:5000/api/v3_0/sensors/1/schedules/trigger",
"http://localhost:5000/api/v3_0/assets/10/schedules/trigger",
status=200,
payload={"schedule": "schedule-uuid"},
)
Expand Down Expand Up @@ -485,8 +500,13 @@ async def test_trigger_schedule_with_prior():
with aioresponses() as m:
client = FlexMeasuresClient(email="test@test.test", password="test")
client.access_token = "test-token"
m.get(
"http://localhost:5000/api/v3_0/sensors/1",
status=200,
payload={"id": 1, "generic_asset_id": 10, "name": "test-sensor"},
)
m.post(
"http://localhost:5000/api/v3_0/sensors/1/schedules/trigger",
"http://localhost:5000/api/v3_0/assets/10/schedules/trigger",
status=200,
payload={"schedule": "sched-uuid"},
)
Expand All @@ -502,8 +522,11 @@ async def test_trigger_schedule_with_prior():

@pytest.mark.asyncio
async def test_trigger_schedule_scheduler_with_sensor_id_error():
"""scheduler set but asset_id is None raises ValueError."""
"""scheduler set with sensor_id on a server older than v0.27.0 raises ValueError."""
client = FlexMeasuresClient(email="test@test.test", password="test")
# Simulate a server older than v0.27.0 so the sensor endpoint is used and
# asset_id cannot be resolved automatically.
client.server_version = "0.26.0"
with pytest.raises(
ValueError,
match="Pass an asset_id instead of a sensor_id if selecting a custom scheduler\\.",
Expand Down Expand Up @@ -552,14 +575,145 @@ async def test_trigger_schedule_scheduler_with_str_attributes():
await client.close()


@pytest.mark.asyncio
async def test_trigger_schedule_sensor_id_uses_asset_endpoint():
"""sensor_id resolves the asset and uses the asset scheduling endpoint."""
with aioresponses() as m:
client = FlexMeasuresClient(email="test@test.test", password="test")
client.access_token = "test-token"
m.get(
"http://localhost:5000/api/v3_0/sensors/1",
status=200,
payload={"id": 1, "generic_asset_id": 10, "name": "test-sensor"},
)
m.post(
"http://localhost:5000/api/v3_0/assets/10/schedules/trigger",
status=200,
payload={"schedule": "sched-uuid"},
)
schedule_id = await client.trigger_schedule(
sensor_id=1,
start="2023-01-01T00:00+00:00",
duration="PT1H",
)
assert schedule_id == "sched-uuid"
await client.close()


@pytest.mark.asyncio
async def test_trigger_schedule_sensor_id_caches_asset_id():
"""asset_id for a sensor is only looked up once and then cached."""
with aioresponses() as m:
client = FlexMeasuresClient(email="test@test.test", password="test")
client.access_token = "test-token"
# Only register one GET sensors/1 response - a second call would raise ConnectionError
m.get(
"http://localhost:5000/api/v3_0/sensors/1",
status=200,
payload={"id": 1, "generic_asset_id": 10, "name": "test-sensor"},
)
m.post(
"http://localhost:5000/api/v3_0/assets/10/schedules/trigger",
status=200,
payload={"schedule": "sched-uuid-1"},
)
m.post(
"http://localhost:5000/api/v3_0/assets/10/schedules/trigger",
status=200,
payload={"schedule": "sched-uuid-2"},
)
# First call: looks up sensor to get asset_id
schedule_id_1 = await client.trigger_schedule(
sensor_id=1,
start="2023-01-01T00:00+00:00",
duration="PT1H",
)
# Second call: uses cached asset_id (no additional GET sensors/1)
schedule_id_2 = await client.trigger_schedule(
sensor_id=1,
start="2023-01-02T00:00+00:00",
duration="PT1H",
)
assert schedule_id_1 == "sched-uuid-1"
assert schedule_id_2 == "sched-uuid-2"
assert client._sensor_asset_id_cache == {1: 10}
await client.close()


@pytest.mark.asyncio
async def test_trigger_schedule_old_server_uses_sensor_endpoint():
"""For server versions below v0.27.0, the sensor scheduling endpoint is used."""
with aioresponses() as m:
client = FlexMeasuresClient(email="test@test.test", password="test")
client.access_token = "test-token"
client.server_version = "0.26.0"
m.post(
"http://localhost:5000/api/v3_0/sensors/1/schedules/trigger",
status=200,
payload={"schedule": "sched-uuid"},
)
schedule_id = await client.trigger_schedule(
sensor_id=1,
start="2023-01-01T00:00+00:00",
duration="PT1H",
)
assert schedule_id == "sched-uuid"
await client.close()


@pytest.mark.asyncio
async def test_trigger_schedule_sensor_id_with_scheduler():
"""sensor_id + scheduler resolves the asset and patches the custom-scheduler attribute."""
with aioresponses() as m:
client = FlexMeasuresClient(email="test@test.test", password="test")
client.access_token = "test-token"
m.get(
"http://localhost:5000/api/v3_0/sensors/1",
status=200,
payload={"id": 1, "generic_asset_id": 10, "name": "test-sensor"},
)
m.get(
"http://localhost:5000/api/v3_0/assets/10",
status=200,
payload={
"id": 10,
"name": "test-asset",
"attributes": {"existing-key": "existing-value"},
},
)
m.patch(
"http://localhost:5000/api/v3_0/assets/10",
status=200,
payload={"message": "Asset updated"},
)
m.post(
"http://localhost:5000/api/v3_0/assets/10/schedules/trigger",
status=200,
payload={"schedule": "sched-uuid"},
)
schedule_id = await client.trigger_schedule(
sensor_id=1,
start="2023-01-01T00:00+00:00",
duration="PT1H",
scheduler="my-scheduler",
)
assert schedule_id == "sched-uuid"
await client.close()


@pytest.mark.asyncio
async def test_trigger_schedule_response_not_dict():
"""trigger response is a list, not dict raises ContentTypeError."""
with aioresponses() as m:
client = FlexMeasuresClient(email="test@test.test", password="test")
client.access_token = "test-token"
m.get(
"http://localhost:5000/api/v3_0/sensors/1",
status=200,
payload={"id": 1, "generic_asset_id": 10, "name": "test-sensor"},
)
m.post(
"http://localhost:5000/api/v3_0/sensors/1/schedules/trigger",
"http://localhost:5000/api/v3_0/assets/10/schedules/trigger",
status=200,
payload=[{"schedule": "sched-uuid"}],
)
Expand All @@ -578,8 +732,13 @@ async def test_trigger_schedule_response_no_schedule_string():
with aioresponses() as m:
client = FlexMeasuresClient(email="test@test.test", password="test")
client.access_token = "test-token"
m.get(
"http://localhost:5000/api/v3_0/sensors/1",
status=200,
payload={"id": 1, "generic_asset_id": 10, "name": "test-sensor"},
)
m.post(
"http://localhost:5000/api/v3_0/sensors/1/schedules/trigger",
"http://localhost:5000/api/v3_0/assets/10/schedules/trigger",
status=200,
payload={"schedule": 123},
)
Expand Down Expand Up @@ -725,8 +884,13 @@ async def test_trigger_and_get_schedule_with_unit():
"""unit parameter is passed through trigger_and_get_schedule to get_schedule."""
url = "http://localhost:5000/api/v3_0/sensors/1/schedules/schedule-uuid?duration=P0DT0H45M0S"
with aioresponses() as m:
m.get(
"http://localhost:5000/api/v3_0/sensors/1",
status=200,
payload={"id": 1, "generic_asset_id": 10, "name": "test-sensor"},
)
m.post(
"http://localhost:5000/api/v3_0/sensors/1/schedules/trigger",
"http://localhost:5000/api/v3_0/assets/10/schedules/trigger",
status=200,
payload={"schedule": "schedule-uuid"},
)
Expand Down
Loading