From d4a888b48ceedea5bd2847b850174543a5ab31d6 Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Mon, 11 May 2026 16:22:21 +0200 Subject: [PATCH 01/10] [WIP] Add muscle3 source component that reads from multiple Kafka topics --- pyproject.toml | 2 +- src/imas_streams/cli.py | 20 ++++ src/imas_streams/muscle3_config.py | 115 +++++++++++++++++++ src/imas_streams/muscle3_datasource.py | 148 +++++++++++++++---------- tests/test_muscle3.py | 2 +- 5 files changed, 228 insertions(+), 59 deletions(-) create mode 100644 src/imas_streams/muscle3_config.py diff --git a/pyproject.toml b/pyproject.toml index ad9bebe..eaaf54b 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -43,7 +43,7 @@ include = ["imas_streams*"] imas-streams = "imas_streams.cli:main" [project.entry-points."ymmsl.module"] -"imas_streams.data_source" = "imas_streams.muscle3_datasource:DATA_SOURCE" +"imas_streams.data_source" = "imas_streams.muscle3_config:DATA_SOURCE" [tool.setuptools_scm] local_scheme = "no-local-version" diff --git a/src/imas_streams/cli.py b/src/imas_streams/cli.py index f763373..78b22aa 100644 --- a/src/imas_streams/cli.py +++ b/src/imas_streams/cli.py @@ -88,3 +88,23 @@ def kafka_to_muscle3(): from imas_streams.muscle3_datasource import data_source data_source() + + +@main.command +def dynamic_kafka_to_muscle3(): + """MUSCLE3 actor consuming multiple IMAS data streams from Kafka topics, making the + data available to a MUSCLE3 workflow, with the ability to publish the workflow data + back to Kafka. + """ + # Ensure optional dependencies are available + try: + import libmuscle # noqa: F401 + + import imas_streams.kafka # noqa: F401 + except ModuleNotFoundError: + click.echo("Error: please install the optional kafka and muscle3 dependencies.") + sys.exit(1) + + from imas_streams.muscle3_datasource import dynamic_data_source + + dynamic_data_source() diff --git a/src/imas_streams/muscle3_config.py b/src/imas_streams/muscle3_config.py new file mode 100644 index 0000000..b9c19b2 --- /dev/null +++ b/src/imas_streams/muscle3_config.py @@ -0,0 +1,115 @@ +import sys + +DATA_SOURCE = f""" +ymmsl_version: v0.2 + +description: Importable yMMSL configuration for imas_streams_source +programs: + imas_streams_source: + executable: {sys.executable} + args: -m imas_streams kafka-to-muscle3 + + ports: + o_i: ids_out + s: trigger + + description: | + # IMAS-Streams data source + + Data source reading Streaming IMAS data from a Kafka topic and making it + available in a MUSCLE3 simulation. + + The `ids_out` port sends one message for every `batch_size` time slices + streamed over the configured kafka topic. The type of IDS depends on the + configured kafka topic: please take care that this matches the IDS that is + expected for components receiving the message. + + You may use the `trigger` port to indicate that the previous message is + processed and a new message may be sent. If this port is not connected then + this component will send messages on the `ids_out` port as soon as they are + available. + + supported_settings: + kafka_host: > + str Bootstrap server address for Kafka (e.g. "localhost:9092" for a + locally running kafka). + kafka_topic: > + str Name of the kafka topic with streaming IMAS data to subscribe to. + batch_size: > + int Number of time slices to batch in a single MUSCLE3 message. + Default is one time slice per message. + most_recent_only: > + bool If not set, or set to false, all data in the IMAS Data Stream is + provided to the MUSCLE3 simulation. + This can be set to true to provide the last available time point with + each iteration. This mode is useful while data is being produced (e.g. + during an experimental pulse) and it is more important to have + up-to-date data than to process all time points. + + imas_streams: + executable: {sys.executable} + args: -m imas_streams dynamic-kafka-to-muscle3 + description: | + # Data source for multiple IMAS-Streams data streams + + This is a data source that reads Streaming IMAS data from multiple Kafka + topics and makes the data available in a MUSCLE3 simulation. + + ## Usage + + To use this component in your simulation, you need to configure the + following: + + 1. Define output ports for each IMAS Stream for the O_I operator. + 2. Specify the Kafka host to connect to in the `kafka_host` setting. + 3. Specify the Kafka topic names for each output port in the `kafka_topics` + setting as `: `. + + ## Example configuration + + ```yaml + imports: + - from imas_streams.data_source import implementation imas_streams + models: + example: + streams: + description: IMAS Streams data source and sink + implementation: imas_streams + ports: + O_I: magnetics_out pf_active_out + S: equilibrium_in + equilibrium: + description: Equilibrium reconstruction code + ports: + F_INIT: magnetics_in pf_active_in + O_F: equilibrium_out + conduits: + streams.magnetics_out: equilibrium.magnetics_in + streams.pf_active_out: equilibrium.pf_active_in + equilibrium.equilibrium_out: streams.equilibrium_in + settings: + streams.kafka_host: localhost:9092 + streams.kafka_topics: | + magnetics_out: kafka.topic.for.magnetics + pf_active_out: kafka.topic.for.pf_active + equilibrium_in: kafka.topic.for.equilibrium + ``` + + In this example, IMAS Streams from two topics are made available: + + 1. The data on the topic `kafka.topic.for.magnetics` is sent on the output + port `magnetics_out`. + 2. The data on the topic `kafka.topic.for.pf_active` is sent on the output + port `pf_active_out`. + + The data received on the S port `equilibrium_in` is published to + `kafka.topic.for.equilibrium`. + supported_settings: + kafka_host: > + str Bootstrap server address for Kafka (e.g. "localhost:9092" for a + locally running kafka). + kafka_topics: > + str List of kafka topics per output / input port, in the form of + `port_name: topic_name`. Each entry must be on a separate line. +""" +"""yMMSL description of the imas_streams MUSCLE3 components""" diff --git a/src/imas_streams/muscle3_datasource.py b/src/imas_streams/muscle3_datasource.py index c08ef36..d4102b7 100644 --- a/src/imas_streams/muscle3_datasource.py +++ b/src/imas_streams/muscle3_datasource.py @@ -1,68 +1,17 @@ import logging -import sys + +import libmuscle +from libmuscle import Instance, Message +from ymmsl import Operator from imas_streams import BatchedIDSConsumer +from imas_streams.kafka import KafkaConsumer, KafkaSettings logger = logging.getLogger(__name__) -DATA_SOURCE = f""" -ymmsl_version: v0.2 - -description: Importable yMMSL configuration for imas_streams_source -programs: - imas_streams_source: - executable: {sys.executable} - args: -m imas_streams kafka-to-muscle3 - - ports: - o_i: ids_out - s: trigger - - description: | - # IMAS-Streams data source - - Data source reading Streaming IMAS data from a Kafka topic and making it - available in a MUSCLE3 simulation. - - The `ids_out` port sends one message for every `batch_size` time slices - streamed over the configured kafka topic. The type of IDS depends on the - configured kafka topic: please take care that this matches the IDS that is - expected for components receiving the message. - - You may use the `trigger` port to indicate that the previous message is - processed and a new message may be sent. If this port is not connected then - this component will send messages on the `ids_out` port as soon as they are - available. - - supported_settings: - kafka_host: > - str Bootstrap server address for Kafka (e.g. "localhost:9092" for a - locally running kafka). - kafka_topic: > - str Name of the kafka topic with streaming IMAS data to subscribe to. - batch_size: > - int Number of time slices to batch in a single MUSCLE3 message. - Default is one time slice per message. - most_recent_only: > - bool If not set, or set to false, all data in the IMAS Data Stream is - provided to the MUSCLE3 simulation. - This can be set to true to provide the last available time point with - each iteration. This mode is useful while data is being produced (e.g. - during an experimental pulse) and it is more important to have - up-to-date data than to process all time points. -""" -"""yMMSL description of the imas_streams_source actor""" - - def data_source(): - # Local imports for all optional dependencies - import libmuscle - from libmuscle import Instance, Message - from ymmsl import Operator - - from imas_streams.kafka import KafkaConsumer, KafkaSettings - + """MUSCLE3 data source streaming data from a single IMAS Stream on a Kafka topic.""" if tuple(map(int, libmuscle.__version__.split(".")[:2])) < (0, 9): raise RuntimeError("This actor requires libmuscle version 0.9.0 or later") @@ -113,3 +62,88 @@ def data_source(): logger.info("IMAS data stream ended") logger.info("Reuse loop finished") + + +def dynamic_data_source(): + """MUSCLE3 data source supporting streaming from multiple Kafka topics and + publishing data back to Kafka. + """ + # FIXME: Check which version of M3 supports dynamic O_I and S ports + # See PR: https://github.com/multiscale/muscle3/pull/350 + if tuple(map(int, libmuscle.__version__.split(".")[:2])) < (0, 10): + raise RuntimeError("This actor requires libmuscle version 0.10.0 or later") + + logger.info("Creating libmuscle instance") + instance = Instance() # Don't specify ports to allow dynamic input/output ports + + # Check the dynamic port configuration + ports = instance.list_ports() + for operator in [Operator.F_INIT, Operator.O_F]: + if ports.get(operator): + raise RuntimeError( + f"imas_streams does not support {operator.name} ports, but the " + f"following ports were defined: {', '.join(ports[Operator.F_INIT])}" + ) + output_ports = [port for port in ports[Operator.O_I] if instance.is_connected(port)] + input_ports = [port for port in ports[Operator.S] if instance.is_connected(port)] + if not output_ports or not input_ports: + raise RuntimeError("imas_streams needs at least one O_I port and one S port.") + + while instance.reuse_instance(): + logger.info("Reading settings") + kafka_host = instance.get_setting("kafka_host", "str") + kafka_topics = instance.get_setting("kafka_topics", "str") + + topic_per_port = _parse_topics(kafka_topics, output_ports, input_ports) + output_port_topics = { + port: topic + for port, topic in topic_per_port.items() + if port in output_ports + } + + # TODO: + # 1 Create kafka clients for each topic we want to receive data for + # 2 Create kafka producer for each topic we need to send data on + # 3 Synchronize messages from different streams + # 4 Read message in streams, instance.send() on the respective ports + # 5 instance.receive() on input ports, create streaming IMAS data frame and send + # to Kafka + # 6 Repeat 4-6 until stream has ended + + +def _parse_topics( + kafka_topics: str, output_ports: list[str], input_ports: list[str] +) -> dict[str, str]: + """Parse kafka topics and return a dict {port_name: topic_name}.""" + topic_per_port: dict[str, str] = {} + for line in kafka_topics.splitlines(): + if not line.strip(): + continue + port, _, topic = map(str.strip, line.partition(":")) + if not topic or not port: + raise RuntimeError( + f"Invalid line encountered in 'kafka_topics' setting: '{line}'" + ) + + if port in output_ports or port in input_ports: + topic_per_port[port] = topic + else: + logger.info( + "Ignoring kafka topic '%s' for disconnected port '%s'", topic, port + ) + + if len(topic_per_port) != len(output_ports) + len(input_ports): + missing_output = [port for port in output_ports if port not in topic_per_port] + missing_input = [port for port in input_ports if port not in topic_per_port] + missing_msgs = [] + if missing_output: + missing_msgs.append(f"output ports: {', '.join(missing_output)}") + if missing_input: + missing_msgs.append(f"input ports: {', '.join(missing_input)}") + missing_msg = " and ".join(missing_msgs) + raise RuntimeError( + f"Kafka topic is missing for {missing_msg}. Please add a line to the " + "'kafka_topics' setting for each port." + ) + + return topic_per_port diff --git a/tests/test_muscle3.py b/tests/test_muscle3.py index 858fc6e..c05e4f0 100644 --- a/tests/test_muscle3.py +++ b/tests/test_muscle3.py @@ -4,7 +4,7 @@ import ymmsl from ymmsl.v0_2 import Configuration, Reference, resolve -from imas_streams.muscle3_datasource import DATA_SOURCE +from imas_streams.muscle3_config import DATA_SOURCE ymmsl_config = """ ymmsl_version: v0.2 From d936b3d82464ccc0d7529b4e31eb86545a7f9c3a Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Wed, 20 May 2026 16:55:03 +0200 Subject: [PATCH 02/10] [WIP] Add muscle3 source component that reads from multiple Kafka topics --- src/imas_streams/kafka.py | 4 +- src/imas_streams/muscle3_datasource.py | 210 ++++++++++++++++--------- 2 files changed, 141 insertions(+), 73 deletions(-) diff --git a/src/imas_streams/kafka.py b/src/imas_streams/kafka.py index df1d46c..f6c2fde 100644 --- a/src/imas_streams/kafka.py +++ b/src/imas_streams/kafka.py @@ -44,7 +44,7 @@ class KafkaSettings(BaseModel): model_config = ConfigDict(extra="forbid") -def _create_kafka_topic(settings: KafkaSettings): +def create_kafka_topic(settings: KafkaSettings): """Create a new kafka topic. This will raise an exception when the topic already exists, or if the topic could @@ -95,7 +95,7 @@ def __init__( self._message_key = f"IMAS-Kafka-{base64.b64encode(random_id).decode()}" # Create the topic and send the metadata as first message - _create_kafka_topic(settings) + create_kafka_topic(settings) self._producer.produce( topic=self._settings.topic_name, value=self._metadata.model_dump_json().encode(), diff --git a/src/imas_streams/muscle3_datasource.py b/src/imas_streams/muscle3_datasource.py index d4102b7..31f8aa3 100644 --- a/src/imas_streams/muscle3_datasource.py +++ b/src/imas_streams/muscle3_datasource.py @@ -1,11 +1,13 @@ import logging +from collections.abc import Iterator import libmuscle +from confluent_kafka import Producer from libmuscle import Instance, Message from ymmsl import Operator -from imas_streams import BatchedIDSConsumer -from imas_streams.kafka import KafkaConsumer, KafkaSettings +from imas_streams import BatchedIDSConsumer, StreamingIDSConsumer +from imas_streams.kafka import KafkaConsumer, KafkaSettings, create_kafka_topic logger = logging.getLogger(__name__) @@ -68,82 +70,148 @@ def dynamic_data_source(): """MUSCLE3 data source supporting streaming from multiple Kafka topics and publishing data back to Kafka. """ - # FIXME: Check which version of M3 supports dynamic O_I and S ports - # See PR: https://github.com/multiscale/muscle3/pull/350 + # Check which version of M3 supports dynamic O_I and S ports if tuple(map(int, libmuscle.__version__.split(".")[:2])) < (0, 10): raise RuntimeError("This actor requires libmuscle version 0.10.0 or later") - - logger.info("Creating libmuscle instance") - instance = Instance() # Don't specify ports to allow dynamic input/output ports - - # Check the dynamic port configuration - ports = instance.list_ports() - for operator in [Operator.F_INIT, Operator.O_F]: - if ports.get(operator): + DynamicDataSource().run() + + +class DynamicDataSource: + def __init__(self) -> None: + logger.info("Creating libmuscle instance") + # Don't specify ports to allow dynamic input/output ports + self.instance = Instance() + + # Check the dynamic port configuration + ports = self.instance.list_ports() + for operator in [Operator.F_INIT, Operator.O_F]: + if ports.get(operator): + raise RuntimeError( + f"imas_streams does not support {operator.name} ports, but the " + f"following ports were defined: {', '.join(ports[operator])}" + ) + self.output_ports = [ + port for port in ports[Operator.O_I] if self.instance.is_connected(port) + ] + self.input_ports = [ + port for port in ports[Operator.S] if self.instance.is_connected(port) + ] + if not self.output_ports or not self.input_ports: raise RuntimeError( - f"imas_streams does not support {operator.name} ports, but the " - f"following ports were defined: {', '.join(ports[Operator.F_INIT])}" + "imas_streams needs at least one O_I port and one S port." ) - output_ports = [port for port in ports[Operator.O_I] if instance.is_connected(port)] - input_ports = [port for port in ports[Operator.S] if instance.is_connected(port)] - if not output_ports or not input_ports: - raise RuntimeError("imas_streams needs at least one O_I port and one S port.") - - while instance.reuse_instance(): - logger.info("Reading settings") - kafka_host = instance.get_setting("kafka_host", "str") - kafka_topics = instance.get_setting("kafka_topics", "str") - topic_per_port = _parse_topics(kafka_topics, output_ports, input_ports) - output_port_topics = { - port: topic - for port, topic in topic_per_port.items() - if port in output_ports - } - - # TODO: - # 1 Create kafka clients for each topic we want to receive data for - # 2 Create kafka producer for each topic we need to send data on - # 3 Synchronize messages from different streams - # 4 Read message in streams, instance.send() on the respective ports - # 5 instance.receive() on input ports, create streaming IMAS data frame and send - # to Kafka - # 6 Repeat 4-6 until stream has ended - - -def _parse_topics( - kafka_topics: str, output_ports: list[str], input_ports: list[str] -) -> dict[str, str]: - """Parse kafka topics and return a dict {port_name: topic_name}.""" - topic_per_port: dict[str, str] = {} - for line in kafka_topics.splitlines(): - if not line.strip(): - continue - port, _, topic = map(str.strip, line.partition(":")) - if not topic or not port: + self.consumers: dict[str, KafkaConsumer] = {} + """Kafka consumer per output port.""" + self.producer: Producer + """Kafka producer.""" + + def run(self) -> None: + """Run MUSCLE3 reuse loop.""" + while self.instance.reuse_instance(): + logger.info("Reading settings") + kafka_host = self.instance.get_setting("kafka_host", "str") + kafka_topics = self.instance.get_setting("kafka_topics", "str") + + self.producer = Producer({"bootstrap.servers": kafka_host}) + topic_per_port = self._parse_topics(kafka_topics) + for port, topic in topic_per_port.items(): + if port in self.output_ports: + self.consumers[port] = KafkaConsumer( + KafkaSettings(host=kafka_host, topic_name=topic), + StreamingIDSConsumer, + return_copy=False, + ) + else: + create_kafka_topic(KafkaSettings(host=kafka_host, topic_name=topic)) + + for msgs in self.generate_serialized_idss(): + for port, (t, data) in msgs.items(): + self.instance.send(port, Message(t, data=data)) + + for port in self.input_ports: + msg = self.instance.receive(port) + self.producer.produce( + topic=topic_per_port[port], + value=msg.data, + ) + self.producer.poll(0) + + # Cleanup + self.consumers = {} + self.producer.flush() + + def _parse_topics(self, kafka_topics: str) -> dict[str, str]: + """Parse kafka topics and return a dict {port_name: topic_name}.""" + topic_per_port: dict[str, str] = {} + for line in kafka_topics.splitlines(): + if not line.strip(): + continue + port, _, topic = map(str.strip, line.partition(":")) + if not topic or not port: + raise RuntimeError( + f"Invalid line encountered in 'kafka_topics' setting: '{line}'" + ) + + if port in self.output_ports or port in self.input_ports: + topic_per_port[port] = topic + else: + logger.info( + "Ignoring kafka topic '%s' for disconnected port '%s'", topic, port + ) + + if len(topic_per_port) != len(self.output_ports) + len(self.input_ports): + missing_output = [p for p in self.output_ports if p not in topic_per_port] + missing_input = [p for p in self.input_ports if p not in topic_per_port] + missing_msgs = [] + if missing_output: + missing_msgs.append(f"output ports: {', '.join(missing_output)}") + if missing_input: + missing_msgs.append(f"input ports: {', '.join(missing_input)}") + missing_msg = " and ".join(missing_msgs) raise RuntimeError( - f"Invalid line encountered in 'kafka_topics' setting: '{line}'" + f"Kafka topic is missing for {missing_msg}. Please add a line to the " + "'kafka_topics' setting for each port." ) - if port in output_ports or port in input_ports: - topic_per_port[port] = topic - else: - logger.info( - "Ignoring kafka topic '%s' for disconnected port '%s'", topic, port - ) + return topic_per_port - if len(topic_per_port) != len(output_ports) + len(input_ports): - missing_output = [port for port in output_ports if port not in topic_per_port] - missing_input = [port for port in input_ports if port not in topic_per_port] - missing_msgs = [] - if missing_output: - missing_msgs.append(f"output ports: {', '.join(missing_output)}") - if missing_input: - missing_msgs.append(f"input ports: {', '.join(missing_input)}") - missing_msg = " and ".join(missing_msgs) - raise RuntimeError( - f"Kafka topic is missing for {missing_msg}. Please add a line to the " - "'kafka_topics' setting for each port." - ) + def generate_serialized_idss(self) -> Iterator[dict[str, tuple[float, bytes]]]: + """Generate synchronized, serialized IDSs for the subscribed streams.""" + # Receive once on each stream: + streams = {port: consumer.stream() for port, consumer in self.consumers.items()} + idss = {port: next(stream) for port, stream in streams.items()} - return topic_per_port + latest_starttime = max(ids.time[0] for ids in idss.values()) + main_ids = next(iter(idss.values())) + main_stream = next(iter(streams.values())) + + # Skip ahead the main stream + if main_ids.time[0] < latest_starttime: + logger.info("Skipping messages until start time of latest stream") + while main_ids.time[0] < latest_starttime: + next(main_stream) + + # Generate time-synchronized serialized IDSs + curdata: dict[str, tuple[float, bytes]] = { + port: (ids.time[0], ids.serialize()) for port, ids in idss.items() + } + while True: + # Get the last message <= main_ids.time[0] for each stream + for port, ids in idss.items(): + if ids is main_ids: + continue + while ids.time[0] <= main_ids.time[0]: + # Note: we may serialize too much here. For example, when the main + # stream produces data at 10 Hz, and a secondary stream at 20 Hz, we + # need to throw away every other serialized IDS. + # N.B. if this becomes a bottleneck we could optimize the IDS + # serialization and directly copy the bytes from the Streaming IDS + # data frame into the serialized IDS (assuming we use the + # flexbuffers serialization protocol). + curdata[port] = (ids.time[0], ids.serialize()) + next(streams[port]) + yield curdata + + # Fetch the next message of the main stream + next(main_stream) From 564749b98394d7302ce55527f1c6836667c3c0bf Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Fri, 22 May 2026 16:47:28 +0200 Subject: [PATCH 03/10] Add tests for muscle3 dynamic data source And fix a some bugs --- pyproject.toml | 2 + src/imas_streams/muscle3_datasource.py | 44 ++++++++---- tests/test_muscle3.py | 92 +++++++++++++++++++++++++- 3 files changed, 124 insertions(+), 14 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index eaaf54b..76bf364 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -30,6 +30,8 @@ dev = [ "pytest-xdist", "pytest-randomly", "imas-python[xarray]", + # TODO: Remove once the next MUSCLE3 release is out + "muscle3 @ git+https://github.com/multiscale/muscle3.git@develop" ] [project.urls] diff --git a/src/imas_streams/muscle3_datasource.py b/src/imas_streams/muscle3_datasource.py index 31f8aa3..67b4fc1 100644 --- a/src/imas_streams/muscle3_datasource.py +++ b/src/imas_streams/muscle3_datasource.py @@ -71,7 +71,7 @@ def dynamic_data_source(): publishing data back to Kafka. """ # Check which version of M3 supports dynamic O_I and S ports - if tuple(map(int, libmuscle.__version__.split(".")[:2])) < (0, 10): + if tuple(map(int, libmuscle.__version__.split(".")[:3])) < (0, 9, 2): raise RuntimeError("This actor requires libmuscle version 0.10.0 or later") DynamicDataSource().run() @@ -91,10 +91,14 @@ def __init__(self) -> None: f"following ports were defined: {', '.join(ports[operator])}" ) self.output_ports = [ - port for port in ports[Operator.O_I] if self.instance.is_connected(port) + port + for port in ports.get(Operator.O_I, []) + if self.instance.is_connected(port) ] self.input_ports = [ - port for port in ports[Operator.S] if self.instance.is_connected(port) + port + for port in ports.get(Operator.S, []) + if self.instance.is_connected(port) ] if not self.output_ports or not self.input_ports: raise RuntimeError( @@ -131,6 +135,10 @@ def run(self) -> None: for port in self.input_ports: msg = self.instance.receive(port) + # FIXME: This publishes serialized IDSs instead of streaming IMAS + # data. Our test case (EFIT++) doesn't produce data that adheres to + # the the IMAS-Streams assumptions (see README.md) so we cannot do + # better at the moment, unfortunately... self.producer.produce( topic=topic_per_port[port], value=msg.data, @@ -160,6 +168,7 @@ def _parse_topics(self, kafka_topics: str) -> dict[str, str]: "Ignoring kafka topic '%s' for disconnected port '%s'", topic, port ) + # Exception handling: each port needs to have a topic configured: if len(topic_per_port) != len(self.output_ports) + len(self.input_ports): missing_output = [p for p in self.output_ports if p not in topic_per_port] missing_input = [p for p in self.input_ports if p not in topic_per_port] @@ -183,8 +192,9 @@ def generate_serialized_idss(self) -> Iterator[dict[str, tuple[float, bytes]]]: idss = {port: next(stream) for port, stream in streams.items()} latest_starttime = max(ids.time[0] for ids in idss.values()) - main_ids = next(iter(idss.values())) - main_stream = next(iter(streams.values())) + main_port = next(iter(streams)) + main_ids = idss[main_port] + main_stream = streams[main_port] # Skip ahead the main stream if main_ids.time[0] < latest_starttime: @@ -204,14 +214,24 @@ def generate_serialized_idss(self) -> Iterator[dict[str, tuple[float, bytes]]]: while ids.time[0] <= main_ids.time[0]: # Note: we may serialize too much here. For example, when the main # stream produces data at 10 Hz, and a secondary stream at 20 Hz, we - # need to throw away every other serialized IDS. - # N.B. if this becomes a bottleneck we could optimize the IDS - # serialization and directly copy the bytes from the Streaming IDS - # data frame into the serialized IDS (assuming we use the - # flexbuffers serialization protocol). + # need to throw away every other serialized IDS. If this becomes a + # bottleneck we could optimize in two ways: + # 1. Stash the data at the streaming IMAS level instead of a + # serialized IDS. Apply the buffer and serialize the IDS only + # when needed. + # 2. Improve serialization and directly copy the bytes from the + # Streaming IDS data frame into the serialized IDS (assuming we + # use the flexbuffers serialization protocol). curdata[port] = (ids.time[0], ids.serialize()) - next(streams[port]) + try: + next(streams[port]) + except StopIteration: + break yield curdata # Fetch the next message of the main stream - next(main_stream) + try: + next(main_stream) + except StopIteration: + return + curdata[main_port] = (main_ids.time[0], main_ids.serialize()) diff --git a/tests/test_muscle3.py b/tests/test_muscle3.py index c05e4f0..2acbec5 100644 --- a/tests/test_muscle3.py +++ b/tests/test_muscle3.py @@ -1,10 +1,14 @@ from pathlib import Path +from unittest.mock import Mock, patch +import imas import pytest import ymmsl -from ymmsl.v0_2 import Configuration, Reference, resolve +from ymmsl.v0_2 import Configuration, Operator, Reference, resolve +from imas_streams import StreamingIDSConsumer, StreamingIDSProducer from imas_streams.muscle3_config import DATA_SOURCE +from imas_streams.muscle3_datasource import DynamicDataSource ymmsl_config = """ ymmsl_version: v0.2 @@ -32,7 +36,7 @@ @pytest.mark.xfail( - tuple(map(int, ymmsl.__version__.split(".")[:3])) < (0, 15, 1), + tuple(map(int, ymmsl.__version__.partition("-")[0].split(".")[:3])) < (0, 15, 1), reason="Test needs YMMSL Entry Points plugins", ) def test_load_ymmsl_config(): @@ -53,3 +57,87 @@ def test_load_ymmsl_config_from_ymmsl_path( config = ymmsl.load_as(Configuration, ymmsl_config) resolve(Reference([]), config) config.check_consistent() + + +def test_dynamic_port_configuration(): + with patch("imas_streams.muscle3_datasource.Instance") as mock: + list_ports = mock.return_value.list_ports + list_ports.return_value = {} + with pytest.raises(RuntimeError, match="needs at least one"): + DynamicDataSource() + list_ports.return_value = {Operator.F_INIT: ["test"]} + with pytest.raises(RuntimeError, match="does not support F_INIT ports"): + DynamicDataSource() + list_ports.return_value = {Operator.O_F: ["test"]} + with pytest.raises(RuntimeError, match="does not support O_F ports"): + DynamicDataSource() + list_ports.return_value = {Operator.O_I: ["out"], Operator.S: ["in"]} + DynamicDataSource() + + +def test_dynamic_port_topics(caplog: pytest.LogCaptureFixture): + caplog.set_level("INFO") + with patch("imas_streams.muscle3_datasource.Instance") as mock: + list_ports = mock.return_value.list_ports + list_ports.return_value = {Operator.O_I: ["out"], Operator.S: ["in"]} + + source = DynamicDataSource() + with pytest.raises(RuntimeError, match="Invalid line"): + source._parse_topics("invalid line") + with pytest.raises(RuntimeError, match="Invalid line"): + source._parse_topics("valid: x\ninvalid") + with pytest.raises(RuntimeError, match="topic is missing for input"): + source._parse_topics("a:x\nout:ok") + with pytest.raises(RuntimeError, match="topic is missing for output"): + source._parse_topics("a:x\nin:ok") + + parsed = source._parse_topics("in:topic1\nout:topic2") + assert parsed == {"in": "topic1", "out": "topic2"} + + caplog.clear() + parsed = source._parse_topics(""" + in : topic1 + out : this.is.a.longer:name.for.the.topic + ignored: this.should.be.logged + """) + assert parsed == {"in": "topic1", "out": "this.is.a.longer:name.for.the.topic"} + assert len(caplog.record_tuples) == 1 + assert "Ignoring kafka topic" in caplog.record_tuples[0][2] + + +class MockKafkaConsumer: + def __init__(self, times: list[float]) -> None: + self.times = times + self.ids = imas.IDSFactory().equilibrium() + self.ids.ids_properties.homogeneous_time = 1 + self.ids.time = [0.0] + self.producer = StreamingIDSProducer(self.ids) + self.consumer = StreamingIDSConsumer(self.producer.metadata, return_copy=False) + + def stream(self): + for time in self.times: + self.ids.time = [float(time)] + message = self.producer.create_message(self.ids) + yield self.consumer.process_message(message) + + +def test_dynamic_ids_synchronization(): + self = Mock() + self.consumers = { + "main": MockKafkaConsumer([0, 1, 2, 3, 4, 5]), + "lockstep": MockKafkaConsumer([0, 1, 2, 3, 4, 5]), + "slower": MockKafkaConsumer([0, 4, 8]), + "faster": MockKafkaConsumer([0, 0.8, 1.6, 2.4, 3.2, 4, 4.8, 5.6]), + } + + def extract_times(data: dict[str, tuple[float, bytes]]) -> dict[str, float]: + return {k: v[0] for k, v in data.items()} + + generated = [x.copy() for x in DynamicDataSource.generate_serialized_idss(self)] + assert len(generated) == 6 + assert extract_times(generated[0]) == dict(main=0, lockstep=0, slower=0, faster=0) + assert extract_times(generated[1]) == dict(main=1, lockstep=1, slower=0, faster=0.8) + assert extract_times(generated[2]) == dict(main=2, lockstep=2, slower=0, faster=1.6) + assert extract_times(generated[3]) == dict(main=3, lockstep=3, slower=0, faster=2.4) + assert extract_times(generated[4]) == dict(main=4, lockstep=4, slower=4, faster=4) + assert extract_times(generated[5]) == dict(main=5, lockstep=5, slower=4, faster=4.8) From 5ac0e05f0ff01b44e2e85be346b92d4d9d73b535 Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Tue, 26 May 2026 17:06:14 +0200 Subject: [PATCH 04/10] Improve tests for IDS synchronization --- tests/test_muscle3.py | 67 ++++++++++++++++++++++++++++++++++++------- 1 file changed, 56 insertions(+), 11 deletions(-) diff --git a/tests/test_muscle3.py b/tests/test_muscle3.py index 2acbec5..b4ba45a 100644 --- a/tests/test_muscle3.py +++ b/tests/test_muscle3.py @@ -106,7 +106,10 @@ def test_dynamic_port_topics(caplog: pytest.LogCaptureFixture): class MockKafkaConsumer: + """Mock object for KafkaConsumer to test IDS synchronization""" + def __init__(self, times: list[float]) -> None: + """Mock Kafka stream with empty equilibrium IDSs at the provided time points.""" self.times = times self.ids = imas.IDSFactory().equilibrium() self.ids.ids_properties.homogeneous_time = 1 @@ -121,6 +124,12 @@ def stream(self): yield self.consumer.process_message(message) +def extract_times(data: dict[str, tuple[float, bytes]]) -> dict[str, float]: + """Extract time values from the yielded data of + DynamicDataSource.generate_serialized_idss.""" + return {k: v[0] for k, v in data.items()} + + def test_dynamic_ids_synchronization(): self = Mock() self.consumers = { @@ -130,14 +139,50 @@ def test_dynamic_ids_synchronization(): "faster": MockKafkaConsumer([0, 0.8, 1.6, 2.4, 3.2, 4, 4.8, 5.6]), } - def extract_times(data: dict[str, tuple[float, bytes]]) -> dict[str, float]: - return {k: v[0] for k, v in data.items()} - - generated = [x.copy() for x in DynamicDataSource.generate_serialized_idss(self)] - assert len(generated) == 6 - assert extract_times(generated[0]) == dict(main=0, lockstep=0, slower=0, faster=0) - assert extract_times(generated[1]) == dict(main=1, lockstep=1, slower=0, faster=0.8) - assert extract_times(generated[2]) == dict(main=2, lockstep=2, slower=0, faster=1.6) - assert extract_times(generated[3]) == dict(main=3, lockstep=3, slower=0, faster=2.4) - assert extract_times(generated[4]) == dict(main=4, lockstep=4, slower=4, faster=4) - assert extract_times(generated[5]) == dict(main=5, lockstep=5, slower=4, faster=4.8) + generated = [ + extract_times(x) for x in DynamicDataSource.generate_serialized_idss(self) + ] + assert generated == [ + dict(main=0, lockstep=0, slower=0, faster=0), + dict(main=1, lockstep=1, slower=0, faster=0.8), + dict(main=2, lockstep=2, slower=0, faster=1.6), + dict(main=3, lockstep=3, slower=0, faster=2.4), + dict(main=4, lockstep=4, slower=4, faster=4), + dict(main=5, lockstep=5, slower=4, faster=4.8), + ] + + +def test_dynamic_ids_synchronization_with_offset(): + self = Mock() + self.consumers = { + "main": MockKafkaConsumer([0, 1, 2, 3]), + "delayed": MockKafkaConsumer([1.5, 2, 2.5]), + "early": MockKafkaConsumer([-1, 1, 3]), + } + + generated = [ + extract_times(x) for x in DynamicDataSource.generate_serialized_idss(self) + ] + assert generated == [ + dict(main=2, delayed=2, early=1), + dict(main=3, delayed=2.5, early=3), + ] + + +def test_dynamic_ids_synchronization_with_offset2(): + self = Mock() + self.consumers = { + # Now delayed will be the 'main' stream for determining time output! + "delayed": MockKafkaConsumer([1.5, 2, 2.5]), + "main": MockKafkaConsumer([0, 1, 2, 3]), + "early": MockKafkaConsumer([-1, 1, 3]), + } + + generated = [ + extract_times(x) for x in DynamicDataSource.generate_serialized_idss(self) + ] + assert generated == [ + dict(main=1, delayed=1.5, early=1), + dict(main=2, delayed=2, early=1), + dict(main=2, delayed=2.5, early=1), + ] From 9fb6c297ae11aa9ba54d69c3db946f843b1473c0 Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Thu, 28 May 2026 15:58:23 +0200 Subject: [PATCH 05/10] Add integration test for dynamic data source actor Also improves logging and resource cleanup for Kafka Producers, and adds a timeout setting to the actor. --- src/imas_streams/kafka.py | 5 +- src/imas_streams/muscle3_config.py | 1 + src/imas_streams/muscle3_datasource.py | 18 +++++- tests/conftest.py | 11 ++++ tests/test_kafka.py | 9 --- tests/test_muscle3.py | 86 +++++++++++++++++++++++++- 6 files changed, 117 insertions(+), 13 deletions(-) create mode 100644 tests/conftest.py diff --git a/src/imas_streams/kafka.py b/src/imas_streams/kafka.py index f6c2fde..a94eacd 100644 --- a/src/imas_streams/kafka.py +++ b/src/imas_streams/kafka.py @@ -50,6 +50,7 @@ def create_kafka_topic(settings: KafkaSettings): This will raise an exception when the topic already exists, or if the topic could not be created (potentially due to missing permissions). """ + logger.info("Creating topic '%s'...", settings.topic_name) conf = {"bootstrap.servers": settings.host} admin_client = AdminClient(conf) @@ -70,6 +71,7 @@ def create_kafka_topic(settings: KafkaSettings): for _topic, future in fs.items(): # This will raise an exception when the topic exists or could not be created future.result() + logger.info("Created topic '%s'", settings.topic_name) class KafkaProducer: @@ -109,6 +111,7 @@ def __del__(self): """Cleanup Kafka Producer resources""" # Ensure all messages are sent self._producer.flush() + self._producer.close() def produce(self, message: bytes) -> None: """Produce a time frame to the configured Kafka topic.""" @@ -156,7 +159,7 @@ def __init__( settings: KafkaSettings, stream_consumer_cls: type[StreamConsumer], *, - timeout: int = DEFAULT_KAFKA_CONSUMER_TIMEOUT, + timeout: float = DEFAULT_KAFKA_CONSUMER_TIMEOUT, most_recent_only: bool = False, **stream_consumer_kwargs, ) -> None: diff --git a/src/imas_streams/muscle3_config.py b/src/imas_streams/muscle3_config.py index b9c19b2..49b451b 100644 --- a/src/imas_streams/muscle3_config.py +++ b/src/imas_streams/muscle3_config.py @@ -111,5 +111,6 @@ kafka_topics: > str List of kafka topics per output / input port, in the form of `port_name: topic_name`. Each entry must be on a separate line. + kafka_timeout: float Timeout when receiving Kafka messages. """ """yMMSL description of the imas_streams MUSCLE3 components""" diff --git a/src/imas_streams/muscle3_datasource.py b/src/imas_streams/muscle3_datasource.py index 67b4fc1..b8134ec 100644 --- a/src/imas_streams/muscle3_datasource.py +++ b/src/imas_streams/muscle3_datasource.py @@ -7,7 +7,12 @@ from ymmsl import Operator from imas_streams import BatchedIDSConsumer, StreamingIDSConsumer -from imas_streams.kafka import KafkaConsumer, KafkaSettings, create_kafka_topic +from imas_streams.kafka import ( + DEFAULT_KAFKA_CONSUMER_TIMEOUT, + KafkaConsumer, + KafkaSettings, + create_kafka_topic, +) logger = logging.getLogger(__name__) @@ -116,15 +121,21 @@ def run(self) -> None: logger.info("Reading settings") kafka_host = self.instance.get_setting("kafka_host", "str") kafka_topics = self.instance.get_setting("kafka_topics", "str") + self.kafka_timeout = self.instance.get_setting( + "kafka_timeout", "float", default=DEFAULT_KAFKA_CONSUMER_TIMEOUT + ) + logger.info("Setting up Kafka Producer") self.producer = Producer({"bootstrap.servers": kafka_host}) topic_per_port = self._parse_topics(kafka_topics) + logger.info("Setting up Kafka Consumers for each stream") for port, topic in topic_per_port.items(): if port in self.output_ports: self.consumers[port] = KafkaConsumer( KafkaSettings(host=kafka_host, topic_name=topic), StreamingIDSConsumer, return_copy=False, + timeout=self.kafka_timeout, ) else: create_kafka_topic(KafkaSettings(host=kafka_host, topic_name=topic)) @@ -188,7 +199,10 @@ def _parse_topics(self, kafka_topics: str) -> dict[str, str]: def generate_serialized_idss(self) -> Iterator[dict[str, tuple[float, bytes]]]: """Generate synchronized, serialized IDSs for the subscribed streams.""" # Receive once on each stream: - streams = {port: consumer.stream() for port, consumer in self.consumers.items()} + streams = { + port: consumer.stream(timeout=self.kafka_timeout) + for port, consumer in self.consumers.items() + } idss = {port: next(stream) for port, stream in streams.items()} latest_starttime = max(ids.time[0] for ids in idss.values()) diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 0000000..d306ee3 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,11 @@ +import os + +import pytest + + +@pytest.fixture +def kafka_host(): + value = os.getenv("KAFKA_HOST") + if not value: + pytest.fail("Cannot connect to Kafka server: KAFKA_HOST not set.") + return value diff --git a/tests/test_kafka.py b/tests/test_kafka.py index aa646e3..49f70be 100644 --- a/tests/test_kafka.py +++ b/tests/test_kafka.py @@ -1,6 +1,5 @@ import contextlib import logging -import os import imas import pytest @@ -16,14 +15,6 @@ logger = logging.getLogger(__name__) -@pytest.fixture -def kafka_host(): - value = os.getenv("KAFKA_HOST") - if not value: - pytest.fail("Cannot connect to Kafka server: KAFKA_HOST not set.") - return value - - @pytest.fixture def test_magnetics(): ids = imas.IDSFactory().magnetics() diff --git a/tests/test_muscle3.py b/tests/test_muscle3.py index b4ba45a..ddc7065 100644 --- a/tests/test_muscle3.py +++ b/tests/test_muscle3.py @@ -1,12 +1,18 @@ +import contextlib +import sys from pathlib import Path from unittest.mock import Mock, patch +import confluent_kafka.admin import imas +import numpy as np import pytest import ymmsl +from libmuscle import Message from ymmsl.v0_2 import Configuration, Operator, Reference, resolve from imas_streams import StreamingIDSConsumer, StreamingIDSProducer +from imas_streams.kafka import KafkaProducer, KafkaSettings from imas_streams.muscle3_config import DATA_SOURCE from imas_streams.muscle3_datasource import DynamicDataSource @@ -117,7 +123,7 @@ def __init__(self, times: list[float]) -> None: self.producer = StreamingIDSProducer(self.ids) self.consumer = StreamingIDSConsumer(self.producer.metadata, return_copy=False) - def stream(self): + def stream(self, timeout=None): for time in self.times: self.ids.time = [float(time)] message = self.producer.create_message(self.ids) @@ -186,3 +192,81 @@ def test_dynamic_ids_synchronization_with_offset2(): dict(main=2, delayed=2, early=1), dict(main=2, delayed=2.5, early=1), ] + + +def test_dynamic_data_source_actor(muscle3_tester, kafka_host): + # Ensure topics are cleared before start + with confluent_kafka.admin.AdminClient({"bootstrap.servers": kafka_host}) as client: + fs = client.delete_topics( + ["test.magnetics", "test.pf_active", "test.equilibrium"] + ) + for _topic, future in fs.items(): + # Raises an exception when the topic did not exists or could not be deleted + with contextlib.suppress(confluent_kafka.KafkaException): + future.result() + + # Populate magnetics and pf_active topics + times = {"magnetics": np.linspace(0, 10, 11), "pf_active": np.linspace(-1, 12, 27)} + for ids_name in ["magnetics", "pf_active"]: + ids = imas.IDSFactory().new(ids_name) + ids.ids_properties.homogeneous_time = 1 + ids.time = times[ids_name][:1] + prod = StreamingIDSProducer(ids) + kprod = KafkaProducer( + KafkaSettings(host=kafka_host, topic_name=f"test.{ids_name}"), + prod.metadata, + ) + for t in times[ids_name]: + ids.time = [t] + kprod.produce(bytes(prod.create_message(ids))) + del kprod # Run cleanup logic + + # Start muscle3 actor + tester = muscle3_tester.start_implementation( + f""" + ymmsl_version: v0.2 + programs: + imas_streams: + ports: + o_i: magnetics_out pf_active_out + s: equilibrium_in + executable: {sys.executable} + args: -m imas_streams dynamic-kafka-to-muscle3 + settings: + kafka_host: {kafka_host} + kafka_timeout: 5.0 + kafka_topics: | + magnetics_out: test.magnetics + pf_active_out: test.pf_active + equilibrium_in: test.equilibrium + """, + "imas_streams", + default_timeout=10, + ) + + mag, pfa, eq = map(imas.IDSFactory().new, ["magnetics", "pf_active", "equilibrium"]) + eq.ids_properties.homogeneous_time = 1 + for expected_time in np.linspace(0, 10, 11): + mag.deserialize(tester.receive("magnetics_out").data) + assert np.array_equal(mag.time, [expected_time]) + pfa.deserialize(tester.receive("pf_active_out").data) + assert np.array_equal(pfa.time, [expected_time]) + eq.time = [expected_time] + tester.send("equilibrium_in", Message(expected_time, data=eq.serialize())) + + # Check that the messages were sent to Kafka + consumer = confluent_kafka.Consumer( + { + "bootstrap.servers": kafka_host, + "group.id": "pytest", + "auto.offset.reset": "earliest", + } + ) + consumer.subscribe(["test.equilibrium"]) + for expected_time in np.linspace(0, 10, 11): + msg = consumer.poll(10) + assert msg is not None, f"Missing message for t={expected_time}" + data = msg.value() + assert data is not None + eq.deserialize(data) + assert np.array_equal(eq.time, [expected_time]) From 80e27b01961301ffce5920d00183250f251e31da Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Thu, 28 May 2026 16:01:15 +0200 Subject: [PATCH 06/10] Refactor checks for optional dependencies (DRY) --- src/imas_streams/cli.py | 24 ++++++++++-------------- 1 file changed, 10 insertions(+), 14 deletions(-) diff --git a/src/imas_streams/cli.py b/src/imas_streams/cli.py index 78b22aa..25c1cd4 100644 --- a/src/imas_streams/cli.py +++ b/src/imas_streams/cli.py @@ -71,11 +71,8 @@ def kafka_to_imasentry( entry.put_slice(result) -@main.command() -def kafka_to_muscle3(): - """MUSCLE3 actor consuming streaming IMAS data from a Kafka topic and making it - available to a MUSCLE3 workflow. - """ +def _ensure_kafka_muscle3_dependencies(): + """Ensure optional dependencies are available""" # Ensure optional dependencies are available try: import libmuscle # noqa: F401 @@ -85,6 +82,13 @@ def kafka_to_muscle3(): click.echo("Error: please install the optional kafka and muscle3 dependencies.") sys.exit(1) + +@main.command() +def kafka_to_muscle3(): + """MUSCLE3 actor consuming streaming IMAS data from a Kafka topic and making it + available to a MUSCLE3 workflow. + """ + _ensure_kafka_muscle3_dependencies() from imas_streams.muscle3_datasource import data_source data_source() @@ -96,15 +100,7 @@ def dynamic_kafka_to_muscle3(): data available to a MUSCLE3 workflow, with the ability to publish the workflow data back to Kafka. """ - # Ensure optional dependencies are available - try: - import libmuscle # noqa: F401 - - import imas_streams.kafka # noqa: F401 - except ModuleNotFoundError: - click.echo("Error: please install the optional kafka and muscle3 dependencies.") - sys.exit(1) - + _ensure_kafka_muscle3_dependencies() from imas_streams.muscle3_datasource import dynamic_data_source dynamic_data_source() From 744091f5b4f9a402596db8cc25e6bba998d11c17 Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Thu, 28 May 2026 17:02:08 +0200 Subject: [PATCH 07/10] Increase timeouts for CI --- tests/test_muscle3.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tests/test_muscle3.py b/tests/test_muscle3.py index ddc7065..61f881c 100644 --- a/tests/test_muscle3.py +++ b/tests/test_muscle3.py @@ -234,14 +234,14 @@ def test_dynamic_data_source_actor(muscle3_tester, kafka_host): args: -m imas_streams dynamic-kafka-to-muscle3 settings: kafka_host: {kafka_host} - kafka_timeout: 5.0 + kafka_timeout: 10.0 kafka_topics: | magnetics_out: test.magnetics pf_active_out: test.pf_active equilibrium_in: test.equilibrium """, "imas_streams", - default_timeout=10, + default_timeout=20, ) mag, pfa, eq = map(imas.IDSFactory().new, ["magnetics", "pf_active", "equilibrium"]) From 54d0a7b2aec690a04b9c31166cbe0d68d088994c Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Fri, 29 May 2026 14:05:16 +0200 Subject: [PATCH 08/10] Update documentation --- README.md | 2 + docs/muscle3.md | 170 +++++++++++++++++++++++++++++ pyproject.toml | 2 +- src/imas_streams/muscle3_config.py | 2 + 4 files changed, 175 insertions(+), 1 deletion(-) create mode 100644 docs/muscle3.md diff --git a/README.md b/README.md index 9d5770d..20066c4 100644 --- a/README.md +++ b/README.md @@ -16,6 +16,8 @@ this git repository: pip install 'imas-streams @ git+https://github.com/iterorganization/IMAS-Streams.git' # If you wish to use the kafka features: pip install 'imas-streams[kafka] @ git+https://github.com/iterorganization/IMAS-Streams.git' +# If you wish to use the MUSCLE3 component: +pip install 'imas-streams[muscle3] @ git+https://github.com/iterorganization/IMAS-Streams.git' ``` ## Command line interface diff --git a/docs/muscle3.md b/docs/muscle3.md new file mode 100644 index 0000000..2dce9fd --- /dev/null +++ b/docs/muscle3.md @@ -0,0 +1,170 @@ +# IMAS-Streams MUSCLE3 integration + +IMAS-Streams contains two MUSCLE3 components: +1. `imas_streams_source` is a simple component that reads data from a single + IMAS data Stream in a Kafka topic, and makes that data available as + serialized IDSs to the MUSCLE3 simulation. Multiple time slices can be + batched in a single IDS. +2. `imas_streams` is a more complex component that reads data from multiple IMAS + data Streams on a Kafka cluster. It synchronizes the data when these streams + do not use the same time bases. It also publishes data back to one or more + Kafka topics. + +We provide more details on these component in the following sections. + + +## `imas_streams_source` component + + + +Data source reading Streaming IMAS data from a Kafka topic and making it +available in a MUSCLE3 simulation. + +The `ids_out` port sends one message for every `batch_size` time slices +streamed over the configured kafka topic. The type of IDS depends on the +configured kafka topic: please take care that this matches the IDS that is +expected for components receiving the message. + +You may use the `trigger` port to indicate that the previous message is +processed and a new message may be sent. If this port is not connected then +this component will send messages on the `ids_out` port as soon as they are +available. + + +### Example configuration + +```yaml +ymmsl_version: v0.2 +description: Example usage for imas_streams_source component + +imports: +- from imas_streams.data_source import implementation imas_streams_source + +models: + example: + decription: Simple example model + components: + source: + implementation: imas_streams_source + description: Data source + ports: + o_i: ids_out + s: trigger + physics_component: + implementation: my_physics_program + description: Physics simulation + ports: + f_init: equilibrium_in + o_f: trigger + conduits: + source.ids_out: physics_component.equilibrium_in + physics_component.trigger: source.trigger + +settings: + kafka_host: localhost:9092 + kafka_topic: test.equilibrium + +programs: + my_physics_program: + executable: /path/to/my/physics/program +``` + + +## `imas_streams` component + + + +This is a data source that reads Streaming IMAS data from multiple Kafka +topics and makes the data available in a MUSCLE3 simulation. + +To use this component in your simulation, you need to configure the +following: + +1. Define output ports for each IMAS Stream for the O_I operator. +2. Specify the Kafka host to connect to in the `kafka_host` setting. +3. Specify the Kafka topic names for each output port in the `kafka_topics` + setting as `: `. + +### Stream synchronization + +This component will synchronize messages from different streams if they don't +have the same time base: + +1. All streams must have data available before a message is sent to the MUSCLE3 + workflow. +2. Messages will be sent at the same frequency as the first stream (as + configured in the `kafka_topics` setting). +3. Messages for the other stream use the following interpolation strategy: + + If there is a data point at exactly the same moment as the first stream, + then that data is sent. + Otherwise, the data at the latest time before that time is sent. + +For example, if there are three streams with data at the following time points: + +- Stream A: data at t = [0, 1, 2, 3] +- Stream B: data at t = [1, 3] +- Stream C: data at t = [0, 1.5, 3, 4.5] + +If stream A is the first configured stream, then this component will send three +messages: + +1. The first message at t=1: this is the first moment that stream B has data + for. The data for stream A at t=0 is discarded. Since stream C doesn't have + data at t=1, the data at t=0 is sent instead. +2. The second message will be sent at t=2, which repeats the data for Stream B + at t=1, and uses the data at t=1.5 for Stream C. +3. The last message is sent at t=3, for which there is data on all three + streams. + +### Example configuration + +```yaml +ymmsl_version: v0.2 +description: Example usage for imas_streams component +imports: +- from imas_streams.data_source import implementation imas_streams +models: + example: + streams: + description: IMAS Streams data source and sink + implementation: imas_streams + ports: + O_I: magnetics_out pf_active_out + S: equilibrium_in + equilibrium: + description: Equilibrium reconstruction code + implementation: my_equilibrium_program + ports: + F_INIT: magnetics_in pf_active_in + O_F: equilibrium_out + conduits: + streams.magnetics_out: equilibrium.magnetics_in + streams.pf_active_out: equilibrium.pf_active_in + equilibrium.equilibrium_out: streams.equilibrium_in +settings: + streams.kafka_host: localhost:9092 + streams.kafka_topics: | + magnetics_out: kafka.topic.for.magnetics + pf_active_out: kafka.topic.for.pf_active + equilibrium_in: kafka.topic.for.equilibrium +programs: + my_equilibrium_program: + executable: /path/to/my/equilibrium/program +``` + +In this example, IMAS Streams from two topics are made available: + +1. The data on the topic `kafka.topic.for.magnetics` is sent on the output + port `magnetics_out`. +2. The data on the topic `kafka.topic.for.pf_active` is sent on the output + port `pf_active_out`. + +The data received on the S port `equilibrium_in` is published to +`kafka.topic.for.equilibrium`. diff --git a/pyproject.toml b/pyproject.toml index 76bf364..8a85514 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -22,7 +22,7 @@ requires-python = ">=3.11" [project.optional-dependencies] all = ["imas-streams[dev,kafka,muscle3]"] kafka = ["confluent-kafka"] -muscle3 = ["muscle3 >= 0.9.0"] +muscle3 = ["muscle3 >= 0.9.0", "imas-streams[kafka]"] dev = [ "ruff", "pytest", diff --git a/src/imas_streams/muscle3_config.py b/src/imas_streams/muscle3_config.py index 49b451b..9d68132 100644 --- a/src/imas_streams/muscle3_config.py +++ b/src/imas_streams/muscle3_config.py @@ -68,6 +68,8 @@ ## Example configuration ```yaml + ymmsl_version: v0.2 + description: Example usage for imas_streams component imports: - from imas_streams.data_source import implementation imas_streams models: From 49bc89f67bc2449189d5a3824567665ca773e5e1 Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Tue, 2 Jun 2026 13:46:58 +0200 Subject: [PATCH 09/10] Update CLI: packaging.version.Version and exception handling --- pyproject.toml | 1 + src/imas_streams/cli.py | 6 +++--- src/imas_streams/muscle3_datasource.py | 6 ++++-- 3 files changed, 8 insertions(+), 5 deletions(-) diff --git a/pyproject.toml b/pyproject.toml index 8a85514..2d7aab0 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -14,6 +14,7 @@ classifiers = [ license = {file = "LICENSE.txt"} dependencies = [ "imas-python", + "packaging", "pydantic", ] dynamic = ["version"] diff --git a/src/imas_streams/cli.py b/src/imas_streams/cli.py index 25c1cd4..0e665ca 100644 --- a/src/imas_streams/cli.py +++ b/src/imas_streams/cli.py @@ -1,5 +1,4 @@ import logging -import sys import click import imas @@ -79,8 +78,9 @@ def _ensure_kafka_muscle3_dependencies(): import imas_streams.kafka # noqa: F401 except ModuleNotFoundError: - click.echo("Error: please install the optional kafka and muscle3 dependencies.") - sys.exit(1) + raise click.ClickException( + "Please install the optional kafka and muscle3 dependencies." + ) from None @main.command() diff --git a/src/imas_streams/muscle3_datasource.py b/src/imas_streams/muscle3_datasource.py index b8134ec..430109e 100644 --- a/src/imas_streams/muscle3_datasource.py +++ b/src/imas_streams/muscle3_datasource.py @@ -4,6 +4,7 @@ import libmuscle from confluent_kafka import Producer from libmuscle import Instance, Message +from packaging.version import Version from ymmsl import Operator from imas_streams import BatchedIDSConsumer, StreamingIDSConsumer @@ -19,7 +20,7 @@ def data_source(): """MUSCLE3 data source streaming data from a single IMAS Stream on a Kafka topic.""" - if tuple(map(int, libmuscle.__version__.split(".")[:2])) < (0, 9): + if Version(libmuscle.__version__) < Version("0.9"): raise RuntimeError("This actor requires libmuscle version 0.9.0 or later") logger.info("Creating libmuscle instance") @@ -76,7 +77,8 @@ def dynamic_data_source(): publishing data back to Kafka. """ # Check which version of M3 supports dynamic O_I and S ports - if tuple(map(int, libmuscle.__version__.split(".")[:3])) < (0, 9, 2): + if Version(libmuscle.__version__) <= Version("0.9.1"): + # N.B. Develop branch with version 0.9.2.dev1 also works raise RuntimeError("This actor requires libmuscle version 0.10.0 or later") DynamicDataSource().run() From 6096ef0ea23ec4f872e1abe246599fc308bdcfff Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Thu, 4 Jun 2026 09:34:58 +0200 Subject: [PATCH 10/10] Remove xfail from test: functionality is now released --- tests/test_muscle3.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/test_muscle3.py b/tests/test_muscle3.py index 61f881c..9b42e13 100644 --- a/tests/test_muscle3.py +++ b/tests/test_muscle3.py @@ -41,10 +41,6 @@ """ -@pytest.mark.xfail( - tuple(map(int, ymmsl.__version__.partition("-")[0].split(".")[:3])) < (0, 15, 1), - reason="Test needs YMMSL Entry Points plugins", -) def test_load_ymmsl_config(): config = ymmsl.load_as(Configuration, ymmsl_config) resolve(Reference([]), config)