diff --git a/CHANGELOG.md b/CHANGELOG.md index c41f56a78..e86e14ce1 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,7 +6,7 @@ - Include parameter names in type stubs - Added pre-commit hook for automatic stub regeneration (see `.pre-commit-config.yaml`) - Wrapped `isObjIntegral()` and test -- Added `structured_optimization_trace` recipe for structured optimization progress tracking +- Added `structured_optimization_trace` recipe with attach-style in-memory tracing and context-managed JSONL tracing with final `run_end` records - Added methods: `getPrimalDualIntegral()` - `getSolVal()` supports `MatrixExpr` now ### Fixed diff --git a/src/pyscipopt/recipes/structured_optimization_trace.py b/src/pyscipopt/recipes/structured_optimization_trace.py index 4ba73def7..cdf54e94f 100644 --- a/src/pyscipopt/recipes/structured_optimization_trace.py +++ b/src/pyscipopt/recipes/structured_optimization_trace.py @@ -1,37 +1,185 @@ +import json + from pyscipopt import SCIP_EVENTTYPE, Eventhdlr, Model +_TRACE_HANDLER_KEY = "_structured_optimization_trace_handler" + + +class _TraceEventhdlr(Eventhdlr): + def __init__(self): + self.trace = None + self.write_run_end_active = False + self._caught_events = set() + + def eventinit(self): + for event_type in ( + SCIP_EVENTTYPE.BESTSOLFOUND, + SCIP_EVENTTYPE.DUALBOUNDIMPROVED, + ): + if event_type not in self._caught_events: + self.model.catchEvent(event_type, self) + self._caught_events.add(event_type) + + def eventexec(self, event): + if self.trace is not None: + self.trace._handle_event(event) + + +class _StructuredOptimizationTrace: + """Internal trace controller shared by both public APIs.""" + + def __init__(self, model: Model, path=None, write_run_end=True): + self.model = model + self.path = path + self.write_run_end = write_run_end + self._fh = None + self._handler = None + self._last_snapshot: dict[str, object] = {} + + def __enter__(self): + if not hasattr(self.model, "data") or self.model.data is None: + self.model.data = {} + + self._handler = self.model.data.get(_TRACE_HANDLER_KEY) + if self._handler is None: + self._handler = _TraceEventhdlr() + self.model.includeEventhdlr( + self._handler, + "structured_trace", + "Structured optimization trace handler", + ) + self.model.data[_TRACE_HANDLER_KEY] = self._handler + + if self._handler.write_run_end_active: + raise RuntimeError( + "structured optimization trace is already active for this model" + ) + + self.model.data["trace"] = [] + + if self.path is not None: + self._fh = open(self.path, "w", encoding="utf-8") + + self._handler.trace = self + if self.write_run_end: + self._handler.write_run_end_active = True + + return self + + def __exit__(self, exc_type, exc, tb): + fields = {} + if self._last_snapshot: + fields.update(self._last_snapshot) + + if exc_type is None: + fields["status"] = "finished" + else: + fields.update( + { + "status": "exception", + "exception": exc_type.__name__, + "message": str(exc) if exc is not None else None, + } + ) + + try: + if self.write_run_end: + self._write_event("run_end", fields) + finally: + if self._fh is not None: + try: + self._fh.close() + finally: + self._fh = None + + if self._handler is not None and self._handler.trace is self: + self._handler.trace = None + if self.write_run_end: + self._handler.write_run_end_active = False + + return False + + def _handle_event(self, event): + event_type = event.getType() + if event_type == SCIP_EVENTTYPE.BESTSOLFOUND: + self._write_snapshot("bestsol_found") + elif event_type == SCIP_EVENTTYPE.DUALBOUNDIMPROVED: + self._write_snapshot("dualbound_improved") + + def _snapshot_now(self): + return { + "time": self.model.getSolvingTime(), + "primalbound": self.model.getPrimalbound(), + "dualbound": self.model.getDualbound(), + "gap": self.model.getGap(), + "nodes": self.model.getNNodes(), + "nsol": self.model.getNSols(), + } + + def _write_snapshot(self, event_type): + snapshot = self._snapshot_now() + self._last_snapshot = snapshot + self._write_event(event_type, snapshot) + + def _write_event(self, event_type, fields=None): + event = {"type": event_type} + if fields: + event.update(fields) + + self.model.data["trace"].append(event) + if self._fh is not None: + self._fh.write(json.dumps(event) + "\n") + self._fh.flush() + + +def structured_optimization_trace(model: Model, path=None): + """ + Create a context manager for structured optimization progress tracing. + + Records progress events in ``model.data["trace"]`` while the model is + optimized inside the context. If ``path`` is given, the same records are + written as JSONL and flushed after each write. + + On exit, appends a final ``run_end`` record and closes JSONL output. If the + context exits with a Python exception, the ``run_end`` record includes + exception metadata and the exception is re-raised. + + Args: + model: SCIP Model. + path: Optional JSONL output path. If None, records are only stored in + ``model.data["trace"]``. + + Usage: + with structured_optimization_trace(model, path="trace.jsonl"): + model.optimize() + + with structured_optimization_trace(model): + model.optimizeNogil() + """ + return _StructuredOptimizationTrace(model, path=path, write_run_end=True) + def attach_structured_optimization_trace(model: Model): """ - Attaches an event handler that records optimization progress in structured JSONL format. + Attaches an event handler that records structured optimization progress. + + This is the attach-style API for simple in-memory tracing. It records + progress events in ``model.data["trace"]`` but does not manage a Python + finalization scope, so it does not emit a final ``run_end`` record. + + Use ``structured_optimization_trace(model, path=...)`` as a context manager + when JSONL output, ``run_end`` records, flushing, closing, or exception + finalization are required. Args: model: SCIP Model - """ - class _TraceEventhdlr(Eventhdlr): - def eventinit(self): - self.model.catchEvent(SCIP_EVENTTYPE.BESTSOLFOUND, self) - self.model.catchEvent(SCIP_EVENTTYPE.DUALBOUNDIMPROVED, self) - - def eventexec(self, event): - record = { - "time": self.model.getSolvingTime(), - "primalbound": self.model.getPrimalbound(), - "dualbound": self.model.getDualbound(), - "gap": self.model.getGap(), - "nodes": self.model.getNNodes(), - "nsol": self.model.getNSols(), - } - self.model.data["trace"].append(record) - - if not hasattr(model, "data") or model.data is None: - model.data = {} - model.data["trace"] = [] - - hdlr = _TraceEventhdlr() - model.includeEventhdlr( - hdlr, "structured_trace", "Structured optimization trace handler" - ) + Usage: + attach_structured_optimization_trace(model) + model.optimize() + trace = model.data["trace"] + """ + trace = _StructuredOptimizationTrace(model, write_run_end=False) + trace.__enter__() return model diff --git a/tests/test_recipe_structured_optimization_trace.py b/tests/test_recipe_structured_optimization_trace.py index 334d5107c..157bfe360 100644 --- a/tests/test_recipe_structured_optimization_trace.py +++ b/tests/test_recipe_structured_optimization_trace.py @@ -1,17 +1,40 @@ +import json + +import pytest from helpers.utils import bin_packing_model from pyscipopt.recipes.structured_optimization_trace import ( attach_structured_optimization_trace, + structured_optimization_trace, ) -def test_structured_optimization_trace(): - from random import randint - - model = bin_packing_model(sizes=[randint(1, 40) for _ in range(120)], capacity=50) +def _model(): + model = bin_packing_model(sizes=list(range(1, 41)) * 3, capacity=50) model.setParam("limits/time", 5) + return model + + +def _assert_progress_records(records): + required_fields = {"time", "primalbound", "dualbound", "gap", "nodes", "nsol"} + + for record in records: + if record["type"] != "run_end": + assert required_fields <= set(record.keys()) + + primalbounds = [r["primalbound"] for r in records if "primalbound" in r] + for i in range(1, len(primalbounds)): + assert primalbounds[i] <= primalbounds[i - 1] + + dualbounds = [r["dualbound"] for r in records if "dualbound" in r] + for i in range(1, len(dualbounds)): + assert dualbounds[i] >= dualbounds[i - 1] + +def test_attach_structured_optimization_trace_in_memory(): + model = _model() model.data = {"test": True} + model = attach_structured_optimization_trace(model) assert "test" in model.data @@ -19,14 +42,89 @@ def test_structured_optimization_trace(): model.optimize() - required_fields = {"time", "primalbound", "dualbound", "gap", "nodes", "nsol"} - for record in model.data["trace"]: - assert required_fields <= set(record.keys()) + assert model.data["trace"] + assert all("type" in record for record in model.data["trace"]) + assert "run_end" not in [r["type"] for r in model.data["trace"]] + _assert_progress_records(model.data["trace"]) - primalbounds = [r["primalbound"] for r in model.data["trace"]] - for i in range(1, len(primalbounds)): - assert primalbounds[i] <= primalbounds[i - 1] - dualbounds = [r["dualbound"] for r in model.data["trace"]] - for i in range(1, len(dualbounds)): - assert dualbounds[i] >= dualbounds[i - 1] +@pytest.mark.parametrize("optimize", ["optimize", "optimizeNogil"]) +def test_structured_optimization_trace_context_in_memory(optimize): + model = _model() + model.data = {"test": True} + + with structured_optimization_trace(model): + getattr(model, optimize)() + + assert "test" in model.data + assert "trace" in model.data + + types = [r["type"] for r in model.data["trace"]] + assert "run_end" in types + assert model.data["trace"][-1]["type"] == "run_end" + assert model.data["trace"][-1]["status"] == "finished" + _assert_progress_records(model.data["trace"]) + + +def test_structured_optimization_trace_file_output(tmp_path): + model = _model() + path = tmp_path / "trace.jsonl" + + with structured_optimization_trace(model, path=str(path)): + model.optimize() + + assert path.exists() + + records = [json.loads(line) for line in path.read_text().splitlines()] + assert records == model.data["trace"] + assert records[-1]["type"] == "run_end" + assert records[-1]["status"] == "finished" + _assert_progress_records(records) + + +def test_structured_optimization_trace_records_run_end_on_exception(): + model = _model() + + with pytest.raises(ValueError): + with structured_optimization_trace(model): + raise ValueError("test error") + + assert model.data["trace"] == [ + { + "type": "run_end", + "status": "exception", + "exception": "ValueError", + "message": "test error", + } + ] + + +def test_structured_optimization_trace_reuses_handler_for_repeated_contexts(tmp_path): + model = _model() + first_path = tmp_path / "first.jsonl" + second_path = tmp_path / "second.jsonl" + + with structured_optimization_trace(model, path=str(first_path)): + pass + + handler = model.data["_structured_optimization_trace_handler"] + + with structured_optimization_trace(model, path=str(second_path)): + pass + + assert model.data["_structured_optimization_trace_handler"] is handler + + first_records = [json.loads(line) for line in first_path.read_text().splitlines()] + second_records = [json.loads(line) for line in second_path.read_text().splitlines()] + + assert first_records == [{"type": "run_end", "status": "finished"}] + assert second_records == [{"type": "run_end", "status": "finished"}] + + +def test_structured_optimization_trace_rejects_nested_contexts(): + model = _model() + + with structured_optimization_trace(model): + with pytest.raises(RuntimeError): + with structured_optimization_trace(model): + pass