diff --git a/doc/release_notes.rst b/doc/release_notes.rst index bae61140..49db41a8 100644 --- a/doc/release_notes.rst +++ b/doc/release_notes.rst @@ -37,6 +37,11 @@ Most users should keep calling ``model.solve(...)``. If you want more control, y * New ``linopy.licensed_solvers``: the subset of installed solvers that currently pass a license check. Handy in tests and for picking a solver at runtime. * New helpers for explicit license checks: ``linopy.solvers.check_solver_licenses("gurobi", "mosek")``, ``Gurobi.license_status()``, ``Gurobi.is_available()``. They return a ``LicenseStatus`` dataclass (``name``, ``ok``, ``message``). +*Constraints — indicator constraints* + +* Add indicator constraints for solvers that support them. They are part of the unified constraints container: ``model.add_indicator_constraints`` returns a ``Constraint`` and the constraint is stored in ``model.constraints`` (filterable via ``model.constraints.indicator`` / ``model.constraints.regular``), so it round-trips through netCDF and ``model.copy()``. + + *Constraints — CSR-backed storage* * Add ``CSRConstraint``: a memory-efficient immutable constraint representation backed by scipy CSR sparse matrices. Up to 90% memory savings for constraints with many terms and 30–120× faster matrix generation for direct solver APIs. diff --git a/linopy/common.py b/linopy/common.py index 5c4a30df..9ee9777d 100644 --- a/linopy/common.py +++ b/linopy/common.py @@ -786,7 +786,11 @@ def __init__(self, constraints: Any) -> None: @cached_property def clabels(self) -> np.ndarray: """Active constraint labels in build order, shape (n_active_cons,).""" - label_lists = [c.active_labels() for c in self._constraints.data.values()] + label_lists = [ + c.active_labels() + for c in self._constraints.data.values() + if not c.is_indicator + ] return ( np.concatenate(label_lists) if label_lists else np.array([], dtype=np.intp) ) diff --git a/linopy/constants.py b/linopy/constants.py index a1f4fb76..9ae61d9b 100644 --- a/linopy/constants.py +++ b/linopy/constants.py @@ -92,6 +92,10 @@ class PerformanceWarning(UserWarning): SOS_DIM_ATTR = "sos_dim" SOS_BIG_M_ATTR = "big_m_upper" +# Indicator constraint attribute keys +INDICATOR_BINARY_VAR_ATTR = "indicator_binary_var" +INDICATOR_BINARY_VAL_ATTR = "indicator_binary_val" + class EvolvingAPIWarning(FutureWarning): """ diff --git a/linopy/constraints.py b/linopy/constraints.py index b74dee5c..2b76a072 100644 --- a/linopy/constraints.py +++ b/linopy/constraints.py @@ -179,6 +179,21 @@ def dual(self) -> DataArray: def dual(self, value: DataArray) -> None: """Set the dual values DataArray.""" + @property + @abstractmethod + def is_indicator(self) -> bool: + """Whether the constraint is an indicator constraint.""" + + @property + @abstractmethod + def binary_var(self) -> DataArray | None: + """Get the indicator binary variable labels, or None.""" + + @property + @abstractmethod + def binary_val(self) -> int | np.ndarray | None: + """Get the indicator triggering value(s), or None.""" + @abstractmethod def has_variable(self, variable: variables.Variable) -> bool: """Check if the constraint references any of the given variable labels.""" @@ -509,6 +524,8 @@ class CSRConstraint(ConstraintBase): "_name", "_cindex", "_dual", + "_binvar_labels", + "_binval", ) def __init__( @@ -522,6 +539,8 @@ def __init__( name: str = "", cindex: int | None = None, dual: np.ndarray | None = None, + binvar_labels: np.ndarray | None = None, + binval: int | np.ndarray | None = None, ) -> None: self._csr = csr self._con_labels = con_labels @@ -532,6 +551,8 @@ def __init__( self._name = name self._cindex = cindex self._dual = dual + self._binvar_labels = binvar_labels + self._binval = binval @property def model(self) -> Model: @@ -686,6 +707,20 @@ def dual(self, value: DataArray) -> None: vals = np.asarray(value).ravel() self._dual = vals[self.active_positions] + @property + def is_indicator(self) -> bool: + return self._binvar_labels is not None + + @property + def binary_var(self) -> DataArray | None: + if self._binvar_labels is None: + return None + return self._active_to_dataarray(self._binvar_labels, fill=-1) + + @property + def binary_val(self) -> int | np.ndarray | None: + return self._binval + def _to_dataset(self, nterm: int) -> Dataset: """ Reconstruct labels/coeffs/vars Dataset from the CSR matrix. @@ -743,9 +778,18 @@ def _to_dataset(self, nterm: int) -> Dataset: def data(self) -> Dataset: """Reconstruct the xarray Dataset from the CSR representation.""" ds = self._to_dataset(self.nterm) - extra = {"sign": self.sign, "rhs": self.rhs} + extra: dict[str, Any] = {"sign": self.sign, "rhs": self.rhs} if self._dual is not None: extra["dual"] = self._active_to_dataarray(self._dual, fill=np.nan) + if self._binvar_labels is not None: + extra["binary_var"] = self._active_to_dataarray( + self._binvar_labels, fill=-1 + ) + binval = self._binval + if isinstance(binval, np.ndarray) and binval.ndim > 0: + extra["binary_val"] = self._active_to_dataarray(binval, fill=-1) + else: + extra["binary_val"] = binval return assign_multiindex_safe(ds, **extra).assign_attrs(self.attrs) def __repr__(self) -> str: @@ -833,6 +877,14 @@ def to_netcdf_ds(self) -> Dataset: } if isinstance(self._sign, str): attrs["sign"] = self._sign + if self._binvar_labels is not None: + attrs["is_indicator"] = True + data_vars["_binvar_labels"] = DataArray(self._binvar_labels, dims=["_flat"]) + binval = self._binval + if isinstance(binval, np.ndarray) and binval.ndim > 0: + data_vars["_binval"] = DataArray(binval, dims=["_flat"]) + else: + attrs["binval"] = int(binval) # type: ignore[arg-type] return Dataset(data_vars, attrs=attrs) @classmethod @@ -859,8 +911,23 @@ def from_netcdf_ds(cls, ds: Dataset, model: Model, name: str) -> CSRConstraint: con_labels = np.arange(cindex, cindex + len(rhs), dtype=np.intp) else: con_labels = np.arange(len(rhs), dtype=np.intp) + binvar_labels: np.ndarray | None = None + binval: int | np.ndarray | None = None + if "_binvar_labels" in ds: + binvar_labels = ds["_binvar_labels"].values + binval = ds["_binval"].values if "_binval" in ds else attrs["binval"] return cls( - csr, con_labels, rhs, sign, coords, model, name, cindex=cindex, dual=dual + csr, + con_labels, + rhs, + sign, + coords, + model, + name, + cindex=cindex, + dual=dual, + binvar_labels=binvar_labels, + binval=binval, ) def has_variable(self, variable: variables.Variable) -> bool: @@ -1023,6 +1090,16 @@ def from_mutable( dual = ( con.data["dual"].values.ravel()[active_mask] if "dual" in con.data else None ) + binvar_labels: np.ndarray | None = None + binval: int | np.ndarray | None = None + binary_var = con.binary_var + if binary_var is not None: + binvar_labels = binary_var.values.ravel()[active_mask] + bv = con.binary_val + if isinstance(bv, np.ndarray) and bv.ndim > 0: + binval = bv.ravel()[active_mask] + else: + binval = bv return cls( csr, con_labels, @@ -1033,6 +1110,8 @@ def from_mutable( con.name, cindex=cindex, dual=dual, + binvar_labels=binvar_labels, + binval=binval, ) @@ -1157,6 +1236,20 @@ def rhs(self, value: ExpressionLike) -> None: self.lhs = self.lhs - value.reset_const() self._data = assign_multiindex_safe(self.data, rhs=value.const) + @property + def is_indicator(self) -> bool: + return "binary_var" in self._data + + @property + def binary_var(self) -> DataArray | None: + return self._data.get("binary_var") + + @property + def binary_val(self) -> int | np.ndarray | None: + if "binary_val" in self._data: + return self._data["binary_val"].values + return None + @property def lhs(self) -> expressions.LinearExpression: data = self.data[["coeffs", "vars"]].rename({self.term_dim: TERM_DIM}) @@ -1661,7 +1754,7 @@ def ncons(self) -> int: This excludes constraints with missing labels or where all variables are masked (vars == -1). """ - return sum(con.ncons for con in self.data.values()) + return sum(con.ncons for con in self.data.values() if not con.is_indicator) @property def inequalities(self) -> Constraints: @@ -1677,6 +1770,20 @@ def equalities(self) -> Constraints: """ return self[[n for n, s in self.items() if (s.sign == EQUAL).all()]] + @property + def indicator(self) -> Constraints: + """ + Get the subset of constraints which are indicator constraints. + """ + return self[[n for n, c in self.items() if c.is_indicator]] + + @property + def regular(self) -> Constraints: + """ + Get the subset of constraints which are not indicator constraints. + """ + return self[[n for n, c in self.items() if not c.is_indicator]] + def sanitize_zeros(self) -> None: """ Filter out terms with zero and close-to-zero coefficient. @@ -1865,9 +1972,13 @@ def to_matrix(self) -> tuple[scipy.sparse.csr_array, np.ndarray]: csrs = [] con_labels_list = [] for c in self.data.values(): + if c.is_indicator: + continue csr, con_labels = c.to_matrix(label_index) csrs.append(csr) con_labels_list.append(con_labels) + if not csrs: + raise ValueError("No constraints available to convert to matrix.") return scipy.sparse.vstack(csrs, format="csr"), np.concatenate(con_labels_list) def reset_dual(self) -> None: diff --git a/linopy/io.py b/linopy/io.py index 62f1ca13..7e1e99cf 100644 --- a/linopy/io.py +++ b/linopy/io.py @@ -459,6 +459,61 @@ def sos_to_file( _format_and_write(df, columns, f) +def indicator_constraints_to_file( + m: Model, + f: BufferedWriter, + explicit_coordinate_names: bool = False, +) -> None: + """ + Write indicator constraints to the s.t. section of an LP file. + + Indicator constraints appear in the Subject To section with the format: + ``ic0: x0 = 1 -> +1.0 x1 <= 5.0`` + """ + if not len(m.constraints.indicator): + return + + if not len(m.constraints.regular): + f.write(b"\n\ns.t.\n\n") + + print_variable_scalar, _ = get_printers_scalar( + m, explicit_coordinate_names=explicit_coordinate_names + ) + + for con in m.constraints.indicator.data.values(): + ic_data = con.data + labels_flat = ic_data.labels.values.flatten() + binary_var_flat = ic_data.binary_var.values.flatten() + binary_val_flat = np.broadcast_to( + ic_data.binary_val.values, labels_flat.shape + ).flatten() + coeffs_flat = ic_data.coeffs.values.reshape(len(labels_flat), -1) + vars_flat = ic_data.vars.values.reshape(len(labels_flat), -1) + sign_flat = np.broadcast_to(ic_data.sign.values, labels_flat.shape).flatten() + rhs_flat = np.broadcast_to(ic_data.rhs.values, labels_flat.shape).flatten() + + for i in range(len(labels_flat)): + if labels_flat[i] == -1: + continue + + bvar_name = print_variable_scalar(binary_var_flat[i : i + 1])[0] + valid = vars_flat[i] != -1 + var_names = print_variable_scalar(vars_flat[i][valid]) + + terms = [] + for coeff, var_name in zip(coeffs_flat[i][valid], var_names): + coeff = float(coeff) + prefix = "+" if coeff >= 0 else "" + terms.append(f"{prefix}{coeff} {var_name}") + + lhs_str = " ".join(terms) + line = ( + f"ic{labels_flat[i]}: {bvar_name} = {int(binary_val_flat[i])} -> " + f"{lhs_str} {sign_flat[i]} {float(rhs_flat[i])}\n" + ) + f.write(line.encode()) + + def constraints_to_file( m: Model, f: BufferedWriter, @@ -467,7 +522,8 @@ def constraints_to_file( slice_size: int = 2_000_000, explicit_coordinate_names: bool = False, ) -> None: - if not len(m.constraints): + regular = m.constraints.regular + if not len(regular): return print_variable, print_constraint = get_printers( @@ -475,10 +531,10 @@ def constraints_to_file( ) f.write(b"\n\ns.t.\n\n") - names = m.constraints + names = list(regular) if progress: names = tqdm( - list(names), + names, desc="Writing constraints.", colour=TQDM_COLOR, ) @@ -486,7 +542,7 @@ def constraints_to_file( # to make this even faster, we can use polars expression # https://docs.pola.rs/user-guide/expressions/plugins/#output-data-types for name in names: - con = m.constraints[name] + con = regular[name] for con_slice in con.iterate_slices(slice_size): df = con_slice.to_polars() @@ -547,6 +603,11 @@ def to_lp_file( slice_size=slice_size, explicit_coordinate_names=explicit_coordinate_names, ) + indicator_constraints_to_file( + m, + f=f, + explicit_coordinate_names=explicit_coordinate_names, + ) bounds_to_file( m, f=f, @@ -1092,7 +1153,7 @@ def copy(m: Model, include_solution: bool = False, deep: bool = True) -> Model: Model A deep or shallow copy of the model. """ - from linopy.constraints import Constraint, Constraints + from linopy.constraints import Constraint, ConstraintBase, Constraints from linopy.expressions import LinearExpression from linopy.model import Model, Objective from linopy.variables import Variable, Variables @@ -1122,15 +1183,18 @@ def copy(m: Model, include_solution: bool = False, deep: bool = True) -> Model: new_model, ) + def _copy_con_data(con: ConstraintBase) -> xr.Dataset: + d = con.mutable().data + if include_solution: + return d.copy(deep=deep) + cols = m.constraints.dataset_attrs + ( + ["binary_var", "binary_val"] if con.is_indicator else [] + ) + return d[cols].copy(deep=deep) + new_model._constraints = Constraints( { - name: Constraint( - con.mutable().data.copy(deep=deep) - if include_solution - else con.mutable().data[m.constraints.dataset_attrs].copy(deep=deep), - new_model, - name, - ) + name: Constraint(_copy_con_data(con), new_model, name) for name, con in m.constraints.items() }, new_model, diff --git a/linopy/matrices.py b/linopy/matrices.py index 21b50d2d..34ee4b76 100644 --- a/linopy/matrices.py +++ b/linopy/matrices.py @@ -72,29 +72,55 @@ def _build_vars(self) -> None: def _build_cons(self) -> None: m = self._parent - - if not len(m.constraints): - self.clabels: ndarray = np.array([], dtype=np.intp) - self.b: ndarray = np.array([]) - self.sense: ndarray = np.array([], dtype=object) - self.A: scipy.sparse.csr_array | None = None - return - label_index = m.variables.label_index - csrs = [] - b_list = [] - sense_list = [] + label_to_pos = label_index.label_to_pos + + reg_csrs, reg_b, reg_sense = [], [], [] + ind_csrs, ind_b, ind_sense, ind_binvar, ind_binval = [], [], [], [], [] for c in m.constraints.data.values(): - csr, _, b, sense = c.to_matrix_with_rhs(label_index) - csrs.append(csr) - b_list.append(b) - sense_list.append(sense) - - self.A = cast(scipy.sparse.csr_array, scipy.sparse.vstack(csrs, format="csr")) - self.clabels = m.constraints.label_index.clabels - self.b = np.concatenate(b_list) if b_list else np.array([]) - self.sense = ( - np.concatenate(sense_list) if sense_list else np.array([], dtype=object) + if c.is_indicator: + cc = c if isinstance(c, CSRConstraint) else c.freeze() + csr, _, b, sense = cc.to_matrix_with_rhs(label_index) + ind_csrs.append(csr) + ind_b.append(b) + ind_sense.append(sense) + ind_binvar.append(label_to_pos[cc._binvar_labels]) + binval = cast("int | np.ndarray", cc._binval) + n = len(b) + if np.ndim(binval) == 0: + ind_binval.append(np.full(n, int(binval), dtype=np.intp)) + else: + ind_binval.append(np.asarray(binval, dtype=np.intp).ravel()) + else: + csr, _, b, sense = c.to_matrix_with_rhs(label_index) + reg_csrs.append(csr) + reg_b.append(b) + reg_sense.append(sense) + + self.clabels: ndarray = m.constraints.label_index.clabels + self.A: scipy.sparse.csr_array | None = ( + cast(scipy.sparse.csr_array, scipy.sparse.vstack(reg_csrs, format="csr")) + if reg_csrs + else None + ) + self.b: ndarray = np.concatenate(reg_b) if reg_b else np.array([]) + self.sense: ndarray = ( + np.concatenate(reg_sense) if reg_sense else np.array([], dtype=object) + ) + self.indicator_A: scipy.sparse.csr_array | None = ( + cast(scipy.sparse.csr_array, scipy.sparse.vstack(ind_csrs, format="csr")) + if ind_csrs + else None + ) + self.indicator_b: ndarray = np.concatenate(ind_b) if ind_b else np.array([]) + self.indicator_sense: ndarray = ( + np.concatenate(ind_sense) if ind_sense else np.array([], dtype=object) + ) + self.indicator_binvar: ndarray = ( + np.concatenate(ind_binvar) if ind_binvar else np.array([], dtype=np.intp) + ) + self.indicator_binval: ndarray = ( + np.concatenate(ind_binval) if ind_binval else np.array([], dtype=np.intp) ) @cached_property @@ -157,6 +183,8 @@ def dual(self) -> ndarray: dual_list = [] has_dual = False for c in m.constraints.data.values(): + if c.is_indicator: + continue if isinstance(c, CSRConstraint): # _dual is active-only if c._dual is not None: diff --git a/linopy/model.py b/linopy/model.py index 4fea3176..24ae56b4 100644 --- a/linopy/model.py +++ b/linopy/model.py @@ -297,6 +297,16 @@ def constraints(self) -> Constraints: """ return self._constraints + @property + def indicator_constraints(self) -> Constraints: + """ + Indicator constraints assigned to the model. + + Returns the subset of ``model.constraints`` for which + ``is_indicator`` is True. + """ + return self.constraints.indicator + @property def objective(self) -> Objective: """ @@ -1099,6 +1109,112 @@ def add_constraints( freeze = self.freeze_constraints return self.constraints.add(constraint, freeze=freeze and not self.chunk) + def add_indicator_constraints( + self, + binary_var: Variable, + binary_val: int, + lhs: ConstraintLike | ExpressionLike | VariableLike, + sign: SignLike | None = None, + rhs: ConstantLike | None = None, + name: str | None = None, + ) -> ConstraintBase: + """ + Add indicator constraints to the model. + + An indicator constraint has the form: + (binary_var == binary_val) => (linear_constraint) + + The linear constraint is only enforced when binary_var equals + binary_val. These constraints are handled natively by solvers + like Gurobi and CPLEX via general constraints. + + Parameters + ---------- + binary_var : linopy.Variable + Binary variable serving as the indicator. Must have binary=True. + binary_val : int + Triggering value, must be 0 or 1. + lhs : linopy.Constraint, linopy.LinearExpression, or linopy.Variable + The conditionally enforced constraint. If a LinearExpression or + Variable is passed, ``sign`` and ``rhs`` must also be provided. + sign : str, optional + Constraint sign ('<=', '>=', '='). Required when ``lhs`` is an + expression. + rhs : numeric, optional + Right-hand side. Required when ``lhs`` is an expression. + name : str, optional + Name for the indicator constraint group. + + Returns + ------- + linopy.constraints.ConstraintBase + The added indicator constraint. + """ + if not binary_var.attrs.get("binary", False): + raise ValueError( + "Indicator variable must be binary. " + f"Variable '{binary_var.name}' is not binary." + ) + + if binary_val not in (0, 1): + raise ValueError(f"binary_val must be 0 or 1, got {binary_val}.") + + if name in list(self.constraints): + raise ValueError(f"Constraint '{name}' already assigned to model.") + elif name is None: + name = f"indcon{self._connameCounter}" + self._connameCounter += 1 + + # Build constraint data from lhs + if isinstance(lhs, Constraint): + if sign is not None or rhs is not None: + raise ValueError("sign and rhs must be None when lhs is a Constraint.") + data = lhs.data + elif isinstance(lhs, LinearExpression): + if sign is None or rhs is None: + raise ValueError( + "sign and rhs are required when lhs is a LinearExpression." + ) + sign = maybe_replace_signs(as_dataarray(sign)) + data = lhs.to_constraint(sign, rhs).data + elif isinstance(lhs, Variable | ScalarVariable | ScalarLinearExpression): + if sign is None or rhs is None: + raise ValueError("sign and rhs are required when lhs is a Variable.") + sign = maybe_replace_signs(as_dataarray(sign)) + data = lhs.to_linexpr().to_constraint(sign, rhs).data + elif isinstance(lhs, AnonymousScalarConstraint): + if sign is not None or rhs is not None: + raise ValueError("sign and rhs must be None when lhs is a Constraint.") + data = lhs.to_constraint().data + else: + raise TypeError( + f"lhs must be a Constraint, LinearExpression, or Variable, " + f"got {type(lhs)}." + ) + + data["binary_var"] = binary_var.labels + data["binary_val"] = binary_val + + data["labels"] = -1 + (data,) = xr.broadcast(data, exclude=[TERM_DIM]) + + start = self._cCounter + end = start + data.labels.size + data.labels.values = np.arange(start, end).reshape(data.labels.shape) + self._cCounter += data.labels.size + + data = data.assign_attrs(label_range=(start, end), name=name) + + con = Constraint(data, name=name, model=self, skip_broadcast=True) + freeze = self.freeze_constraints + return self.constraints.add(con, freeze=freeze and not self.chunk) + + def remove_indicator_constraints(self, name: str) -> None: + """ + Remove indicator constraint by name. + """ + self.constraints.remove(name) + def add_objective( self, expr: Variable @@ -1890,6 +2006,8 @@ def assign_result( if len(result.solution.dual): dual = result.solution.dual for _, con in self.constraints.items(): + if con.is_indicator: + continue start, end = con.range coords = {dim: con.coords[dim] for dim in con.coord_dims} con.dual = xr.DataArray( diff --git a/linopy/solvers.py b/linopy/solvers.py index 5b36a7d5..fbd7815e 100644 --- a/linopy/solvers.py +++ b/linopy/solvers.py @@ -137,6 +137,7 @@ class SolverFeature(Enum): GPU_ONLY = auto() IIS_COMPUTATION = auto() SOS_CONSTRAINTS = auto() + INDICATOR_CONSTRAINTS = auto() SEMI_CONTINUOUS_VARIABLES = auto() SOLVER_ATTRIBUTE_ACCESS = auto() MIP_DUAL_BOUND_REPORT = auto() @@ -568,6 +569,14 @@ def _validate_model(self) -> None: "`model.apply_sos_reformulation()`, or use a solver that supports SOS." ) + if model.indicator_constraints and not cls.supports( + SolverFeature.INDICATOR_CONSTRAINTS + ): + raise ValueError( + f"Solver {solver_name} does not support indicator constraints. " + "Use a solver that supports them." + ) + def _build_direct(self, **build_kwargs: Any) -> None: """Build the native solver model from ``self.model``. Override per-solver.""" raise NotImplementedError( @@ -1506,6 +1515,7 @@ class Gurobi(Solver["gurobipy.Env | dict[str, Any] | None"]): SolverFeature.SOLUTION_FILE_NOT_NEEDED, SolverFeature.IIS_COMPUTATION, SolverFeature.SOS_CONSTRAINTS, + SolverFeature.INDICATOR_CONSTRAINTS, SolverFeature.SEMI_CONTINUOUS_VARIABLES, SolverFeature.SOLVER_ATTRIBUTE_ACCESS, SolverFeature.MIP_DUAL_BOUND_REPORT, @@ -1591,8 +1601,7 @@ def _build_solver_model( if model.objective.sense == "max": gm.ModelSense = -1 - if len(model.constraints): - assert M.A is not None + if M.A is not None: c = gm.addMConstr(M.A, x, M.sense, M.b) if set_names: names = print_constraints(M.clabels) @@ -1601,6 +1610,27 @@ def _build_solver_model( for sos_type, positions, weights in _iter_sos_sets(model): gm.addSOS(sos_type, x[positions.tolist()].tolist(), weights.tolist()) + if M.indicator_A is not None: + sense_map = { + "<": gurobipy.GRB.LESS_EQUAL, + ">": gurobipy.GRB.GREATER_EQUAL, + "=": gurobipy.GRB.EQUAL, + } + x_list = x.tolist() + A = M.indicator_A + for i in range(A.shape[0]): + lhs = gurobipy.LinExpr() + start, end = A.indptr[i], A.indptr[i + 1] + for col, coeff in zip(A.indices[start:end], A.data[start:end]): + lhs.add(x_list[int(col)], float(coeff)) + gm.addGenConstrIndicator( + x_list[int(M.indicator_binvar[i])], + bool(M.indicator_binval[i]), + lhs, + sense_map[str(M.indicator_sense[i])], + float(M.indicator_b[i]), + ) + gm.update() return gm @@ -1813,6 +1843,7 @@ class Cplex(Solver[None]): SolverFeature.LP_FILE_NAMES, SolverFeature.READ_MODEL_FROM_FILE, SolverFeature.SOS_CONSTRAINTS, + SolverFeature.INDICATOR_CONSTRAINTS, SolverFeature.SEMI_CONTINUOUS_VARIABLES, } ) diff --git a/test/test_indicator_constraints.py b/test/test_indicator_constraints.py new file mode 100644 index 00000000..bba6498a --- /dev/null +++ b/test/test_indicator_constraints.py @@ -0,0 +1,425 @@ +"""Tests for indicator constraint support.""" + +from pathlib import Path + +import numpy as np +import pandas as pd +import pytest + +import linopy +from linopy import Model, available_solvers +from linopy.constraints import Constraint, CSRConstraint +from linopy.variables import Variable + +requires_gurobi = pytest.mark.skipif( + "gurobi" not in available_solvers, reason="Gurobi not installed" +) + + +@pytest.fixture +def mbx() -> tuple[Model, Variable, Variable]: + """Model with a scalar binary ``b`` and continuous ``x`` in [0, 10].""" + m = Model() + b = m.add_variables(name="b", binary=True) + x = m.add_variables(lower=0, upper=10, name="x") + return m, b, x + + +@pytest.fixture +def coords_mbx() -> tuple[Model, Variable, Variable, pd.RangeIndex]: + """Model with index-aligned binary ``b`` and continuous ``x`` over a length-3 index.""" + m = Model() + idx = pd.RangeIndex(3, name="i") + b = m.add_variables(coords=[idx], name="b", binary=True) + x = m.add_variables(coords=[idx], lower=0, upper=10, name="x") + return m, b, x, idx + + +class TestConstruction: + """Creating indicator constraints and validating their arguments.""" + + def test_basic_fields(self, mbx: tuple[Model, Variable, Variable]) -> None: + """Indicator constraint is created with correct fields.""" + m, b, x = mbx + m.add_indicator_constraints(b, 1, x, "<=", 5, name="ic0") + + assert "ic0" in m.indicator_constraints + ic = m.indicator_constraints["ic0"] + for field in ( + "coeffs", + "vars", + "sign", + "rhs", + "binary_var", + "binary_val", + "labels", + ): + assert field in ic + + def test_auto_name(self, mbx: tuple[Model, Variable, Variable]) -> None: + """Omitting ``name`` auto-generates an ``indcon`` name.""" + m, b, x = mbx + con = m.add_indicator_constraints(b, 1, x, "<=", 5) + assert con.name.startswith("indcon") + assert con.name in m.indicator_constraints + + @pytest.mark.parametrize( + ("build_lhs", "kwargs"), + [ + (lambda x: x <= 5, {}), + (lambda x: 2 * x, {"sign": "<=", "rhs": 5}), + (lambda x: 1 * x.at[()] <= 5, {}), + ], + ids=["constraint", "linexpr", "scalarcon"], + ) + def test_valid_lhs( + self, mbx: tuple[Model, Variable, Variable], build_lhs: object, kwargs: dict + ) -> None: + """Each accepted lhs form (constraint, expression, scalar) builds an indicator.""" + m, b, x = mbx + ic = m.add_indicator_constraints(b, 1, build_lhs(x), name="ic0", **kwargs) # type: ignore[operator] + assert ic.is_indicator + assert "ic0" in m.indicator_constraints + + @pytest.mark.parametrize( + ("build_lhs", "kwargs", "exc", "match"), + [ + (lambda x: x <= 5, {"sign": "<=", "rhs": 5}, ValueError, "must be None"), + ( + lambda x: 2 * x, + {}, + ValueError, + "required when lhs is a LinearExpression", + ), + (lambda x: x, {}, ValueError, "required when lhs is a Variable"), + ( + lambda x: 1 * x.at[()] <= 5, + {"sign": "<=", "rhs": 5}, + ValueError, + "must be None", + ), + (lambda x: "not-a-constraint", {}, TypeError, "must be a Constraint"), + ], + ids=[ + "con+sign", + "linexpr-no-sign", + "var-no-sign", + "scalarcon+sign", + "bad-type", + ], + ) + def test_invalid_lhs( + self, + mbx: tuple[Model, Variable, Variable], + build_lhs: object, + kwargs: dict, + exc: type[Exception], + match: str, + ) -> None: + """Invalid lhs / sign / rhs combinations raise with a clear message.""" + m, b, x = mbx + with pytest.raises(exc, match=match): + m.add_indicator_constraints(b, 1, build_lhs(x), name="ic0", **kwargs) # type: ignore[operator] + + def test_non_binary_var_raises(self, mbx: tuple[Model, Variable, Variable]) -> None: + """Non-binary variable raises ValueError.""" + m, _, x = mbx + y = m.add_variables(lower=0, upper=1, name="y") + with pytest.raises(ValueError, match="must be binary"): + m.add_indicator_constraints(y, 1, x, "<=", 5) + + def test_bad_binary_val_raises(self, mbx: tuple[Model, Variable, Variable]) -> None: + """Invalid binary_val raises ValueError.""" + m, b, x = mbx + with pytest.raises(ValueError, match="must be 0 or 1"): + m.add_indicator_constraints(b, 2, x, "<=", 5) + + def test_duplicate_name_raises(self, mbx: tuple[Model, Variable, Variable]) -> None: + """Duplicate name raises ValueError.""" + m, b, x = mbx + m.add_indicator_constraints(b, 1, x, "<=", 5, name="ic0") + with pytest.raises(ValueError, match="already assigned"): + m.add_indicator_constraints(b, 0, x, ">=", 0, name="ic0") + + +class TestContainer: + """How indicator constraints live in (and stay separate within) the container.""" + + def test_separate_from_regular(self, mbx: tuple[Model, Variable, Variable]) -> None: + """Indicator constraints are separate from regular constraints.""" + m, b, x = mbx + m.add_constraints(x >= 0, name="regular") + m.add_indicator_constraints(b, 1, x, "<=", 5, name="ic0") + + assert "regular" in m.constraints + assert "regular" in m.constraints.regular + assert "ic0" in m.constraints + assert "ic0" in m.constraints.indicator + assert "ic0" not in m.constraints.regular + assert "ic0" in m.indicator_constraints + + def test_in_unified_container(self, mbx: tuple[Model, Variable, Variable]) -> None: + """Indicator constraint lives in the unified container, not in regular.""" + m, b, x = mbx + m.add_constraints(x >= 0, name="regular") + ic = m.add_indicator_constraints(b, 1, x, "<=", 5, name="ic0") + + assert isinstance(ic, Constraint) + assert ic.is_indicator + assert "ic0" in m.constraints + assert "ic0" in m.constraints.indicator + assert "ic0" not in m.constraints.regular + assert "ic0" in m.indicator_constraints + assert "regular" in m.constraints.regular + assert "regular" not in m.constraints.indicator + + def test_remove(self, mbx: tuple[Model, Variable, Variable]) -> None: + """Indicator constraints can be removed.""" + m, b, x = mbx + m.add_indicator_constraints(b, 1, x, "<=", 5, name="ic0") + assert "ic0" in m.indicator_constraints + m.remove_indicator_constraints("ic0") + assert "ic0" not in m.indicator_constraints + + def test_regular_constraint_has_no_indicator_fields( + self, mbx: tuple[Model, Variable, Variable] + ) -> None: + """Regular constraints report no indicator metadata, frozen or not.""" + m, _, x = mbx + m.add_constraints(x <= 5, name="c") + con = m.constraints["c"] + assert not con.is_indicator + assert con.binary_var is None + assert con.binary_val is None + + frozen = con.freeze() + assert not frozen.is_indicator + assert frozen.binary_var is None + assert frozen.binary_val is None + + def test_with_coords( + self, coords_mbx: tuple[Model, Variable, Variable, pd.RangeIndex] + ) -> None: + """Indicator constraints work with multi-dimensional coords.""" + m, b, x, idx = coords_mbx + m.add_indicator_constraints(b, 1, x, "<=", 5, name="ic0") + assert m.indicator_constraints["ic0"].labels.size == idx.size + + +class TestPersistence: + """Indicator metadata survives freeze, copy, and netCDF round-trips.""" + + def test_freeze_roundtrip(self, mbx: tuple[Model, Variable, Variable]) -> None: + """is_indicator and binary fields survive freeze and freeze->mutable.""" + m, b, x = mbx + m.add_indicator_constraints(b, 1, x, "<=", 5, name="ic0") + + frozen = m.constraints["ic0"].freeze() + assert isinstance(frozen, CSRConstraint) + assert frozen.is_indicator + assert frozen.binary_var is not None + assert frozen.binary_val == 1 + + mutable = frozen.mutable() + assert isinstance(mutable, Constraint) + assert mutable.is_indicator + assert mutable.binary_var is not None + assert np.all(mutable.binary_val == 1) + + def test_copy_preserves(self, mbx: tuple[Model, Variable, Variable]) -> None: + """Model.copy preserves is_indicator and the binary fields.""" + m, b, x = mbx + m.add_indicator_constraints(b, 1, x, "<=", 5, name="ic0") + + ic = m.copy().constraints["ic0"] + assert ic.is_indicator + assert ic.binary_var is not None + assert np.all(ic.binary_val == 1) + + @pytest.mark.parametrize("freeze_constraints", [False, True]) + def test_netcdf_roundtrip(self, tmp_path: Path, freeze_constraints: bool) -> None: + """is_indicator and binary fields survive a netCDF round-trip.""" + m = Model(freeze_constraints=freeze_constraints) + b = m.add_variables(name="b", binary=True) + x = m.add_variables(lower=0, upper=10, name="x") + m.add_constraints(x >= 0, name="regular") + m.add_indicator_constraints(b, 1, x, "<=", 5, name="ic0") + + fn = tmp_path / "model.nc" + m.to_netcdf(fn) + ic = linopy.read_netcdf(fn).constraints["ic0"] + + assert ic.is_indicator + assert ic.binary_var is not None + assert np.all(ic.binary_val == 1) + + def test_array_binval_roundtrip(self, tmp_path: Path) -> None: + """A coords-based indicator has per-element binary_val that round-trips.""" + m = Model(freeze_constraints=True) + idx = pd.RangeIndex(3, name="i") + b = m.add_variables(coords=[idx], name="b", binary=True) + x = m.add_variables(coords=[idx], lower=0, upper=10, name="x") + m.add_indicator_constraints(b, 1, x, "<=", 5, name="ic0") + + frozen = m.constraints["ic0"] + assert isinstance(frozen, CSRConstraint) + assert isinstance(frozen.binary_val, np.ndarray) and frozen.binary_val.ndim == 1 + assert "binary_val" in frozen.data + + fn = tmp_path / "model.nc" + m.to_netcdf(fn) + ic = linopy.read_netcdf(fn).constraints["ic0"] + assert ic.is_indicator + assert np.all(ic.binary_val == 1) + + +class TestMatrices: + """Indicator rows are split out of the regular constraint matrix.""" + + def test_matrix_split( + self, coords_mbx: tuple[Model, Variable, Variable, pd.RangeIndex] + ) -> None: + """Regular A excludes indicator rows; indicator arrays carry them.""" + m, b, x, idx = coords_mbx + m.add_constraints(x >= 0, name="regular") + m.add_indicator_constraints(b, 1, x, "<=", 5, name="ic0") + + n = idx.size + assert m.matrices.A is not None + assert m.matrices.indicator_A is not None + assert m.matrices.A.shape[0] == n + assert m.matrices.indicator_A.shape[0] == n + assert len(m.matrices.clabels) == n + assert m.ncons == n + + np.testing.assert_array_equal(m.matrices.indicator_binval, np.full(n, 1)) + np.testing.assert_array_equal(m.matrices.indicator_b, np.full(n, 5.0)) + np.testing.assert_array_equal(m.matrices.indicator_sense, np.full(n, "<")) + assert m.matrices.indicator_binvar.shape == (n,) + + def test_to_matrix_skips_indicator( + self, coords_mbx: tuple[Model, Variable, Variable, pd.RangeIndex] + ) -> None: + """Constraints.to_matrix drops indicator rows; an indicator-only set raises.""" + m, b, x, idx = coords_mbx + m.add_constraints(x <= 5, name="regular") + m.add_indicator_constraints(b, 1, x, "<=", 5, name="ic0") + + A, labels = m.constraints.to_matrix() + assert A.shape[0] == idx.size + + with pytest.raises(ValueError, match="No constraints available"): + m.constraints.indicator.to_matrix() + + +class TestLPFile: + """LP export of indicator constraints.""" + + def test_with_regular_constraints( + self, mbx: tuple[Model, Variable, Variable], tmp_path: Path + ) -> None: + """LP file contains general constraints section.""" + m, b, x = mbx + m.add_constraints(x >= 0, name="dummy") + m.add_objective(x) + m.add_indicator_constraints(b, 1, x, "<=", 5, name="ic0") + + fn = tmp_path / "test.lp" + m.to_file(fn) + content = fn.read_text() + assert "= 1 ->" in content + label = int(m.indicator_constraints["ic0"].labels.item()) + assert f"ic{label}:" in content + + def test_indicator_only( + self, mbx: tuple[Model, Variable, Variable], tmp_path: Path + ) -> None: + """An LP file with no regular constraints still writes the indicator section.""" + m, b, x = mbx + m.add_objective(x) + m.add_indicator_constraints(b, 1, x, "<=", 5, name="ic0") + + fn = tmp_path / "test.lp" + m.to_file(fn) + content = fn.read_text() + assert "s.t." in content + assert "= 1 ->" in content + + +class TestSolve: + """Solving models with indicator constraints.""" + + @requires_gurobi + @pytest.mark.parametrize( + ("io_api", "trigger", "expected"), + [ + (None, 1, 5), + (None, 0, 10), + ("direct", 1, 5), + ("direct", 0, 10), + ], + ids=["lp-active", "lp-inactive", "direct-active", "direct-inactive"], + ) + def test_gurobi_enforces_at_trigger( + self, + mbx: tuple[Model, Variable, Variable], + io_api: str | None, + trigger: int, + expected: float, + ) -> None: + """ + The indicator is enforced (x<=5) only when b matches the trigger value. + + With b fixed to 1, an active trigger caps x at 5; an inactive one leaves + x free to its upper bound of 10. + """ + m, b, x = mbx + m.add_constraints(b >= 1, name="fix_b") + m.add_indicator_constraints(b, trigger, x, "<=", 5, name="ic0") + m.add_objective(x, sense="max") + m.solve(solver_name="gurobi", io_api=io_api) + assert m.objective.value is not None + assert np.isclose(m.objective.value, expected, atol=1e-6) + + @requires_gurobi + def test_gurobi_multiple(self) -> None: + """Multiple indicators combine: x <= min(10, 5) = 5.""" + m = Model() + b1 = m.add_variables(name="b1", binary=True) + b2 = m.add_variables(name="b2", binary=True) + x = m.add_variables(lower=0, upper=20, name="x") + m.add_constraints(b1 >= 1, name="fix_b1") + m.add_constraints(b2 >= 1, name="fix_b2") + m.add_indicator_constraints(b1, 1, x, "<=", 10, name="ic1") + m.add_indicator_constraints(b2, 1, x, "<=", 5, name="ic2") + m.add_objective(x, sense="max") + m.solve(solver_name="gurobi") + assert m.objective.value is not None + assert np.isclose(m.objective.value, 5, atol=1e-6) + + def test_unsupported_solver_raises( + self, mbx: tuple[Model, Variable, Variable] + ) -> None: + """Solvers without indicator support raise ValueError.""" + m, b, x = mbx + m.add_constraints(x >= 0, name="dummy") + m.add_objective(x) + m.add_indicator_constraints(b, 1, x, "<=", 5, name="ic0") + + for solver in ["glpk", "highs", "mosek", "mindopt"]: + if solver in available_solvers: + with pytest.raises( + ValueError, match="does not support indicator constraints" + ): + m.solve(solver_name=solver) + + @requires_gurobi + def test_mip_has_no_duals(self, mbx: tuple[Model, Variable, Variable]) -> None: + """Indicator rows are skipped when collecting duals; MIPs expose none.""" + m, b, x = mbx + m.add_constraints(b >= 1, name="fix_b") + m.add_indicator_constraints(b, 1, x, "<=", 5, name="ic0") + m.add_objective(x, sense="max") + m.solve(solver_name="gurobi") + with pytest.raises(AttributeError, match="dual"): + _ = m.matrices.dual