diff --git a/doc/release_notes.rst b/doc/release_notes.rst index 0268e08b..0bac59be 100644 --- a/doc/release_notes.rst +++ b/doc/release_notes.rst @@ -13,6 +13,10 @@ Upcoming Version * After ``model.solve()``, the solver object stays available on ``model.solver``. You can inspect it, reuse it, or release the underlying solver (and its license) by calling ``model.solver.close()`` or assigning ``model.solver = None``. It is also released automatically when the model is garbage-collected. * New ``SolverReport`` on the result (``result.report``) reports runtime, MIP gap, dual (best) bound, and iteration counts. It is shown in ``repr(result)`` and currently populated by CBC, HiGHS, Gurobi, Knitro, and cuPDLPx. +*Dualize LP models* + +* New ``Model.dualize()`` constructs the LP dual as a standalone model, lifting finite variable bounds into explicit constraints so they are reflected in the dual. Dual variables are named after the primal constraints. Works for linear problems with linear objective and constraints. (https://github.com/PyPSA/linopy/pull/626) + *A new way to call a solver (advanced)* Most users should keep calling ``model.solve(...)``. If you want more control, you can now build the solver yourself and run it in two steps: diff --git a/linopy/dual.py b/linopy/dual.py new file mode 100644 index 00000000..9bfea082 --- /dev/null +++ b/linopy/dual.py @@ -0,0 +1,737 @@ +""" +Linopy dual module. + +This module contains implementations for constructing the dual of a linear optimization problem. +""" + +from __future__ import annotations + +import logging +from typing import TYPE_CHECKING, Any, Literal + +import numpy as np +import xarray as xr + +from linopy.expressions import LinearExpression + +if TYPE_CHECKING: + from linopy.model import Model + +logger = logging.getLogger(__name__) + + +def _skip( + da: xr.DataArray, component_type: Literal["variable", "constraint"], name: str +) -> bool: + """Return True if the label array is empty or entirely masked (all -1).""" + if da.size == 0: + logger.debug(f"Skipping empty {component_type} '{name}'.") + return True + + if (da == -1).all(): + logger.debug(f"{component_type} '{name}' is fully masked, skipping.") + return True + return False + + +def _lift_bounds_to_constraints(m: Model) -> None: + """ + Convert finite variable bounds to explicit ``>=`` / ``<=`` constraints. + + Each finite lower bound becomes a ``'{var_name}-bound-lower'`` constraint and + each finite upper bound becomes a ``'{var_name}-bound-upper'`` constraint. + The variable bounds are then relaxed to ``[-inf, inf]`` to avoid + double-counting when the dual is formed. + + Bounds may vary elementwise within one variable block. Infinite bounds are + not converted into constraints. + + Parameters + ---------- + m : Model + Model to mutate in-place. + """ + logger.debug("Converting finite variable bounds to explicit constraints.") + logger.debug("Relaxing converted variable bounds to [-inf, inf].") + + for var_name, var in m.variables.items(): + label_mask = var.labels != -1 + lb = var.lower + ub = var.upper + + finite_lb = xr.DataArray( + np.isfinite(lb.values), + coords=lb.coords, + dims=lb.dims, + ) + finite_ub = xr.DataArray( + np.isfinite(ub.values), + coords=ub.coords, + dims=ub.dims, + ) + + lower_mask = label_mask & finite_lb + upper_mask = label_mask & finite_ub + + # Lower bound. + if f"{var_name}-bound-lower" not in m.constraints: + if bool(lower_mask.any()): + m.add_constraints( + var >= lb, + name=f"{var_name}-bound-lower", + mask=lower_mask, + ) + logger.debug(f"Added lower bound constraint for '{var_name}'.") + + # Remove converted bounds to avoid double-counting in the dual. + # Rely on the new constraints instead. + var.lower.values[lower_mask.values] = -np.inf + else: + logger.debug( + f"Variable '{var_name}' has no finite lower bound, skipping." + ) + + # Upper bound. + if f"{var_name}-bound-upper" not in m.constraints: + if bool(upper_mask.any()): + m.add_constraints( + var <= ub, + name=f"{var_name}-bound-upper", + mask=upper_mask, + ) + logger.debug(f"Added upper bound constraint for '{var_name}'.") + + # Remove converted bounds to avoid double-counting in the dual. + # Rely on the new constraints instead. + var.upper.values[upper_mask.values] = np.inf + else: + logger.debug( + f"Variable '{var_name}' has no finite upper bound, skipping." + ) + + +def _dual_bounds_from_constraint_signs( + signs: xr.DataArray, + labels: xr.DataArray, + primal_is_min: bool, +) -> tuple[xr.DataArray, xr.DataArray, xr.DataArray]: + """ + Return elementwise dual-variable bounds for constraint signs. + + ``signs`` is broadcast to ``labels``. Valid signs are ``=``, ``<=``, and + ``>=``. ``valid_sign`` is True where the broadcast sign is valid. + """ + signs = signs.broadcast_like(labels) + + is_eq = signs == "=" + is_le = signs == "<=" + is_ge = signs == ">=" + valid_sign = is_eq | is_le | is_ge + + # Bounds for invalid signs are irrelevant because invalid entries are + # masked out by the caller. Use finite placeholders to avoid NaNs. + lower = xr.zeros_like(labels, dtype=float) + upper = xr.zeros_like(labels, dtype=float) + + # Equality constraints: dual variable is free. + lower = lower.where(~is_eq, -np.inf) + upper = upper.where(~is_eq, np.inf) + + if primal_is_min: + # <= constraints in a min primal: dual <= 0. + lower = lower.where(~is_le, -np.inf) + upper = upper.where(~is_le, 0.0) + + # >= constraints in a min primal: dual >= 0. + lower = lower.where(~is_ge, 0.0) + upper = upper.where(~is_ge, np.inf) + else: + # <= constraints in a max primal: dual >= 0. + lower = lower.where(~is_le, 0.0) + upper = upper.where(~is_le, np.inf) + + # >= constraints in a max primal: dual <= 0. + lower = lower.where(~is_ge, -np.inf) + upper = upper.where(~is_ge, 0.0) + + return lower, upper, valid_sign + + +def _add_dual_variables(m: Model, m_dual: Model) -> dict: + """ + Add one dual variable to m_dual for each included constraint in m. + + Dual variable bounds encode the sign convention for each constraint type + and primal objective sense: + + ============ =========== ================ ================ + Constraint Primal sense lower upper + ============ =========== ================ ================ + = min / max -inf +inf (free) + <= min -inf 0 + <= max 0 +inf + >= min 0 +inf + >= max -inf 0 + ============ =========== ================ ================ + + Fully masked or empty constraints are skipped. Constraint arrays may contain + mixed signs; dual variable bounds are assigned elementwise. + + Parameters + ---------- + m : Model + Primal model. + m_dual : Model + Dual model to populate. + + Returns + ------- + dict + ``{constraint_name: dual_variable}`` for every dualized constraint. + """ + primal_is_min = m.objective.sense == "min" + + dual_vars = {} + for name, con in m.constraints.items(): + if _skip(con.labels, "constraint", name): + continue + + lower, upper, valid_sign = _dual_bounds_from_constraint_signs( + con.sign, + con.labels, + primal_is_min, + ) + + unmasked = con.labels != -1 + invalid_unmasked = unmasked & ~valid_sign + if bool(invalid_unmasked.any()): + logger.warning( + f"Constraint '{name}' has unrecognized signs; invalid entries " + "will be skipped." + ) + + mask = unmasked & valid_sign + if not bool(mask.any()): + logger.warning( + f"Constraint '{name}' has no entries with valid signs, skipping." + ) + continue + + logger.debug( + f"Adding dual variable for constraint '{name}' " + f"with shape {con.shape} and dims {con.labels.dims}." + ) + coords = ( + [con.labels.coords[dim] for dim in con.labels.dims] + if con.labels.dims + else None + ) + dual_vars[name] = m_dual.add_variables( + lower=lower, + upper=upper, + coords=coords, + name=name, + mask=mask, + ) + + return dual_vars + + +def _build_flat_con_to_dual_label_lookup(m: Model, dual_vars: dict) -> np.ndarray: + """ + Build a lookup from flat primal constraint labels to flat dual variable labels. + + The returned array maps each flat constraint label to the corresponding + flat dual variable label from ``dual_vars``. Entries are -1 for constraints + not in ``dual_vars`` or with masked labels. + + Parameters + ---------- + m : Model + Primal model. + dual_vars : dict + ``{constraint_name: dual_variable}`` as returned by ``_add_dual_variables()``. + + Returns + ------- + np.ndarray of int64 + Lookup array of length ``max_flat_con_label + 1``; empty if no valid + constraint labels exist. + """ + max_flat_con = -1 + for con_name in dual_vars: + flat = m.constraints[con_name].labels.values.ravel() + valid = flat[flat != -1] + if len(valid): + max_flat_con = max(max_flat_con, int(valid.max())) + + if max_flat_con < 0: + return np.array([], dtype=np.int64) + + lookup = np.full(max_flat_con + 1, -1, dtype=np.int64) + for con_name, dv in dual_vars.items(): + con_flat = m.constraints[con_name].labels.values.ravel().astype(np.int64) + dv_flat = dv.labels.values.ravel().astype(np.int64) + valid = (con_flat != -1) & (dv_flat != -1) + if valid.any(): + lookup[con_flat[valid]] = dv_flat[valid] + + return lookup + + +def _extract_dual_feas_entries( + A: Any, + vlabels: np.ndarray, + clabels: np.ndarray, + flat_con_to_dual: np.ndarray, +) -> tuple[np.ndarray, np.ndarray, np.ndarray]: + """ + Return ``(flat_var_label, flat_dual_label, coeff)`` for each included + nonzero entry of ``A``. + + Converts sparse matrix ``A`` to coordinate format, maps rows/columns to + flat labels via ``clabels`` / ``vlabels``, and drops entries that are masked + or belong to a constraint without a dual variable. + + Parameters + ---------- + A : scipy sparse matrix + Primal constraint matrix in any scipy sparse format. + vlabels : np.ndarray of int64 + Flat variable label per column of ``A``; -1 for masked columns. + clabels : np.ndarray of int64 + Flat constraint label per row of ``A``; -1 for masked rows. + flat_con_to_dual : np.ndarray of int64 + Lookup array as returned by ``_build_flat_con_to_dual_label_lookup()``. + + Returns + ------- + flat_v : np.ndarray of int64 + Flat primal variable label for each retained nonzero entry. + flat_d : np.ndarray of int64 + Corresponding flat dual variable label. + coeffs : np.ndarray of float64 + Corresponding coefficient from ``A``. + """ + A_coo = A.tocoo() + flat_v = vlabels[A_coo.col].astype(np.int64) + flat_c = clabels[A_coo.row].astype(np.int64) + coeffs = A_coo.data + + # Drop entries where either label is masked. + has_labels = (flat_v != -1) & (flat_c != -1) + flat_v, flat_c, coeffs = flat_v[has_labels], flat_c[has_labels], coeffs[has_labels] + + # Map primal constraint labels to flat dual variable labels. + n = len(flat_con_to_dual) + in_range = (flat_c >= 0) & (flat_c < n) + flat_d = np.full(len(flat_c), -1, dtype=np.int64) + flat_d[in_range] = flat_con_to_dual[flat_c[in_range]] + + # Drop entries with no corresponding dual variable. + has_dual = flat_d != -1 + return flat_v[has_dual], flat_d[has_dual], coeffs[has_dual] + + +def _build_obj_coeff_lookup(vlabels: np.ndarray, c_vec: np.ndarray) -> np.ndarray: + """ + Build a lookup from flat variable labels to objective coefficients. + + The returned array maps each valid flat variable label to its objective + coefficient. Labels that do not occur in ``vlabels`` but fall within the + lookup range have value 0.0. + + Parameters + ---------- + vlabels : np.ndarray of int64 + Flat variable label per column of ``A``; -1 for masked columns. + c_vec : np.ndarray of float64 + Objective coefficient per column, in the same ordering as ``vlabels``. + + Returns + ------- + np.ndarray of float64 + Lookup array of length ``max_flat_var_label + 1``; empty if no valid + variable labels exist. + """ + valid_mask = vlabels != -1 + valid_vlabels = vlabels[valid_mask] + if not len(valid_vlabels): + return np.array([], dtype=np.float64) + + lookup = np.zeros(int(valid_vlabels.max()) + 1, dtype=np.float64) + lookup[valid_vlabels.astype(np.int64)] = c_vec[valid_mask] + return lookup + + +def _build_label_to_flat_index_lookup(labels: np.ndarray) -> np.ndarray: + """ + Build a lookup from flat labels to flat indices. + + ``labels`` is expected to contain nonnegative integer labels, with -1 used + as the masked-label sentinel. + + ``lookup[label]`` gives the flat index of ``label`` in ``labels``. + Labels that do not occur in ``labels`` map to -1. If no valid labels exist, + returns an empty int64 array. + """ + valid = labels != -1 + if not valid.any(): + return np.array([], dtype=np.int64) + + lookup = np.full(int(labels[valid].max()) + 1, -1, dtype=np.int64) + lookup[labels[valid].astype(np.int64)] = np.where(valid)[0].astype(np.int64) + return lookup + + +def _lookup_flat_indices(labels: np.ndarray, lookup: np.ndarray) -> np.ndarray: + """ + Look up flat indices for flat labels. + + ``labels`` is expected to contain nonnegative integer labels, with -1 used + as the masked-label sentinel. ``lookup`` is expected to be an array as + returned by ``_build_label_to_flat_index_lookup()``. + + Labels outside the lookup range, including masked labels, map to -1. + """ + flat_indices = np.full(len(labels), -1, dtype=np.int64) + + in_range = (labels >= 0) & (labels < len(lookup)) + if in_range.any(): + flat_indices[in_range] = lookup[labels[in_range].astype(np.int64)] + + return flat_indices + + +def _term_slots_for_sorted_flat_indices(sorted_flat_indices: np.ndarray) -> np.ndarray: + """ + Return the term slot within each run of equal sorted flat indices. + + ``sorted_flat_indices`` must be non-empty and sorted in nondecreasing order. + Each run of equal values is treated as one group. + + Example: + ------- + ``[2, 2, 2, 5, 5, 9]`` becomes ``[0, 1, 2, 0, 1, 0]``. + """ + group_start = np.empty(len(sorted_flat_indices), dtype=bool) + group_start[0] = True + group_start[1:] = sorted_flat_indices[1:] != sorted_flat_indices[:-1] + + group_ids = np.cumsum(group_start) - 1 + group_start_pos = np.where(group_start)[0] + + return ( + np.arange(len(sorted_flat_indices), dtype=np.int64) - group_start_pos[group_ids] + ) + + +def _build_dual_feas_lhs( + var: Any, + flat_v: np.ndarray, + flat_d: np.ndarray, + nnz_data: np.ndarray, + m_dual: Model, +) -> LinearExpression: + """ + Build the dual-feasibility LHS for one primal variable. + + For each coordinate of ``var``, constructs the linear expression + ``sum_j A[j, var] * lambda[j]`` over the corresponding dual variables. + + Parameters + ---------- + var : linopy.Variable + Primal variable whose dual-feasibility LHS is being constructed. + flat_v : np.ndarray of int64 + Flat primal variable labels for all included nonzero entries, as returned + by ``_extract_dual_feas_entries()``. + flat_d : np.ndarray of int64 + Corresponding flat dual variable labels. + nnz_data : np.ndarray of float64 + Corresponding nonzero coefficients from the primal constraint matrix. + m_dual : Model + Dual model owning the dual variables. + + Returns + ------- + LinearExpression + Dual-feasibility LHS over ``var``'s coordinate space. Returns an empty + expression, equivalent to constant zero, when ``var`` has no included + constraint-matrix entries. + """ + var_flat = var.labels.values.ravel().astype(np.int64) + n_elements = len(var_flat) + + # Map flat variable labels to flat indices in this variable. + var_to_idx = _build_label_to_flat_index_lookup(var_flat) + if not len(var_to_idx): + return LinearExpression(None, m_dual) + + # Locate entries that belong to this variable via bounded lookup. + lin_idx = _lookup_flat_indices(flat_v, var_to_idx) + + # Keep only entries that belong to this variable. + keep = lin_idx != -1 + if not keep.any(): + return LinearExpression(None, m_dual) + + lin_idx = lin_idx[keep] + dual_labels = flat_d[keep] + coeffs = nnz_data[keep] + + # Sort by flat index so entries for the same element are contiguous. + order = np.argsort(lin_idx, kind="stable") + lin_idx = lin_idx[order] + dual_labels = dual_labels[order] + coeffs = coeffs[order] + + # Assign each term to a column slot within its variable-element group. + # Each run of equal ``lin_idx`` values corresponds to one variable element. + col_idx = _term_slots_for_sorted_flat_indices(lin_idx) + max_terms = int(col_idx.max()) + 1 + + # Populate (n_elements, max_terms) arrays via advanced indexing. + dual_labels_2d = np.full((n_elements, max_terms), -1, dtype=np.int64) + dual_coeffs_2d = np.zeros((n_elements, max_terms), dtype=np.float64) + dual_labels_2d[lin_idx, col_idx] = dual_labels + dual_coeffs_2d[lin_idx, col_idx] = coeffs + + # Wrap in a LinearExpression, reshaping to (*var_dims, _term). + target_shape = var.labels.shape + (max_terms,) + dims = list(var.labels.dims) + ["_term"] + ds = xr.Dataset( + { + "vars": xr.DataArray(dual_labels_2d.reshape(target_shape), dims=dims), + "coeffs": xr.DataArray(dual_coeffs_2d.reshape(target_shape), dims=dims), + }, + coords={dim: var.labels.coords[dim] for dim in var.labels.dims}, + ) + return LinearExpression(ds, m_dual) + + +def _add_dual_feasibility_constraints( + m: Model, + m_dual: Model, + dual_vars: dict, +) -> None: + """ + Add the stationarity constraint ``sum_i(A_ji * lambda_i) = c_j`` for each + primal variable. + + Sign conventions are already encoded in the dual variable bounds produced by + ``_add_dual_variables()``, so raw A coefficients are used without adjustment. + Variables with no constraint connections (e.g. unconstrained free variables) + are skipped; the dual is infeasible for such problems if ``c_j != 0``. + + Parameters + ---------- + m : Model + Primal model. + m_dual : Model + Dual model to populate. + dual_vars : dict + ``{constraint_name: dual_variable}`` as returned by ``_add_dual_variables()``. + """ + A = m.matrices.A + if A is None: + raise ValueError("Constraint matrix is None, model has no constraints.") + + vlabels = np.asarray(m.matrices.vlabels, dtype=np.int64) + clabels = np.asarray(m.matrices.clabels, dtype=np.int64) + + flat_con_to_dual = _build_flat_con_to_dual_label_lookup(m, dual_vars) + if not len(flat_con_to_dual): + logger.warning( + "No valid constraint labels found, skipping dual feasibility constraints." + ) + return + + flat_v, flat_d, nnz_data = _extract_dual_feas_entries( + A, vlabels, clabels, flat_con_to_dual + ) + c_lookup = _build_obj_coeff_lookup(vlabels, m.matrices.c) + + logger.debug("Building dual feasibility constraints for each primal variable.") + for var_name, var in m.variables.items(): + if _skip(var.labels, "variable", var_name): + continue + + mask = var.labels != -1 + var_flat = var.labels.values.ravel().astype(np.int64) + + # RHS: objective coefficient for each element of this variable. + c_arr = np.zeros(len(var_flat), dtype=np.float64) + in_c_range = (var_flat >= 0) & (var_flat < len(c_lookup)) + + if in_c_range.any(): + c_arr[in_c_range] = c_lookup[var_flat[in_c_range]] + + c_vals = xr.DataArray( + c_arr.reshape(var.labels.shape), + coords=var.labels.coords, + dims=var.labels.dims, + ) + + lhs = _build_dual_feas_lhs(var, flat_v, flat_d, nnz_data, m_dual) + + if lhs.is_constant: + # If c_j is zero, the condition is redundant. If c_j is nonzero, the + # condition is impossible because it reduces to 0 == c_j. Since this + # constant-only condition has no variables to add to the dual model, we + # report it and skip adding the constraint. + if np.any(c_arr[mask.values.ravel()] != 0): + logger.warning( + f"Variable '{var_name}' has no constraint connections but has " + "nonzero objective coefficients; the corresponding dual-feasibility " + "condition is infeasible." + ) + else: + logger.debug( + f"Variable '{var_name}' has no constraint connections and zero " + "objective coefficients; skipping redundant dual feasibility constraint." + ) + continue + + m_dual.add_constraints(lhs == c_vals, name=var_name, mask=mask) + + +def _add_dual_objective( + m: Model, + m_dual: Model, + dual_vars: dict, +) -> None: + """ + Construct and add ``sum(rhs_i * lambda_i)`` as the dual objective of m_dual. + + The uniform ``+`` sign is correct because sign conventions are already + encoded in the dual variable bounds: a ``<=`` constraint has ``lambda <= 0``, + so ``rhs * lambda`` contributes negatively without an explicit sign flip. + This aligns with linopy's native dual convention, so + ``m_dual.variables[con_name].solution`` can be compared directly with + ``m.constraints[con_name].dual`` after solving. + + The objective sense is flipped: ``min`` primal → ``max`` dual, and vice versa. + + Parameters + ---------- + m : Model + Primal model. + m_dual : Model + Dual model to populate. + dual_vars : dict + ``{constraint_name: dual_variable}`` as returned by ``_add_dual_variables()``. + """ + dual_obj: LinearExpression = LinearExpression(None, m_dual) + sense = "max" if m.objective.sense == "min" else "min" + + for name, con in m.constraints.items(): + if name not in dual_vars: + continue + + mask = con.labels != -1 + rhs_masked = con.rhs.where(mask, 0) + dual_obj += (rhs_masked * dual_vars[name]).sum() + + logger.debug(f"Constructed dual objective with {len(dual_obj.coeffs)} terms.") + logger.debug("Adding dual objective to model.") + m_dual.add_objective(dual_obj, sense=sense, overwrite=True) + + +def dualize( + m: Model, +) -> Model: + """ + Construct the dual of a linopy LP model. + + Transforms the primal model into its dual equivalent m_dual following + standard LP duality theory. The dual sense is flipped relative to the + primal (min -> max, max -> min), and dual variable bounds depend on + both constraint type and primal objective sense. + + For a minimization primal: + + Primal (min): + min c^T x + s.t. A_eq x = b_eq : λ free + A_leq x <= b_leq : μ <= 0 + A_geq x >= b_geq : ν >= 0 + + Dual (max): + max b_eq^T λ + b_leq^T μ + b_geq^T ν + s.t. A_eq^T λ + A_leq^T μ + A_geq^T ν = c + λ free, μ <= 0, ν >= 0 + + For a maximization primal the dual variable bounds are flipped: + μ >= 0 for <= constraints, ν <= 0 for >= constraints. + + The corresponding bound conventions are: + + ============ =========== ================ ================ + Constraint Primal sense lower upper + ============ =========== ================ ================ + = min / max -inf +inf (free) + <= min -inf 0 + <= max 0 +inf + >= min 0 +inf + >= max -inf 0 + ============ =========== ================ ================ + + Variable bounds are converted to explicit constraints before dualization + via _lift_bounds_to_constraints(), so that they appear in the constraint matrix + A and are correctly reflected in the dual. + + The dual variables in m_dual are named identically to their corresponding + primal constraints and are accessible via m_dual.variables[con_name]. + + Strong duality guarantees that at optimality: + primal objective = dual objective + + Note: This constructs a standalone dual model. Pathological or unsupported + primal formulations may lead to infeasible or unbounded dual models or may + cause this function to raise an error. Only linear primal models with linear + objectives and linear constraints are supported; finite variable bounds are + lifted into explicit constraints before dualization. + + Parameters + ---------- + m : Model + Primal linopy model to dualize. Must have a linear objective and either + linear constraints or finite variable bounds. + + Returns + ------- + Model + Dual model whose variables are named after the primal constraints and + constraints are named after the primal variables. + + Example + ------- + .. code-block:: python + + m_dual = m.dualize() + m_dual.solve(solver_name="gurobi", Method=2, Crossover=1) + gap = abs(m.objective.value - m_dual.objective.value) + """ + from linopy.model import Model + + m1 = m.copy() + m_dual = Model() + + if not m1.variables: + logger.warning("Primal model has no variables. Returning empty dual model.") + return m_dual + + _lift_bounds_to_constraints(m1) + + if not m1.constraints: + logger.warning( + "Primal model has no constraints after lifting variable bounds. " + "Returning empty dual model." + ) + return m_dual + + dual_vars = _add_dual_variables(m1, m_dual) + _add_dual_feasibility_constraints(m1, m_dual, dual_vars) + _add_dual_objective(m1, m_dual, dual_vars) + return m_dual diff --git a/linopy/model.py b/linopy/model.py index 48a8200b..ee78a114 100644 --- a/linopy/model.py +++ b/linopy/model.py @@ -55,6 +55,7 @@ Constraints, CSRConstraint, ) +from linopy.dual import dualize from linopy.expressions import ( LinearExpression, QuadraticExpression, @@ -2192,3 +2193,5 @@ def reset_solution(self) -> None: to_xpress = to_xpress to_block_files = to_block_files + + dualize = dualize diff --git a/test/test_dual.py b/test/test_dual.py new file mode 100644 index 00000000..1aab2ede --- /dev/null +++ b/test/test_dual.py @@ -0,0 +1,608 @@ +"""Tests for linopy/dual.py - LP dualization.""" + +from __future__ import annotations + +from typing import Any + +import numpy as np +import pandas as pd +import pytest +import xarray as xr +from _pytest.logging import LogCaptureFixture + +from linopy import Model +from linopy.dual import ( + _build_label_to_flat_index_lookup, + _build_obj_coeff_lookup, + _dual_bounds_from_constraint_signs, + _lookup_flat_indices, + _skip, + _term_slots_for_sorted_flat_indices, + dualize, +) +from linopy.solvers import licensed_solvers + +_lp_solver = next((s for s in ("highs", "glpk", "scip") if s in licensed_solvers), None) +needs_solver = pytest.mark.skipif(_lp_solver is None, reason="No LP solver available") + + +# Structural tests for important internal functions +def test_skip_empty_label_array() -> None: + """Empty label arrays are skipped.""" + labels = xr.DataArray(np.array([], dtype=np.int64), dims=["dim_0"]) + + assert _skip(labels, "variable", "x") + + +def test_skip_fully_masked_label_array() -> None: + """Fully masked label arrays are skipped.""" + labels = xr.DataArray(np.array([-1, -1], dtype=np.int64), dims=["dim_0"]) + + assert _skip(labels, "constraint", "c") + + +def test_build_label_to_flat_index_lookup() -> None: + """Flat labels are mapped to their positions in the flattened label array.""" + labels = np.array([10, -1, 12, 99], dtype=np.int64) + + lookup = _build_label_to_flat_index_lookup(labels) + + assert lookup[10] == 0 + assert lookup[12] == 2 + assert lookup[99] == 3 + assert lookup[11] == -1 + + +def test_build_label_to_flat_index_lookup_all_masked() -> None: + """An all-masked label array produces an empty lookup.""" + labels = np.array([-1, -1], dtype=np.int64) + + lookup = _build_label_to_flat_index_lookup(labels) + + assert lookup.dtype == np.int64 + assert len(lookup) == 0 + + +def test_lookup_flat_indices_bounds_safe() -> None: + """Out-of-range and masked labels safely map to -1.""" + lookup = np.array([-1, -1, 5, -1, 7], dtype=np.int64) + labels = np.array([-1, 0, 2, 4, 99], dtype=np.int64) + + flat_indices = _lookup_flat_indices(labels, lookup) + + np.testing.assert_array_equal( + flat_indices, + np.array([-1, -1, 5, 7, -1], dtype=np.int64), + ) + + +def test_term_slots_for_sorted_flat_indices() -> None: + """Repeated sorted flat indices are assigned increasing term slots.""" + sorted_flat_indices = np.array([2, 2, 2, 5, 5, 9], dtype=np.int64) + + slots = _term_slots_for_sorted_flat_indices(sorted_flat_indices) + + np.testing.assert_array_equal( + slots, + np.array([0, 1, 2, 0, 1, 0], dtype=np.int64), + ) + + +def test_build_obj_coeff_lookup_all_masked() -> None: + """All-masked variable labels produce an empty objective-coefficient lookup.""" + lookup = _build_obj_coeff_lookup( + np.array([-1, -1], dtype=np.int64), + np.array([1.0, 2.0], dtype=np.float64), + ) + + assert lookup.dtype == np.float64 + assert len(lookup) == 0 + + +def test_dual_bounds_from_mixed_constraint_signs_min() -> None: + """Mixed signs produce elementwise dual bounds for a minimization primal.""" + idx = pd.RangeIndex(3, name="i") + labels = xr.DataArray([0, 1, 2], dims=["i"], coords={"i": idx}) + signs = xr.DataArray(["<=", ">=", "="], dims=["i"], coords={"i": idx}) + + lower, upper, valid_sign = _dual_bounds_from_constraint_signs( + signs, + labels, + primal_is_min=True, + ) + + np.testing.assert_allclose(lower.values, np.array([-np.inf, 0.0, -np.inf])) + np.testing.assert_allclose(upper.values, np.array([0.0, np.inf, np.inf])) + np.testing.assert_array_equal(valid_sign.values, np.array([True, True, True])) + + +def test_dual_bounds_from_mixed_constraint_signs_max() -> None: + """Mixed signs produce elementwise dual bounds for a maximization primal.""" + idx = pd.RangeIndex(3, name="i") + labels = xr.DataArray([0, 1, 2], dims=["i"], coords={"i": idx}) + signs = xr.DataArray(["<=", ">=", "="], dims=["i"], coords={"i": idx}) + + lower, upper, valid_sign = _dual_bounds_from_constraint_signs( + signs, + labels, + primal_is_min=False, + ) + + np.testing.assert_allclose(lower.values, np.array([0.0, -np.inf, -np.inf])) + np.testing.assert_allclose(upper.values, np.array([np.inf, 0.0, np.inf])) + np.testing.assert_array_equal(valid_sign.values, np.array([True, True, True])) + + +# Structural tests (no solver required) +def test_dualize_empty_model() -> None: + """Dualizing an empty model returns an empty dual model.""" + m = Model() + m_dual = dualize(m) + assert len(m_dual.variables) == 0 + assert len(m_dual.constraints) == 0 + + +def test_only_lower_bound_lifted_to_dual_variable() -> None: + """A finite lower bound is lifted even when the upper bound is infinite.""" + m = Model() + x = m.add_variables(lower=1, upper=np.inf, name="x") + m.add_objective(x) + + m_dual = dualize(m) + + assert "x-bound-lower" in m_dual.variables + assert "x-bound-upper" not in m_dual.variables + + +def test_only_upper_bound_lifted_to_dual_variable() -> None: + """A finite upper bound is lifted even when the lower bound is infinite.""" + m = Model() + x = m.add_variables(lower=-np.inf, upper=3, name="x") + m.add_objective(x) + + m_dual = dualize(m) + + assert "x-bound-lower" not in m_dual.variables + assert "x-bound-upper" in m_dual.variables + + +def test_unbounded_variable_bounds_do_not_create_dual_variables() -> None: + """Infinite variable bounds are not lifted into dual variables.""" + m = Model() + x = m.add_variables(lower=-np.inf, upper=np.inf, name="x") + m.add_constraints(x == 1, name="c") + m.add_objective(x) + + m_dual = dualize(m) + + assert "x-bound-lower" not in m_dual.variables + assert "x-bound-upper" not in m_dual.variables + + +def test_dualize_model_with_variables_but_no_constraints_or_finite_bounds() -> None: + """A model with variables but no constraints or finite bounds returns an empty dual.""" + m = Model() + x = m.add_variables(lower=-np.inf, upper=np.inf, name="x") + m.add_objective(x) + + m_dual = dualize(m) + + assert len(m_dual.variables) == 0 + assert len(m_dual.constraints) == 0 + + +def test_variable_bounds_lifted_to_dual_variables() -> None: + """Finite variable bounds are lifted and dualized.""" + m = Model() + x = m.add_variables(lower=1, upper=3, name="x") + m.add_objective(x) + + m_dual = dualize(m) + + assert "x-bound-lower" in m_dual.variables + assert "x-bound-upper" in m_dual.variables + + +def test_dual_variables_named_after_primal_constraints() -> None: + """Dual variables use the names of their corresponding primal constraints.""" + m = Model() + x = m.add_variables(lower=0, coords=[pd.RangeIndex(3)], name="x") + m.add_constraints(x >= 1.0, name="lb_con") + m.add_objective(2.0 * x) + + m_dual = dualize(m) + assert "lb_con" in m_dual.variables + assert "x-bound-lower" in m_dual.variables + + +def test_dual_feasibility_constraints_named_after_primal_variables() -> None: + """Dual-feasibility constraints use the names of the primal variables.""" + m = Model() + x = m.add_variables(lower=0, coords=[pd.RangeIndex(3)], name="x") + m.add_constraints(x >= 1.0, name="lb_con") + m.add_objective(2.0 * x) + + m_dual = dualize(m) + assert "x" in m_dual.constraints + + +def test_dual_objective_sense_flipped_min() -> None: + """A minimization primal produces a maximization dual.""" + m = Model() + x = m.add_variables(lower=0, name="x") + m.add_constraints(x >= 1, name="c") + m.add_objective(x) # min + + m_dual = dualize(m) + assert m_dual.objective.sense == "max" + + +def test_dual_objective_sense_flipped_max() -> None: + """A maximization primal produces a minimization dual.""" + m = Model() + x = m.add_variables(upper=10, name="x") + m.add_constraints(x <= 5, name="c") + m.add_objective(x, sense="max") + + m_dual = dualize(m) + assert m_dual.objective.sense == "min" + + +def test_dual_sign_conventions_min() -> None: + """For a min primal: <= -> dual <= 0, >= -> dual >= 0, = -> dual free.""" + m = Model() + x = m.add_variables(lower=-np.inf, upper=np.inf, name="x") + m.add_constraints(x == 5, name="eq") + m.add_constraints(x <= 10, name="leq") + m.add_constraints(x >= -10, name="geq") + m.add_objective(x) + + m_dual = dualize(m) + assert m_dual.variables["eq"].lower.item() == -np.inf + assert m_dual.variables["eq"].upper.item() == np.inf + assert m_dual.variables["leq"].upper.item() == 0 + assert m_dual.variables["geq"].lower.item() == 0 + + +def test_dual_sign_conventions_max() -> None: + """For a max primal: <= -> dual >= 0, >= -> dual <= 0.""" + m = Model() + x = m.add_variables(lower=-np.inf, upper=np.inf, name="x") + m.add_constraints(x <= 10, name="leq") + m.add_constraints(x >= -10, name="geq") + m.add_objective(x, sense="max") + + m_dual = dualize(m) + assert m_dual.variables["leq"].lower.item() == 0 + assert m_dual.variables["geq"].upper.item() == 0 + + +def test_dual_feasibility_rhs_equals_objective_coefficients() -> None: + """The dual feasibility RHS equals the primal objective coefficient.""" + m = Model() + x = m.add_variables( + lower=-np.inf, upper=np.inf, coords=[pd.RangeIndex(4)], name="x" + ) + m.add_constraints(x == np.array([1.0, 2.0, 3.0, 4.0]), name="eq") + c = np.array([10.0, 20.0, 30.0, 40.0]) + m.add_objective(c * x) + + m_dual = dualize(m) + np.testing.assert_allclose(m_dual.constraints["x"].rhs.values, c) + + +def test_dual_multi_constraint_per_variable() -> None: + """A variable appearing in k constraints gets k dual-feasibility terms.""" + m = Model() + n = 3 + x = m.add_variables( + lower=-np.inf, upper=np.inf, coords=[pd.RangeIndex(n)], name="x" + ) + # Two equality constraints, both using x + m.add_constraints(x == 1.0, name="c1") + m.add_constraints(2.0 * x == 2.0, name="c2") + m.add_objective(5.0 * x) + + m_dual = dualize(m) + # dual feas constraint for x: lambda_c1 + 2*lambda_c2 = 5 + con_x = m_dual.constraints["x"] + # Should have 2 terms (one from c1, one from c2) + n_terms = (con_x.vars != -1).sum(dim="_term").max().item() + assert n_terms == 2 + + +def test_dual_with_masked_variable() -> None: + """Partially masked variables only produce constraints for unmasked elements.""" + m = Model() + mask = xr.DataArray([True, False, True], dims=["dim_0"]) + x = m.add_variables(lower=0, coords=[pd.RangeIndex(3)], name="x", mask=mask) + m.add_constraints(x >= 1.0, name="c", mask=mask) + m.add_objective(x) + + m_dual = dualize(m) + assert "x" in m_dual.constraints + assert ( + m_dual.constraints["x"].labels != -1 + ).sum().item() == 2 # only 2 unmasked elements + + +def test_dual_free_unconstrained_zero_cost_variable_no_error() -> None: + """A disconnected zero-cost free variable is skipped.""" + m = Model() + m.add_variables( + lower=-np.inf, upper=np.inf, name="x" + ) # no bounds → no bound constraints + y = m.add_variables(lower=-np.inf, upper=np.inf, name="y") + m.add_constraints(y == 5, name="eq") + m.add_objective(y) # x has no connections at all + + m_dual = dualize(m) # must not raise + assert "y" in m_dual.constraints + assert "x" not in m_dual.constraints # no constraint connections + + +def test_dual_free_unconstrained_variable_with_objective_warns( + caplog: LogCaptureFixture, +) -> None: + """A disconnected variable with nonzero objective coefficient is reported.""" + m = Model() + x = m.add_variables(lower=-np.inf, upper=np.inf, name="x") + y = m.add_variables(lower=-np.inf, upper=np.inf, name="y") + + m.add_constraints(y == 5, name="eq") + m.add_objective(x + y) + + with caplog.at_level("WARNING"): + m_dual = dualize(m) + + assert "y" in m_dual.constraints + assert "x" not in m_dual.constraints + assert any( + "corresponding dual-feasibility condition is infeasible" in rec.message + for rec in caplog.records + ) + + +def test_dual_multi_constraint_per_variable_coefficients() -> None: + """Dual-feasibility terms keep the correct A-matrix coefficients.""" + m = Model() + x = m.add_variables(lower=-np.inf, upper=np.inf, name="x") + + m.add_constraints(x == 1.0, name="c1") + m.add_constraints(2.0 * x == 2.0, name="c2") + m.add_objective(5.0 * x) + + m_dual = dualize(m) + coeffs = m_dual.constraints["x"].coeffs.values.ravel() + coeffs = coeffs[m_dual.constraints["x"].vars.values.ravel() != -1] + + np.testing.assert_allclose(np.sort(coeffs), np.array([1.0, 2.0])) + + +def test_dual_mixed_sign_constraint_array_min() -> None: + """Mixed elementwise constraint signs produce elementwise dual bounds.""" + m = Model() + idx = pd.RangeIndex(3, name="i") + x = m.add_variables(lower=-np.inf, upper=np.inf, coords=[idx], name="x") + + signs = xr.DataArray(["<=", ">=", "="], dims=["i"], coords={"i": idx}) + rhs = xr.DataArray([1.0, 2.0, 3.0], dims=["i"], coords={"i": idx}) + + m.add_constraints(x, signs, rhs, name="mixed") + m.add_objective(x.sum()) + + m_dual = dualize(m) + dv = m_dual.variables["mixed"] + + np.testing.assert_allclose( + dv.lower.values, + np.array([-np.inf, 0.0, -np.inf]), + ) + np.testing.assert_allclose( + dv.upper.values, + np.array([0.0, np.inf, np.inf]), + ) + + +def test_dual_mixed_sign_constraint_array_max() -> None: + """Mixed elementwise constraint signs follow maximization dual bounds.""" + m = Model() + idx = pd.RangeIndex(3, name="i") + x = m.add_variables(lower=-np.inf, upper=np.inf, coords=[idx], name="x") + + signs = xr.DataArray(["<=", ">=", "="], dims=["i"], coords={"i": idx}) + rhs = xr.DataArray([1.0, 2.0, 3.0], dims=["i"], coords={"i": idx}) + + m.add_constraints(x, signs, rhs, name="mixed") + m.add_objective(x.sum(), sense="max") + + m_dual = dualize(m) + dv = m_dual.variables["mixed"] + + np.testing.assert_allclose( + dv.lower.values, + np.array([0.0, -np.inf, -np.inf]), + ) + np.testing.assert_allclose( + dv.upper.values, + np.array([np.inf, 0.0, np.inf]), + ) + + +def test_mixed_variable_bounds_lift_only_finite_entries() -> None: + """Mixed finite/infinite bounds are lifted only for finite entries.""" + m = Model() + idx = pd.RangeIndex(3, name="i") + + lower = xr.DataArray([0.0, -np.inf, -np.inf], dims=["i"], coords={"i": idx}) + upper = xr.DataArray([np.inf, 5.0, np.inf], dims=["i"], coords={"i": idx}) + + x = m.add_variables(lower=lower, upper=upper, coords=[idx], name="x") + m.add_objective(x.sum()) + + m_dual = dualize(m) + + lower_labels = m_dual.variables["x-bound-lower"].labels + upper_labels = m_dual.variables["x-bound-upper"].labels + + assert (lower_labels != -1).sum().item() == 1 + assert (upper_labels != -1).sum().item() == 1 + + assert lower_labels.sel(i=0).item() != -1 + assert upper_labels.sel(i=1).item() != -1 + + +# Numerical tests (require solver) +def _solve(model: Model, **kwargs: Any) -> float: + """Solve a model with the available LP solver and return its objective value.""" + assert _lp_solver is not None + model.solve(solver_name=_lp_solver, io_api="lp", **kwargs) + + value = model.objective.value + assert value is not None + return float(value) + + +@needs_solver +def test_strong_duality_simple() -> None: + """Strong duality: primal obj == dual obj at optimality.""" + m = Model() + x = m.add_variables(lower=0, name="x") + y = m.add_variables(lower=0, name="y") + m.add_constraints(x + y >= 3, name="c1") + m.add_constraints(x + 2 * y >= 4, name="c2") + m.add_objective(5 * x + 4 * y) + + primal_obj = _solve(m) + dual_obj = _solve(m.dualize()) + assert abs(primal_obj - dual_obj) < 1e-5 + + +@needs_solver +def test_strong_duality_array_variable() -> None: + """Strong duality with array variables.""" + rng = np.random.default_rng(0) + n = 6 + + A = rng.random((4, n)) + 0.1 + b = rng.random(4) + 1.0 + c_obj = rng.random(n) + 0.1 + + m = Model() + x = m.add_variables(lower=0, coords=[pd.RangeIndex(n)], name="x") + + for i in range(4): + lhs: Any = sum(float(A[i, j]) * x[j] for j in range(n)) + m.add_constraints(lhs <= float(b[i]), name=f"c{i}") + + # Bounded because x >= 0 and A x <= b with A > 0. + obj: Any = sum(float(c_obj[j]) * x[j] for j in range(n)) + m.add_objective(obj, sense="max") + + primal_obj = _solve(m) + dual_obj = _solve(m.dualize()) + assert abs(primal_obj - dual_obj) < 1e-4 + + +@needs_solver +def test_strong_duality_mixed_constraint_types() -> None: + """Strong duality with =, <=, >= constraints.""" + m = Model() + x = m.add_variables(lower=-np.inf, upper=np.inf, name="x") + y = m.add_variables(lower=-np.inf, upper=np.inf, name="y") + m.add_constraints(x + y == 5, name="eq") + m.add_constraints(x - y <= 2, name="leq") + m.add_constraints(x >= 0, name="geq") + m.add_objective(2 * x + 3 * y) + + primal_obj = _solve(m) + dual_obj = _solve(m.dualize()) + assert abs(primal_obj - dual_obj) < 1e-5 + + +@needs_solver +def test_strong_duality_maximization() -> None: + """Strong duality for a maximization primal.""" + m = Model() + x = m.add_variables(lower=0, name="x") + y = m.add_variables(lower=0, name="y") + m.add_constraints(x + y <= 10, name="c1") + m.add_constraints(x <= 6, name="c2") + m.add_constraints(y <= 8, name="c3") + m.add_objective(3 * x + 5 * y, sense="max") + + primal_obj = _solve(m) + dual_obj = _solve(m.dualize()) + assert abs(primal_obj - dual_obj) < 1e-5 + + +@needs_solver +def test_dualize_mixed_signs_and_mixed_variable_bounds() -> None: + """Dualization handles mixed constraint signs and mixed variable bounds.""" + m = Model() + idx = pd.RangeIndex(3, name="i") + + lower = xr.DataArray([0.0, -np.inf, -np.inf], dims=["i"], coords={"i": idx}) + upper = xr.DataArray([np.inf, 5.0, np.inf], dims=["i"], coords={"i": idx}) + + x = m.add_variables( + lower=lower, + upper=upper, + coords=[idx], + name="x", + ) + + signs = xr.DataArray(["<=", ">=", "="], dims=["i"], coords={"i": idx}) + rhs = xr.DataArray([4.0, 2.0, 3.0], dims=["i"], coords={"i": idx}) + + m.add_constraints(x, signs, rhs, name="mixed") + m.add_objective(x.sum()) + + primal_obj = _solve(m) + + m_dual = dualize(m) + dual_obj = _solve(m_dual) + + assert abs(primal_obj - dual_obj) < 1e-5 + + mixed_dual = m_dual.variables["mixed"] + + np.testing.assert_allclose( + mixed_dual.lower.values, + np.array([-np.inf, 0.0, -np.inf]), + ) + np.testing.assert_allclose( + mixed_dual.upper.values, + np.array([0.0, np.inf, np.inf]), + ) + + assert "x-bound-lower" in m_dual.variables + assert "x-bound-upper" in m_dual.variables + + lower_bound_labels = m_dual.variables["x-bound-lower"].labels + upper_bound_labels = m_dual.variables["x-bound-upper"].labels + + assert (lower_bound_labels != -1).sum().item() == 1 + assert (upper_bound_labels != -1).sum().item() == 1 + + assert lower_bound_labels.sel(i=0).item() != -1 + assert upper_bound_labels.sel(i=1).item() != -1 + + con_x = m_dual.constraints["x"] + + # x[0]: mixed[0] + x-bound-lower[0] = 1 + # x[1]: mixed[1] + x-bound-upper[1] = 1 + # x[2]: mixed[2] = 1 + n_terms = (con_x.vars != -1).sum(dim="_term") + np.testing.assert_array_equal( + n_terms.values, + np.array([2, 2, 1]), + ) + + # The dual values from the standalone dual model should match Linopy's + # primal constraint duals for the original mixed constraint block. + np.testing.assert_allclose( + m.constraints["mixed"].dual.values, + m_dual.variables["mixed"].solution.values, + atol=1e-6, + )