diff --git a/docs/guide/cli/cli_basics.rst b/docs/guide/cli/cli_basics.rst index 15e3faa6a..31a600d08 100644 --- a/docs/guide/cli/cli_basics.rst +++ b/docs/guide/cli/cli_basics.rst @@ -69,6 +69,8 @@ the subcommand name, e.g., ``openfe quickrun --help``, which returns exist, it will be created at runtime. -o PATH Filepath at which to create and write the JSON- formatted results. + --resume Attempt to resume this transformation's execution + using the cache. -h, --help Show this message and exit. For more details on various commands, see the :ref:`cli-reference`. diff --git a/docs/guide/cli/index.rst b/docs/guide/cli/index.rst index 481492eb4..e3a565740 100644 --- a/docs/guide/cli/index.rst +++ b/docs/guide/cli/index.rst @@ -13,3 +13,4 @@ into non-Python workflows. .. toctree:: cli_basics cli_yaml + quickrun diff --git a/docs/guide/cli/quickrun.rst b/docs/guide/cli/quickrun.rst new file mode 100644 index 000000000..f60df5712 --- /dev/null +++ b/docs/guide/cli/quickrun.rst @@ -0,0 +1,85 @@ +.. _userguide_cli_quickrun: + +Using Quickrun to execute Transformations +========================================= + +The ``openfe quickrun`` command executes a single alchemical Transformation. +This is currrently the primary way to execute Transformations after they +have been created during network planning. + + +Basic Usage +----------- + +To run a Transformation (``transformation.json``) and save results to ``results.json``: + +.. code:: none + + openfe quickrun transformation.json -d workdir/ -o workdir/results.json + +The ``-d`` / ``--work-dir`` flag controls where working files (checkpoints, +trajectory data, etc...) are written. If it is ommited, the current directory +will be used. + +The ``-o`` flag controls where the results file will be written. If it is omitted, +results are written to a file named ``_results.json`` in the working directory, where `` is a unique identifier. + + +Resuming a halted Job +--------------------- + +When ``openfe quickrun`` starts, it saves a plan of the simulation to a +cache file before execution begins: + +.. code:: none + + /quickrun_cache/-ProtocolDAG.json + +This cache is automatically removed once the job completes successfully. + +If a job is interrupted (e.g. due to a wall-time limit, node failure, or +manual cancellation), you can resume the interrupted job by passing the ``--resume`` flag: + +.. code:: none + + openfe quickrun transformation.json -d workdir/ -o workdir/results.json --resume + +The planned simulation cache will be used to identify where in the simulation +process it is and, if supported by the Transformation Protocol, how to resume. + +.. note:: + + The same ``-d`` / ``--work-dir`` used in the original run + must be specified so that ``quickrun`` can locate the cache file. + +If you pass ``--resume`` but no cache file is found (e.g. the job never +started), the following warning is printed and a fresh execution begins: + +.. code:: none + + No checkpoint found at /quickrun_cache/-protocolDAG.json! + Starting new execution. + +If the cache file is corrupted (e.g. due to an incomplete write at +the moment of interruption), ``quickrun --resume`` will raise an error with instructions to rerun the simulation: + +.. code:: none + + Recovery failed, please remove /quickrun_cache/-protocolDAG.json + and any results from your working directory before continuing to create a new protocol, or run without `--resume`. + +If you do not pass the ``--resume`` flag, the code will detect the partially +complete transformation and prevent you from accidentally starting a duplicate +run. The following error will be raised: + +.. code:: none + + RuntimeError: Transformation has been started but is incomplete. Please + remove /quickrun_cache/-protocolDAG.json and rerun, or resume + execution using the ``--resume`` flag. + +See Also +-------- + +- :ref:`cli-reference` - full CLI reference for ``openfe quickrun`` +- :ref:`rbfe_cli_tutorial` - a tutorial on how to use the CLI to run hybrid topology relative binding free energy calculations. diff --git a/news/quickrun_resume.rst b/news/quickrun_resume.rst new file mode 100644 index 000000000..e53e52738 --- /dev/null +++ b/news/quickrun_resume.rst @@ -0,0 +1,24 @@ +**Added:** + +* Added ``--resume`` flag to ``openfe quickrun``. + Quickrun now temporarily caches ``protocolDAG`` information and when used with the ``--resume`` flag, quickrun will attempt resume execution of an incomplete transformation. + +**Changed:** + +* + +**Deprecated:** + +* + +**Removed:** + +* + +**Fixed:** + +* + +**Security:** + +* diff --git a/src/openfe/tests/protocols/openmm_abfe/test_abfe_protocol_results.py b/src/openfe/tests/protocols/openmm_abfe/test_abfe_protocol_results.py index a6519ddab..4b387f429 100644 --- a/src/openfe/tests/protocols/openmm_abfe/test_abfe_protocol_results.py +++ b/src/openfe/tests/protocols/openmm_abfe/test_abfe_protocol_results.py @@ -83,12 +83,12 @@ def patcher(): yield -def test_gather(benzene_complex_dag, patcher, tmpdir): +def test_gather(benzene_complex_dag, patcher, tmp_path): # check that .gather behaves as expected dagres = gufe.protocols.execute_DAG( benzene_complex_dag, - shared_basedir=tmpdir, - scratch_basedir=tmpdir, + shared_basedir=tmp_path, + scratch_basedir=tmp_path, keep_shared=True, ) diff --git a/src/openfe/tests/protocols/openmm_ahfe/test_ahfe_protocol_results.py b/src/openfe/tests/protocols/openmm_ahfe/test_ahfe_protocol_results.py index 66e12d302..11300977b 100644 --- a/src/openfe/tests/protocols/openmm_ahfe/test_ahfe_protocol_results.py +++ b/src/openfe/tests/protocols/openmm_ahfe/test_ahfe_protocol_results.py @@ -103,12 +103,12 @@ def patcher(): yield -def test_gather(benzene_solvation_dag, patcher, tmpdir): +def test_gather(benzene_solvation_dag, patcher, tmp_path): # check that .gather behaves as expected dagres = gufe.protocols.execute_DAG( benzene_solvation_dag, - shared_basedir=tmpdir, - scratch_basedir=tmpdir, + shared_basedir=tmp_path, + scratch_basedir=tmp_path, keep_shared=True, ) diff --git a/src/openfe/tests/protocols/openmm_md/test_plain_md_protocol.py b/src/openfe/tests/protocols/openmm_md/test_plain_md_protocol.py index 33e550d9b..31b2626d3 100644 --- a/src/openfe/tests/protocols/openmm_md/test_plain_md_protocol.py +++ b/src/openfe/tests/protocols/openmm_md/test_plain_md_protocol.py @@ -508,7 +508,7 @@ def test_unit_tagging(solvent_protocol_dag, tmpdir): assert len(repeats) == 3 -def test_gather(solvent_protocol_dag, tmpdir): +def test_gather(solvent_protocol_dag, tmp_path): # check .gather behaves as expected with mock.patch( "openfe.protocols.openmm_md.plain_md_methods.PlainMDProtocolUnit.run", @@ -519,8 +519,8 @@ def test_gather(solvent_protocol_dag, tmpdir): ): dagres = gufe.protocols.execute_DAG( solvent_protocol_dag, - shared_basedir=tmpdir, - scratch_basedir=tmpdir, + shared_basedir=tmp_path, + scratch_basedir=tmp_path, keep_shared=True, ) diff --git a/src/openfe/tests/protocols/openmm_rfe/test_hybrid_top_protocol.py b/src/openfe/tests/protocols/openmm_rfe/test_hybrid_top_protocol.py index d98482aba..d63ed2734 100644 --- a/src/openfe/tests/protocols/openmm_rfe/test_hybrid_top_protocol.py +++ b/src/openfe/tests/protocols/openmm_rfe/test_hybrid_top_protocol.py @@ -1257,12 +1257,12 @@ def test_unit_tagging(solvent_protocol_dag, unit_mock_patcher, tmpdir): assert len(setup_results) == len(sim_results) == len(analysis_results) == 3 -def test_gather(solvent_protocol_dag, unit_mock_patcher, tmpdir): +def test_gather(solvent_protocol_dag, unit_mock_patcher, tmp_path): # check .gather behaves as expected dagres = gufe.protocols.execute_DAG( solvent_protocol_dag, - shared_basedir=tmpdir, - scratch_basedir=tmpdir, + shared_basedir=tmp_path, + scratch_basedir=tmp_path, keep_shared=True, ) diff --git a/src/openfe/tests/protocols/openmm_septop/test_septop_protocol.py b/src/openfe/tests/protocols/openmm_septop/test_septop_protocol.py index f78a6b6f8..7b39a4107 100644 --- a/src/openfe/tests/protocols/openmm_septop/test_septop_protocol.py +++ b/src/openfe/tests/protocols/openmm_septop/test_septop_protocol.py @@ -1295,7 +1295,7 @@ def test_unit_tagging(benzene_toluene_dag, tmpdir): assert len(complex_repeats) == len(solv_repeats) == 2 -def test_gather(benzene_toluene_dag, tmpdir): +def test_gather(benzene_toluene_dag, tmp_path): # check that .gather behaves as expected with ( mock.patch( @@ -1339,8 +1339,8 @@ def test_gather(benzene_toluene_dag, tmpdir): ): dagres = gufe.protocols.execute_DAG( benzene_toluene_dag, - shared_basedir=tmpdir, - scratch_basedir=tmpdir, + shared_basedir=tmp_path, + scratch_basedir=tmp_path, keep_shared=True, ) diff --git a/src/openfecli/commands/quickrun.py b/src/openfecli/commands/quickrun.py index f34410d69..15e099237 100644 --- a/src/openfecli/commands/quickrun.py +++ b/src/openfecli/commands/quickrun.py @@ -3,6 +3,7 @@ import json import pathlib +import warnings import click @@ -30,8 +31,14 @@ def _format_exception(exception) -> str: type=click.Path(dir_okay=False, file_okay=False, path_type=pathlib.Path), help="Filepath at which to create and write the JSON-formatted results.", ) # fmt: skip +@click.option( + "--resume", + is_flag=True, + default=False, + help=("Attempt to resume this transformation's execution using the cache."), +) @print_duration -def quickrun(transformation, work_dir, output): +def quickrun(transformation, work_dir, output, resume): """Run the transformation (edge) in the given JSON file. Simulation JSON files can be created with the @@ -51,7 +58,9 @@ def quickrun(transformation, work_dir, output): import logging import os import sys + from json import JSONDecodeError + from gufe import ProtocolDAG from gufe.protocols.protocoldag import execute_DAG from gufe.tokenization import JSON_HANDLER from gufe.transformations.transformation import Transformation @@ -94,13 +103,37 @@ def quickrun(transformation, work_dir, output): else: output.parent.mkdir(exist_ok=True, parents=True) - write("Planning simulations for this edge...") - dag = trans.create() + # Attempt to either deserialize or freshly create DAG + trans_DAG_json = work_dir / "quickrun_cache" / f"{trans.key}-protocolDAG.json" + + if trans_DAG_json.is_file(): + if resume: + write(f"Attempting to resume execution using existing edges from '{trans_DAG_json}'") + try: + dag = ProtocolDAG.from_json(trans_DAG_json) + except JSONDecodeError: + errmsg = f"Recovery failed, please remove {trans_DAG_json} and any results from your working directory before continuing to create a new protocol, or run without `--resume`." + raise click.ClickException(errmsg) + else: + errmsg = f"Transformation has been started but is incomplete. Please remove {trans_DAG_json} and rerun, or resume execution using the ``--resume`` flag." + raise RuntimeError(errmsg) + + else: + if resume: + warnings.warn(f"No checkpoint found at {trans_DAG_json}! Starting new execution.") + + # Create the DAG instead and then serialize for later resuming + write("Planning simulations for this edge...") + dag = trans.create() + pathlib.Path(work_dir, "quickrun_cache").mkdir(exist_ok=True) + dag.to_json(trans_DAG_json) + write("Starting the simulations for this edge...") dagresult = execute_DAG( dag, shared_basedir=work_dir, scratch_basedir=work_dir, + unitresults_basedir=work_dir, keep_shared=True, raise_error=False, n_retries=2, @@ -126,6 +159,9 @@ def quickrun(transformation, work_dir, output): with open(output, mode="w") as outf: json.dump(out_dict, outf, cls=JSON_HANDLER.encoder) + # remove the checkpoint since the job has completed + os.remove(trans_DAG_json) + write(f"Here is the result:\n\tdG = {estimate} ± {uncertainty}\n") write("") diff --git a/src/openfecli/tests/commands/test_quickrun.py b/src/openfecli/tests/commands/test_quickrun.py index 86fe00b26..786e3a484 100644 --- a/src/openfecli/tests/commands/test_quickrun.py +++ b/src/openfecli/tests/commands/test_quickrun.py @@ -1,14 +1,18 @@ import json +import os import pathlib from importlib import resources +from unittest import mock -import click import pytest from click.testing import CliRunner +from gufe import Transformation from gufe.tokenization import JSON_HANDLER from openfecli.commands.quickrun import quickrun +from ..utils import assert_click_success + @pytest.fixture def json_file(): @@ -18,21 +22,21 @@ def json_file(): return json_file -@pytest.mark.parametrize( - "extra_args", - [ - {}, - {"-d": "foo_dir", "-o": "foo.json"}, - ], -) +@pytest.mark.parametrize("extra_args", [{}, {"-d": "foo_dir", "-o": "foo.json"}]) def test_quickrun(extra_args, json_file): extras = sum([list(kv) for kv in extra_args.items()], []) runner = CliRunner() with runner.isolated_filesystem(): result = runner.invoke(quickrun, [json_file] + extras) - assert result.exit_code == 0 + + assert_click_success(result) assert "Here is the result" in result.output + trans = Transformation.from_json(json_file) + # checkpoint should be deleted when job is complete + assert not pathlib.Path( + extra_args.get("-d", ""), "quickrun_cache", f"{trans.key}-protocolDAG.json" + ).exists() if outfile := extra_args.get("-o"): assert pathlib.Path(outfile).exists() @@ -49,6 +53,23 @@ def test_quickrun(extra_args, json_file): # assert len(list(dirpath.iterdir())) > 0 +@pytest.mark.parametrize("extra_args", [{}, {"-d": "foo_dir", "-o": "foo.json"}]) +def test_quickrun_interrupted(extra_args, json_file): + """If a quickrun is unable to complete, the protocolDAG.json checkpoint should exist.""" + extras = sum([list(kv) for kv in extra_args.items()], []) + + runner = CliRunner() + with runner.isolated_filesystem(): + with mock.patch("gufe.protocols.protocoldag.execute_DAG", side_effect=RuntimeError): + result = runner.invoke(quickrun, [json_file] + extras) + + assert "Here is the result" not in result.output + trans = Transformation.from_json(json_file) + assert pathlib.Path( + extra_args.get("-d", ""), "quickrun_cache", f"{trans.key}-protocolDAG.json" + ).exists() + + def test_quickrun_output_file_exists(json_file): """Fail if the output file already exists.""" runner = CliRunner() @@ -65,7 +86,7 @@ def test_quickrun_output_file_in_nonexistent_directory(json_file): with runner.isolated_filesystem(): outfile = pathlib.Path("not_dir/foo.json") result = runner.invoke(quickrun, [json_file, "-o", outfile]) - assert result.exit_code == 0 + assert_click_success(result) assert outfile.parent.is_dir() @@ -76,7 +97,7 @@ def test_quickrun_dir_created_at_runtime(json_file): outdir = "not_dir" outfile = outdir + "foo.json" result = runner.invoke(quickrun, [json_file, "-d", outdir, "-o", outfile]) - assert result.exit_code == 0 + assert_click_success(result) def test_quickrun_unit_error(): @@ -92,3 +113,55 @@ def test_quickrun_unit_error(): # to be stored in JSON # not sure whether that means we should always be storing all # protocol dag results maybe? + + +def test_quickrun_existing_checkpoint(json_file): + """In the default case where resume=False, if the checkpoint exists, quickrun should error out and not attempt to execute.""" + trans = Transformation.from_json(json_file) + dag = trans.create() + + runner = CliRunner() + with runner.isolated_filesystem(): + pathlib.Path("quickrun_cache").mkdir() + dag.to_json(pathlib.Path("quickrun_cache", f"{trans.key}-protocolDAG.json")) + result = runner.invoke(quickrun, [json_file]) + assert isinstance(result.exception, RuntimeError) + assert "Attempting to resume" not in result.output + + +def test_quickrun_resume_from_checkpoint(json_file): + trans = Transformation.from_json(json_file) + dag = trans.create() + + runner = CliRunner() + with runner.isolated_filesystem(): + pathlib.Path("quickrun_cache").mkdir() + dag.to_json(pathlib.Path("quickrun_cache", f"{trans.key}-protocolDAG.json")) + result = runner.invoke(quickrun, [json_file, "--resume"]) + + assert_click_success(result) + assert "Attempting to resume" in result.output + + +def test_quickrun_resume_invalid_checkpoint(json_file): + """Fail if the output file doesn't load properly.""" + trans = Transformation.from_json(json_file) + + runner = CliRunner() + with runner.isolated_filesystem(): + pathlib.Path("quickrun_cache").mkdir() + pathlib.Path("quickrun_cache", f"{trans.key}-protocolDAG.json").touch() + result = runner.invoke(quickrun, [json_file, "--resume"]) + + assert result.exit_code == 1 + assert "Attempting to resume" in result.output + assert "Recovery failed" in result.stderr + + +def test_quickrun_resume_missing_checkpoint(json_file): + """If --resume is passed but there's not checkpoint, just warn and keep going.""" + runner = CliRunner() + with runner.isolated_filesystem(): + with pytest.warns(): + result = runner.invoke(quickrun, [json_file, "--resume"]) + assert result.exit_code == 0