diff --git a/arc/plotter.py b/arc/plotter.py index d0f6938e84..c84c66aff8 100644 --- a/arc/plotter.py +++ b/arc/plotter.py @@ -2,6 +2,7 @@ A module for plotting and saving output files such as RMG libraries. """ +import datetime import matplotlib # Force matplotlib to not use any Xwindows backend. # This must be called before pylab, matplotlib.pyplot, or matplotlib.backends is imported. @@ -12,10 +13,15 @@ import numpy as np import os import shutil +import textwrap from matplotlib.backends.backend_pdf import PdfPages from mpl_toolkits.mplot3d import Axes3D from typing import List, Optional, Tuple, Union +try: + import graphviz +except ImportError: + graphviz = None import py3Dmol as p3D from rdkit import Chem @@ -54,6 +60,168 @@ logger = get_logger() +def _sanitize_graphviz_id(value: str) -> str: + """Return a Graphviz-safe identifier.""" + return ''.join(ch if ch.isalnum() else '_' for ch in value) + + +def _wrap_graph_label(text: str, width: int = 24) -> str: + """Wrap long labels so graph nodes stay readable, preserving intentional newlines.""" + if not text: + return '' + return '\n'.join(line for part in str(text).split('\n') + for line in (textwrap.wrap(part, width=width) or [''])) + + +def save_provenance_artifacts(project_directory: str, + provenance: dict, + ) -> dict: + """ + Save provenance YAML and render Graphviz artifacts for an ARC run. + + Args: + project_directory (str): The ARC project directory. + provenance (dict): A provenance dictionary with an ``events`` list. + + Returns: + dict: Paths to generated artifacts. + """ + output_directory = os.path.join(project_directory, 'output') + os.makedirs(output_directory, exist_ok=True) + yml_path = os.path.join(output_directory, 'provenance.yml') + dot_path = os.path.join(output_directory, 'provenance.dot') + svg_path = os.path.join(output_directory, 'provenance.svg') + + run_label = provenance.get('project', 'ARC run') + if graphviz is None: + logger.warning('The graphviz Python package is not available, so ARC will only save provenance.yml.') + provenance['updated_at'] = datetime.datetime.now().isoformat(timespec='seconds') + save_yaml_file(path=yml_path, content=provenance) + return {'yml': yml_path, 'dot': None, 'svg': None} + + graph = graphviz.Digraph( + name='arc_provenance', + comment=f'ARC provenance for {run_label}', + graph_attr={'rankdir': 'LR', 'splines': 'true', 'overlap': 'false'}, + node_attr={'shape': 'box', 'style': 'rounded,filled', 'fillcolor': 'white', 'fontname': 'Helvetica'}, + edge_attr={'fontname': 'Helvetica'}, + ) + run_node_id = _sanitize_graphviz_id(f"run_{provenance.get('run_id', run_label)}") + run_header = provenance.get('started_at', '') + run_footer = provenance.get('ended_at', '') + run_text = f'{run_label}' + if run_header: + run_text += f'\nstart: {run_header}' + if run_footer: + run_text += f'\nend: {run_footer}' + graph.node(run_node_id, _wrap_graph_label(run_text, width=32), shape='oval', fillcolor='lightgoldenrod1') + + species_nodes = dict() + job_nodes = dict() + # Track the most recent decision node (troubleshoot / TS selection) per label, + # so that follow-up jobs spawned by that decision connect from the diamond. + last_decision_by_label = dict() + + for event in provenance.get('events', list()): + event_type = event.get('event_type', '') + label = event.get('label') + if label and label not in species_nodes: + species_node_id = _sanitize_graphviz_id(f'species_{label}') + species_text = label + if event.get('is_ts'): + species_text += '\nTS' + graph.node(species_node_id, _wrap_graph_label(species_text), fillcolor='aliceblue') + graph.edge(run_node_id, species_node_id) + species_nodes[label] = species_node_id + + if event_type == 'job_started': + job_key = event.get('job_key', event.get('job_name', 'job')) + job_node_id = _sanitize_graphviz_id(f'job_{job_key}') + job_text = f"{event.get('job_type', 'job')}\n{event.get('job_name', job_key)}" + if event.get('job_adapter'): + job_text += f"\n{event['job_adapter']}" + if event.get('level'): + job_text += f"\n{event['level']}" + graph.node(job_node_id, _wrap_graph_label(job_text), fillcolor='white') + + # Determine the source node for this job's incoming edge. + parent_job = event.get('provenance_parent_job') + reason = event.get('provenance_reason', '') + if parent_job and label in last_decision_by_label: + # A decision (troubleshoot / TS selection) preceded this job — connect from it. + source_node_id = last_decision_by_label.pop(label) + elif parent_job: + # Rerun or other child job — connect from the parent job node. + parent_key = f'{label}:{parent_job}' + source_node_id = job_nodes.get(parent_key, species_nodes.get(label, run_node_id)) + else: + # Normal first-launch job — connect from the species node. + source_node_id = species_nodes.get(label, run_node_id) + graph.edge(source_node_id, job_node_id, label=reason) + job_nodes[job_key] = job_node_id + + elif event_type == 'job_finished': + job_key = event.get('job_key') + if job_key in job_nodes: + status = event.get('status', 'unknown') + fillcolor = {'done': 'honeydew', 'errored': 'mistyrose'}.get(status, 'lightyellow') + graph.node(job_nodes[job_key], fillcolor=fillcolor) + + result_node_id = _sanitize_graphviz_id( + f"result_{event.get('event_id', len(job_nodes))}_{job_key}" + ) + result_text = f"{status}" + if event.get('run_time'): + result_text += f"\n{event['run_time']}" + if event.get('keywords'): + result_text += f"\n{', '.join(event['keywords'])}" + graph.node(result_node_id, _wrap_graph_label(result_text), shape='note', fillcolor='cornsilk') + graph.edge(job_nodes[job_key], result_node_id) + + elif event_type in ('ts_guess_selected', 'ts_guess_selection_failed', 'job_troubleshooting'): + decision_node_id = _sanitize_graphviz_id(f"decision_{event.get('event_id', 0)}") + if event_type == 'ts_guess_selected': + decision_text = f"Select TS guess {event.get('selected_index')}" + if event.get('method'): + decision_text += f"\n{event['method']}" + fillcolor = 'lavender' + elif event_type == 'ts_guess_selection_failed': + decision_text = 'TS guess selection\nfailed' + fillcolor = 'mistyrose' + else: + decision_text = f"Troubleshoot {event.get('job_name', '')}" + if event.get('methods'): + decision_text += f"\n{', '.join(event['methods'])}" + fillcolor = 'moccasin' + graph.node(decision_node_id, _wrap_graph_label(decision_text), shape='diamond', fillcolor=fillcolor) + source_job_key = event.get('job_key') + source_node_id = job_nodes.get(source_job_key) if source_job_key else species_nodes.get(label) + if source_node_id is None and label is not None: + source_node_id = species_nodes.get(label) + if source_node_id is not None: + graph.edge(source_node_id, decision_node_id) + if label is not None: + last_decision_by_label[label] = decision_node_id + + elif event_type == 'species_initialized' and label in species_nodes: + continue + + with open(dot_path, 'w') as f: + f.write(graph.source) + + try: + svg_data = graph.pipe(format='svg') + except (graphviz.ExecutableNotFound, graphviz.CalledProcessError): + logger.warning('Could not render ARC provenance SVG because Graphviz is not available on this system.') + else: + with open(svg_path, 'wb') as f: + f.write(svg_data) + + provenance['updated_at'] = datetime.datetime.now().isoformat(timespec='seconds') + save_yaml_file(path=yml_path, content=provenance) + return {'yml': yml_path, 'dot': dot_path, 'svg': svg_path if os.path.isfile(svg_path) else None} + + # *** Drawings species *** def draw_structure(xyz=None, species=None, project_directory=None, method='show_sticks', show_atom_indices=False): diff --git a/arc/plotter_test.py b/arc/plotter_test.py index ba6984dae4..20b07656d6 100644 --- a/arc/plotter_test.py +++ b/arc/plotter_test.py @@ -218,6 +218,88 @@ def test_save_irc_traj_animation(self): plotter.save_irc_traj_animation(irc_f_path, irc_r_path, out_path) self.assertTrue(os.path.isfile(out_path)) + def test_wrap_graph_label(self): + """Test that _wrap_graph_label preserves intentional newlines.""" + # Intentional newlines should be preserved, not collapsed. + result = plotter._wrap_graph_label("opt\nopt_a1\ngaussian\nwb97xd/def2tzvp", width=30) + lines = result.split('\n') + self.assertEqual(lines[0], 'opt') + self.assertEqual(lines[1], 'opt_a1') + self.assertEqual(lines[2], 'gaussian') + self.assertEqual(lines[3], 'wb97xd/def2tzvp') + # Long single lines should still be wrapped. + result = plotter._wrap_graph_label("this is a very long label that should be wrapped", width=20) + self.assertTrue(all(len(line) <= 20 for line in result.split('\n'))) + # Empty string returns empty. + self.assertEqual(plotter._wrap_graph_label(''), '') + + def test_save_provenance_artifacts(self): + """Test saving ARC provenance YAML / Graphviz artifacts.""" + project = 'arc_project_for_testing_delete_after_usage' + project_directory = os.path.join(ARC_PATH, 'Projects', project) + provenance = { + 'project': project, + 'run_id': 'run_1', + 'started_at': '2026-03-15T10:00:00', + 'ended_at': '2026-03-15T10:05:00', + 'events': [ + {'event_id': 1, 'event_type': 'species_initialized', 'timestamp': '2026-03-15T10:00:00', + 'label': 'spc1'}, + {'event_id': 2, 'event_type': 'species_initialized', 'timestamp': '2026-03-15T10:00:00', + 'label': 'TS0', 'is_ts': True}, + {'event_id': 3, 'event_type': 'job_started', 'timestamp': '2026-03-15T10:00:01', + 'label': 'spc1', 'job_key': 'spc1:opt_a1', 'job_name': 'opt_a1', 'job_type': 'opt', + 'job_adapter': 'gaussian', 'level': 'b3lyp/6-31g(d)'}, + {'event_id': 4, 'event_type': 'job_finished', 'timestamp': '2026-03-15T10:01:00', + 'label': 'spc1', 'job_key': 'spc1:opt_a1', 'status': 'done', 'run_time': '0:01:00'}, + {'event_id': 5, 'event_type': 'job_started', 'timestamp': '2026-03-15T10:01:01', + 'label': 'spc1', 'job_key': 'spc1:freq_a2', 'job_name': 'freq_a2', 'job_type': 'freq', + 'job_adapter': 'gaussian', 'level': 'b3lyp/6-31g(d)'}, + {'event_id': 6, 'event_type': 'job_finished', 'timestamp': '2026-03-15T10:01:30', + 'label': 'spc1', 'job_key': 'spc1:freq_a2', 'status': 'errored', + 'run_time': '0:00:30', 'keywords': ['memory']}, + {'event_id': 7, 'event_type': 'job_troubleshooting', 'timestamp': '2026-03-15T10:01:35', + 'label': 'spc1', 'job_key': 'spc1:freq_a2', 'job_name': 'freq_a2', 'job_type': 'freq', + 'methods': ['memory']}, + {'event_id': 8, 'event_type': 'job_started', 'timestamp': '2026-03-15T10:01:40', + 'label': 'spc1', 'job_key': 'spc1:freq_a3', 'job_name': 'freq_a3', 'job_type': 'freq', + 'job_adapter': 'gaussian', 'provenance_parent_job': 'freq_a2', + 'provenance_reason': 'ess_troubleshoot'}, + {'event_id': 9, 'event_type': 'job_finished', 'timestamp': '2026-03-15T10:02:00', + 'label': 'spc1', 'job_key': 'spc1:freq_a3', 'status': 'done', 'run_time': '0:00:20'}, + {'event_id': 10, 'event_type': 'job_started', 'timestamp': '2026-03-15T10:02:01', + 'label': 'TS0', 'job_key': 'TS0:tsg0', 'job_name': 'tsg0', 'job_type': 'tsg', + 'job_adapter': 'autotst'}, + {'event_id': 11, 'event_type': 'job_finished', 'timestamp': '2026-03-15T10:03:00', + 'label': 'TS0', 'job_key': 'TS0:tsg0', 'status': 'done'}, + {'event_id': 12, 'event_type': 'ts_guess_selected', 'timestamp': '2026-03-15T10:03:01', + 'label': 'TS0', 'selected_index': 0, 'method': 'autotst', 'energy': -154.321}, + ], + } + paths = plotter.save_provenance_artifacts(project_directory=project_directory, provenance=provenance) + self.assertTrue(os.path.isfile(paths['yml'])) + if paths['dot'] is not None: + self.assertTrue(os.path.isfile(paths['dot'])) + with open(paths['dot'], 'r') as f: + dot = f.read() + # Species and job nodes are present. + self.assertIn('spc1', dot) + self.assertIn('opt_a1', dot) + self.assertIn('TS0', dot) + # Troubleshoot diamond and edge label rendered. + self.assertIn('Troubleshoot', dot) + self.assertIn('ess_troubleshoot', dot) + # TS guess selection diamond rendered. + self.assertIn('Select TS guess 0', dot) + self.assertIn('autotst', dot) + # Errored job node coloured correctly. + self.assertIn('mistyrose', dot) + # Normal jobs (opt_a1, freq_a2) connect from the species node, not from each other. + self.assertIn('species_spc1 -> job_spc1_opt_a1', dot) + self.assertIn('species_spc1 -> job_spc1_freq_a2', dot) + # Troubleshoot follow-up connects from the decision diamond, not the species node. + self.assertIn('decision_7 -> job_spc1_freq_a3', dot) + @classmethod def tearDownClass(cls): diff --git a/arc/scheduler.py b/arc/scheduler.py index aeafecd130..3b5f73c427 100644 --- a/arc/scheduler.py +++ b/arc/scheduler.py @@ -9,9 +9,8 @@ import pprint import shutil import time - import numpy as np -from typing import TYPE_CHECKING, List, Optional, Tuple, Union +from typing import Any, TYPE_CHECKING, List, Optional, Tuple, Union import arc.parser.parser as parser from arc import plotter @@ -297,12 +296,20 @@ def __init__(self, self.output_multi_spc = dict() self.report_e_elect = report_e_elect self.skip_nmd = skip_nmd + self.provenance = {'version': 1, + 'project': self.project, + 'run_id': f'{self.project}_{datetime.datetime.now().strftime("%Y%m%d%H%M%S")}', + 'started_at': datetime.datetime.now().isoformat(timespec='seconds'), + 'events': list(), + } + self.provenance_path = os.path.join(self.project_directory, 'output', 'provenance.yml') self.species_dict, self.rxn_dict = dict(), dict() for species in self.species_list: self.species_dict[species.label] = species for rxn in self.rxn_list: self.rxn_dict[rxn.index] = rxn + self._initialize_provenance() if self.restart_dict is not None: self.output = self.restart_dict['output'] if 'output' in self.restart_dict else dict() self.output_multi_spc = self.restart_dict['output_multi_spc'] if 'output_multi_spc' in self.restart_dict else dict() @@ -325,6 +332,8 @@ def __init__(self, self.orbitals_level = orbitals_level self.unique_species_labels = list() self.save_restart = False + if self.restart_dict is not None: + self._sanitize_restart_output() if len(self.rxn_list): rxn_info_path = self.make_reaction_labels_info_file() @@ -368,6 +377,10 @@ def __init__(self, self.species_list.append(ts_species) self.species_dict[ts_species.label] = ts_species self.initialize_output_dict(ts_species.label) + self.record_provenance_event(event_type='species_initialized', + label=ts_species.label, + is_ts=True, + ) else: # The TS species was already loaded from a restart dict or an Arkane YAML file. ts_species = None @@ -510,6 +523,62 @@ def __init__(self, if not self.testing: self.schedule_jobs() + def _initialize_provenance(self): + """Load previous provenance when restarting and record the current run start.""" + if os.path.isfile(self.provenance_path): + try: + provenance = read_yaml_file(self.provenance_path) + except Exception: + logger.warning('Could not parse existing provenance.yml; starting a fresh provenance log.') + provenance = None + if isinstance(provenance, dict): + raw_events = provenance.get('events', list()) + if isinstance(raw_events, list) and all(isinstance(e, dict) for e in raw_events): + self.provenance['events'] = raw_events + else: + logger.warning('Existing provenance.yml has invalid events; starting with an empty event log.') + already_initialized = {e['label'] for e in self.provenance['events'] + if e.get('event_type') == 'species_initialized' and isinstance(e.get('label'), str)} + for species in self.species_list: + if species.label not in already_initialized: + self.record_provenance_event(event_type='species_initialized', + label=species.label, + is_ts=species.is_ts, + ) + + def record_provenance_event(self, + event_type: str, + label: Optional[str] = None, + **data: Any, + ): + """Append a provenance event and persist the event log.""" + max_id = max((e.get('event_id', 0) for e in self.provenance['events']), default=0) + event = {'event_id': max_id + 1, + 'event_type': event_type, + 'timestamp': datetime.datetime.now().isoformat(timespec='seconds'), + } + if label is not None: + event['label'] = label + for key, value in data.items(): + if value is not None and value != '' and value != list(): + event[key] = value + self.provenance['events'].append(event) + self.save_provenance() + + def save_provenance(self): + """Persist the provenance event log.""" + output_directory = os.path.dirname(self.provenance_path) + if not os.path.isdir(output_directory): + os.makedirs(output_directory) + save_yaml_file(path=self.provenance_path, content=self.provenance) + + def finalize_provenance(self): + """Render final provenance artifacts after the run completes.""" + self.provenance['ended_at'] = datetime.datetime.now().isoformat(timespec='seconds') + plotter.save_provenance_artifacts(project_directory=self.project_directory, + provenance=self.provenance, + ) + def schedule_jobs(self): """ The main job scheduling block @@ -741,6 +810,7 @@ def schedule_jobs(self): # Generate a TS report: self.generate_final_ts_guess_report() + self.finalize_provenance() def run_job(self, job_type: str, @@ -767,6 +837,8 @@ def run_job(self, torsions: Optional[List[List[int]]] = None, times_rerun: int = 0, tsg: Optional[int] = None, + provenance_parent_job: Optional[str] = None, + provenance_reason: Optional[str] = None, xyz: Optional[Union[dict, List[dict]]]= None, ): """ @@ -795,6 +867,8 @@ def run_job(self, torsions (List[List[int]], optional): The 0-indexed atom indices of the torsion(s). trsh (str, optional): A troubleshooting keyword to be used in input files. tsg (int, optional): TSGuess number if optimizing TS guesses. + provenance_parent_job (str, optional): The job_name of the parent job that triggered this one. + provenance_reason (str, optional): Why this job was spawned (e.g., 'rerun', 'ess_troubleshoot', 'fine_opt'). xyz (Union[dict, List[dict]], optional): The 3D coordinates for the species. """ max_job_time = max_job_time or self.max_job_time # if it's None, set to default @@ -898,6 +972,24 @@ def run_job(self, if job.server is not None and job.server not in self.servers: self.servers.append(job.server) self.check_max_simultaneous_jobs_limit(job.server) + level_repr = None if job.level is None else str(job.level) + provenance_label = '+'.join(label) if isinstance(label, list) else label + self.record_provenance_event( + event_type='job_started', + label=provenance_label, + is_ts=self.species_dict[label].is_ts if isinstance(label, str) and label in self.species_dict else None, + job_key=f'{provenance_label}:{job.job_name}', + job_name=job.job_name, + job_type=job.job_type, + job_adapter=job.job_adapter, + level=level_repr, + execution_type=job.execution_type, + ess_trsh_methods=job.ess_trsh_methods, + conformer=conformer, + tsg=tsg, + provenance_parent_job=provenance_parent_job, + provenance_reason=provenance_reason, + ) job.execute() self.save_restart_dict() @@ -1018,6 +1110,18 @@ def end_job(self, job: 'JobAdapter', self.timer = False job.write_completed_job_to_csv_file() logger.info(f' Ending job {job_name} for {label} (run time: {job.run_time})') + self.record_provenance_event( + event_type='job_finished', + label=label, + is_ts=self.species_dict[label].is_ts if isinstance(label, str) and label in self.species_dict else None, + job_key=f'{label}:{job.job_name}', + job_name=job.job_name, + job_type=job.job_type, + status=job.job_status[1]['status'] if job.job_status[1]['status'] else job.job_status[0], + keywords=job.job_status[1]['keywords'], + error=job.job_status[1]['error'], + run_time=str(job.run_time) if job.run_time is not None else None, + ) if job.job_status[0] != 'done': return False if job.job_adapter in ['gaussian', 'terachem'] and os.path.isfile(os.path.join(job.local_path, 'check.chk')) \ @@ -1074,6 +1178,8 @@ def _run_a_job(self, torsions=job.torsions, times_rerun=job.times_rerun + int(rerun), tsg=job.tsg, + provenance_parent_job=job.job_name, + provenance_reason='rerun', xyz=job.xyz, ) @@ -1161,14 +1267,18 @@ def run_ts_conformer_jobs(self, label: str): successful_tsgs = [tsg for tsg in self.species_dict[label].ts_guesses if tsg.success] if len(successful_tsgs) > 1: self.job_dict[label]['conf_opt'] = dict() - for i, tsg in enumerate(successful_tsgs): + for tsg in successful_tsgs: + if tsg.index is None: + existing_indices = [guess.index for guess in self.species_dict[label].ts_guesses + if guess.index is not None] + tsg.index = max(existing_indices or [-1]) + 1 + tsg.conformer_index = tsg.index # Set before run_job so restart state is consistent. self.run_job(label=label, xyz=tsg.initial_xyz, level_of_theory=self.ts_guess_level, job_type='conf_opt', - conformer=i, + conformer=tsg.index, ) - tsg.conformer_index = i # Store the conformer index in the TSGuess object to match them later. elif len(successful_tsgs) == 1: if 'opt' not in self.job_dict[label].keys() and 'composite' not in self.job_dict[label].keys(): # proceed only if opt (/composite) not already spawned @@ -1954,9 +2064,14 @@ def parse_conformer(self, xyz = parser.parse_geometry(log_file_path=job.local_path_to_output_file) energy = parser.parse_e_elect(log_file_path=job.local_path_to_output_file) if self.species_dict[label].is_ts: - self.species_dict[label].ts_guesses[i].energy = energy - self.species_dict[label].ts_guesses[i].opt_xyz = xyz - self.species_dict[label].ts_guesses[i].index = i + tsg = next((guess for guess in self.species_dict[label].ts_guesses + if guess.conformer_index == i), None) + if tsg is None: + logger.warning(f'Could not find TSGuess for conformer {i} of {label} ' + f'(expected a matching conformer_index); skipping.') + return False + tsg.energy = energy + tsg.opt_xyz = xyz if energy is not None: logger.debug(f'Energy for TSGuess {i} of {label} is {energy:.2f}') else: @@ -1972,8 +2087,14 @@ def parse_conformer(self, logger.warning(f'Conformer {i} for {label} did not converge.') if job.job_status[1]['status'] == 'errored' and job.times_rerun == 0: job.times_rerun += 1 - self.troubleshoot_ess(label=label, job=job, level_of_theory=job.level, conformer= job.conformer if job.conformer is not None else None) - return True + self.troubleshoot_ess(label=label, + job=job, + level_of_theory=job.level, + conformer=job.conformer if job.conformer is not None else None) + # Report "still troubleshooting" only if another job was actually queued. + # Conformer jobs are tracked in running_jobs as '{job_type}_{conformer}', not by job_name. + running_key = f'{job.job_type}_{job.conformer}' if job.conformer is not None else job.job_name + return label in self.running_jobs and running_key in self.running_jobs[label] if job.times_rerun == 0 and self.trsh_ess_jobs: self._run_a_job(job=job, label=label, rerun=True) return True @@ -2186,6 +2307,10 @@ def determine_most_likely_ts_conformer(self, label: str): logger.warning(f'Could not determine a likely TS conformer for {label}') self.species_dict[label].ts_number, self.species_dict[label].chosen_ts = None, None self.species_dict[label].populate_ts_checks() + self.record_provenance_event(event_type='ts_guess_selection_failed', + label=label, + is_ts=True, + ) return None else: rxn_txt = '' if self.species_dict[label].rxn_label is None \ @@ -2203,6 +2328,13 @@ def determine_most_likely_ts_conformer(self, label: str): self.species_dict[label].initial_xyz = tsg.opt_xyz self.species_dict[label].final_xyz = None self.species_dict[label].ts_guesses_exhausted = False + self.record_provenance_event(event_type='ts_guess_selected', + label=label, + is_ts=True, + selected_index=selected_i, + method=tsg.method, + energy=tsg.energy, + ) if tsg.success and tsg.energy is not None: # guess method and ts_level opt were both successful tsg.energy -= e_min im_freqs = f', imaginary frequencies {tsg.imaginary_freqs}' if tsg.imaginary_freqs is not None else '' @@ -2377,6 +2509,8 @@ def parse_opt_geo(self, level_of_theory=job.level, job_type='opt', fine=True, + provenance_parent_job=job.job_name, + provenance_reason='fine_opt', ) else: success = True @@ -2619,7 +2753,6 @@ def switch_ts(self, label: str): logger.info(f'Switching a TS guess for {label}...') self.determine_most_likely_ts_conformer(label=label) # Look for a different TS guess. self.delete_all_species_jobs(label=label) # Delete other currently running jobs for this TS. - self.output[label]['geo'] = self.output[label]['freq'] = self.output[label]['sp'] = self.output[label]['composite'] = '' freq_path = os.path.join(self.project_directory, 'output', 'rxns', label, 'geometry', 'freq.out') if os.path.isfile(freq_path): os.remove(freq_path) @@ -3044,6 +3177,9 @@ def check_all_done(self, label: str): logger.debug(f'Species {label} did not converge.') all_converged = False break + if all_converged and self._missing_required_paths(label): + logger.debug(f'Species {label} did not converge due to missing output paths.') + all_converged = False if label in self.output and all_converged: self.output[label]['convergence'] = True if self.species_dict[label].is_ts: @@ -3084,6 +3220,64 @@ def check_all_done(self, label: str): # Update restart dictionary and save the yaml restart file: self.save_restart_dict() + def _missing_required_paths(self, label: str) -> bool: + """ + Check whether required output paths are missing for a species/TS. + + Args: + label (str): The species label. + + Returns: + bool: Whether required output paths are missing. + """ + return bool(self._get_missing_required_paths(label)) + + def _get_missing_required_paths(self, label: str) -> set: + """ + Get missing required output path job types for a species/TS. + + Args: + label (str): The species label. + + Returns: + set: Job types with missing required output paths. + """ + if label not in self.output or 'paths' not in self.output[label]: + return set() + path_map = { + 'opt': 'geo', + 'freq': 'freq', + 'sp': 'sp', + 'composite': 'composite', + } + missing = set() + for job_type, path_key in path_map.items(): + if job_type == 'composite': + required = self.composite_method is not None + else: + required = self.job_types.get(job_type, False) + if not required: + continue + if self.species_dict[label].number_of_atoms == 1 and job_type in ['opt', 'freq']: + continue + if self.output[label]['job_types'].get(job_type, False) and not self.output[label]['paths'].get(path_key, ''): + missing.add(job_type) + return missing + + def _sanitize_restart_output(self) -> None: + """ + Ensure restart output state is internally consistent (e.g., convergence without paths). + """ + for label in list(self.output.keys()): + if label not in self.species_dict: + continue + missing_job_types = self._get_missing_required_paths(label) + if self.output[label].get('convergence') and missing_job_types: + self.output[label]['convergence'] = False + if 'job_types' in self.output[label]: + for job_type in missing_job_types: + self.output[label]['job_types'][job_type] = False + def get_server_job_ids(self, specific_server: Optional[str] = None): """ Check job status on a specific server or on all active servers, get a list of relevant running job IDs. @@ -3446,6 +3640,16 @@ def troubleshoot_ess(self, job.ess_trsh_methods = ess_trsh_methods if not couldnt_trsh: + self.record_provenance_event(event_type='job_troubleshooting', + label=label, + is_ts=self.species_dict[label].is_ts, + job_key=f'{label}:{job.job_name}', + job_name=job.job_name, + job_type=job.job_type, + methods=ess_trsh_methods, + keywords=job.job_status[1]['keywords'], + error=job.job_status[1]['error'], + ) self.run_job(label=label, xyz=xyz, level_of_theory=level_of_theory, @@ -3462,8 +3666,16 @@ def troubleshoot_ess(self, rotor_index=job.rotor_index, cpu_cores=cpu_cores, shift=shift, + provenance_parent_job=job.job_name, + provenance_reason='ess_troubleshoot', ) elif self.species_dict[label].is_ts and not self.species_dict[label].ts_guesses_exhausted: + # During TS conf_opt screening, avoid switching mid-batch since switch_ts() deletes all + # running jobs for this TS label and can discard other viable TS guesses still running. + if job.job_type == 'conf_opt': + logger.debug(f'Deferring TS switch for {label} during conf_opt batch screening.') + self.save_restart_dict() + return None logger.info(f'TS {label} did not converge. ' f'Status is:\n{self.species_dict[label].ts_checks}\n' f'Searching for a better TS conformer...') @@ -3547,7 +3759,13 @@ def delete_all_species_jobs(self, label: str): logger.info(f'Deleted job {job_name}') job.delete() self.running_jobs[label] = list() - self.output[label]['paths'] = {key: '' if key != 'irc' else list() for key in self.output[label]['paths'].keys()} + if label in self.output: + self.output[label]['convergence'] = False + for key in ['opt', 'freq', 'sp', 'composite', 'fine']: + if key in self.output[label]['job_types']: + self.output[label]['job_types'][key] = False + self.output[label]['paths'] = {key: '' if key != 'irc' else list() + for key in self.output[label]['paths'].keys()} def restore_running_jobs(self): """ diff --git a/arc/scheduler_test.py b/arc/scheduler_test.py index 77e8123092..fcb9c39e9b 100644 --- a/arc/scheduler_test.py +++ b/arc/scheduler_test.py @@ -8,6 +8,7 @@ import unittest import os import shutil +from unittest import mock import arc.parser.parser as parser from arc.checks.ts import check_ts @@ -19,7 +20,7 @@ from arc.imports import settings from arc.reaction import ARCReaction from arc.species.converter import str_to_xyz -from arc.species.species import ARCSpecies +from arc.species.species import ARCSpecies, TSGuess default_levels_of_theory = settings['default_levels_of_theory'] @@ -757,13 +758,168 @@ def test_add_label_to_unique_species_labels(self): self.assertEqual(unique_label, 'new_species_15_1') self.assertEqual(self.sched2.unique_species_labels, ['methylamine', 'C2H6', 'CtripCO', 'new_species_15', 'new_species_15_0', 'new_species_15_1']) + def test_initialize_provenance_dedup_on_restart(self): + """Test that _initialize_provenance does not re-emit species_initialized for species already in the log.""" + spc = ARCSpecies(label='ethanol', smiles='CCO') + project_directory = os.path.join(ARC_PATH, 'Projects', 'arc_project_for_testing_delete_after_usage_prov') + os.makedirs(os.path.join(project_directory, 'output'), exist_ok=True) + # Write a fake provenance file that already has ethanol initialized. + from arc.common import save_yaml_file + save_yaml_file(path=os.path.join(project_directory, 'output', 'provenance.yml'), + content={'version': 1, 'project': 'test', 'run_id': 'old_run', + 'started_at': '2026-01-01T00:00:00', + 'events': [{'event_id': 1, 'event_type': 'species_initialized', + 'label': 'ethanol', 'is_ts': False}]}) + sched = Scheduler(project='test_prov_dedup', ess_settings=self.ess_settings, + species_list=[spc], + opt_level=Level(repr=default_levels_of_theory['opt']), + freq_level=Level(repr=default_levels_of_theory['freq']), + sp_level=Level(repr=default_levels_of_theory['sp']), + project_directory=project_directory, + testing=True, job_types=initialize_job_types()) + init_events = [e for e in sched.provenance['events'] + if e['event_type'] == 'species_initialized' and e.get('label') == 'ethanol'] + self.assertEqual(len(init_events), 1, 'species_initialized should not be duplicated on restart') + # New run should get its own run_id, not the old one. + self.assertNotEqual(sched.provenance['run_id'], 'old_run') + shutil.rmtree(project_directory, ignore_errors=True) + + def test_sanitize_restart_output(self): + """Test that _sanitize_restart_output resets convergence when paths are missing.""" + spc = ARCSpecies(label='H2O', smiles='O') + output = { + 'H2O': { + 'paths': {'geo': '', 'freq': '', 'sp': '', 'composite': ''}, + 'restart': '', 'convergence': True, + 'job_types': {'conf_opt': False, 'conf_sp': False, 'opt': True, 'freq': True, 'sp': True, + 'rotors': False, 'irc': False, 'fine': False, 'composite': False}, + } + } + sched = Scheduler(project='test_sanitize', ess_settings=self.ess_settings, + species_list=[spc], + opt_level=Level(repr=default_levels_of_theory['opt']), + freq_level=Level(repr=default_levels_of_theory['freq']), + sp_level=Level(repr=default_levels_of_theory['sp']), + project_directory=self.project_directory, + testing=True, job_types=initialize_job_types(), + restart_dict={'output': output}) + self.assertFalse(sched.output['H2O']['convergence']) + for key in ['opt', 'freq', 'sp']: + self.assertFalse(sched.output['H2O']['job_types'][key]) + + def test_delete_all_species_jobs_resets_output(self): + """Test that delete_all_species_jobs clears convergence, job_types, and paths.""" + spc = ARCSpecies(label='CH4', smiles='C') + output = { + 'CH4': { + 'paths': {'geo': 'some/path.out', 'freq': 'freq.out', 'sp': 'sp.out', 'composite': ''}, + 'restart': '', 'convergence': True, + 'job_types': {'conf_opt': False, 'conf_sp': False, 'opt': True, 'freq': True, 'sp': True, + 'rotors': False, 'irc': False, 'fine': True, 'composite': False}, + } + } + sched = Scheduler(project='test_delete_jobs', ess_settings=self.ess_settings, + species_list=[spc], + opt_level=Level(repr=default_levels_of_theory['opt']), + freq_level=Level(repr=default_levels_of_theory['freq']), + sp_level=Level(repr=default_levels_of_theory['sp']), + project_directory=self.project_directory, + testing=True, job_types=initialize_job_types(), + restart_dict={'output': output}) + sched.running_jobs['CH4'] = [] + sched.delete_all_species_jobs(label='CH4') + self.assertFalse(sched.output['CH4']['convergence']) + for key in ['opt', 'freq', 'sp', 'fine']: + self.assertFalse(sched.output['CH4']['job_types'][key]) + self.assertEqual(sched.output['CH4']['paths']['geo'], '') + + def test_conformer_index_set_before_run_job(self): + """Test that tsg.conformer_index is assigned before run_job is called, so restart state is consistent.""" + ts_spc = ARCSpecies(label='TS0', is_ts=True, multiplicity=1, charge=0) + # Use geometries different enough to survive cluster_tsgs() deduplication. + ts_spc.ts_guesses = [ + TSGuess(method='autotst', index=0, success=True, + xyz={'symbols': ('C', 'H', 'H', 'H', 'H'), 'isotopes': (12, 1, 1, 1, 1), + 'coords': ((0, 0, 0), (1, 0, 0), (0, 1, 0), (0, 0, 1), (-1, 0, 0))}, + project_directory=self.project_directory), + TSGuess(method='gcn', index=1, success=True, + xyz={'symbols': ('C', 'H', 'H', 'H', 'H'), 'isotopes': (12, 1, 1, 1, 1), + 'coords': ((0, 0, 0), (2, 0, 0), (0, 2, 0), (0, 0, 2), (-2, 0, 0))}, + project_directory=self.project_directory), + ] + sched = Scheduler(project='test_conf_index_order', ess_settings=self.ess_settings, + species_list=[ts_spc], + opt_level=Level(repr=default_levels_of_theory['opt']), + freq_level=Level(repr=default_levels_of_theory['freq']), + sp_level=Level(repr=default_levels_of_theory['sp']), + ts_guess_level=Level(repr=default_levels_of_theory['ts_guesses']), + project_directory=self.project_directory, + testing=True, job_types=initialize_job_types()) + # Track conformer_index values observed inside run_job. + observed = [] + + def capturing_run_job(**kwargs): + conformer = kwargs.get('conformer') + if conformer is not None: + tsg = next((g for g in ts_spc.ts_guesses if g.index == conformer), None) + observed.append((conformer, tsg.conformer_index if tsg else None)) + + with mock.patch.object(sched, 'run_job', side_effect=capturing_run_job), \ + mock.patch('arc.plotter.save_conformers_file'): + sched.run_ts_conformer_jobs(label='TS0') + + # Every call to run_job should have seen conformer_index already set. + self.assertTrue(len(observed) >= 2, f'Expected at least 2 conf_opt jobs, got {len(observed)}') + for conformer_idx, conformer_index_value in observed: + self.assertIsNotNone(conformer_index_value, + f'conformer_index was None when run_job was called for conformer {conformer_idx}') + self.assertEqual(conformer_idx, conformer_index_value) + + def test_provenance_records_ts_species_from_reactions(self): + """Test that TS species created from reactions get a species_initialized provenance event.""" + r_spc = ARCSpecies(label='nC3H7', smiles='[CH2]CC') + p_spc = ARCSpecies(label='iC3H7', smiles='C[CH]C') + rxn = ARCReaction(reactants=['nC3H7'], products=['iC3H7'], + r_species=[r_spc], p_species=[p_spc]) + rxn.index = 0 + sched = Scheduler(project='test_ts_prov', ess_settings=self.ess_settings, + species_list=[r_spc, p_spc], + rxn_list=[rxn], + opt_level=Level(repr=default_levels_of_theory['opt']), + freq_level=Level(repr=default_levels_of_theory['freq']), + sp_level=Level(repr=default_levels_of_theory['sp']), + project_directory=self.project_directory, + testing=True, job_types=initialize_job_types()) + init_labels = [e['label'] for e in sched.provenance['events'] + if e.get('event_type') == 'species_initialized'] + self.assertIn('nC3H7', init_labels) + self.assertIn('iC3H7', init_labels) + self.assertIn('TS0', init_labels, 'TS species created from a reaction should get a species_initialized event') + + def test_provenance_multi_species_label(self): + """Test that provenance handles multi-species (list) labels by joining them.""" + spc1 = ARCSpecies(label='H2', smiles='[H][H]') + spc2 = ARCSpecies(label='O2', smiles='[O][O]') + sched = Scheduler(project='test_multi_label', ess_settings=self.ess_settings, + species_list=[spc1, spc2], + opt_level=Level(repr=default_levels_of_theory['opt']), + freq_level=Level(repr=default_levels_of_theory['freq']), + sp_level=Level(repr=default_levels_of_theory['sp']), + project_directory=self.project_directory, + testing=True, job_types=initialize_job_types()) + sched.record_provenance_event(event_type='test_event', label='H2+O2') + event = sched.provenance['events'][-1] + self.assertEqual(event['label'], 'H2+O2') + self.assertIsInstance(event['label'], str) + @classmethod def tearDownClass(cls): """ A function that is run ONCE after all unit tests in this class. Delete all project directories created during these unit tests """ - projects = ['arc_project_for_testing_delete_after_usage3', 'arc_project_for_testing_delete_after_usage6'] + projects = ['arc_project_for_testing_delete_after_usage3', 'arc_project_for_testing_delete_after_usage6', + 'arc_project_for_testing_delete_after_usage_prov'] for project in projects: project_directory = os.path.join(ARC_PATH, 'Projects', project) shutil.rmtree(project_directory, ignore_errors=True) diff --git a/arc/species/species.py b/arc/species/species.py index a94ce01c00..3a2bf32d1c 100644 --- a/arc/species/species.py +++ b/arc/species/species.py @@ -1536,12 +1536,12 @@ def make_ts_report(self): self.ts_report += ':\n' if self.successful_methods: self.ts_report += 'Methods that successfully generated a TS guess:\n' - for successful_method in self.successful_methods: - self.ts_report += successful_method + ',' + unique_successful_methods = list(dict.fromkeys(self.successful_methods)) + self.ts_report += ','.join(unique_successful_methods) if self.unsuccessful_methods: - self.ts_report += '\nMethods that were unsuccessfully in generating a TS guess:\n' - for unsuccessful_method in self.unsuccessful_methods: - self.ts_report += unsuccessful_method + ',' + self.ts_report += '\nMethods that were unsuccessful in generating a TS guess:\n' + unique_unsuccessful_methods = list(dict.fromkeys(self.unsuccessful_methods)) + self.ts_report += ','.join(unique_unsuccessful_methods) if not self.ts_guesses_exhausted: self.ts_report += f'\nThe method that generated the best TS guess and its output used for the ' \ f'optimization: {self.chosen_ts_method}\n' diff --git a/arc/species/species_test.py b/arc/species/species_test.py index 8074dd8c96..7f0fcd6ec2 100644 --- a/arc/species/species_test.py +++ b/arc/species/species_test.py @@ -1201,7 +1201,7 @@ def test_from_dict(self): 'ts_guesses_exhausted': False, 'ts_number': 0, 'ts_report': 'TS method summary for TS0 in C3_1 <=> C3_2:\n' 'Methods that successfully generated a TS guess:\n' - 'autotst,autotst,autotst,autotst,gcn,gcn,gcn,gcn,gcn,gcn,gcn,gcn,gcn,gcn,kinbot,kinbot,\n' + 'autotst,gcn,kinbot\n' 'The method that generated the best TS guess and its output used ' 'for the optimization: gcn\n', 'tsg_spawned': True, 'unsuccessful_methods': []} diff --git a/environment.yml b/environment.yml index 5f22a9c40a..1ac6654f4f 100644 --- a/environment.yml +++ b/environment.yml @@ -24,6 +24,7 @@ dependencies: - conda-forge::ffmpeg - conda-forge::gprof2dot - conda-forge::graphviz + - conda-forge::python-graphviz - conda-forge::h5py - conda-forge::ipython - conda-forge::jupyter