diff --git a/news/quickrun_resume.rst b/news/quickrun_resume.rst new file mode 100644 index 000000000..c594c443a --- /dev/null +++ b/news/quickrun_resume.rst @@ -0,0 +1,24 @@ +**Added:** + +* Added ``--resume`` flag to ``openfe quickrun``. + Quickrun now temporarily caches ``protocolDAG`` information and, when used with the ``--resume`` flag, quickrun will attempt to resume execution of an incomplete transformation. + +**Changed:** + +* + +**Deprecated:** + +* + +**Removed:** + +* + +**Fixed:** + +* + +**Security:** + +* diff --git a/src/openfecli/commands/quickrun.py b/src/openfecli/commands/quickrun.py index f34410d69..f5c1b33a4 100644 --- a/src/openfecli/commands/quickrun.py +++ b/src/openfecli/commands/quickrun.py @@ -1,8 +1,10 @@ # This code is part of OpenFE and is licensed under the MIT license. # For details, see https://github.com/OpenFreeEnergy/openfe +import hashlib import json import pathlib +import warnings import click @@ -15,6 +17,12 @@ def _format_exception(exception) -> str: return f"{exception[0]}: {exception[1][0]}" +def _hash_quickrun_inputs(output, transformation): + string_rep = f"{output.absolute()}{transformation.key}" + hasher = hashlib.md5(string_rep.encode(), usedforsecurity=False) + return hasher.hexdigest() + + @click.command("quickrun", short_help="Run a given transformation, saved as a JSON file") @click.argument("transformation", type=click.File(mode="r"), required=True) @click.option( @@ -30,8 +38,14 @@ def _format_exception(exception) -> str: type=click.Path(dir_okay=False, file_okay=False, path_type=pathlib.Path), help="Filepath at which to create and write the JSON-formatted results.", ) # fmt: skip +@click.option( + "--resume", + is_flag=True, + default=False, + help=("Attempt to resume this transformation's execution using the cache."), +) @print_duration -def quickrun(transformation, work_dir, output): +def quickrun(transformation, work_dir, output, resume): """Run the transformation (edge) in the given JSON file. Simulation JSON files can be created with the @@ -51,7 +65,9 @@ def quickrun(transformation, work_dir, output): import logging import os import sys + from json import JSONDecodeError + from gufe import ProtocolDAG from gufe.protocols.protocoldag import execute_DAG from gufe.tokenization import JSON_HANDLER from gufe.transformations.transformation import Transformation @@ -81,12 +97,14 @@ def quickrun(transformation, work_dir, output): # turn warnings into log message (don't show stack trace) logging.captureWarnings(True) + click.secho(f"\nCurrent directory: {os.getcwd()}/") if work_dir is None: + click.secho(f"Creating working directory: {work_dir}/") work_dir = pathlib.Path(os.getcwd()) else: + click.secho(f"Using existing working directory: {work_dir}/") work_dir.mkdir(exist_ok=True, parents=True) - write("Loading file...") trans = Transformation.from_json(transformation) if output is None: @@ -94,13 +112,56 @@ def quickrun(transformation, work_dir, output): else: output.parent.mkdir(exist_ok=True, parents=True) - write("Planning simulations for this edge...") - dag = trans.create() - write("Starting the simulations for this edge...") + click.secho(f"Loading transformation from: {transformation.name}") + click.secho(f"When simulation is complete, results will be written to: {output}\n") + + resume_command = f"openfe quickrun {transformation.name} -o {output} -d {work_dir} --resume\n" + + click.secho( + "If this simulation is interrupted or fails, you may attempt to resume execution using:", + bold=True, + ) + click.secho(resume_command) + + # Attempt to either deserialize or freshly create DAG + cache_basedir = work_dir / "quickrun_cache" + hashed_key = _hash_quickrun_inputs(output, trans) + cached_dag_path = cache_basedir / f"dag-cache-{hashed_key}.json" + + if cached_dag_path.is_file(): + if resume: + write(f"Attempting to resume execution using '{cached_dag_path}'") + try: + dag = ProtocolDAG.from_json(cached_dag_path) + except JSONDecodeError: + # we can't tell the user which gufe-generated cache dir to delete, since we'd need to load the JSON to know the DAG's key + # however, just removing the cached_dag_path is sufficient to trigger a fresh DAG to be generated, and the gufe-generated cached dir will just be stale. + errmsg = f"Recovery failed, please remove {cached_dag_path} before continuing to create a new protocol." + raise click.ClickException(errmsg) + + write("Success. Resuming execution...") + else: + errmsg = f"Transformation has been started but is incomplete. Please remove {cached_dag_path} and rerun, or resume execution using the ``--resume`` flag." + raise click.ClickException(click.style(errmsg, fg="red")) + + else: + if resume: + write( + f"openfe quickrun was run with --resume, but no cached results found at {cached_dag_path}. Starting new execution." + ) + + # Create the DAG instead and then serialize for later resuming + write("Planning simulations for this edge...") + dag = trans.create() + cache_basedir.mkdir(exist_ok=True) + dag.to_json(cached_dag_path) + + write("\nStarting the simulations for this edge...\n") dagresult = execute_DAG( dag, shared_basedir=work_dir, scratch_basedir=work_dir, + cache_basedir=cache_basedir, keep_shared=True, raise_error=False, n_retries=2, @@ -126,6 +187,9 @@ def quickrun(transformation, work_dir, output): with open(output, mode="w") as outf: json.dump(out_dict, outf, cls=JSON_HANDLER.encoder) + # remove the cached dag since the job has completed + os.remove(cached_dag_path) + write(f"Here is the result:\n\tdG = {estimate} ± {uncertainty}\n") write("") diff --git a/src/openfecli/tests/commands/test_quickrun.py b/src/openfecli/tests/commands/test_quickrun.py index 11ff676e6..9cdcc7c3b 100644 --- a/src/openfecli/tests/commands/test_quickrun.py +++ b/src/openfecli/tests/commands/test_quickrun.py @@ -1,13 +1,17 @@ import json +import os import pathlib from importlib import resources +from unittest import mock import pytest from click.testing import CliRunner from gufe import Transformation from gufe.tokenization import JSON_HANDLER -from openfecli.commands.quickrun import quickrun +from openfecli.commands.quickrun import _hash_quickrun_inputs, quickrun + +from ..utils import assert_click_success @pytest.fixture @@ -18,35 +22,34 @@ def json_file(): return json_file -@pytest.mark.parametrize( - "extra_args", - [ - {}, - {"-d": "foo_dir", "-o": "foo.json"}, - ], -) +@pytest.mark.parametrize("extra_args", [{}, {"-d": "foo_dir", "-o": "foo.json"}]) def test_quickrun(extra_args, json_file): extras = sum([list(kv) for kv in extra_args.items()], []) runner = CliRunner() with runner.isolated_filesystem(): + # figure out what cached json should be trans = Transformation.from_json(json_file) + work_dir = extra_args.get("-d", ".") outfile = pathlib.Path(extra_args.get("-o", f"{trans.key}_results.json")) + hashed_key = _hash_quickrun_inputs(outfile, trans) # output json shouldn't be created before quickrun is executed assert not pathlib.Path(outfile).exists() - result = runner.invoke(quickrun, [json_file] + extras) - assert result.exit_code == 0 + + assert_click_success(result) assert "Here is the result" in result.output - # output json should exist and have results after execution + # cache should be deleted when job is complete + assert not pathlib.Path(work_dir, "quickrun_cache", f"dag-cache-{hashed_key}.json").exists() + + # output json should exist with data when job is complete assert pathlib.Path(outfile).exists() with open(outfile, mode="r") as outf: dct = json.load(outf, cls=JSON_HANDLER.decoder) assert set(dct) == {"estimate", "uncertainty", "protocol_result", "unit_results"} - # TODO: need a protocol that drops files to actually do this! # if directory := extra_args.get('-d'): # dirpath = pathlib.Path(directory) @@ -55,6 +58,26 @@ def test_quickrun(extra_args, json_file): # assert len(list(dirpath.iterdir())) > 0 +@pytest.mark.parametrize("extra_args", [{}, {"-d": "foo_dir", "-o": "foo.json"}]) +def test_quickrun_interrupted(extra_args, json_file): + """If quickrun starts but is unable to complete, the cached DAG should exist.""" + extras = sum([list(kv) for kv in extra_args.items()], []) + + runner = CliRunner() + with runner.isolated_filesystem(): + # figure out what cached json should be + trans = Transformation.from_json(json_file) + work_dir = pathlib.Path(extra_args.get("-d", ".")).absolute() + outfile = pathlib.Path(extra_args.get("-o", f"{trans.key}_results.json")) + hashed_key = _hash_quickrun_inputs(outfile, trans) + + with mock.patch("gufe.protocols.protocoldag.execute_DAG", side_effect=RuntimeError): + result = runner.invoke(quickrun, [json_file] + extras) + + assert "Here is the result" not in result.output + assert pathlib.Path(work_dir, "quickrun_cache", f"dag-cache-{hashed_key}.json").exists() + + def test_quickrun_output_file_exists(json_file): """Fail if the output file already exists.""" runner = CliRunner() @@ -71,7 +94,7 @@ def test_quickrun_output_file_in_nonexistent_directory(json_file): with runner.isolated_filesystem(): outfile = pathlib.Path("not_dir/foo.json") result = runner.invoke(quickrun, [json_file, "-o", outfile]) - assert result.exit_code == 0 + assert_click_success(result) assert outfile.parent.is_dir() @@ -82,7 +105,7 @@ def test_quickrun_dir_created_at_runtime(json_file): outdir = "not_dir" outfile = outdir + "foo.json" result = runner.invoke(quickrun, [json_file, "-d", outdir, "-o", outfile]) - assert result.exit_code == 0 + assert_click_success(result) def test_quickrun_unit_error(): @@ -98,3 +121,74 @@ def test_quickrun_unit_error(): # to be stored in JSON # not sure whether that means we should always be storing all # protocol dag results maybe? + + +def test_quickrun_existing_cache_error(json_file): + """In the default case where resume=False, if the cache exists, quickrun should error out and not attempt to execute.""" + trans = Transformation.from_json(json_file) + dag = trans.create() + + runner = CliRunner() + with runner.isolated_filesystem(): + outfile = pathlib.Path(f"{trans.key}_results.json") + hashed_key = _hash_quickrun_inputs(outfile, trans) + pathlib.Path("quickrun_cache").mkdir() + dag.to_json(pathlib.Path("quickrun_cache", f"dag-cache-{hashed_key}.json")) + result = runner.invoke(quickrun, [json_file]) + assert result.exit_code == 1 + assert "Attempting to resume" not in result.output + assert "Transformation has been started but is incomplete." in result.stderr + + +def test_quickrun_resume_from_cache(json_file): + trans = Transformation.from_json(json_file) + dag = trans.create() + + runner = CliRunner() + with runner.isolated_filesystem(): + outfile = pathlib.Path(f"{trans.key}_results.json") + hashed_key = _hash_quickrun_inputs(outfile, trans) + pathlib.Path("quickrun_cache").mkdir() + dag_cache = pathlib.Path("quickrun_cache", f"dag-cache-{hashed_key}.json") + dag.to_json(dag_cache) + result = runner.invoke(quickrun, [json_file, "--resume"]) + + assert_click_success(result) + assert f"resume execution using '{dag_cache.absolute()}" in result.output + assert "Success" in result.output + + +def test_quickrun_resume_invalid_cache(json_file): + """Fail if the output file doesn't load properly.""" + trans = Transformation.from_json(json_file) + + runner = CliRunner() + with runner.isolated_filesystem(): + outfile = pathlib.Path(f"{trans.key}_results.json") + hashed_key = _hash_quickrun_inputs(outfile, trans) + pathlib.Path("quickrun_cache").mkdir() + dag_cache = pathlib.Path("quickrun_cache", f"dag-cache-{hashed_key}.json") + dag_cache.touch() + result = runner.invoke(quickrun, [json_file, "--resume"]) + + assert result.exit_code == 1 + assert f"resume execution using '{dag_cache.absolute()}" in result.output + assert "Recovery failed" in result.stderr + + +def test_quickrun_resume_missing_cache(json_file): + """If --resume is passed but there's no cache, just echo a message and start from scratch.""" + runner = CliRunner() + with runner.isolated_filesystem(): + # determine what the cache to be looked for should be named + trans = Transformation.from_json(json_file) + outfile = pathlib.Path(f"{trans.key}_results.json") + hashed_key = _hash_quickrun_inputs(outfile, trans) + dag_cache = pathlib.Path("quickrun_cache", f"dag-cache-{hashed_key}.json") + + result = runner.invoke(quickrun, [json_file, "--resume"]) + assert_click_success(result) + assert ( + f"openfe quickrun was run with --resume, but no cached results found at {dag_cache.absolute()}" + in result.output + )