diff --git a/examples/nest-simulation/tests/test_examples.py b/examples/nest-simulation/tests/test_examples.py index f7c65953..28fe210f 100644 --- a/examples/nest-simulation/tests/test_examples.py +++ b/examples/nest-simulation/tests/test_examples.py @@ -62,7 +62,7 @@ def test_json_example(self): self.scaffold.compile() self._test_scaffold_results() results = self.scaffold.run_simulation("basal_activity") - self._test_simulation_results(results.spiketrains) + self._test_simulation_results(results.block.segments[0].spiketrains) def test_yaml_example(self): self.cfg = parse_configuration_file( @@ -72,7 +72,7 @@ def test_yaml_example(self): self.scaffold.compile() self._test_scaffold_results() results = self.scaffold.run_simulation("basal_activity") - self._test_simulation_results(results.spiketrains) + self._test_simulation_results(results.block.segments[0].spiketrains) def test_python_example(self): import scripts.guide_nest # noqa: F401 diff --git a/examples/neuron-simulation/tests/test_examples.py b/examples/neuron-simulation/tests/test_examples.py index 8ecd2208..0cefa8d1 100644 --- a/examples/neuron-simulation/tests/test_examples.py +++ b/examples/neuron-simulation/tests/test_examples.py @@ -67,7 +67,7 @@ def test_json_example(self): self.scaffold.compile() self._test_scaffold_results() results = self.scaffold.run_simulation("neuronsim") - self._test_simulation_results(results.analogsignals) + self._test_simulation_results(results.block.segments[0].analogsignals) def test_yaml_example(self): self.cfg = parse_configuration_file( @@ -77,7 +77,7 @@ def test_yaml_example(self): self.scaffold.compile() self._test_scaffold_results() results = self.scaffold.run_simulation("neuronsim") - self._test_simulation_results(results.analogsignals) + self._test_simulation_results(results.block.segments[0].analogsignals) def test_python_example(self): import scripts.guide_neuron # noqa: F401 diff --git a/packages/bsb-arbor/bsb_arbor/adapter.py b/packages/bsb-arbor/bsb_arbor/adapter.py index de875018..b5da822f 100644 --- a/packages/bsb-arbor/bsb_arbor/adapter.py +++ b/packages/bsb-arbor/bsb_arbor/adapter.py @@ -25,11 +25,11 @@ class ArborSimulationData(SimulationData): Container class for simulation data. """ - def __init__(self, simulation): + def __init__(self, simulation, filename): """ Container class for simulation data. """ - super().__init__(simulation) + super().__init__(simulation, filename=filename) self.arbor_sim: arbor.simulation = None @@ -375,11 +375,13 @@ def __init__(self, comm=None): super().__init__(comm) self.simdata: dict[ArborSimulation, ArborSimulationData] = {} - def prepare(self, simulation: "ArborSimulation") -> ArborSimulationData: + def prepare( + self, simulation: "ArborSimulation", filename=None + ) -> ArborSimulationData: """ Prepares the arbor simulation engine with the given simulation. """ - simdata = self._create_simdata(simulation) + simdata = self._create_simdata(simulation, filename) try: context = arbor.context(arbor.proc_allocation(threads=simulation.threads)) if self.comm.get_size() > 1: @@ -466,8 +468,8 @@ def get_recipe(self, simulation, simdata=None): self._cache_devices(simulation, simdata) return ArborRecipe(simulation, simdata) - def _create_simdata(self, simulation): - self.simdata[simulation] = simdata = ArborSimulationData(simulation) + def _create_simdata(self, simulation, filename): + self.simdata[simulation] = simdata = ArborSimulationData(simulation, filename) self._assign_chunks(simulation, simdata) return simdata diff --git a/packages/bsb-core/bsb/cli/commands/_commands.py b/packages/bsb-core/bsb/cli/commands/_commands.py index cc9c402a..429ea238 100644 --- a/packages/bsb-core/bsb/cli/commands/_commands.py +++ b/packages/bsb-core/bsb/cli/commands/_commands.py @@ -229,13 +229,11 @@ def handler(self, context): level=0, ) try: - result = network.run_simulation(sim_name) + network.run_simulation(sim_name, output_filename=root / f"{uuid4()}.nio") except NodeNotFoundError as e: append = ", " if len(network.simulations) else "" append += ", ".join(f"'{name}'" for name in extra_simulations) errr.wrap(type(e), e, append=append) - else: - result.write(root / f"{uuid4()}.nio", "ow") def get_options(self): return { diff --git a/packages/bsb-core/bsb/core.py b/packages/bsb-core/bsb/core.py index 9a1125f8..caf8ec25 100644 --- a/packages/bsb-core/bsb/core.py +++ b/packages/bsb-core/bsb/core.py @@ -449,7 +449,7 @@ def run_pipelines(self, fail_fast=True, pipelines=None): pool.schedule(pipelines) pool.execute() - def run_simulation(self, simulation_name: str): + def run_simulation(self, simulation_name: str, output_filename: str = None): """ Run a simulation starting from the default single-instance adapter. @@ -460,7 +460,7 @@ def run_simulation(self, simulation_name: str): adapter = get_simulation_adapter( simulation.simulator, comm=self._comm.get_communicator() ) - return adapter.simulate(simulation)[0] + return adapter.simulate(simulation, filename=output_filename)[0] def get_simulation(self, sim_name: str) -> Simulation: """ diff --git a/packages/bsb-core/bsb/simulation/adapter.py b/packages/bsb-core/bsb/simulation/adapter.py index b31e28a6..b365c303 100644 --- a/packages/bsb-core/bsb/simulation/adapter.py +++ b/packages/bsb-core/bsb/simulation/adapter.py @@ -66,7 +66,7 @@ def use_bar(self): class SimulationData: - def __init__(self, simulation: "Simulation", result=None): + def __init__(self, simulation: "Simulation", result=None, filename=None): self.chunks = None self.populations = dict() self.placement: dict[CellModel, PlacementSet] = { @@ -75,7 +75,7 @@ def __init__(self, simulation: "Simulation", result=None): self.connections = dict() self.devices = dict() if result is None: - result = SimulationResult(simulation) + result = SimulationResult(simulation, filename=filename) self.result: SimulationResult = result @@ -92,7 +92,7 @@ def __init__(self, comm=None): self._duration = None self.current_checkpoint = 0 - def simulate(self, *simulations, post_prepare=None): + def simulate(self, *simulations, post_prepare=None, filename=None): """ Simulate the given simulations. @@ -113,7 +113,7 @@ def simulate(self, *simulations, post_prepare=None): self._controllers.append(listener) for simulation in simulations: - data = self.prepare(simulation) + data = self.prepare(simulation, filename) alldata.append(data) for hook in simulation.post_prepare: hook(self, simulation, data) @@ -123,7 +123,7 @@ def simulate(self, *simulations, post_prepare=None): return self.collect(results) @abc.abstractmethod - def prepare(self, simulation): # pragma: nocover + def prepare(self, simulation, filename=None): # pragma: nocover """ Reset the simulation backend and prepare for the given simulation. diff --git a/packages/bsb-core/bsb/simulation/results.py b/packages/bsb-core/bsb/simulation/results.py index 1547d22e..f1b8f8e1 100644 --- a/packages/bsb-core/bsb/simulation/results.py +++ b/packages/bsb-core/bsb/simulation/results.py @@ -9,22 +9,41 @@ class SimulationResult: - def __init__(self, simulation): - from neo import Block + def __init__(self, simulation, filename=None): + from neo import Block, io tree = simulation.__tree__() with contextlib.suppress(KeyError): del tree["post_prepare"] - self.block = Block(name=simulation.name, config=tree) + if filename: + self.filename = filename + self.name = simulation.name + io = io.NixIO(filename, mode="rw") + io.write(Block(name=self.name, nix_name=self.name, config=tree)) + for i, nixblock in enumerate(io.nix_file.blocks): + if self.name == nixblock.name: + self.block_id = i + io.close() + else: + self.block = Block( + name=simulation.name, nix_name=simulation.name, config=tree + ) + self.recorders = [] @property - def spiketrains(self): - return self.block.segments[0].spiketrains + def analogsignals(self): + if hasattr(self, "block"): + return self.block.segments[0].analogsignals + else: + return [] @property - def analogsignals(self): - return self.block.segments[0].analogsignals + def spiketrains(self): + if hasattr(self, "block"): + return self.block.segments[0].spiketrains + else: + return [] def add(self, recorder): self.recorders.append(recorder) @@ -36,21 +55,29 @@ def create_recorder(self, flush: typing.Callable[["neo.core.Segment"], None]): return recorder def flush(self): - from neo import Segment + from neo import Segment, io segment = Segment() - self.block.segments.append(segment) for recorder in self.recorders: try: recorder.flush(segment) except Exception: traceback.print_exc() warn("Recorder errored out!") + if hasattr(self, "filename"): + out_stream = io.NixIO(self.filename, mode="rw") + block = out_stream.nix_file.blocks[self.block_id] + out_stream._write_segment(segment, block) + out_stream.close() + del segment + else: + self.block.segments.append(segment) def write(self, filename, mode): from neo import io - io.NixIO(filename, mode=mode).write(self.block) + if hasattr(self, "block"): + io.NixIO(filename, mode=mode).write(self.block) class SimulationRecorder: diff --git a/packages/bsb-core/tests/test_simulation.py b/packages/bsb-core/tests/test_simulation.py index 7ce18690..2a89d175 100644 --- a/packages/bsb-core/tests/test_simulation.py +++ b/packages/bsb-core/tests/test_simulation.py @@ -1,13 +1,15 @@ import unittest import numpy as np -from bsb_arbor import SpikeRecorder +from bsb_arbor import ArborSimulation, SpikeRecorder from bsb_test import FixedPosConfigFixture, NumpyTestCase, RandomStorageFixture +from neo import AnalogSignal, io from bsb import ( MPI, AttributeMissingError, Scaffold, + SimulationResult, config, get_simulation_adapter, options, @@ -312,7 +314,6 @@ def run_checkpoint(self, kwargs=None): return self._status -@unittest.skipIf(MPI.get_size() > 1, "Skipped during parallel testing.") class TestAdapterControllers( FixedPosConfigFixture, RandomStorageFixture, @@ -396,6 +397,7 @@ def setUp(self): self.network = Scaffold(self.cfg, self.storage) self.network.compile() + @unittest.skipIf(MPI.get_size() > 1, "Skipped during parallel testing.") def test_controller_registration(self): self.network.simulations.test.devices["rec_15"] = dict( device="spike_controller", @@ -427,6 +429,7 @@ def test_controller_registration(self): "Only rec_15 device should be registered", ) + @unittest.skipIf(MPI.get_size() > 1, "Skipped during parallel testing.") def test_registration_of_listener(self): self.network.simulations.test.devices["rec_15"] = dict( device="spike_controller", @@ -451,6 +454,7 @@ def test_registration_of_listener(self): "The first controller should be the listener", ) + @unittest.skipIf(MPI.get_size() > 1, "Skipped during parallel testing.") def test_incorrect_controller(self): @config.node class RottenController( @@ -479,6 +483,7 @@ def get_next_checkpoint(self): with self.assertRaises(AttributeMissingError): adapter.prepare(sim) + @unittest.skipIf(MPI.get_size() > 1, "Skipped during parallel testing.") def test_record_checkpoint(self): """Create a test with an AdapterController that flushes every 10 steps, so with a simulation of 100 of duration it will create 10 segments plus @@ -516,6 +521,7 @@ def test_record_checkpoint(self): "Spike times in segment 6 fall outside the expected range (60–70).", ) + @unittest.skipIf(MPI.get_size() > 1, "Skipped during parallel testing.") def test_two_controllers(self): """Test two AdapterController instances together, one configured with 15 steps and the other with 40 steps, it will flush at [ 15,30,40,45,60,75,80,90]. @@ -563,3 +569,142 @@ def test_two_controllers(self): & (segments[-1].spiketrains[0].magnitude < 100), "Spike times in last segment fall outside the expected range (90-100).", ) + + @unittest.skipIf(MPI.get_size() > 1, "Skipped during parallel testing.") + def test_checkpoints_with_triple_sim(self): + """This test checks that if two simulations are run the results are written in two + separate blocks inside the same file""" + self.network.simulations.test.devices["new_recorder"] = dict( + device="spike_controller", + targetting={ + "strategy": "cell_model", + "cell_models": ["test_cell"], + }, + step=10, + ) + sim_2 = dict( + simulator="arbor", + duration=50, + resolution=0.5, + cell_models={ + "test_cell": { + "model_strategy": "lif", + "constants": { + "C_m": 250, + "tau_m": 20, + "t_ref": 2.0, + "E_L": 0.0, + "E_R": 0.0, + "V_m": 0.0, + "V_th": 20, + }, + }, + "h_cell": { + "model_strategy": "lif", + "constants": { + "C_m": 250, + "tau_m": 20, + "t_ref": 2.0, + "E_L": 0.0, + "E_R": 0.0, + "V_m": 0.0, + "V_th": 20, + }, + }, + }, + connection_models={ + "test_to_h_cell": {"weight": 20.68015524367846, "delay": 1.5} + }, + devices=dict( + pg={ + "device": "poisson_generator", + "rate": 1600, + "targetting": {"strategy": "all"}, + "weight": 2000, + "delay": 1.5, + }, + new_recorder=dict( + device="spike_controller", + targetting={ + "strategy": "cell_model", + "cell_models": ["test_cell"], + }, + step=10, + ), + ), + ) + self.network.simulations["sim_2"] = ArborSimulation(sim_2) + self.network.simulations["sim_3"] = ArborSimulation(sim_2) + + nio_file = "out.nio" + self.network.run_simulation("test", output_filename=nio_file) + self.network.run_simulation("sim_2", output_filename=nio_file) + self.network.run_simulation("sim_3", output_filename=nio_file) + + written_results = io.NixIO(nio_file, "ro") + blocks = written_results.read_all_blocks() + self.assertEqual(len(blocks), 3) + self.assertEqual(len(blocks[0].segments), 11) + self.assertEqual(len(blocks[1].segments), 6) + self.assertEqual(len(blocks[2].segments), 6) + import os + + os.remove(nio_file) + + def test_RAM_usage(self): + """ + This test writes over 7 GB of data in 60 segments of 120 MB each. + + It verifies that memory usage, after the initial steps, remains stable + within a 5 MB threshold. It also checks that peak memory usage never + exceeds 500 MB. + If PLOT_GRAPH is set to True, it will also plot the graph of memory + values collected after every flush. + """ + import os + import tracemalloc + + from quantities import ms + + tracemalloc.start() + PLOT_GRAPH = False + MB = 1024 * 1024 + total_threshold = 500 + rank = MPI.get_rank() + + sim = self.network.simulations.test + nio_file = "out" + str(rank) + ".nio" + my_result = SimulationResult(sim, nio_file) + + def my_flush(segment): + # Creates a signal of 120 MB + signal = np.ones(15 * MB) + segment.analogsignals.append( + AnalogSignal(signal, sampling_period=0.1 * ms, units="mV") + ) + + my_result.create_recorder(my_flush) + + num_samples = 60 + mem_size = np.zeros(num_samples) + for sample in range(num_samples): + my_result.flush() + mem_size_after, mem_peak = tracemalloc.get_traced_memory() + mem_size[sample] = mem_size_after / MB + + _, mem_peak = tracemalloc.get_traced_memory() + mean = np.mean(mem_size[5::]) + max_dev = np.max(np.abs(mem_size[5::] - mean)) + if PLOT_GRAPH: + import matplotlib.pyplot as plt + + plt.plot(mem_size) + plt.xlabel("step") + plt.ylabel("Mem Size") + plt.ylim(0, 400) + plt.grid(True) + plt.show() + + self.assertClose(0, max_dev, atol=5.0) + self.assertLess(mem_peak / MB, total_threshold) + os.remove("out" + str(rank) + ".nio") diff --git a/packages/bsb-nest/bsb_nest/adapter.py b/packages/bsb-nest/bsb_nest/adapter.py index 6fc2e5bc..d10c9f62 100644 --- a/packages/bsb-nest/bsb_nest/adapter.py +++ b/packages/bsb-nest/bsb_nest/adapter.py @@ -51,14 +51,16 @@ def __init__(self, comm=None): self.loaded_modules = set() self._prev_chkpoint = 0 - def simulate(self, *simulations, post_prepare=None): + def simulate(self, *simulations, post_prepare=None, filename=None): try: self.reset_kernel() - return super().simulate(*simulations, post_prepare=post_prepare) + return super().simulate( + *simulations, post_prepare=post_prepare, filename=filename + ) finally: self.reset_kernel() - def prepare(self, simulation): + def prepare(self, simulation, filename=None): """ Prepare the simulation environment in NEST. @@ -80,7 +82,7 @@ def prepare(self, simulation): :rtype: bsb.simulation.adapter.SimulationData """ self.simdata[simulation] = SimulationData( - simulation, result=NestResult(simulation) + simulation, result=NestResult(simulation, filename) ) try: report("Installing NEST modules...", level=2) diff --git a/packages/bsb-nest/tests/test_nest.py b/packages/bsb-nest/tests/test_nest.py index e7128ef9..d1b5d66f 100644 --- a/packages/bsb-nest/tests/test_nest.py +++ b/packages/bsb-nest/tests/test_nest.py @@ -8,7 +8,12 @@ from bsb.core import Scaffold from bsb.services import MPI from bsb_test import NumpyTestCase, RandomStorageFixture, get_test_config -from nest.lib.hl_api_exceptions import NESTErrors +from packaging.version import Version + +if Version(nest.__version__) >= Version("3.10"): + from nest import NESTErrors +else: + from nest.lib.hl_api_exceptions import NESTErrors from scipy.optimize import curve_fit from bsb_nest import NestAdapter diff --git a/packages/bsb-neuron/bsb_neuron/adapter.py b/packages/bsb-neuron/bsb_neuron/adapter.py index d00ec34f..8402ca50 100644 --- a/packages/bsb-neuron/bsb_neuron/adapter.py +++ b/packages/bsb-neuron/bsb_neuron/adapter.py @@ -70,7 +70,7 @@ def engine(self): return engine - def prepare(self, simulation): + def prepare(self, simulation, filename=None): """ Prepare the simulation environment and data structures for running a NEURON simulation. @@ -90,7 +90,8 @@ def prepare(self, simulation): """ self.simdata[simulation] = NeuronSimulationData( - simulation, result=NeuronResult(simulation) + simulation, + result=NeuronResult(simulation, filename=filename), ) try: report("Preparing simulation", level=2) diff --git a/packages/bsb-neuron/tests/test_neuron.py b/packages/bsb-neuron/tests/test_neuron.py index 03f7c87c..63a905a3 100644 --- a/packages/bsb-neuron/tests/test_neuron.py +++ b/packages/bsb-neuron/tests/test_neuron.py @@ -2,9 +2,7 @@ import unittest from copy import copy -from bsb.core import Scaffold -from bsb.services import MPI -from bsb.simulation import get_simulation_adapter +from bsb import MPI, Scaffold, get_simulation_adapter from bsb_test import ( ConfigFixture, MorphologiesFixture, @@ -525,3 +523,80 @@ def test_500ch_manualloop(self): ], receiving_cells, ) + + +class TestCheckpoints( + RandomStorageFixture, + ConfigFixture, + NetworkFixture, + MorphologiesFixture, + unittest.TestCase, + config="complete", + morpho_filters=["3branch"], + engine_name="hdf5", +): + def setUp(self): + import os + + import psutil + + super().setUp() + p.parallel.gid_clear() + for ct in self.network.cell_types.values(): + ct.spatial.morphologies = ["3branch"] + self.network.chunk_size = [50, 50, 50] + hh_soma = { + "cable_types": { + "soma": { + "cable": {"Ra": 10, "cm": 1}, + "mechanisms": {"pas": {}, "hh": {}}, + } + }, + "synapse_types": {"ExpSyn": {}}, + } + devices = { + "spike_generator": { + "device": "spike_generator", + "start": 9, + "number": 8, + "weight": 1, + "delay": 1, + "targetting": { + "strategy": "cell_model", + "cell_models": ["A", "B", "C"], + }, + } + } + + for i in range(20): + devices[str(i)] = { + "device": "voltage_recorder", + "targetting": {"strategy": "cell_model", "cell_models": ["A", "B", "C"]}, + } + + self.network.simulations.add( + "test", + simulator="neuron", + duration=20000, + resolution=0.1, + temperature=32, + cell_models=dict( + A=ArborizedModel(model=hh_soma), + B=ArborizedModel(model=hh_soma), + C=ArborizedModel(model=hh_soma), + ), + connection_models=dict( + A_to_A=TransceiverModel(synapses=[dict(synapse="ExpSyn", delay=1.0)]), + A_to_B=TransceiverModel(synapses=[dict(synapse="ExpSyn", delay=1.0)]), + B_to_C=TransceiverModel(synapses=[dict(synapse="ExpSyn", delay=1.0)]), + C_to_A=TransceiverModel(synapses=[dict(synapse="ExpSyn", delay=1.0)]), + C_to_B=TransceiverModel(synapses=[dict(synapse="ExpSyn", delay=1.0)]), + ), + devices=devices, + ) + self.network.compile() + self.process = psutil.Process(os.getpid()) + # Baseline before the test + self.before_mem = self.process.memory_info().rss + +