diff --git a/README.md b/README.md index 9d5770d..20066c4 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,8 @@ this git repository: pip install 'imas-streams @ git+https://github.com/iterorganization/IMAS-Streams.git' # If you wish to use the kafka features: pip install 'imas-streams[kafka] @ git+https://github.com/iterorganization/IMAS-Streams.git' +# If you wish to use the MUSCLE3 component: +pip install 'imas-streams[muscle3] @ git+https://github.com/iterorganization/IMAS-Streams.git' ``` ## Command line interface diff --git a/docs/muscle3.md b/docs/muscle3.md new file mode 100644 index 0000000..2dce9fd --- /dev/null +++ b/docs/muscle3.md @@ -0,0 +1,170 @@ +# IMAS-Streams MUSCLE3 integration + +IMAS-Streams contains two MUSCLE3 components: +1. `imas_streams_source` is a simple component that reads data from a single + IMAS data Stream in a Kafka topic, and makes that data available as + serialized IDSs to the MUSCLE3 simulation. Multiple time slices can be + batched in a single IDS. +2. `imas_streams` is a more complex component that reads data from multiple IMAS + data Streams on a Kafka cluster. It synchronizes the data when these streams + do not use the same time bases. It also publishes data back to one or more + Kafka topics. + +We provide more details on these component in the following sections. + + +## `imas_streams_source` component + + + +Data source reading Streaming IMAS data from a Kafka topic and making it +available in a MUSCLE3 simulation. + +The `ids_out` port sends one message for every `batch_size` time slices +streamed over the configured kafka topic. The type of IDS depends on the +configured kafka topic: please take care that this matches the IDS that is +expected for components receiving the message. + +You may use the `trigger` port to indicate that the previous message is +processed and a new message may be sent. If this port is not connected then +this component will send messages on the `ids_out` port as soon as they are +available. + + +### Example configuration + +```yaml +ymmsl_version: v0.2 +description: Example usage for imas_streams_source component + +imports: +- from imas_streams.data_source import implementation imas_streams_source + +models: + example: + decription: Simple example model + components: + source: + implementation: imas_streams_source + description: Data source + ports: + o_i: ids_out + s: trigger + physics_component: + implementation: my_physics_program + description: Physics simulation + ports: + f_init: equilibrium_in + o_f: trigger + conduits: + source.ids_out: physics_component.equilibrium_in + physics_component.trigger: source.trigger + +settings: + kafka_host: localhost:9092 + kafka_topic: test.equilibrium + +programs: + my_physics_program: + executable: /path/to/my/physics/program +``` + + +## `imas_streams` component + + + +This is a data source that reads Streaming IMAS data from multiple Kafka +topics and makes the data available in a MUSCLE3 simulation. + +To use this component in your simulation, you need to configure the +following: + +1. Define output ports for each IMAS Stream for the O_I operator. +2. Specify the Kafka host to connect to in the `kafka_host` setting. +3. Specify the Kafka topic names for each output port in the `kafka_topics` + setting as `: `. + +### Stream synchronization + +This component will synchronize messages from different streams if they don't +have the same time base: + +1. All streams must have data available before a message is sent to the MUSCLE3 + workflow. +2. Messages will be sent at the same frequency as the first stream (as + configured in the `kafka_topics` setting). +3. Messages for the other stream use the following interpolation strategy: + + If there is a data point at exactly the same moment as the first stream, + then that data is sent. + Otherwise, the data at the latest time before that time is sent. + +For example, if there are three streams with data at the following time points: + +- Stream A: data at t = [0, 1, 2, 3] +- Stream B: data at t = [1, 3] +- Stream C: data at t = [0, 1.5, 3, 4.5] + +If stream A is the first configured stream, then this component will send three +messages: + +1. The first message at t=1: this is the first moment that stream B has data + for. The data for stream A at t=0 is discarded. Since stream C doesn't have + data at t=1, the data at t=0 is sent instead. +2. The second message will be sent at t=2, which repeats the data for Stream B + at t=1, and uses the data at t=1.5 for Stream C. +3. The last message is sent at t=3, for which there is data on all three + streams. + +### Example configuration + +```yaml +ymmsl_version: v0.2 +description: Example usage for imas_streams component +imports: +- from imas_streams.data_source import implementation imas_streams +models: + example: + streams: + description: IMAS Streams data source and sink + implementation: imas_streams + ports: + O_I: magnetics_out pf_active_out + S: equilibrium_in + equilibrium: + description: Equilibrium reconstruction code + implementation: my_equilibrium_program + ports: + F_INIT: magnetics_in pf_active_in + O_F: equilibrium_out + conduits: + streams.magnetics_out: equilibrium.magnetics_in + streams.pf_active_out: equilibrium.pf_active_in + equilibrium.equilibrium_out: streams.equilibrium_in +settings: + streams.kafka_host: localhost:9092 + streams.kafka_topics: | + magnetics_out: kafka.topic.for.magnetics + pf_active_out: kafka.topic.for.pf_active + equilibrium_in: kafka.topic.for.equilibrium +programs: + my_equilibrium_program: + executable: /path/to/my/equilibrium/program +``` + +In this example, IMAS Streams from two topics are made available: + +1. The data on the topic `kafka.topic.for.magnetics` is sent on the output + port `magnetics_out`. +2. The data on the topic `kafka.topic.for.pf_active` is sent on the output + port `pf_active_out`. + +The data received on the S port `equilibrium_in` is published to +`kafka.topic.for.equilibrium`. diff --git a/pyproject.toml b/pyproject.toml index ad9bebe..2d7aab0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,7 @@ classifiers = [ license = {file = "LICENSE.txt"} dependencies = [ "imas-python", + "packaging", "pydantic", ] dynamic = ["version"] @@ -22,7 +23,7 @@ requires-python = ">=3.11" [project.optional-dependencies] all = ["imas-streams[dev,kafka,muscle3]"] kafka = ["confluent-kafka"] -muscle3 = ["muscle3 >= 0.9.0"] +muscle3 = ["muscle3 >= 0.9.0", "imas-streams[kafka]"] dev = [ "ruff", "pytest", @@ -30,6 +31,8 @@ dev = [ "pytest-xdist", "pytest-randomly", "imas-python[xarray]", + # TODO: Remove once the next MUSCLE3 release is out + "muscle3 @ git+https://github.com/multiscale/muscle3.git@develop" ] [project.urls] @@ -43,7 +46,7 @@ include = ["imas_streams*"] imas-streams = "imas_streams.cli:main" [project.entry-points."ymmsl.module"] -"imas_streams.data_source" = "imas_streams.muscle3_datasource:DATA_SOURCE" +"imas_streams.data_source" = "imas_streams.muscle3_config:DATA_SOURCE" [tool.setuptools_scm] local_scheme = "no-local-version" diff --git a/src/imas_streams/cli.py b/src/imas_streams/cli.py index f763373..0e665ca 100644 --- a/src/imas_streams/cli.py +++ b/src/imas_streams/cli.py @@ -1,5 +1,4 @@ import logging -import sys import click import imas @@ -71,20 +70,37 @@ def kafka_to_imasentry( entry.put_slice(result) -@main.command() -def kafka_to_muscle3(): - """MUSCLE3 actor consuming streaming IMAS data from a Kafka topic and making it - available to a MUSCLE3 workflow. - """ +def _ensure_kafka_muscle3_dependencies(): + """Ensure optional dependencies are available""" # Ensure optional dependencies are available try: import libmuscle # noqa: F401 import imas_streams.kafka # noqa: F401 except ModuleNotFoundError: - click.echo("Error: please install the optional kafka and muscle3 dependencies.") - sys.exit(1) + raise click.ClickException( + "Please install the optional kafka and muscle3 dependencies." + ) from None + +@main.command() +def kafka_to_muscle3(): + """MUSCLE3 actor consuming streaming IMAS data from a Kafka topic and making it + available to a MUSCLE3 workflow. + """ + _ensure_kafka_muscle3_dependencies() from imas_streams.muscle3_datasource import data_source data_source() + + +@main.command +def dynamic_kafka_to_muscle3(): + """MUSCLE3 actor consuming multiple IMAS data streams from Kafka topics, making the + data available to a MUSCLE3 workflow, with the ability to publish the workflow data + back to Kafka. + """ + _ensure_kafka_muscle3_dependencies() + from imas_streams.muscle3_datasource import dynamic_data_source + + dynamic_data_source() diff --git a/src/imas_streams/kafka.py b/src/imas_streams/kafka.py index df1d46c..a94eacd 100644 --- a/src/imas_streams/kafka.py +++ b/src/imas_streams/kafka.py @@ -44,12 +44,13 @@ class KafkaSettings(BaseModel): model_config = ConfigDict(extra="forbid") -def _create_kafka_topic(settings: KafkaSettings): +def create_kafka_topic(settings: KafkaSettings): """Create a new kafka topic. This will raise an exception when the topic already exists, or if the topic could not be created (potentially due to missing permissions). """ + logger.info("Creating topic '%s'...", settings.topic_name) conf = {"bootstrap.servers": settings.host} admin_client = AdminClient(conf) @@ -70,6 +71,7 @@ def _create_kafka_topic(settings: KafkaSettings): for _topic, future in fs.items(): # This will raise an exception when the topic exists or could not be created future.result() + logger.info("Created topic '%s'", settings.topic_name) class KafkaProducer: @@ -95,7 +97,7 @@ def __init__( self._message_key = f"IMAS-Kafka-{base64.b64encode(random_id).decode()}" # Create the topic and send the metadata as first message - _create_kafka_topic(settings) + create_kafka_topic(settings) self._producer.produce( topic=self._settings.topic_name, value=self._metadata.model_dump_json().encode(), @@ -109,6 +111,7 @@ def __del__(self): """Cleanup Kafka Producer resources""" # Ensure all messages are sent self._producer.flush() + self._producer.close() def produce(self, message: bytes) -> None: """Produce a time frame to the configured Kafka topic.""" @@ -156,7 +159,7 @@ def __init__( settings: KafkaSettings, stream_consumer_cls: type[StreamConsumer], *, - timeout: int = DEFAULT_KAFKA_CONSUMER_TIMEOUT, + timeout: float = DEFAULT_KAFKA_CONSUMER_TIMEOUT, most_recent_only: bool = False, **stream_consumer_kwargs, ) -> None: diff --git a/src/imas_streams/muscle3_config.py b/src/imas_streams/muscle3_config.py new file mode 100644 index 0000000..9d68132 --- /dev/null +++ b/src/imas_streams/muscle3_config.py @@ -0,0 +1,118 @@ +import sys + +DATA_SOURCE = f""" +ymmsl_version: v0.2 + +description: Importable yMMSL configuration for imas_streams_source +programs: + imas_streams_source: + executable: {sys.executable} + args: -m imas_streams kafka-to-muscle3 + + ports: + o_i: ids_out + s: trigger + + description: | + # IMAS-Streams data source + + Data source reading Streaming IMAS data from a Kafka topic and making it + available in a MUSCLE3 simulation. + + The `ids_out` port sends one message for every `batch_size` time slices + streamed over the configured kafka topic. The type of IDS depends on the + configured kafka topic: please take care that this matches the IDS that is + expected for components receiving the message. + + You may use the `trigger` port to indicate that the previous message is + processed and a new message may be sent. If this port is not connected then + this component will send messages on the `ids_out` port as soon as they are + available. + + supported_settings: + kafka_host: > + str Bootstrap server address for Kafka (e.g. "localhost:9092" for a + locally running kafka). + kafka_topic: > + str Name of the kafka topic with streaming IMAS data to subscribe to. + batch_size: > + int Number of time slices to batch in a single MUSCLE3 message. + Default is one time slice per message. + most_recent_only: > + bool If not set, or set to false, all data in the IMAS Data Stream is + provided to the MUSCLE3 simulation. + This can be set to true to provide the last available time point with + each iteration. This mode is useful while data is being produced (e.g. + during an experimental pulse) and it is more important to have + up-to-date data than to process all time points. + + imas_streams: + executable: {sys.executable} + args: -m imas_streams dynamic-kafka-to-muscle3 + description: | + # Data source for multiple IMAS-Streams data streams + + This is a data source that reads Streaming IMAS data from multiple Kafka + topics and makes the data available in a MUSCLE3 simulation. + + ## Usage + + To use this component in your simulation, you need to configure the + following: + + 1. Define output ports for each IMAS Stream for the O_I operator. + 2. Specify the Kafka host to connect to in the `kafka_host` setting. + 3. Specify the Kafka topic names for each output port in the `kafka_topics` + setting as `: `. + + ## Example configuration + + ```yaml + ymmsl_version: v0.2 + description: Example usage for imas_streams component + imports: + - from imas_streams.data_source import implementation imas_streams + models: + example: + streams: + description: IMAS Streams data source and sink + implementation: imas_streams + ports: + O_I: magnetics_out pf_active_out + S: equilibrium_in + equilibrium: + description: Equilibrium reconstruction code + ports: + F_INIT: magnetics_in pf_active_in + O_F: equilibrium_out + conduits: + streams.magnetics_out: equilibrium.magnetics_in + streams.pf_active_out: equilibrium.pf_active_in + equilibrium.equilibrium_out: streams.equilibrium_in + settings: + streams.kafka_host: localhost:9092 + streams.kafka_topics: | + magnetics_out: kafka.topic.for.magnetics + pf_active_out: kafka.topic.for.pf_active + equilibrium_in: kafka.topic.for.equilibrium + ``` + + In this example, IMAS Streams from two topics are made available: + + 1. The data on the topic `kafka.topic.for.magnetics` is sent on the output + port `magnetics_out`. + 2. The data on the topic `kafka.topic.for.pf_active` is sent on the output + port `pf_active_out`. + + The data received on the S port `equilibrium_in` is published to + `kafka.topic.for.equilibrium`. + supported_settings: + kafka_host: > + str Bootstrap server address for Kafka (e.g. "localhost:9092" for a + locally running kafka). + kafka_topics: > + str List of kafka topics per output / input port, in the form of + `port_name: topic_name`. Each entry must be on a separate line. + kafka_timeout: float Timeout when receiving Kafka messages. +""" +"""yMMSL description of the imas_streams MUSCLE3 components""" diff --git a/src/imas_streams/muscle3_datasource.py b/src/imas_streams/muscle3_datasource.py index c08ef36..430109e 100644 --- a/src/imas_streams/muscle3_datasource.py +++ b/src/imas_streams/muscle3_datasource.py @@ -1,69 +1,26 @@ import logging -import sys - -from imas_streams import BatchedIDSConsumer +from collections.abc import Iterator + +import libmuscle +from confluent_kafka import Producer +from libmuscle import Instance, Message +from packaging.version import Version +from ymmsl import Operator + +from imas_streams import BatchedIDSConsumer, StreamingIDSConsumer +from imas_streams.kafka import ( + DEFAULT_KAFKA_CONSUMER_TIMEOUT, + KafkaConsumer, + KafkaSettings, + create_kafka_topic, +) logger = logging.getLogger(__name__) -DATA_SOURCE = f""" -ymmsl_version: v0.2 - -description: Importable yMMSL configuration for imas_streams_source -programs: - imas_streams_source: - executable: {sys.executable} - args: -m imas_streams kafka-to-muscle3 - - ports: - o_i: ids_out - s: trigger - - description: | - # IMAS-Streams data source - - Data source reading Streaming IMAS data from a Kafka topic and making it - available in a MUSCLE3 simulation. - - The `ids_out` port sends one message for every `batch_size` time slices - streamed over the configured kafka topic. The type of IDS depends on the - configured kafka topic: please take care that this matches the IDS that is - expected for components receiving the message. - - You may use the `trigger` port to indicate that the previous message is - processed and a new message may be sent. If this port is not connected then - this component will send messages on the `ids_out` port as soon as they are - available. - - supported_settings: - kafka_host: > - str Bootstrap server address for Kafka (e.g. "localhost:9092" for a - locally running kafka). - kafka_topic: > - str Name of the kafka topic with streaming IMAS data to subscribe to. - batch_size: > - int Number of time slices to batch in a single MUSCLE3 message. - Default is one time slice per message. - most_recent_only: > - bool If not set, or set to false, all data in the IMAS Data Stream is - provided to the MUSCLE3 simulation. - This can be set to true to provide the last available time point with - each iteration. This mode is useful while data is being produced (e.g. - during an experimental pulse) and it is more important to have - up-to-date data than to process all time points. -""" -"""yMMSL description of the imas_streams_source actor""" - - def data_source(): - # Local imports for all optional dependencies - import libmuscle - from libmuscle import Instance, Message - from ymmsl import Operator - - from imas_streams.kafka import KafkaConsumer, KafkaSettings - - if tuple(map(int, libmuscle.__version__.split(".")[:2])) < (0, 9): + """MUSCLE3 data source streaming data from a single IMAS Stream on a Kafka topic.""" + if Version(libmuscle.__version__) < Version("0.9"): raise RuntimeError("This actor requires libmuscle version 0.9.0 or later") logger.info("Creating libmuscle instance") @@ -113,3 +70,184 @@ def data_source(): logger.info("IMAS data stream ended") logger.info("Reuse loop finished") + + +def dynamic_data_source(): + """MUSCLE3 data source supporting streaming from multiple Kafka topics and + publishing data back to Kafka. + """ + # Check which version of M3 supports dynamic O_I and S ports + if Version(libmuscle.__version__) <= Version("0.9.1"): + # N.B. Develop branch with version 0.9.2.dev1 also works + raise RuntimeError("This actor requires libmuscle version 0.10.0 or later") + DynamicDataSource().run() + + +class DynamicDataSource: + def __init__(self) -> None: + logger.info("Creating libmuscle instance") + # Don't specify ports to allow dynamic input/output ports + self.instance = Instance() + + # Check the dynamic port configuration + ports = self.instance.list_ports() + for operator in [Operator.F_INIT, Operator.O_F]: + if ports.get(operator): + raise RuntimeError( + f"imas_streams does not support {operator.name} ports, but the " + f"following ports were defined: {', '.join(ports[operator])}" + ) + self.output_ports = [ + port + for port in ports.get(Operator.O_I, []) + if self.instance.is_connected(port) + ] + self.input_ports = [ + port + for port in ports.get(Operator.S, []) + if self.instance.is_connected(port) + ] + if not self.output_ports or not self.input_ports: + raise RuntimeError( + "imas_streams needs at least one O_I port and one S port." + ) + + self.consumers: dict[str, KafkaConsumer] = {} + """Kafka consumer per output port.""" + self.producer: Producer + """Kafka producer.""" + + def run(self) -> None: + """Run MUSCLE3 reuse loop.""" + while self.instance.reuse_instance(): + logger.info("Reading settings") + kafka_host = self.instance.get_setting("kafka_host", "str") + kafka_topics = self.instance.get_setting("kafka_topics", "str") + self.kafka_timeout = self.instance.get_setting( + "kafka_timeout", "float", default=DEFAULT_KAFKA_CONSUMER_TIMEOUT + ) + + logger.info("Setting up Kafka Producer") + self.producer = Producer({"bootstrap.servers": kafka_host}) + topic_per_port = self._parse_topics(kafka_topics) + logger.info("Setting up Kafka Consumers for each stream") + for port, topic in topic_per_port.items(): + if port in self.output_ports: + self.consumers[port] = KafkaConsumer( + KafkaSettings(host=kafka_host, topic_name=topic), + StreamingIDSConsumer, + return_copy=False, + timeout=self.kafka_timeout, + ) + else: + create_kafka_topic(KafkaSettings(host=kafka_host, topic_name=topic)) + + for msgs in self.generate_serialized_idss(): + for port, (t, data) in msgs.items(): + self.instance.send(port, Message(t, data=data)) + + for port in self.input_ports: + msg = self.instance.receive(port) + # FIXME: This publishes serialized IDSs instead of streaming IMAS + # data. Our test case (EFIT++) doesn't produce data that adheres to + # the the IMAS-Streams assumptions (see README.md) so we cannot do + # better at the moment, unfortunately... + self.producer.produce( + topic=topic_per_port[port], + value=msg.data, + ) + self.producer.poll(0) + + # Cleanup + self.consumers = {} + self.producer.flush() + + def _parse_topics(self, kafka_topics: str) -> dict[str, str]: + """Parse kafka topics and return a dict {port_name: topic_name}.""" + topic_per_port: dict[str, str] = {} + for line in kafka_topics.splitlines(): + if not line.strip(): + continue + port, _, topic = map(str.strip, line.partition(":")) + if not topic or not port: + raise RuntimeError( + f"Invalid line encountered in 'kafka_topics' setting: '{line}'" + ) + + if port in self.output_ports or port in self.input_ports: + topic_per_port[port] = topic + else: + logger.info( + "Ignoring kafka topic '%s' for disconnected port '%s'", topic, port + ) + + # Exception handling: each port needs to have a topic configured: + if len(topic_per_port) != len(self.output_ports) + len(self.input_ports): + missing_output = [p for p in self.output_ports if p not in topic_per_port] + missing_input = [p for p in self.input_ports if p not in topic_per_port] + missing_msgs = [] + if missing_output: + missing_msgs.append(f"output ports: {', '.join(missing_output)}") + if missing_input: + missing_msgs.append(f"input ports: {', '.join(missing_input)}") + missing_msg = " and ".join(missing_msgs) + raise RuntimeError( + f"Kafka topic is missing for {missing_msg}. Please add a line to the " + "'kafka_topics' setting for each port." + ) + + return topic_per_port + + def generate_serialized_idss(self) -> Iterator[dict[str, tuple[float, bytes]]]: + """Generate synchronized, serialized IDSs for the subscribed streams.""" + # Receive once on each stream: + streams = { + port: consumer.stream(timeout=self.kafka_timeout) + for port, consumer in self.consumers.items() + } + idss = {port: next(stream) for port, stream in streams.items()} + + latest_starttime = max(ids.time[0] for ids in idss.values()) + main_port = next(iter(streams)) + main_ids = idss[main_port] + main_stream = streams[main_port] + + # Skip ahead the main stream + if main_ids.time[0] < latest_starttime: + logger.info("Skipping messages until start time of latest stream") + while main_ids.time[0] < latest_starttime: + next(main_stream) + + # Generate time-synchronized serialized IDSs + curdata: dict[str, tuple[float, bytes]] = { + port: (ids.time[0], ids.serialize()) for port, ids in idss.items() + } + while True: + # Get the last message <= main_ids.time[0] for each stream + for port, ids in idss.items(): + if ids is main_ids: + continue + while ids.time[0] <= main_ids.time[0]: + # Note: we may serialize too much here. For example, when the main + # stream produces data at 10 Hz, and a secondary stream at 20 Hz, we + # need to throw away every other serialized IDS. If this becomes a + # bottleneck we could optimize in two ways: + # 1. Stash the data at the streaming IMAS level instead of a + # serialized IDS. Apply the buffer and serialize the IDS only + # when needed. + # 2. Improve serialization and directly copy the bytes from the + # Streaming IDS data frame into the serialized IDS (assuming we + # use the flexbuffers serialization protocol). + curdata[port] = (ids.time[0], ids.serialize()) + try: + next(streams[port]) + except StopIteration: + break + yield curdata + + # Fetch the next message of the main stream + try: + next(main_stream) + except StopIteration: + return + curdata[main_port] = (main_ids.time[0], main_ids.serialize()) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..d306ee3 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,11 @@ +import os + +import pytest + + +@pytest.fixture +def kafka_host(): + value = os.getenv("KAFKA_HOST") + if not value: + pytest.fail("Cannot connect to Kafka server: KAFKA_HOST not set.") + return value diff --git a/tests/test_kafka.py b/tests/test_kafka.py index aa646e3..49f70be 100644 --- a/tests/test_kafka.py +++ b/tests/test_kafka.py @@ -1,6 +1,5 @@ import contextlib import logging -import os import imas import pytest @@ -16,14 +15,6 @@ logger = logging.getLogger(__name__) -@pytest.fixture -def kafka_host(): - value = os.getenv("KAFKA_HOST") - if not value: - pytest.fail("Cannot connect to Kafka server: KAFKA_HOST not set.") - return value - - @pytest.fixture def test_magnetics(): ids = imas.IDSFactory().magnetics() diff --git a/tests/test_muscle3.py b/tests/test_muscle3.py index 858fc6e..9b42e13 100644 --- a/tests/test_muscle3.py +++ b/tests/test_muscle3.py @@ -1,10 +1,20 @@ +import contextlib +import sys from pathlib import Path +from unittest.mock import Mock, patch +import confluent_kafka.admin +import imas +import numpy as np import pytest import ymmsl -from ymmsl.v0_2 import Configuration, Reference, resolve +from libmuscle import Message +from ymmsl.v0_2 import Configuration, Operator, Reference, resolve -from imas_streams.muscle3_datasource import DATA_SOURCE +from imas_streams import StreamingIDSConsumer, StreamingIDSProducer +from imas_streams.kafka import KafkaProducer, KafkaSettings +from imas_streams.muscle3_config import DATA_SOURCE +from imas_streams.muscle3_datasource import DynamicDataSource ymmsl_config = """ ymmsl_version: v0.2 @@ -31,10 +41,6 @@ """ -@pytest.mark.xfail( - tuple(map(int, ymmsl.__version__.split(".")[:3])) < (0, 15, 1), - reason="Test needs YMMSL Entry Points plugins", -) def test_load_ymmsl_config(): config = ymmsl.load_as(Configuration, ymmsl_config) resolve(Reference([]), config) @@ -53,3 +59,210 @@ def test_load_ymmsl_config_from_ymmsl_path( config = ymmsl.load_as(Configuration, ymmsl_config) resolve(Reference([]), config) config.check_consistent() + + +def test_dynamic_port_configuration(): + with patch("imas_streams.muscle3_datasource.Instance") as mock: + list_ports = mock.return_value.list_ports + list_ports.return_value = {} + with pytest.raises(RuntimeError, match="needs at least one"): + DynamicDataSource() + list_ports.return_value = {Operator.F_INIT: ["test"]} + with pytest.raises(RuntimeError, match="does not support F_INIT ports"): + DynamicDataSource() + list_ports.return_value = {Operator.O_F: ["test"]} + with pytest.raises(RuntimeError, match="does not support O_F ports"): + DynamicDataSource() + list_ports.return_value = {Operator.O_I: ["out"], Operator.S: ["in"]} + DynamicDataSource() + + +def test_dynamic_port_topics(caplog: pytest.LogCaptureFixture): + caplog.set_level("INFO") + with patch("imas_streams.muscle3_datasource.Instance") as mock: + list_ports = mock.return_value.list_ports + list_ports.return_value = {Operator.O_I: ["out"], Operator.S: ["in"]} + + source = DynamicDataSource() + with pytest.raises(RuntimeError, match="Invalid line"): + source._parse_topics("invalid line") + with pytest.raises(RuntimeError, match="Invalid line"): + source._parse_topics("valid: x\ninvalid") + with pytest.raises(RuntimeError, match="topic is missing for input"): + source._parse_topics("a:x\nout:ok") + with pytest.raises(RuntimeError, match="topic is missing for output"): + source._parse_topics("a:x\nin:ok") + + parsed = source._parse_topics("in:topic1\nout:topic2") + assert parsed == {"in": "topic1", "out": "topic2"} + + caplog.clear() + parsed = source._parse_topics(""" + in : topic1 + out : this.is.a.longer:name.for.the.topic + ignored: this.should.be.logged + """) + assert parsed == {"in": "topic1", "out": "this.is.a.longer:name.for.the.topic"} + assert len(caplog.record_tuples) == 1 + assert "Ignoring kafka topic" in caplog.record_tuples[0][2] + + +class MockKafkaConsumer: + """Mock object for KafkaConsumer to test IDS synchronization""" + + def __init__(self, times: list[float]) -> None: + """Mock Kafka stream with empty equilibrium IDSs at the provided time points.""" + self.times = times + self.ids = imas.IDSFactory().equilibrium() + self.ids.ids_properties.homogeneous_time = 1 + self.ids.time = [0.0] + self.producer = StreamingIDSProducer(self.ids) + self.consumer = StreamingIDSConsumer(self.producer.metadata, return_copy=False) + + def stream(self, timeout=None): + for time in self.times: + self.ids.time = [float(time)] + message = self.producer.create_message(self.ids) + yield self.consumer.process_message(message) + + +def extract_times(data: dict[str, tuple[float, bytes]]) -> dict[str, float]: + """Extract time values from the yielded data of + DynamicDataSource.generate_serialized_idss.""" + return {k: v[0] for k, v in data.items()} + + +def test_dynamic_ids_synchronization(): + self = Mock() + self.consumers = { + "main": MockKafkaConsumer([0, 1, 2, 3, 4, 5]), + "lockstep": MockKafkaConsumer([0, 1, 2, 3, 4, 5]), + "slower": MockKafkaConsumer([0, 4, 8]), + "faster": MockKafkaConsumer([0, 0.8, 1.6, 2.4, 3.2, 4, 4.8, 5.6]), + } + + generated = [ + extract_times(x) for x in DynamicDataSource.generate_serialized_idss(self) + ] + assert generated == [ + dict(main=0, lockstep=0, slower=0, faster=0), + dict(main=1, lockstep=1, slower=0, faster=0.8), + dict(main=2, lockstep=2, slower=0, faster=1.6), + dict(main=3, lockstep=3, slower=0, faster=2.4), + dict(main=4, lockstep=4, slower=4, faster=4), + dict(main=5, lockstep=5, slower=4, faster=4.8), + ] + + +def test_dynamic_ids_synchronization_with_offset(): + self = Mock() + self.consumers = { + "main": MockKafkaConsumer([0, 1, 2, 3]), + "delayed": MockKafkaConsumer([1.5, 2, 2.5]), + "early": MockKafkaConsumer([-1, 1, 3]), + } + + generated = [ + extract_times(x) for x in DynamicDataSource.generate_serialized_idss(self) + ] + assert generated == [ + dict(main=2, delayed=2, early=1), + dict(main=3, delayed=2.5, early=3), + ] + + +def test_dynamic_ids_synchronization_with_offset2(): + self = Mock() + self.consumers = { + # Now delayed will be the 'main' stream for determining time output! + "delayed": MockKafkaConsumer([1.5, 2, 2.5]), + "main": MockKafkaConsumer([0, 1, 2, 3]), + "early": MockKafkaConsumer([-1, 1, 3]), + } + + generated = [ + extract_times(x) for x in DynamicDataSource.generate_serialized_idss(self) + ] + assert generated == [ + dict(main=1, delayed=1.5, early=1), + dict(main=2, delayed=2, early=1), + dict(main=2, delayed=2.5, early=1), + ] + + +def test_dynamic_data_source_actor(muscle3_tester, kafka_host): + # Ensure topics are cleared before start + with confluent_kafka.admin.AdminClient({"bootstrap.servers": kafka_host}) as client: + fs = client.delete_topics( + ["test.magnetics", "test.pf_active", "test.equilibrium"] + ) + for _topic, future in fs.items(): + # Raises an exception when the topic did not exists or could not be deleted + with contextlib.suppress(confluent_kafka.KafkaException): + future.result() + + # Populate magnetics and pf_active topics + times = {"magnetics": np.linspace(0, 10, 11), "pf_active": np.linspace(-1, 12, 27)} + for ids_name in ["magnetics", "pf_active"]: + ids = imas.IDSFactory().new(ids_name) + ids.ids_properties.homogeneous_time = 1 + ids.time = times[ids_name][:1] + prod = StreamingIDSProducer(ids) + kprod = KafkaProducer( + KafkaSettings(host=kafka_host, topic_name=f"test.{ids_name}"), + prod.metadata, + ) + for t in times[ids_name]: + ids.time = [t] + kprod.produce(bytes(prod.create_message(ids))) + del kprod # Run cleanup logic + + # Start muscle3 actor + tester = muscle3_tester.start_implementation( + f""" + ymmsl_version: v0.2 + programs: + imas_streams: + ports: + o_i: magnetics_out pf_active_out + s: equilibrium_in + executable: {sys.executable} + args: -m imas_streams dynamic-kafka-to-muscle3 + settings: + kafka_host: {kafka_host} + kafka_timeout: 10.0 + kafka_topics: | + magnetics_out: test.magnetics + pf_active_out: test.pf_active + equilibrium_in: test.equilibrium + """, + "imas_streams", + default_timeout=20, + ) + + mag, pfa, eq = map(imas.IDSFactory().new, ["magnetics", "pf_active", "equilibrium"]) + eq.ids_properties.homogeneous_time = 1 + for expected_time in np.linspace(0, 10, 11): + mag.deserialize(tester.receive("magnetics_out").data) + assert np.array_equal(mag.time, [expected_time]) + pfa.deserialize(tester.receive("pf_active_out").data) + assert np.array_equal(pfa.time, [expected_time]) + eq.time = [expected_time] + tester.send("equilibrium_in", Message(expected_time, data=eq.serialize())) + + # Check that the messages were sent to Kafka + consumer = confluent_kafka.Consumer( + { + "bootstrap.servers": kafka_host, + "group.id": "pytest", + "auto.offset.reset": "earliest", + } + ) + consumer.subscribe(["test.equilibrium"]) + for expected_time in np.linspace(0, 10, 11): + msg = consumer.poll(10) + assert msg is not None, f"Missing message for t={expected_time}" + data = msg.value() + assert data is not None + eq.deserialize(data) + assert np.array_equal(eq.time, [expected_time])