From e5a69f784c9053d43b938b5ce93e4a36b95f6e55 Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Wed, 3 Jun 2026 17:33:30 +0200 Subject: [PATCH 1/6] Add CLI to stream data from an IMAS DBEntry --- src/imas_streams/cli.py | 85 +++++++++++++++++++++++++++++++++++++++-- 1 file changed, 81 insertions(+), 4 deletions(-) diff --git a/src/imas_streams/cli.py b/src/imas_streams/cli.py index f763373..4ebc5ae 100644 --- a/src/imas_streams/cli.py +++ b/src/imas_streams/cli.py @@ -3,8 +3,9 @@ import click import imas +from imas.ids_defs import CLOSEST_INTERP, IDS_TIME_MODE_HOMOGENEOUS -from imas_streams import BatchedIDSConsumer +from imas_streams import BatchedIDSConsumer, StreamingIDSProducer @click.group(invoke_without_command=True, no_args_is_help=True) @@ -22,6 +23,84 @@ def main() -> None: ) +@main.command() +@click.argument("imas_uri") +@click.argument("kafka_host") +@click.argument("kafka_topic") +def imasentry_to_kafka(imas_uri: str, kafka_host: str, kafka_topic: str) -> None: + """Stream data from an existing IMAS data entry to a Kafka topic. + + The input data must be limited to dynamic floating point data, and array shapes must + remain constant for all time slices. An error will be displayed when this is not + adhered to. + + \b + Arguments: + IMAS_URI IMAS URI (including IDS and optionally occurrence) with the data to + be streamed. For example: "imas:hdf5?path=./testdata#magnetics". + KAFKA_HOST Kafka host and port (aka bootstrap.servers). E.g. 'localhost:9092'. + KAFKA_TOPIC Name of the kafka topic to stream the data to. + """ + # Local import: kafka is an optional dependency + from imas_streams.kafka import KafkaProducer, KafkaSettings + + # Extract IDS/occurrence + base_uri, _, ids_and_occurrence = imas_uri.partition("#") + idsname, _, occurrence = ids_and_occurrence.partition(":") + if not idsname: + raise click.UsageError( + f"Invalid IMAS URI '{imas_uri}': no IDS name given. Hint: " + "add '#' to your URI." + ) + if occurrence: + try: + occurrence = int(occurrence) + except ValueError: + raise click.UsageError( + f"Invalid IMAS URI '{imas_uri}': " + f"occurrence '{occurrence}' is not an integer." + ) from None + else: + occurrence = 0 + + with imas.DBEntry(base_uri, "r") as entry: + # Ensure IDS uses homogeneous time, extract all time points + lazy_ids = entry.get(idsname, occurrence, lazy=True, autoconvert=False) + if lazy_ids.ids_properties.homogeneous_time != IDS_TIME_MODE_HOMOGENEOUS: + raise click.ClickException("The loaded IDS is not using homogeneous time.") + times = lazy_ids.time[:] + del lazy_ids + + # Get first time slice to obtain the static and metadata + ids = entry.get_slice( + idsname, times[0], CLOSEST_INTERP, occurrence, autoconvert=False + ) + ids_producer = StreamingIDSProducer(ids) + + kafka_producer = KafkaProducer( + KafkaSettings(host=kafka_host, topic_name=kafka_topic), + ids_producer.metadata, + ) + + # Send first time slice + kafka_producer.produce(bytes(ids_producer.create_message(ids))) + + # Send remaining time slices + with click.progressbar( + times[1:], label="Streaming time slices", show_pos=True + ) as bar: + for time in bar: + ids = entry.get_slice( + idsname, + time, + CLOSEST_INTERP, + occurrence, + autoconvert=False, + lazy=True, + ) + kafka_producer.produce(bytes(ids_producer.create_message(ids))) + + @main.command() @click.argument("kafka_host") @click.argument("kafka_topic") @@ -43,11 +122,9 @@ def kafka_to_imasentry( ): """Consume streaming IMAS data from Kafka and store data in an IMAS Data Entry. - N.B. This program requires the optional kafka dependency. - \b Arguments: - KAFKA_HOST Kafka host and port (aka bootstrap.servers). E.g. 'localhost:9092' + KAFKA_HOST Kafka host and port (aka bootstrap.servers). E.g. 'localhost:9092'. KAFKA_TOPIC Name of the kafka topic with streaming IMAS data. IMAS_URI IMAS URI to store the data at, for example 'imas:hdf5?path=./out'. The program will not overwrite existing data (unless the --overwrite From 4effc2648b4e6b5a2239ae0e735e4c95fd31eede Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Thu, 4 Jun 2026 16:36:43 +0200 Subject: [PATCH 2/6] Add option to stream DBEntry to kafka without get_slice --- src/imas_streams/cli.py | 32 +++++++++++++++++++++++----- src/imas_streams/producer.py | 41 +++++++++++++++++++++++++++++++++++- 2 files changed, 67 insertions(+), 6 deletions(-) diff --git a/src/imas_streams/cli.py b/src/imas_streams/cli.py index 4ebc5ae..27eacb0 100644 --- a/src/imas_streams/cli.py +++ b/src/imas_streams/cli.py @@ -27,7 +27,14 @@ def main() -> None: @click.argument("imas_uri") @click.argument("kafka_host") @click.argument("kafka_topic") -def imasentry_to_kafka(imas_uri: str, kafka_host: str, kafka_topic: str) -> None: +@click.option( + "--get", + is_flag=True, + help="Get full IDS instead of iteratively requesting a time slice with get_slice.", +) +def imasentry_to_kafka( + imas_uri: str, kafka_host: str, kafka_topic: str, get: bool +) -> None: """Stream data from an existing IMAS data entry to a Kafka topic. The input data must be limited to dynamic floating point data, and array shapes must @@ -63,31 +70,46 @@ def imasentry_to_kafka(imas_uri: str, kafka_host: str, kafka_topic: str) -> None else: occurrence = 0 + logging.info("Opening data entry...") with imas.DBEntry(base_uri, "r") as entry: + logging.info("Reading IDS...") # Ensure IDS uses homogeneous time, extract all time points lazy_ids = entry.get(idsname, occurrence, lazy=True, autoconvert=False) if lazy_ids.ids_properties.homogeneous_time != IDS_TIME_MODE_HOMOGENEOUS: raise click.ClickException("The loaded IDS is not using homogeneous time.") times = lazy_ids.time[:] del lazy_ids + logging.info("Found %d time slices to stream", len(times)) # Get first time slice to obtain the static and metadata ids = entry.get_slice( idsname, times[0], CLOSEST_INTERP, occurrence, autoconvert=False ) ids_producer = StreamingIDSProducer(ids) - kafka_producer = KafkaProducer( KafkaSettings(host=kafka_host, topic_name=kafka_topic), ids_producer.metadata, ) - # Send first time slice - kafka_producer.produce(bytes(ids_producer.create_message(ids))) + if get: + logging.info("Loading full IDS...") + ids = entry.get(idsname, occurrence) + logging.info("IDS loaded.") + + with click.progressbar( + ids_producer.messages_from_batch(ids), + length=len(times), + label="Streaming time slices", + show_pos=True, + update_min_steps=1001, + ) as bar: + for data in bar: + kafka_producer.produce(bytes(data)) + return # Send remaining time slices with click.progressbar( - times[1:], label="Streaming time slices", show_pos=True + times, label="Streaming time slices", show_pos=True ) as bar: for time in bar: ids = entry.get_slice( diff --git a/src/imas_streams/producer.py b/src/imas_streams/producer.py index 7c2bb93..be08de8 100644 --- a/src/imas_streams/producer.py +++ b/src/imas_streams/producer.py @@ -1,4 +1,5 @@ import copy +from collections.abc import Iterator import imas import numpy as np @@ -9,7 +10,9 @@ from imas_streams.metadata import DynamicData, StreamingIMASMetadata -def _metadata_from_time_slice(time_slice: IDSToplevel, static_paths: list[str]): +def _metadata_from_time_slice( + time_slice: IDSToplevel, static_paths: list[str] +) -> StreamingIMASMetadata: # -- Data sanity checks -- # The IDS must use homogeneous time mode if time_slice.ids_properties.homogeneous_time != IDS_TIME_MODE_HOMOGENEOUS: @@ -191,6 +194,7 @@ def metadata(self) -> StreamingIMASMetadata: return self._metadata def create_message(self, time_slice: IDSToplevel) -> bytearray: + """Create a single IMAS Streams message from the provided time slice.""" buffer = bytearray(self._buffersize) curindex = 0 for dyndata in self._metadata.dynamic_data: @@ -214,3 +218,38 @@ def create_message(self, time_slice: IDSToplevel) -> bytearray: if not (curindex == len(buffer) == self._buffersize): raise RuntimeError("Internal error: incorrect size of data buffer") return buffer + + def messages_from_batch(self, ids: IDSToplevel) -> Iterator[bytearray]: + """Create an IMAS Streams message for each time slice in the provided IDS. + + N.B. This method currently doesn't support streaming dynamic arrays of + structures. + """ + if ids.ids_properties.homogeneous_time != IDS_TIME_MODE_HOMOGENEOUS: + raise ValueError("The provided IDS doesn't use homogeneous time") + + nodes = [] + for dyndata in self._metadata.dynamic_data: + node = ids[dyndata.path] + if ( + dyndata.path != "time" + and not node.metadata.coordinates[-1].is_time_coordinate + ): + raise NotImplementedError( + "messages_from_batch() does not implement streaming data in dynamic" + " arrays of structures. Please use create_message() instead." + ) + nodes.append(node) + + buffer = bytearray(self._buffersize) + for i in range(len(ids.time)): + curindex = 0 + for node in nodes: + arr: np.ndarray = node.value[..., i] + nbytes = arr.nbytes + buffer[curindex : curindex + nbytes] = arr.tobytes() + curindex += nbytes + + if not (curindex == len(buffer) == self._buffersize): + raise RuntimeError("Internal error: incorrect size of data buffer") + yield buffer From e4cee1456732db56ca58dafce5071f13f49949ad Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Fri, 5 Jun 2026 09:44:53 +0200 Subject: [PATCH 3/6] Remove ymmsl version check in tests --- tests/test_muscle3.py | 4 ---- 1 file changed, 4 deletions(-) diff --git a/tests/test_muscle3.py b/tests/test_muscle3.py index 858fc6e..26542b6 100644 --- a/tests/test_muscle3.py +++ b/tests/test_muscle3.py @@ -31,10 +31,6 @@ """ -@pytest.mark.xfail( - tuple(map(int, ymmsl.__version__.split(".")[:3])) < (0, 15, 1), - reason="Test needs YMMSL Entry Points plugins", -) def test_load_ymmsl_config(): config = ymmsl.load_as(Configuration, ymmsl_config) resolve(Reference([]), config) From 1ca9272519d1d2e96adce1003b505e4db15192d6 Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Fri, 5 Jun 2026 10:33:55 +0200 Subject: [PATCH 4/6] Add CLI option to restrict number of time slices streamed from a data entry --- src/imas_streams/cli.py | 17 ++++++++++++++--- 1 file changed, 14 insertions(+), 3 deletions(-) diff --git a/src/imas_streams/cli.py b/src/imas_streams/cli.py index 27eacb0..702393b 100644 --- a/src/imas_streams/cli.py +++ b/src/imas_streams/cli.py @@ -7,6 +7,8 @@ from imas_streams import BatchedIDSConsumer, StreamingIDSProducer +_PROGRESS_BAR_UPDATE_MINSTEP = 1001 + @click.group(invoke_without_command=True, no_args_is_help=True) @click.version_option() @@ -32,8 +34,9 @@ def main() -> None: is_flag=True, help="Get full IDS instead of iteratively requesting a time slice with get_slice.", ) +@click.option("-n", default=0, help="Maximum number of time slices to stream") def imasentry_to_kafka( - imas_uri: str, kafka_host: str, kafka_topic: str, get: bool + imas_uri: str, kafka_host: str, kafka_topic: str, get: bool, n: int ) -> None: """Stream data from an existing IMAS data entry to a Kafka topic. @@ -80,6 +83,9 @@ def imasentry_to_kafka( times = lazy_ids.time[:] del lazy_ids logging.info("Found %d time slices to stream", len(times)) + if n and n < len(times): + logging.info("Streaming first %d time slices", n) + times = times[:n] # Get first time slice to obtain the static and metadata ids = entry.get_slice( @@ -101,10 +107,15 @@ def imasentry_to_kafka( length=len(times), label="Streaming time slices", show_pos=True, - update_min_steps=1001, + update_min_steps=_PROGRESS_BAR_UPDATE_MINSTEP, ) as bar: - for data in bar: + for i, data in enumerate(bar): + if i == n: + break kafka_producer.produce(bytes(data)) + # Make bar go to 100% + bar.make_step(len(times) % _PROGRESS_BAR_UPDATE_MINSTEP) + bar.render_progress() return # Send remaining time slices From dfd32ff5aabbc0fe372e4c97000a9fa8dd00be70 Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Fri, 5 Jun 2026 13:12:23 +0200 Subject: [PATCH 5/6] Fix bug with num samples --- src/imas_streams/cli.py | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/src/imas_streams/cli.py b/src/imas_streams/cli.py index 702393b..6408d7a 100644 --- a/src/imas_streams/cli.py +++ b/src/imas_streams/cli.py @@ -86,6 +86,7 @@ def imasentry_to_kafka( if n and n < len(times): logging.info("Streaming first %d time slices", n) times = times[:n] + n = len(times) # Get first time slice to obtain the static and metadata ids = entry.get_slice( @@ -104,7 +105,7 @@ def imasentry_to_kafka( with click.progressbar( ids_producer.messages_from_batch(ids), - length=len(times), + length=n, label="Streaming time slices", show_pos=True, update_min_steps=_PROGRESS_BAR_UPDATE_MINSTEP, @@ -114,7 +115,7 @@ def imasentry_to_kafka( break kafka_producer.produce(bytes(data)) # Make bar go to 100% - bar.make_step(len(times) % _PROGRESS_BAR_UPDATE_MINSTEP) + bar.make_step(n % _PROGRESS_BAR_UPDATE_MINSTEP) bar.render_progress() return From 7ae7a59e3f65b4aca72dd840df871c6fc09c8775 Mon Sep 17 00:00:00 2001 From: Maarten Sebregts Date: Fri, 5 Jun 2026 16:29:38 +0200 Subject: [PATCH 6/6] Disable autoconvert with full get --- src/imas_streams/cli.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/imas_streams/cli.py b/src/imas_streams/cli.py index 6408d7a..608066a 100644 --- a/src/imas_streams/cli.py +++ b/src/imas_streams/cli.py @@ -100,7 +100,7 @@ def imasentry_to_kafka( if get: logging.info("Loading full IDS...") - ids = entry.get(idsname, occurrence) + ids = entry.get(idsname, occurrence, autoconvert=False) logging.info("IDS loaded.") with click.progressbar(