diff --git a/src/power_grid_model_ds/_core/power_grid_model_interface.py b/src/power_grid_model_ds/_core/power_grid_model_interface.py index 48850ce9..77eca919 100644 --- a/src/power_grid_model_ds/_core/power_grid_model_interface.py +++ b/src/power_grid_model_ds/_core/power_grid_model_interface.py @@ -100,6 +100,25 @@ def calculate_power_flow( ) return self.output_data + def calculate_state_estimation( + self, + calculation_method: CalculationMethod = CalculationMethod.iterative_linear, + update_data: dict | None = None, + **kwargs, + ): + """Initialize the PowerGridModel and calculate state estimation over input data. + + If input data is not available, self.create_input_from_grid() will be called to create it. + + Returns output of the state estimation calculation (also stored in self.output_data) + """ + self.model = self.model or self.setup_model() + + self.output_data = self.model.calculate_state_estimation( + calculation_method=calculation_method, update_data=update_data, **kwargs + ) + return self.output_data + def _create_power_grid_array(self, array_name: str) -> np.ndarray: """Create power grid model array""" internal_array = getattr(self.grid, array_name) diff --git a/tests/integration/loadflow/test_power_grid_model.py b/tests/integration/loadflow/test_power_grid_model.py index 82e8a28d..6178add5 100644 --- a/tests/integration/loadflow/test_power_grid_model.py +++ b/tests/integration/loadflow/test_power_grid_model.py @@ -8,7 +8,7 @@ import numpy as np import pytest -from power_grid_model import ComponentType, TapChangingStrategy, initialize_array +from power_grid_model import ComponentType, MeasuredTerminalType, TapChangingStrategy, initialize_array from power_grid_model_ds._core.data_source.generator.grid_generators import RadialGridGenerator from power_grid_model_ds._core.model.grids.base import Grid @@ -114,6 +114,39 @@ def test_grid_with_automatic_tap_regulator(self, grid_with_tap_regulator: Grid): assert output["transformer_tap_regulator"]["tap_pos"][0] > 0 +class TestCalculateStateEstimation: + def test_simple_grid_with_sensors(self, simple_loadflow_grid: Grid): + core_interface = PowerGridModelInterface(grid=simple_loadflow_grid) + input_data = core_interface.create_input_from_grid() + + voltage_sensors = initialize_array("input", "sym_voltage_sensor", 2) + voltage_sensors["id"] = [10, 11] + voltage_sensors["measured_object"] = [0, 1] + voltage_sensors["u_sigma"] = [1.0, 1.0] + voltage_sensors["u_measured"] = [10_500.0, 10_497.5] + voltage_sensors["u_angle_measured"] = [0.0, -0.0002] + + power_sensors = initialize_array("input", "sym_power_sensor", 2) + power_sensors["id"] = [12, 13] + power_sensors["measured_object"] = [3, 4] + power_sensors["measured_terminal_type"] = [MeasuredTerminalType.source, MeasuredTerminalType.load] + power_sensors["power_sigma"] = [100.0, 100.0] + power_sensors["p_measured"] = [-250_000.0, 250_000.0] + power_sensors["q_measured"] = [-50_000.0, 50_000.0] + power_sensors["p_sigma"] = [100.0, 100.0] + power_sensors["q_sigma"] = [100.0, 100.0] + + input_data["sym_voltage_sensor"] = voltage_sensors + input_data["sym_power_sensor"] = power_sensors + + output = core_interface.calculate_state_estimation() + + assert output is core_interface.output_data + assert output["node"]["u"][0] == pytest.approx(10_498.75, 0.1) + assert output["node"]["u"][1] == pytest.approx(10_498.75, 0.1) + assert all(output["line"]["i_from"] > 0) + + @pytest.fixture def base_grid() -> Grid: grid_generator = RadialGridGenerator(grid_class=Grid, nr_nodes=5, nr_sources=1, nr_nops=0)