diff --git a/CHANGELOG.md b/CHANGELOG.md index fc98efe41..1a7b727cd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -11,6 +11,7 @@ This project adheres to [Semantic Versioning], with the exception that minor rel ### Changed +- 🎨 Improve the RL state machine logic ([#677]) ([**@flowerthrower**]) - 🐛 Support BQSKit conversion of IQM's native `r` gate ([#679]) ([**@flowerthrower**]) - 🔧 Replace `mypy` with `ty` ([#572]) ([**@denialhaag**]) - 🐛 Fix instruction duration unit in estimated success probability calculation ([#445]) ([**@Shaobo-Zhou**]) @@ -47,6 +48,7 @@ _📚 Refer to the [GitHub Release Notes](https://github.com/munich-quantum-tool +[#677]: https://github.com/munich-quantum-toolkit/predictor/pull/677 [#679]: https://github.com/munich-quantum-toolkit/predictor/pull/679 [#572]: https://github.com/munich-quantum-toolkit/predictor/pull/572 [#489]: https://github.com/munich-quantum-toolkit/predictor/pull/489 diff --git a/src/mqt/predictor/reward.py b/src/mqt/predictor/reward.py index 7c1ce1bba..e8c1b4742 100644 --- a/src/mqt/predictor/reward.py +++ b/src/mqt/predictor/reward.py @@ -23,7 +23,6 @@ if TYPE_CHECKING: from qiskit import QuantumCircuit - from qiskit.circuit import QuantumRegister, Qubit from qiskit.transpiler import Target from sklearn.ensemble import RandomForestRegressor @@ -62,44 +61,22 @@ def expected_fidelity(qc: QuantumCircuit, device: Target, precision: int = 10) - if gate_type != "barrier": assert len(qargs) in [1, 2] - first_qubit_idx = calc_qubit_index(qargs, qc.qregs, 0) + first_qubit_idx = qc.find_bit(qargs[0]).index if len(qargs) == 1: specific_fidelity = 1 - device[gate_type][first_qubit_idx,].error else: - second_qubit_idx = calc_qubit_index(qargs, qc.qregs, 1) - specific_fidelity = 1 - device[gate_type][first_qubit_idx, second_qubit_idx].error - + second_qubit_idx = qc.find_bit(qargs[1]).index + try: + specific_fidelity = 1 - device[gate_type][first_qubit_idx, second_qubit_idx].error + except KeyError: + msg = f"Error rate for gate {gate_type} on qubits {first_qubit_idx} and {second_qubit_idx} not found in device properties." + raise KeyError(msg) from None res *= specific_fidelity return float(np.round(res, precision).item()) -def calc_qubit_index(qargs: list[Qubit], qregs: list[QuantumRegister], index: int) -> int: - """Calculates the global qubit index for a given quantum circuit and qubit index. - - Arguments: - qargs: The qubits of the quantum circuit. - qregs: The quantum registers of the quantum circuit. - index: The index of the qubit in the qargs list. - - Returns: - The global qubit index of the given qubit in the quantum circuit. - - Raises: - ValueError: If the qubit index is not found in the quantum registers. - """ - offset = 0 - for reg in qregs: - if qargs[index] not in reg: - offset += reg.size - else: - qubit_index: int = offset + reg.index(qargs[index]) - return qubit_index - error_msg = f"Global qubit index for local qubit {index} index not found." - raise ValueError(error_msg) - - def estimated_success_probability(qc: QuantumCircuit, device: Target, precision: int = 10) -> float: """Calculates the estimated success probability of a given quantum circuit on a given device. @@ -125,7 +102,7 @@ def estimated_success_probability(qc: QuantumCircuit, device: Target, precision: if gate_type == "barrier" or gate_type == "id": continue assert len(qargs) in (1, 2) - first_qubit_idx = calc_qubit_index(qargs, qc.qregs, 0) + first_qubit_idx = qc.find_bit(qargs[0]).index active_qubits.add(first_qubit_idx) if len(qargs) == 1: # single-qubit gate @@ -140,7 +117,7 @@ def estimated_success_probability(qc: QuantumCircuit, device: Target, precision: )) exec_time_per_qubit[first_qubit_idx] += duration else: # multi-qubit gate - second_qubit_idx = calc_qubit_index(qargs, qc.qregs, 1) + second_qubit_idx = qc.find_bit(qargs[1]).index active_qubits.add(second_qubit_idx) duration = device[gate_type][first_qubit_idx, second_qubit_idx].duration op_times.append((gate_type, [first_qubit_idx, second_qubit_idx], duration, "s")) @@ -191,7 +168,7 @@ def estimated_success_probability(qc: QuantumCircuit, device: Target, precision: continue assert len(qargs) in (1, 2) - first_qubit_idx = calc_qubit_index(qargs, qc.qregs, 0) + first_qubit_idx = scheduled_circ.find_bit(qargs[0]).index if len(qargs) == 1: if gate_type == "measure": @@ -213,8 +190,12 @@ def estimated_success_probability(qc: QuantumCircuit, device: Target, precision: continue res *= 1 - device[gate_type][first_qubit_idx,].error else: - second_qubit_idx = calc_qubit_index(qargs, qc.qregs, 1) - res *= 1 - device[gate_type][first_qubit_idx, second_qubit_idx].error + second_qubit_idx = scheduled_circ.find_bit(qargs[1]).index + try: + res *= 1 - device[gate_type][first_qubit_idx, second_qubit_idx].error + except KeyError: + msg = f"Error rate for gate {gate_type} on qubits {first_qubit_idx} and {second_qubit_idx} not found in device properties." + raise KeyError(msg) from None if qiskit_version >= "2.0.0": for i in range(device.num_qubits): diff --git a/src/mqt/predictor/rl/actions.py b/src/mqt/predictor/rl/actions.py index 7c4f8bbdd..598dfe0e6 100644 --- a/src/mqt/predictor/rl/actions.py +++ b/src/mqt/predictor/rl/actions.py @@ -11,7 +11,6 @@ from __future__ import annotations import os -import sys from collections import defaultdict from dataclasses import dataclass from enum import Enum @@ -90,6 +89,7 @@ from qiskit.passmanager.base_tasks import Task TaskList = list[Task | TketBasePass | PreProcessTKETRoutingAfterQiskitLayout] + from qiskit.passmanager import PropertySet class CompilationOrigin(str, Enum): @@ -146,7 +146,7 @@ class DeviceDependentAction(Action): Callable[..., tuple[Any, ...] | Circuit], ] ) - do_while: Callable[[dict[str, Circuit]], bool] | None = None + do_while: Callable[[PropertySet], bool] | None = None # Registry of actions @@ -332,7 +332,7 @@ def remove_action(name: str) -> None: circuit, optimization_level=1 if os.getenv("GITHUB_ACTIONS") == "true" else 2, synthesis_epsilon=1e-1 if os.getenv("GITHUB_ACTIONS") == "true" else 1e-8, - max_synthesis_size=2 if os.getenv("GITHUB_ACTIONS") == "true" else 3, + max_synthesis_size=3, seed=10, num_workers=1 if os.getenv("GITHUB_ACTIONS") == "true" else -1, ), @@ -431,7 +431,7 @@ def remove_action(name: str) -> None: with_mapping=True, optimization_level=1 if os.getenv("GITHUB_ACTIONS") == "true" else 2, synthesis_epsilon=1e-1 if os.getenv("GITHUB_ACTIONS") == "true" else 1e-8, - max_synthesis_size=2 if os.getenv("GITHUB_ACTIONS") == "true" and sys.platform != "linux" else 3, + max_synthesis_size=3, seed=10, num_workers=1 if os.getenv("GITHUB_ACTIONS") == "true" else -1, ) @@ -461,7 +461,7 @@ def remove_action(name: str) -> None: model=MachineModel(bqskit_circuit.num_qudits, gate_set=get_bqskit_native_gates(device)), optimization_level=1 if os.getenv("GITHUB_ACTIONS") == "true" else 2, synthesis_epsilon=1e-1 if os.getenv("GITHUB_ACTIONS") == "true" else 1e-8, - max_synthesis_size=2 if os.getenv("GITHUB_ACTIONS") == "true" and sys.platform != "linux" else 3, + max_synthesis_size=3, seed=10, num_workers=1 if os.getenv("GITHUB_ACTIONS") == "true" else -1, ) diff --git a/src/mqt/predictor/rl/predictor.py b/src/mqt/predictor/rl/predictor.py index 2654f34fe..c1578125a 100644 --- a/src/mqt/predictor/rl/predictor.py +++ b/src/mqt/predictor/rl/predictor.py @@ -89,6 +89,7 @@ def train_model( timesteps: int = 1000, verbose: int = 2, test: bool = False, + seed: int | None = None, ) -> None: """Trains all models for the given reward functions and device. @@ -96,12 +97,16 @@ def train_model( timesteps: The number of timesteps to train the model. Defaults to 1000. verbose: The verbosity level. Defaults to 2. test: Whether to train the model for testing purposes. Defaults to False. + seed: The random seed to use for reproducible training. Set to None to use true randomness. + Defaults to None. """ + if seed is not None: + set_random_seed(seed) if test: - set_random_seed(0) # for reproducibility - n_steps = 10 + # minimum training overhead + n_steps = max(timesteps, 2) n_epochs = 1 - batch_size = 10 + batch_size = n_steps progress_bar = False else: # default PPO values @@ -120,6 +125,7 @@ def train_model( n_steps=n_steps, batch_size=batch_size, n_epochs=n_epochs, + seed=seed, ) # Training Loop: In each iteration, the agent collects n_steps steps (rollout), # updates the policy for n_epochs, and then repeats the process until total_timesteps steps have been taken. diff --git a/src/mqt/predictor/rl/predictorenv.py b/src/mqt/predictor/rl/predictorenv.py index a1d1a3ecb..95d632463 100644 --- a/src/mqt/predictor/rl/predictorenv.py +++ b/src/mqt/predictor/rl/predictorenv.py @@ -22,7 +22,7 @@ from bqskit import Circuit from gymnasium.spaces import Space from qiskit.passmanager.base_tasks import Task - from qiskit.transpiler import Target + from qiskit.transpiler import Layout, Target from mqt.predictor.reward import figure_of_merit from mqt.predictor.rl.actions import Action @@ -41,7 +41,6 @@ from qiskit import QuantumCircuit from qiskit.passmanager.flow_controllers import DoWhileController from qiskit.transpiler import CouplingMap, PassManager, TranspileLayout -from qiskit.transpiler.passes import CheckMap, GatesInBasis from qiskit.transpiler.passes.layout.vf2_layout import VF2LayoutStopReason from mqt.predictor.hellinger import get_hellinger_model_path @@ -94,7 +93,7 @@ def __init__( self.actions_routing_indices = [] self.actions_mapping_indices = [] self.actions_opt_indices = [] - self.actions_final_optimization_indices = [] + self.actions_final_optimization_indices = [] # TODO: currently not used; will be improved by addressing issue https://github.com/munich-quantum-toolkit/predictor/issues/666 self.used_actions: list[str] = [] self.device = device @@ -147,9 +146,13 @@ def __init__( self.reward_function = reward_function self.action_space = Discrete(len(self.action_set.keys())) self.num_steps = 0 - self.layout: TranspileLayout | None = None self.num_qubits_uncompiled_circuit = 0 + # Canonical layout state for the current circuit. It is mirrored to + # QuantumCircuit.layout for callers, but kept here because TKET and + # BQSKit conversions do not preserve Qiskit's layout metadata. + self.layout: TranspileLayout | None = None + self.has_parameterized_gates = False self.rng = np.random.default_rng(10) @@ -191,23 +194,26 @@ def step(self, action: int) -> tuple[dict[str, Any], float, bool, bool, dict[Any self.state: QuantumCircuit = altered_qc self.num_steps += 1 + # in case a Qiskit.QuantumCircuit has `unitary` gates in it, decompose them (otherwise qiskit will throw an error when applying BasisTranslator) + # TODO: will be improved by addressing issue https://github.com/munich-quantum-toolkit/predictor/issues/668 + if self.state.count_ops().get("unitary"): + self.state = self.state.decompose(gates_to_decompose="unitary") + + self.state._layout = self.layout # noqa: SLF001 + self.valid_actions = self.determine_valid_actions_for_state() if len(self.valid_actions) == 0: msg = "No valid actions left." raise RuntimeError(msg) if action == self.action_terminate_index: + assert action in self.valid_actions, "Terminate action is not valid but was chosen." reward_val = self.calculate_reward() done = True else: reward_val = 0 done = False - # in case the Qiskit.QuantumCircuit has unitary or u gates in it, decompose them (because otherwise qiskit will throw an error when applying the BasisTranslator - if self.state.count_ops().get("unitary"): - self.state = self.state.decompose(gates_to_decompose="unitary") - - self.state._layout = self.layout # noqa: SLF001 obs = create_feature_dict(self.state) return obs, reward_val, done, False, {} @@ -258,7 +264,7 @@ def reset( self.layout = None - self.valid_actions = self.actions_opt_indices + self.actions_synthesis_indices + self.valid_actions = self.actions_synthesis_indices + self.actions_opt_indices self.error_occurred = False @@ -270,15 +276,21 @@ def action_masks(self) -> list[bool]: """Returns a list of valid actions for the current state.""" action_mask = [action in self.valid_actions for action in self.action_set] - # it is not clear how tket will handle the layout, so we remove all actions that are from "origin"=="tket" if a layout is set - if self.layout is not None: + has_layout = self.layout is not None + + if has_layout: + # TKET layout/optimization actions must not run after a Qiskit layout has been set + # (it is not clear how tket will handle the layout). TKET routing actions are + # designed to work after a Qiskit layout via PreProcessTKETRoutingAfterQiskitLayout. action_mask = [ - action_mask[i] and self.action_set[i].origin != CompilationOrigin.TKET for i in range(len(action_mask)) + action_mask[i] + and (self.action_set[i].origin != CompilationOrigin.TKET or i in self.actions_routing_indices) + for i in range(len(action_mask)) ] - if self.has_parameterized_gates or self.layout is not None: + if self.has_parameterized_gates or has_layout: # remove all actions that are from "origin"=="bqskit" because they are not supported for parameterized gates - # or after layout since using BQSKit after a layout is set may result in an error + # or after layout since using BQSKit after a layout is set can result in an error action_mask = [ action_mask[i] and self.action_set[i].origin != CompilationOrigin.BQSKIT for i in range(len(action_mask)) @@ -344,9 +356,16 @@ def _apply_qiskit_action(self, action: Action, action_index: int) -> QuantumCirc ): altered_qc = self._handle_qiskit_layout_postprocessing(action, pm, altered_qc) - elif action_index in self.actions_routing_indices and self.layout: + elif ( + action_index in self.actions_routing_indices and self.layout and pm.property_set["final_layout"] is not None + ): self.layout.final_layout = pm.property_set["final_layout"] + # BasisTranslator errors on unitary gates; decompose them immediately so + # the circuit is always in a consistent state after a Qiskit action. + if altered_qc.count_ops().get("unitary"): + altered_qc = altered_qc.decompose(gates_to_decompose="unitary") + return altered_qc def _handle_qiskit_layout_postprocessing( @@ -359,12 +378,19 @@ def _handle_qiskit_layout_postprocessing( assert self.layout is not None altered_qc, _ = postprocess_vf2postlayout(altered_qc, post_layout, self.layout) elif action.name == "VF2Layout": - assert pm.property_set["VF2Layout_stop_reason"] == VF2LayoutStopReason.SOLUTION_FOUND - assert pm.property_set["layout"] + if pm.property_set["VF2Layout_stop_reason"] != VF2LayoutStopReason.SOLUTION_FOUND: + logger.warning( + "VF2Layout pass did not find a solution. Reason: %s", + pm.property_set["VF2Layout_stop_reason"], + ) + else: + assert pm.property_set["layout"] else: assert pm.property_set["layout"] if pm.property_set["layout"]: + # Layout/mapping passes create the base logical-to-physical mapping; + # later routing actions only update final_layout. self.layout = TranspileLayout( initial_layout=pm.property_set["layout"], input_qubit_mapping=pm.property_set["original_qubit_indices"], @@ -387,7 +413,7 @@ def _apply_tket_action(self, action: Action, action_index: int) -> QuantumCircui qbs = tket_qc.qubits tket_qc.rename_units({qbs[i]: Qubit("q", i) for i in range(len(qbs))}) - altered_qc = tk_to_qiskit(tket_qc) + altered_qc = tk_to_qiskit(tket_qc, replace_implicit_swaps=True) if action_index in self.actions_routing_indices: assert self.layout is not None @@ -430,27 +456,101 @@ def _apply_bqskit_action(self, action: Action, action_index: int) -> QuantumCirc return bqskit_to_qiskit(bqskit_compiled_qc) - def determine_valid_actions_for_state(self) -> list[int]: - """Determines and returns the valid actions for the current state.""" - check_nat_gates = GatesInBasis(basis_gates=self.device.operation_names) - check_nat_gates(self.state) - only_nat_gates = check_nat_gates.property_set["all_gates_in_basis"] + def is_circuit_laid_out(self, circuit: QuantumCircuit, layout: TranspileLayout | Layout) -> bool: + """True if every logical qubit in the circuit has a physical assignment.""" + if isinstance(layout, TranspileLayout): + # Use final_layout if available; otherwise fallback to initial_layout + layout = layout.final_layout or layout.initial_layout - if not only_nat_gates: - actions = self.actions_synthesis_indices + self.actions_opt_indices - if self.layout is not None: - actions += self.actions_routing_indices - return actions + v2p = layout.get_virtual_bits() + return all(q in v2p for q in circuit.qubits) - check_mapping = CheckMap(coupling_map=self.device.build_coupling_map()) - check_mapping(self.state) - mapped = check_mapping.property_set["is_swap_mapped"] + def is_circuit_synthesized(self, circuit: QuantumCircuit) -> bool: + """Check if the circuit uses only native gates of the device. - if mapped and self.layout is not None: # The circuit is correctly mapped. - return [self.action_terminate_index, *self.actions_opt_indices] + Verifies that every gate name in the circuit is present in + ``device.operation_names``, equivalent to the ``GatesInBasis`` pass. + + Args: + circuit: QuantumCircuit to check. + + Returns: + True if all gates are native to the device. + """ + native_names = set(self.device.operation_names) + return all( + instr.operation.name in native_names or instr.operation.name in ("barrier", "measure") + for instr in circuit.data + ) - if self.layout is not None: # The circuit is not yet mapped but a layout is set. - return self.actions_routing_indices + def is_circuit_routed(self, circuit: QuantumCircuit, coupling_map: CouplingMap) -> bool: + """Check if a circuit is fully routed to the device, including directionality. - # No layout applied yet - return self.actions_mapping_indices + self.actions_layout_indices + self.actions_opt_indices + A circuit is considered routed if all two-qubit gates are on qubit pairs + that exist as directed edges in the device coupling map. + + After a layout pass the circuit's qubits are already physical qubits, so + ``circuit.find_bit(q).index`` gives the physical index directly — + consistent with how ``reward.py`` looks up gate calibrations. + + Args: + circuit: QuantumCircuit to check. + coupling_map: CouplingMap of the target device. + + Returns: + True if fully routed, False otherwise. + """ + directed_edges = set(coupling_map.get_edges()) + for instr in circuit.data: + if len(instr.qubits) == 2: + q0 = circuit.find_bit(instr.qubits[0]).index + q1 = circuit.find_bit(instr.qubits[1]).index + if (q0, q1) not in directed_edges: + return False + return True + + def determine_valid_actions_for_state(self) -> list[int]: + """Determine valid actions based on circuit state: synthesized, mapped, routed.""" + synthesized = self.is_circuit_synthesized(self.state) + laid_out = self.is_circuit_laid_out(self.state, self.layout) if self.layout else False + # Routing is only allowed after layout + routed = ( + self.is_circuit_routed(self.state, CouplingMap(self.device.build_coupling_map())) if laid_out else False + ) + + actions = [] + + # Initial state + if not synthesized and not laid_out and not routed: + actions.extend(self.actions_synthesis_indices) + actions.extend(self.actions_opt_indices) + + if synthesized and not laid_out and not routed: + actions.extend(self.actions_mapping_indices) + actions.extend(self.actions_layout_indices) + actions.extend(self.actions_opt_indices) + + # Possible state because optimization can destroy the native gate set + # State is not explicitly *depicted* in original paper + if not synthesized and laid_out and not routed: + actions.extend(self.actions_synthesis_indices) + actions.extend(self.actions_routing_indices) + actions.extend(self.actions_opt_indices) + + # Possible state because there are layout-only passes + # State is not explicitly *depicted* in original paper + if synthesized and laid_out and not routed: + actions.extend(self.actions_routing_indices) + + # Possible state because routing may add SWAP gates which are not necessarily native gates + # State is not explicitly *depicted* in original paper + if not synthesized and laid_out and routed: + actions.extend(self.actions_synthesis_indices) + actions.extend(self.actions_opt_indices) + + # Final state + if synthesized and laid_out and routed: + actions.extend([self.action_terminate_index]) + actions.extend(self.actions_opt_indices) + + return actions diff --git a/tests/compilation/test_integration_further_SDKs.py b/tests/compilation/test_integration_further_SDKs.py index f02b0211b..b9b5c077f 100644 --- a/tests/compilation/test_integration_further_SDKs.py +++ b/tests/compilation/test_integration_further_SDKs.py @@ -241,7 +241,7 @@ def test_tket_routing(available_actions_dict: dict[PassType, list[Action]]) -> N qbs = tket_qc.qubits tket_qc.rename_units({qbs[i]: Qubit("q", i) for i in range(len(qbs))}) - mapped_qc = tk_to_qiskit(tket_qc) + mapped_qc = tk_to_qiskit(tket_qc, replace_implicit_swaps=True) final_layout = final_layout_pytket_to_qiskit(tket_qc, mapped_qc) diff --git a/tests/compilation/test_predictor_rl.py b/tests/compilation/test_predictor_rl.py index 5aa086d3c..5cebe65d8 100644 --- a/tests/compilation/test_predictor_rl.py +++ b/tests/compilation/test_predictor_rl.py @@ -16,12 +16,14 @@ import pytest from mqt.bench import BenchmarkLevel, get_benchmark from mqt.bench.targets import get_device +from qiskit import QuantumCircuit from qiskit.circuit.library import CXGate from qiskit.qasm2 import dump -from qiskit.transpiler import InstructionProperties, Target +from qiskit.transpiler import InstructionProperties, Layout, Target, TranspileLayout from qiskit.transpiler.passes import GatesInBasis from mqt.predictor.rl import Predictor, rl_compile +from mqt.predictor.rl import predictorenv as predictorenv_module from mqt.predictor.rl.actions import ( CompilationOrigin, DeviceIndependentAction, @@ -84,10 +86,7 @@ def test_qcompile_with_newly_trained_models() -> None: ): rl_compile(qc, device=device, figure_of_merit=figure_of_merit) - predictor.train_model( - timesteps=100, - test=True, - ) + predictor.train_model(timesteps=512, test=True, seed=0) qc_compiled, compilation_information = rl_compile(qc, device=device, figure_of_merit=figure_of_merit) @@ -120,6 +119,72 @@ def test_warning_for_unidirectional_device() -> None: Predictor(figure_of_merit="expected_fidelity", device=target) +def test_predictor_env_actions_after_layout_with_non_native_unrouted_circuit() -> None: + """Test valid actions for a laid-out circuit that still needs synthesis and routing.""" + device = get_device("ibm_falcon_27") + env = predictorenv_module.PredictorEnv(device=device) + qc = QuantumCircuit(3) + qc.h(0) + qc.cx(0, 2) + env.reset(qc) + + env.layout = TranspileLayout( + initial_layout=Layout({qubit: index for index, qubit in enumerate(qc.qubits)}), + input_qubit_mapping={qubit: index for index, qubit in enumerate(qc.qubits)}, + final_layout=None, + _output_qubit_list=qc.qubits, + _input_qubit_count=qc.num_qubits, + ) + + valid_actions = env.determine_valid_actions_for_state() + + assert set(env.actions_synthesis_indices).issubset(valid_actions) + assert set(env.actions_routing_indices).issubset(valid_actions) + assert set(env.actions_opt_indices).issubset(valid_actions) + assert env.action_terminate_index not in valid_actions + + +def test_predictor_env_qiskit_routing_updates_final_layout(monkeypatch: pytest.MonkeyPatch) -> None: + """Test that Qiskit routing actions update the tracked final layout.""" + device = get_device("ibm_falcon_27") + env = predictorenv_module.PredictorEnv(device=device) + qc = QuantumCircuit(2) + qc.cx(0, 1) + env.reset(qc) + + initial_layout = Layout({qubit: index for index, qubit in enumerate(qc.qubits)}) + final_layout = Layout({qc.qubits[0]: 1, qc.qubits[1]: 0}) + env.layout = TranspileLayout( + initial_layout=initial_layout, + input_qubit_mapping={qubit: index for index, qubit in enumerate(qc.qubits)}, + final_layout=None, + _output_qubit_list=qc.qubits, + _input_qubit_count=qc.num_qubits, + ) + + class FakePassManager: + """Minimal PassManager replacement that exposes a final layout.""" + + def __init__(self, _passes: object) -> None: + self.property_set = {"final_layout": final_layout} + + def run(self, circuit: QuantumCircuit) -> QuantumCircuit: + return circuit + + monkeypatch.setattr(predictorenv_module, "PassManager", FakePassManager) + action = DeviceIndependentAction( + name="SyntheticQiskitRouting", + pass_type=PassType.ROUTING, + transpile_pass=[], + origin=CompilationOrigin.QISKIT, + ) + + altered_qc = env._apply_qiskit_action(action, env.actions_routing_indices[0]) # noqa: SLF001 + + assert altered_qc is env.state + assert env.layout.final_layout is final_layout + + def test_register_action() -> None: """Test the register_action function.""" action = DeviceIndependentAction( diff --git a/tests/compilation/test_reward.py b/tests/compilation/test_reward.py index 5e36fcc8a..e0843dbc9 100644 --- a/tests/compilation/test_reward.py +++ b/tests/compilation/test_reward.py @@ -10,15 +10,16 @@ from __future__ import annotations -from typing import TYPE_CHECKING +import re import pytest from mqt.bench import BenchmarkLevel, get_benchmark from mqt.bench.targets import get_device -from qiskit import transpile +from qiskit import QuantumCircuit, transpile from qiskit.circuit.library import CXGate, Measure, XGate from qiskit.transpiler import InstructionProperties, Target +from mqt.predictor import reward as reward_module from mqt.predictor.reward import crit_depth, esp_data_available, estimated_success_probability, expected_fidelity try: @@ -30,9 +31,6 @@ QISKIT_PRE_2_0 = True -if TYPE_CHECKING: - from qiskit import QuantumCircuit - @pytest.fixture def device() -> Target: @@ -132,3 +130,37 @@ def test_esp_data_available_invalid_target(kwargs: dict[str, float | bool]) -> N """Test that `esp_data_available` returns False for invalid device configurations.""" target = make_target(**kwargs) # ty: ignore[invalid-argument-type] assert not esp_data_available(target) + + +@pytest.mark.parametrize("reward_function", ["expected_fidelity", "estimated_success_probability"]) +def test_reward_missing_two_qubit_error(reward_function: str, monkeypatch: pytest.MonkeyPatch) -> None: + """Test that reward functions report missing two-qubit error rates descriptively.""" + target = make_target() + del target["cx"][0, 1] + + qc = QuantumCircuit(2) + if reward_function == "estimated_success_probability": + qc.x(0) + + scheduled_qc = QuantumCircuit(2) + scheduled_qc.cx(0, 1) + + def fake_transpile(*_args: object, **_kwargs: object) -> QuantumCircuit: + return scheduled_qc + + def estimate_duration(*, target: Target) -> float: + assert target.num_qubits == 2 + return 0.0 + + monkeypatch.setattr(reward_module, "qiskit_version", "2.0.0") + monkeypatch.setattr(reward_module, "transpile", fake_transpile) + monkeypatch.setattr( + scheduled_qc, "estimate_duration", estimate_duration, raising=False + ) # not available in qiskit<2.0 + else: + qc.cx(0, 1) + + reward = estimated_success_probability if reward_function == "estimated_success_probability" else expected_fidelity + expected_message = "Error rate for gate cx on qubits 0 and 1 not found in device properties." + with pytest.raises(KeyError, match=re.escape(expected_message)): + reward(qc, target) diff --git a/tests/hellinger_distance/test_estimated_hellinger_distance.py b/tests/hellinger_distance/test_estimated_hellinger_distance.py index 1d08c6189..3d274c13b 100644 --- a/tests/hellinger_distance/test_estimated_hellinger_distance.py +++ b/tests/hellinger_distance/test_estimated_hellinger_distance.py @@ -196,10 +196,7 @@ def test_train_and_qcompile_with_hellinger_model(source_path: Path, target_path: # 1. Train the reinforcement learning model for circuit compilation rl_predictor = rl_Predictor(device=device, figure_of_merit=figure_of_merit) - rl_predictor.train_model( - timesteps=5, - test=True, - ) + rl_predictor.train_model(timesteps=5, test=True, seed=0) # 2. Setup and train the machine learning model for device selection ml_predictor = ml_Predictor(devices=[device], figure_of_merit=figure_of_merit)