Skip to content

Add initial version of flexmeasures-openadr3 plugin#1

Draft
nicburgt wants to merge 1 commit into
mainfrom
initial-version
Draft

Add initial version of flexmeasures-openadr3 plugin#1
nicburgt wants to merge 1 commit into
mainfrom
initial-version

Conversation

@nicburgt
Copy link
Copy Markdown
Collaborator

No description provided.

@nicburgt
Copy link
Copy Markdown
Collaborator Author

Migrate to using ingestion redis queue

@socket-security
Copy link
Copy Markdown

Review the following changes in direct dependencies. Learn more about Socket for GitHub.

Diff Package Supply Chain
Security
Vulnerability Quality Maintenance License
Addedflexmeasures@​0.32.296100100100100
Addedopenadr3-client@​2.0.4100100100100100

View full report


OPENADR_EVENT_SOURCE_NAME = "OpenADR 3 VTN"
OPENADR_EVENT_SOURCE_TYPE = "gateway"
FETCH_EVENTS_QUEUE_NAME = "forecasting" # "ingestion"
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

update to ingestion redis queue name

Comment thread Dockerfile
@@ -0,0 +1,10 @@
FROM lfenergy/flexmeasures:v0.33.0
Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wait for flexmeasures v0.33.0 release before merging

runs-on: ubuntu-latest
name: "Check (on Python3.11)"
steps:
- uses: actions/setup-python@v4
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use hashes for action versions.

on: push


jobs:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Optional: add zizmor for GH actions checking. See https://github.com/ElaadNL/openadr3-client/blob/main/.github/workflows/ci.yml

python-version: 3.11
- name: Check out src from Git
uses: actions/checkout@v3
- run: make test
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this work?

Comment thread pyproject.toml
version = "0.1"
dependencies = [
"flexmeasures>=0.32.0",
"openadr3-client>=2.0.4",
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Update openadr3-client version before release

Comment thread pyproject.toml
]

# Existing tool configurations (keep these as-is)
[tool.pytest.ini_options]
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add

addopts = ["--import-mode=importlib"]

As per https://github.com/zupo/awesome-pytest-speedup?tab=readme-ov-file


@field_validator("utc_trigger_time")
@classmethod
def _validate_utc_trigger_time(cls, value: str, info: ValidationInfo) -> str:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Better to use a Pydantic validator for this

field_name = str(issue["loc"][-1])
errors.add(field_name, issue["msg"])
else:
assert config is not None
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are inline asserts good here?

)
return cls(import_sensor=import_sensor, export_sensor=export_sensor)

def sensor_for(self, payload_type: EventPayloadType) -> Sensor | None:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

get_sensor_for



def _fetch_events_queue() -> Queue:
return cast(Queue, current_app.queues[FETCH_EVENTS_QUEUE_NAME])
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't this be typed out of the box?

"""


import warnings # NOQA: E402
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why are these not at the top? Please add a comment

Copy link
Copy Markdown
Member

@nhoening nhoening left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Just a question

Comment thread uv.lock
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd like some clarification about dependencies. I'm not sure what the uv.lock file is used for?

Maybe it is useful for running only this plugin, and it installs FlexMeasures + dependencies?

How compatible is that with running multiple plugins?

In our practical experience, we have a FlexMeasures docker image, and we might create a dedicated one with some custom plugin with some extra requirements installed. And we might install extra plugins (like this one) even at the deployment step (e.g. with ansible).

)
db.session.commit()
finally:
self.schedule(ven_client, sensor_config, replace_existing=False)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The chain of scheduling jobs would break if one job fails before this point. The finally safeguards against many sources of failure, but if a worker is restarted during job handling before reaching this point, that is problematic.

I'm flagging this as a potential follow-up to move to rq.cron.CronScheduler, which will require a dedicated rq-cron-scheduler service in docker-compose.yml running something like:

from rq.cron import CronScheduler

cron = CronScheduler(connection=app.redis_connection)

cron.start()

And then registering recurring jobs with something like:

cron.register(
    self._execute,
    queue_name=app.queues[FETCH_EVENTS_QUEUE_NAME],
    cron_string="0 0 * * *",
    kwargs={"ven_id": ven_client.id, "config_name": sensor_config.name},
)

Comment thread pyproject.toml
]
version = "0.1"
dependencies = [
"flexmeasures>=0.32.0",
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"flexmeasures>=0.32.0",
"flexmeasures>=0.33.0", # introduces the ingestion queue

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants