Skip to content
Open
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ this git repository:
pip install 'imas-streams @ git+https://github.com/iterorganization/IMAS-Streams.git'
# If you wish to use the kafka features:
pip install 'imas-streams[kafka] @ git+https://github.com/iterorganization/IMAS-Streams.git'
# If you wish to use the MUSCLE3 component:
pip install 'imas-streams[muscle3] @ git+https://github.com/iterorganization/IMAS-Streams.git'
```

## Command line interface
Expand Down
170 changes: 170 additions & 0 deletions docs/muscle3.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,170 @@
# IMAS-Streams MUSCLE3 integration

IMAS-Streams contains two MUSCLE3 components:
1. `imas_streams_source` is a simple component that reads data from a single
IMAS data Stream in a Kafka topic, and makes that data available as
serialized IDSs to the MUSCLE3 simulation. Multiple time slices can be
batched in a single IDS.
2. `imas_streams` is a more complex component that reads data from multiple IMAS
data Streams on a Kafka cluster. It synchronizes the data when these streams
do not use the same time bases. It also publishes data back to one or more
Kafka topics.

We provide more details on these component in the following sections.


## `imas_streams_source` component

<!--
N.B. this is copied from muscle3_config.py, ideally we'll use the sphinx-ymmsl
plugin to keep a single source of truth.
-->

Data source reading Streaming IMAS data from a Kafka topic and making it
available in a MUSCLE3 simulation.

The `ids_out` port sends one message for every `batch_size` time slices
streamed over the configured kafka topic. The type of IDS depends on the
configured kafka topic: please take care that this matches the IDS that is
expected for components receiving the message.

You may use the `trigger` port to indicate that the previous message is
processed and a new message may be sent. If this port is not connected then
this component will send messages on the `ids_out` port as soon as they are
available.


### Example configuration

```yaml
ymmsl_version: v0.2
description: Example usage for imas_streams_source component

imports:
- from imas_streams.data_source import implementation imas_streams_source

models:
example:
decription: Simple example model
components:
source:
implementation: imas_streams_source
description: Data source
ports:
o_i: ids_out
s: trigger
physics_component:
implementation: my_physics_program
description: Physics simulation
ports:
f_init: equilibrium_in
o_f: trigger
conduits:
source.ids_out: physics_component.equilibrium_in
physics_component.trigger: source.trigger

settings:
kafka_host: localhost:9092
kafka_topic: test.equilibrium

programs:
my_physics_program:
executable: /path/to/my/physics/program
```


## `imas_streams` component

<!--
N.B. this is copied from muscle3_config.py, ideally we'll use the sphinx-ymmsl
plugin to keep a single source of truth.
-->

This is a data source that reads Streaming IMAS data from multiple Kafka
topics and makes the data available in a MUSCLE3 simulation.

To use this component in your simulation, you need to configure the
following:

1. Define output ports for each IMAS Stream for the O_I operator.
2. Specify the Kafka host to connect to in the `kafka_host` setting.
3. Specify the Kafka topic names for each output port in the `kafka_topics`
setting as `<output_port>: <kafka_topic>`.

### Stream synchronization

This component will synchronize messages from different streams if they don't
have the same time base:

1. All streams must have data available before a message is sent to the MUSCLE3
workflow.
2. Messages will be sent at the same frequency as the first stream (as
configured in the `kafka_topics` setting).
3. Messages for the other stream use the following interpolation strategy:

If there is a data point at exactly the same moment as the first stream,
then that data is sent.
Otherwise, the data at the latest time before that time is sent.

For example, if there are three streams with data at the following time points:

- Stream A: data at t = [0, 1, 2, 3]
- Stream B: data at t = [1, 3]
- Stream C: data at t = [0, 1.5, 3, 4.5]

If stream A is the first configured stream, then this component will send three
messages:

1. The first message at t=1: this is the first moment that stream B has data
for. The data for stream A at t=0 is discarded. Since stream C doesn't have
data at t=1, the data at t=0 is sent instead.
2. The second message will be sent at t=2, which repeats the data for Stream B
at t=1, and uses the data at t=1.5 for Stream C.
3. The last message is sent at t=3, for which there is data on all three
streams.

### Example configuration

```yaml
ymmsl_version: v0.2
description: Example usage for imas_streams component
imports:
- from imas_streams.data_source import implementation imas_streams
models:
example:
streams:
description: IMAS Streams data source and sink
implementation: imas_streams
ports:
O_I: magnetics_out pf_active_out
S: equilibrium_in
equilibrium:
description: Equilibrium reconstruction code
implementation: my_equilibrium_program
ports:
F_INIT: magnetics_in pf_active_in
O_F: equilibrium_out
conduits:
streams.magnetics_out: equilibrium.magnetics_in
streams.pf_active_out: equilibrium.pf_active_in
equilibrium.equilibrium_out: streams.equilibrium_in
settings:
streams.kafka_host: localhost:9092
streams.kafka_topics: |
magnetics_out: kafka.topic.for.magnetics
pf_active_out: kafka.topic.for.pf_active
equilibrium_in: kafka.topic.for.equilibrium
programs:
my_equilibrium_program:
executable: /path/to/my/equilibrium/program
```

In this example, IMAS Streams from two topics are made available:

1. The data on the topic `kafka.topic.for.magnetics` is sent on the output
port `magnetics_out`.
2. The data on the topic `kafka.topic.for.pf_active` is sent on the output
port `pf_active_out`.

The data received on the S port `equilibrium_in` is published to
`kafka.topic.for.equilibrium`.
7 changes: 5 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ classifiers = [
license = {file = "LICENSE.txt"}
dependencies = [
"imas-python",
"packaging",
"pydantic",
]
dynamic = ["version"]
Expand All @@ -22,14 +23,16 @@ requires-python = ">=3.11"
[project.optional-dependencies]
all = ["imas-streams[dev,kafka,muscle3]"]
kafka = ["confluent-kafka"]
muscle3 = ["muscle3 >= 0.9.0"]
muscle3 = ["muscle3 >= 0.9.0", "imas-streams[kafka]"]
dev = [
"ruff",
"pytest",
"pytest-cov",
"pytest-xdist",
"pytest-randomly",
"imas-python[xarray]",
# TODO: Remove once the next MUSCLE3 release is out
"muscle3 @ git+https://github.com/multiscale/muscle3.git@develop"
]

[project.urls]
Expand All @@ -43,7 +46,7 @@ include = ["imas_streams*"]
imas-streams = "imas_streams.cli:main"

[project.entry-points."ymmsl.module"]
"imas_streams.data_source" = "imas_streams.muscle3_datasource:DATA_SOURCE"
"imas_streams.data_source" = "imas_streams.muscle3_config:DATA_SOURCE"

[tool.setuptools_scm]
local_scheme = "no-local-version"
Expand Down
32 changes: 24 additions & 8 deletions src/imas_streams/cli.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
import logging
import sys

import click
import imas
Expand Down Expand Up @@ -71,20 +70,37 @@ def kafka_to_imasentry(
entry.put_slice(result)


@main.command()
def kafka_to_muscle3():
"""MUSCLE3 actor consuming streaming IMAS data from a Kafka topic and making it
available to a MUSCLE3 workflow.
"""
def _ensure_kafka_muscle3_dependencies():
"""Ensure optional dependencies are available"""
# Ensure optional dependencies are available
try:
import libmuscle # noqa: F401

import imas_streams.kafka # noqa: F401
except ModuleNotFoundError:
click.echo("Error: please install the optional kafka and muscle3 dependencies.")
sys.exit(1)
raise click.ClickException(
"Please install the optional kafka and muscle3 dependencies."
) from None


@main.command()
def kafka_to_muscle3():
"""MUSCLE3 actor consuming streaming IMAS data from a Kafka topic and making it
available to a MUSCLE3 workflow.
"""
_ensure_kafka_muscle3_dependencies()
from imas_streams.muscle3_datasource import data_source

data_source()


@main.command
def dynamic_kafka_to_muscle3():
"""MUSCLE3 actor consuming multiple IMAS data streams from Kafka topics, making the
data available to a MUSCLE3 workflow, with the ability to publish the workflow data
back to Kafka.
"""
_ensure_kafka_muscle3_dependencies()
from imas_streams.muscle3_datasource import dynamic_data_source

dynamic_data_source()
9 changes: 6 additions & 3 deletions src/imas_streams/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,13 @@ class KafkaSettings(BaseModel):
model_config = ConfigDict(extra="forbid")


def _create_kafka_topic(settings: KafkaSettings):
def create_kafka_topic(settings: KafkaSettings):
"""Create a new kafka topic.

This will raise an exception when the topic already exists, or if the topic could
not be created (potentially due to missing permissions).
"""
logger.info("Creating topic '%s'...", settings.topic_name)
conf = {"bootstrap.servers": settings.host}
admin_client = AdminClient(conf)

Expand All @@ -70,6 +71,7 @@ def _create_kafka_topic(settings: KafkaSettings):
for _topic, future in fs.items():
# This will raise an exception when the topic exists or could not be created
future.result()
logger.info("Created topic '%s'", settings.topic_name)


class KafkaProducer:
Expand All @@ -95,7 +97,7 @@ def __init__(
self._message_key = f"IMAS-Kafka-{base64.b64encode(random_id).decode()}"

# Create the topic and send the metadata as first message
_create_kafka_topic(settings)
create_kafka_topic(settings)
self._producer.produce(
topic=self._settings.topic_name,
value=self._metadata.model_dump_json().encode(),
Expand All @@ -109,6 +111,7 @@ def __del__(self):
"""Cleanup Kafka Producer resources"""
# Ensure all messages are sent
self._producer.flush()
self._producer.close()

def produce(self, message: bytes) -> None:
"""Produce a time frame to the configured Kafka topic."""
Expand Down Expand Up @@ -156,7 +159,7 @@ def __init__(
settings: KafkaSettings,
stream_consumer_cls: type[StreamConsumer],
*,
timeout: int = DEFAULT_KAFKA_CONSUMER_TIMEOUT,
timeout: float = DEFAULT_KAFKA_CONSUMER_TIMEOUT,
most_recent_only: bool = False,
**stream_consumer_kwargs,
) -> None:
Expand Down
Loading
Loading