From e4ea63636ad305c9e55029f52d6bad0c1ed6df1d Mon Sep 17 00:00:00 2001 From: Matthew Treinish Date: Tue, 18 Feb 2025 17:59:08 -0500 Subject: [PATCH] Remove v1 primitive implementations This commit removes the deprecated implementation of the v1 primitives interface. These were deprecated in the 1.2 release and have been superseded by the implementation of the v2 primtives interface. --- qiskit/primitives/__init__.py | 8 - qiskit/primitives/backend_estimator.py | 486 ----------------- qiskit/primitives/backend_sampler.py | 222 -------- qiskit/primitives/estimator.py | 172 ------ qiskit/primitives/sampler.py | 162 ------ ...eprecated-primitives-c200e72332c1bc16.yaml | 16 + .../primitives/test_backend_estimator.py | 511 ------------------ .../python/primitives/test_backend_sampler.py | 496 ----------------- test/python/primitives/test_estimator.py | 437 --------------- test/python/primitives/test_sampler.py | 440 --------------- 10 files changed, 16 insertions(+), 2934 deletions(-) delete mode 100644 qiskit/primitives/backend_estimator.py delete mode 100644 qiskit/primitives/backend_sampler.py delete mode 100644 qiskit/primitives/estimator.py delete mode 100644 qiskit/primitives/sampler.py create mode 100644 releasenotes/notes/remove-deprecated-primitives-c200e72332c1bc16.yaml delete mode 100644 test/python/primitives/test_backend_estimator.py delete mode 100644 test/python/primitives/test_backend_sampler.py delete mode 100644 test/python/primitives/test_estimator.py delete mode 100644 test/python/primitives/test_sampler.py diff --git a/qiskit/primitives/__init__.py b/qiskit/primitives/__init__.py index c4fb9855115f..597d84b832fe 100644 --- a/qiskit/primitives/__init__.py +++ b/qiskit/primitives/__init__.py @@ -437,8 +437,6 @@ BaseEstimator BaseEstimatorV1 - Estimator - BackendEstimator EstimatorResult @@ -450,14 +448,10 @@ BaseSampler BaseSamplerV1 - Sampler - BackendSampler SamplerResult """ -from .backend_estimator import BackendEstimator -from .backend_sampler import BackendSampler from .base import ( BaseEstimator, BaseEstimatorV1, @@ -480,9 +474,7 @@ ObservableLike, ObservablesArrayLike, ) -from .estimator import Estimator from .primitive_job import BasePrimitiveJob, PrimitiveJob -from .sampler import Sampler from .statevector_estimator import StatevectorEstimator from .statevector_sampler import StatevectorSampler from .backend_estimator_v2 import BackendEstimatorV2 diff --git a/qiskit/primitives/backend_estimator.py b/qiskit/primitives/backend_estimator.py deleted file mode 100644 index 5f9f8d20c75e..000000000000 --- a/qiskit/primitives/backend_estimator.py +++ /dev/null @@ -1,486 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2022. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -"""Estimator V1 implementation for an arbitrary Backend object.""" - -from __future__ import annotations - -from collections.abc import Sequence -from itertools import accumulate - -import numpy as np - -from qiskit.circuit import ClassicalRegister, QuantumCircuit, QuantumRegister -from qiskit.compiler import transpile -from qiskit.exceptions import QiskitError -from qiskit.providers import BackendV1, BackendV2, Options -from qiskit.quantum_info import Pauli, PauliList -from qiskit.quantum_info.operators.base_operator import BaseOperator -from qiskit.result import Counts, Result -from qiskit.transpiler import CouplingMap, PassManager -from qiskit.transpiler.passes import ( - ApplyLayout, - EnlargeWithAncilla, - FullAncillaAllocation, - Optimize1qGatesDecomposition, - SetLayout, -) -from qiskit.utils.deprecation import deprecate_func - -from .base import BaseEstimator, EstimatorResult -from .primitive_job import PrimitiveJob -from .utils import _circuit_key, _observable_key, init_observable - - -def _run_circuits( - circuits: QuantumCircuit | list[QuantumCircuit], - backend: BackendV1 | BackendV2, - clear_metadata: bool = True, - **run_options, -) -> tuple[list[Result], list[dict]]: - """Remove metadata of circuits and run the circuits on a backend. - Args: - circuits: The circuits - backend: The backend - clear_metadata: Clear circuit metadata before passing to backend.run if - True. - **run_options: run_options - Returns: - The result and the metadata of the circuits - """ - if isinstance(circuits, QuantumCircuit): - circuits = [circuits] - metadata = [] - for circ in circuits: - metadata.append(circ.metadata) - if clear_metadata: - circ.metadata = {} - if isinstance(backend, BackendV1): - max_circuits = getattr(backend.configuration(), "max_experiments", None) - elif isinstance(backend, BackendV2): - max_circuits = backend.max_circuits - else: - raise RuntimeError("Backend version not supported") - if max_circuits: - jobs = [ - backend.run(circuits[pos : pos + max_circuits], **run_options) - for pos in range(0, len(circuits), max_circuits) - ] - result = [x.result() for x in jobs] - else: - result = [backend.run(circuits, **run_options).result()] - return result, metadata - - -def _prepare_counts(results: list[Result]): - counts = [] - for res in results: - count = res.get_counts() - if not isinstance(count, list): - count = [count] - counts.extend(count) - return counts - - -class BackendEstimator(BaseEstimator[PrimitiveJob[EstimatorResult]]): - """Evaluates expectation value using Pauli rotation gates. - - The :class:`~.BackendEstimator` class is a generic implementation of the - :class:`~.BaseEstimator` (V1) interface that is used to wrap a :class:`~.BackendV2` - (or :class:`~.BackendV1`) object in the :class:`~.BaseEstimator` V1 API. It - facilitates using backends that do not provide a native - :class:`~.BaseEstimator` V1 implementation in places that work with - :class:`~.BaseEstimator` V1. - However, if you're using a provider that has a native implementation of - :class:`~.BaseEstimatorV1` ( :class:`~.BaseEstimator`) or - :class:`~.BaseEstimatorV2`, it is a better - choice to leverage that native implementation as it will likely include - additional optimizations and be a more efficient implementation. - The generic nature of this class precludes doing any provider- or - backend-specific optimizations. - """ - - @deprecate_func( - since="1.2", - additional_msg="All implementations of the `BaseEstimatorV1` interface " - "have been deprecated in favor of their V2 counterparts. " - "The V2 alternative for the `BackendEstimator` class is `BackendEstimatorV2`.", - ) - def __init__( - self, - backend: BackendV1 | BackendV2, - options: dict | None = None, - abelian_grouping: bool = True, - bound_pass_manager: PassManager | None = None, - skip_transpilation: bool = False, - ): - """Initialize a new BackendEstimator (V1) instance - - Args: - backend: (required) the backend to run the primitive on - options: Default options. - abelian_grouping: Whether the observable should be grouped into - commuting - bound_pass_manager: An optional pass manager to run after - parameter binding. - skip_transpilation: If this is set to True the internal compilation - of the input circuits is skipped and the circuit objects - will be directly executed when this object is called. - """ - super().__init__(options=options) - self._circuits = [] - self._parameters = [] - self._observables = [] - - self._abelian_grouping = abelian_grouping - - self._backend = backend - - self._transpile_options = Options() - self._bound_pass_manager = bound_pass_manager - - self._preprocessed_circuits: list[tuple[QuantumCircuit, list[QuantumCircuit]]] | None = None - self._transpiled_circuits: list[QuantumCircuit] | None = None - - self._grouping = list(zip(range(len(self._circuits)), range(len(self._observables)))) - self._skip_transpilation = skip_transpilation - - self._circuit_ids = {} - self._observable_ids = {} - - @property - def transpile_options(self) -> Options: - """Return the transpiler options for transpiling the circuits.""" - return self._transpile_options - - def set_transpile_options(self, **fields): - """Set the transpiler options for transpiler. - Args: - **fields: The fields to update the options - """ - self._transpiled_circuits = None - self._transpile_options.update_options(**fields) - - @property - def preprocessed_circuits( - self, - ) -> list[tuple[QuantumCircuit, list[QuantumCircuit]]]: - """ - Transpiled quantum circuits produced by preprocessing - Returns: - List of the transpiled quantum circuit - """ - self._preprocessed_circuits = self._preprocessing() - return self._preprocessed_circuits - - @property - def transpiled_circuits(self) -> list[QuantumCircuit]: - """ - Transpiled quantum circuits. - Returns: - List of the transpiled quantum circuit - Raises: - QiskitError: if the instance has been closed. - """ - self._transpile() - return self._transpiled_circuits - - @property - def backend(self) -> BackendV1 | BackendV2: - """ - Returns: - The backend which this estimator object based on - """ - return self._backend - - def _transpile(self): - """Split Transpile""" - self._transpiled_circuits = [] - for common_circuit, diff_circuits in self.preprocessed_circuits: - # 1. transpile a common circuit - if self._skip_transpilation: - transpiled_circuit = common_circuit.copy() - final_index_layout = list(range(common_circuit.num_qubits)) - else: - transpiled_circuit = transpile( # pylint:disable=unexpected-keyword-arg - common_circuit, self.backend, **self.transpile_options.__dict__ - ) - if transpiled_circuit.layout is not None: - final_index_layout = transpiled_circuit.layout.final_index_layout() - else: - final_index_layout = list(range(transpiled_circuit.num_qubits)) - - # 2. transpile diff circuits - passmanager = _passmanager_for_measurement_circuits(final_index_layout, self.backend) - diff_circuits = passmanager.run(diff_circuits) - # 3. combine - transpiled_circuits = [] - for diff_circuit in diff_circuits: - transpiled_circuit_copy = transpiled_circuit.copy() - # diff_circuit is supposed to have a classical register whose name is different from - # those of the transpiled_circuit - clbits = diff_circuit.cregs[0] - for creg in transpiled_circuit_copy.cregs: - if clbits.name == creg.name: - raise QiskitError( - "Classical register for measurements conflict with those of the input " - f"circuit: {clbits}. " - "Recommended to avoid register names starting with '__'." - ) - transpiled_circuit_copy.add_register(clbits) - transpiled_circuit_copy.compose(diff_circuit, clbits=clbits, inplace=True) - transpiled_circuit_copy.metadata = diff_circuit.metadata - transpiled_circuits.append(transpiled_circuit_copy) - self._transpiled_circuits += transpiled_circuits - - def _call( - self, - circuits: Sequence[int], - observables: Sequence[int], - parameter_values: Sequence[Sequence[float]], - **run_options, - ) -> EstimatorResult: - - # Transpile - self._grouping = list(zip(circuits, observables)) - transpiled_circuits = self.transpiled_circuits - num_observables = [len(m) for (_, m) in self.preprocessed_circuits] - accum = [0] + list(accumulate(num_observables)) - - # Bind parameters - parameter_dicts = [ - dict(zip(self._parameters[i], value)) for i, value in zip(circuits, parameter_values) - ] - bound_circuits = [ - ( - transpiled_circuits[circuit_index] - if len(p) == 0 - else transpiled_circuits[circuit_index].assign_parameters(p) - ) - for i, (p, n) in enumerate(zip(parameter_dicts, num_observables)) - for circuit_index in range(accum[i], accum[i] + n) - ] - bound_circuits = self._bound_pass_manager_run(bound_circuits) - - # Run - result, metadata = _run_circuits(bound_circuits, self._backend, **run_options) - - return self._postprocessing(result, accum, metadata) - - def _run( - self, - circuits: tuple[QuantumCircuit, ...], - observables: tuple[BaseOperator, ...], - parameter_values: tuple[tuple[float, ...], ...], - **run_options, - ): - circuit_indices = [] - for circuit in circuits: - index = self._circuit_ids.get(_circuit_key(circuit)) - if index is not None: - circuit_indices.append(index) - else: - circuit_indices.append(len(self._circuits)) - self._circuit_ids[_circuit_key(circuit)] = len(self._circuits) - self._circuits.append(circuit) - self._parameters.append(circuit.parameters) - observable_indices = [] - for observable in observables: - observable = init_observable(observable) - index = self._observable_ids.get(_observable_key(observable)) - if index is not None: - observable_indices.append(index) - else: - observable_indices.append(len(self._observables)) - self._observable_ids[_observable_key(observable)] = len(self._observables) - self._observables.append(observable) - job = PrimitiveJob( - self._call, circuit_indices, observable_indices, parameter_values, **run_options - ) - job._submit() - return job - - @staticmethod - def _measurement_circuit(num_qubits: int, pauli: Pauli): - # Note: if pauli is I for all qubits, this function generates a circuit to measure only - # the first qubit. - # Although such an operator can be optimized out by interpreting it as a constant (1), - # this optimization requires changes in various methods. So it is left as future work. - qubit_indices = np.arange(pauli.num_qubits)[pauli.z | pauli.x] - if not np.any(qubit_indices): - qubit_indices = [0] - meas_circuit = QuantumCircuit( - QuantumRegister(num_qubits, "q"), ClassicalRegister(len(qubit_indices), f"__c_{pauli}") - ) - for clbit, i in enumerate(qubit_indices): - if pauli.x[i]: - if pauli.z[i]: - meas_circuit.sdg(i) - meas_circuit.h(i) - meas_circuit.measure(i, clbit) - return meas_circuit, qubit_indices - - def _preprocessing(self) -> list[tuple[QuantumCircuit, list[QuantumCircuit]]]: - """ - Preprocessing for evaluation of expectation value using pauli rotation gates. - """ - preprocessed_circuits = [] - for group in self._grouping: - circuit = self._circuits[group[0]] - observable = self._observables[group[1]] - diff_circuits: list[QuantumCircuit] = [] - if self._abelian_grouping: - for obs in observable.group_commuting(qubit_wise=True): - basis = Pauli( - (np.logical_or.reduce(obs.paulis.z), np.logical_or.reduce(obs.paulis.x)) - ) - meas_circuit, indices = self._measurement_circuit(circuit.num_qubits, basis) - paulis = PauliList.from_symplectic( - obs.paulis.z[:, indices], - obs.paulis.x[:, indices], - obs.paulis.phase, - ) - meas_circuit.metadata = { - "paulis": paulis, - "coeffs": np.real_if_close(obs.coeffs), - } - diff_circuits.append(meas_circuit) - else: - for basis, obs in zip(observable.paulis, observable): - meas_circuit, indices = self._measurement_circuit(circuit.num_qubits, basis) - paulis = PauliList.from_symplectic( - obs.paulis.z[:, indices], - obs.paulis.x[:, indices], - obs.paulis.phase, - ) - meas_circuit.metadata = { - "paulis": paulis, - "coeffs": np.real_if_close(obs.coeffs), - } - diff_circuits.append(meas_circuit) - - preprocessed_circuits.append((circuit.copy(), diff_circuits)) - return preprocessed_circuits - - def _postprocessing( - self, result: list[Result], accum: list[int], metadata: list[dict] - ) -> EstimatorResult: - """ - Postprocessing for evaluation of expectation value using pauli rotation gates. - """ - counts = _prepare_counts(result) - expval_list = [] - var_list = [] - shots_list = [] - - for i, j in zip(accum, accum[1:]): - - combined_expval = 0.0 - combined_var = 0.0 - - for k in range(i, j): - meta = metadata[k] - paulis = meta["paulis"] - coeffs = meta["coeffs"] - - count = counts[k] - - expvals, variances = _pauli_expval_with_variance(count, paulis) - - # Accumulate - combined_expval += np.dot(expvals, coeffs) - combined_var += np.dot(variances, coeffs**2) - - expval_list.append(combined_expval) - var_list.append(combined_var) - shots_list.append(sum(counts[i].values())) - - metadata = [{"variance": var, "shots": shots} for var, shots in zip(var_list, shots_list)] - - return EstimatorResult(np.real_if_close(expval_list), metadata) - - def _bound_pass_manager_run(self, circuits): - if self._bound_pass_manager is None: - return circuits - else: - output = self._bound_pass_manager.run(circuits) - if not isinstance(output, list): - output = [output] - return output - - -def _paulis2inds(paulis: PauliList) -> list[int]: - """Convert PauliList to diagonal integers. - These are integer representations of the binary string with a - 1 where there are Paulis, and 0 where there are identities. - """ - # Treat Z, X, Y the same - nonid = paulis.z | paulis.x - - # bits are packed into uint8 in little endian - # e.g., i-th bit corresponds to coefficient 2^i - packed_vals = np.packbits(nonid, axis=1, bitorder="little") - power_uint8 = 1 << (8 * np.arange(packed_vals.shape[1], dtype=object)) - inds = packed_vals @ power_uint8 - return inds.tolist() - - -def _parity(integer: int) -> int: - """Return the parity of an integer""" - return bin(integer).count("1") % 2 - - -def _pauli_expval_with_variance(counts: Counts, paulis: PauliList) -> tuple[np.ndarray, np.ndarray]: - """Return array of expval and variance pairs for input Paulis. - Note: All non-identity Pauli's are treated as Z-paulis, assuming - that basis rotations have been applied to convert them to the - diagonal basis. - """ - # Diag indices - size = len(paulis) - diag_inds = _paulis2inds(paulis) - - expvals = np.zeros(size, dtype=float) - denom = 0 # Total shots for counts dict - for bin_outcome, freq in counts.items(): - split_outcome = bin_outcome.split(" ", 1)[0] if " " in bin_outcome else bin_outcome - outcome = int(split_outcome, 2) - denom += freq - for k in range(size): - coeff = (-1) ** _parity(diag_inds[k] & outcome) - expvals[k] += freq * coeff - - # Divide by total shots - expvals /= denom - - # Compute variance - variances = 1 - expvals**2 - return expvals, variances - - -def _passmanager_for_measurement_circuits(layout, backend) -> PassManager: - passmanager = PassManager([SetLayout(layout)]) - if isinstance(backend, BackendV2): - opt1q = Optimize1qGatesDecomposition(target=backend.target) - else: - opt1q = Optimize1qGatesDecomposition(basis=backend.configuration().basis_gates) - passmanager.append(opt1q) - if isinstance(backend, BackendV2) and isinstance(backend.coupling_map, CouplingMap): - coupling_map = backend.coupling_map - passmanager.append(FullAncillaAllocation(coupling_map)) - passmanager.append(EnlargeWithAncilla()) - elif isinstance(backend, BackendV1) and backend.configuration().coupling_map is not None: - coupling_map = CouplingMap(backend.configuration().coupling_map) - passmanager.append(FullAncillaAllocation(coupling_map)) - passmanager.append(EnlargeWithAncilla()) - passmanager.append(ApplyLayout()) - return passmanager diff --git a/qiskit/primitives/backend_sampler.py b/qiskit/primitives/backend_sampler.py deleted file mode 100644 index 905fa2371542..000000000000 --- a/qiskit/primitives/backend_sampler.py +++ /dev/null @@ -1,222 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2022. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -"""Sampler V1 implementation for an arbitrary Backend object.""" - -from __future__ import annotations - -import math -from collections.abc import Sequence -from typing import Any - -from qiskit.circuit.quantumcircuit import QuantumCircuit -from qiskit.providers.backend import BackendV1, BackendV2 -from qiskit.providers.options import Options -from qiskit.result import QuasiDistribution, Result -from qiskit.transpiler.passmanager import PassManager -from qiskit.utils.deprecation import deprecate_func - -from .backend_estimator import _prepare_counts, _run_circuits -from .base import BaseSampler, SamplerResult -from .primitive_job import PrimitiveJob -from .utils import _circuit_key - - -class BackendSampler(BaseSampler[PrimitiveJob[SamplerResult]]): - """A :class:`~.BaseSampler` (V1) implementation that provides a wrapper for - leveraging the Sampler V1 interface from any backend. - - This class provides a sampler interface from any backend and doesn't do - any measurement mitigation, it just computes the probability distribution - from the counts. It facilitates using backends that do not provide a - native :class:`~.BaseSampler` V1 implementation in places that work with - :class:`~.BaseSampler` V1. - However, if you're using a provider that has a native implementation of - :class:`~.BaseSamplerV1` ( :class:`~.BaseSampler`) or - :class:`~.BaseESamplerV2`, it is a better - choice to leverage that native implementation as it will likely include - additional optimizations and be a more efficient implementation. - The generic nature of this class precludes doing any provider- or - backend-specific optimizations. - """ - - @deprecate_func( - since="1.2", - additional_msg="All implementations of the `BaseSamplerV1` interface " - "have been deprecated in favor of their V2 counterparts. " - "The V2 alternative for the `BackendSampler` class is `BackendSamplerV2`.", - ) - def __init__( - self, - backend: BackendV1 | BackendV2, - options: dict | None = None, - bound_pass_manager: PassManager | None = None, - skip_transpilation: bool = False, - ): - """Initialize a new BackendSampler (V1) instance - - Args: - backend: (required) the backend to run the sampler primitive on - options: Default options. - bound_pass_manager: An optional pass manager to run after - parameter binding. - skip_transpilation: If this is set to True the internal compilation - of the input circuits is skipped and the circuit objects - will be directly executed when this objected is called. - Raises: - ValueError: If backend is not provided - """ - - super().__init__(options=options) - self._circuits = [] - self._parameters = [] - self._backend = backend - self._transpile_options = Options() - self._bound_pass_manager = bound_pass_manager - self._preprocessed_circuits: list[QuantumCircuit] | None = None - self._transpiled_circuits: list[QuantumCircuit] = [] - self._skip_transpilation = skip_transpilation - self._circuit_ids = {} - - @property - def preprocessed_circuits(self) -> list[QuantumCircuit]: - """ - Preprocessed quantum circuits produced by preprocessing - Returns: - List of the transpiled quantum circuit - Raises: - QiskitError: if the instance has been closed. - """ - return list(self._circuits) - - @property - def transpiled_circuits(self) -> list[QuantumCircuit]: - """ - Transpiled quantum circuits. - Returns: - List of the transpiled quantum circuit - Raises: - QiskitError: if the instance has been closed. - """ - if self._skip_transpilation: - self._transpiled_circuits = list(self._circuits) - elif len(self._transpiled_circuits) < len(self._circuits): - # transpile only circuits that are not transpiled yet - self._transpile() - return self._transpiled_circuits - - @property - def backend(self) -> BackendV1 | BackendV2: - """ - Returns: - The backend which this sampler object based on - """ - return self._backend - - @property - def transpile_options(self) -> Options: - """Return the transpiler options for transpiling the circuits.""" - return self._transpile_options - - def set_transpile_options(self, **fields): - """Set the transpiler options for transpiler. - Args: - **fields: The fields to update the options. - Returns: - self. - Raises: - QiskitError: if the instance has been closed. - """ - self._transpile_options.update_options(**fields) - - def _call( - self, - circuits: Sequence[int], - parameter_values: Sequence[Sequence[float]], - **run_options, - ) -> SamplerResult: - - # This line does the actual transpilation - transpiled_circuits = self.transpiled_circuits - bound_circuits = [ - ( - transpiled_circuits[i] - if len(value) == 0 - else transpiled_circuits[i].assign_parameters( - (dict(zip(self._parameters[i], value))) - ) - ) - for i, value in zip(circuits, parameter_values) - ] - bound_circuits = self._bound_pass_manager_run(bound_circuits) - # Run - result, _metadata = _run_circuits(bound_circuits, self._backend, **run_options) - return self._postprocessing(result, bound_circuits) - - def _postprocessing( - self, result: list[Result], circuits: list[QuantumCircuit] - ) -> SamplerResult: - counts = _prepare_counts(result) - shots = sum(counts[0].values()) - - probabilities = [] - metadata: list[dict[str, Any]] = [{} for _ in range(len(circuits))] - for count in counts: - prob_dist = {k: v / shots for k, v in count.items()} - probabilities.append( - QuasiDistribution(prob_dist, shots=shots, stddev_upper_bound=math.sqrt(1 / shots)) - ) - for metadatum in metadata: - metadatum["shots"] = shots - - return SamplerResult(probabilities, metadata) - - def _transpile(self): - from qiskit.compiler import transpile - - start = len(self._transpiled_circuits) - self._transpiled_circuits.extend( - transpile( # pylint:disable=unexpected-keyword-arg - self.preprocessed_circuits[start:], - self.backend, - **self.transpile_options.__dict__, - ), - ) - - def _bound_pass_manager_run(self, circuits): - if self._bound_pass_manager is None: - return circuits - else: - output = self._bound_pass_manager.run(circuits) - if not isinstance(output, list): - output = [output] - return output - - def _run( - self, - circuits: tuple[QuantumCircuit, ...], - parameter_values: tuple[tuple[float, ...], ...], - **run_options, - ): - circuit_indices = [] - for circuit in circuits: - index = self._circuit_ids.get(_circuit_key(circuit)) - if index is not None: - circuit_indices.append(index) - else: - circuit_indices.append(len(self._circuits)) - self._circuit_ids[_circuit_key(circuit)] = len(self._circuits) - self._circuits.append(circuit) - self._parameters.append(circuit.parameters) - job = PrimitiveJob(self._call, circuit_indices, parameter_values, **run_options) - job._submit() - return job diff --git a/qiskit/primitives/estimator.py b/qiskit/primitives/estimator.py deleted file mode 100644 index d8e0a00f45e1..000000000000 --- a/qiskit/primitives/estimator.py +++ /dev/null @@ -1,172 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2022. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. -""" -Estimator V1 reference implementation -""" - -from __future__ import annotations - -from collections.abc import Sequence -from typing import Any - -import numpy as np - -from qiskit.circuit import QuantumCircuit -from qiskit.exceptions import QiskitError -from qiskit.quantum_info.operators.base_operator import BaseOperator -from qiskit.utils.deprecation import deprecate_func - -from .base import BaseEstimator, EstimatorResult -from .primitive_job import PrimitiveJob -from .utils import ( - _circuit_key, - _observable_key, - _statevector_from_circuit, - init_observable, -) - - -class Estimator(BaseEstimator[PrimitiveJob[EstimatorResult]]): - """ - Reference implementation of :class:`BaseEstimator` (V1). - - :Run Options: - - - **shots** (None or int) -- - The number of shots. If None, it calculates the expectation values - with full state vector simulation. - Otherwise, it samples from normal distributions with standard errors as standard - deviations using normal distribution approximation. - - - **seed** (np.random.Generator or int) -- - Set a fixed seed or generator for the normal distribution. If shots is None, - this option is ignored. - - .. note:: - The result of this class is exact if the circuit contains only unitary operations. - On the other hand, the result could be stochastic if the circuit contains a non-unitary - operation such as a reset for a some subsystems. - The stochastic result can be made reproducible by setting ``seed``, e.g., - ``Estimator(options={"seed":123})``. - """ - - @deprecate_func( - since="1.2", - additional_msg="All implementations of the `BaseEstimatorV1` interface " - "have been deprecated in favor of their V2 counterparts. " - "The V2 alternative for the `Estimator` class is `StatevectorEstimator`.", - ) - def __init__(self, *, options: dict | None = None): - """ - Args: - options: Default options. - - Raises: - QiskitError: if some classical bits are not used for measurements. - """ - super().__init__(options=options) - self._circuits = [] - self._parameters = [] - self._observables = [] - self._circuit_ids = {} - self._observable_ids = {} - - def _call( - self, - circuits: Sequence[int], - observables: Sequence[int], - parameter_values: Sequence[Sequence[float]], - **run_options, - ) -> EstimatorResult: - shots = run_options.pop("shots", None) - seed = run_options.pop("seed", None) - if seed is None: - rng = np.random.default_rng() - elif isinstance(seed, np.random.Generator): - rng = seed - else: - rng = np.random.default_rng(seed) - - # Initialize metadata - metadata: list[dict[str, Any]] = [{} for _ in range(len(circuits))] - - bound_circuits = [] - for i, value in zip(circuits, parameter_values): - if len(value) != len(self._parameters[i]): - raise QiskitError( - f"The number of values ({len(value)}) does not match " - f"the number of parameters ({len(self._parameters[i])})." - ) - bound_circuits.append( - self._circuits[i] - if len(value) == 0 - else self._circuits[i].assign_parameters(dict(zip(self._parameters[i], value))) - ) - sorted_observables = [self._observables[i] for i in observables] - expectation_values = [] - for circ, obs, metadatum in zip(bound_circuits, sorted_observables, metadata): - if circ.num_qubits != obs.num_qubits: - raise QiskitError( - f"The number of qubits of a circuit ({circ.num_qubits}) does not match " - f"the number of qubits of a observable ({obs.num_qubits})." - ) - final_state = _statevector_from_circuit(circ, rng) - expectation_value = final_state.expectation_value(obs) - if shots is None: - expectation_values.append(expectation_value) - else: - expectation_value = np.real_if_close(expectation_value) - sq_obs = (obs @ obs).simplify(atol=0) - sq_exp_val = np.real_if_close(final_state.expectation_value(sq_obs)) - variance = sq_exp_val - expectation_value**2 - variance = max(variance, 0) - standard_error = np.sqrt(variance / shots) - expectation_value_with_error = rng.normal(expectation_value, standard_error) - expectation_values.append(expectation_value_with_error) - metadatum["variance"] = variance - metadatum["shots"] = shots - - return EstimatorResult(np.real_if_close(expectation_values), metadata) - - def _run( - self, - circuits: tuple[QuantumCircuit, ...], - observables: tuple[BaseOperator, ...], - parameter_values: tuple[tuple[float, ...], ...], - **run_options, - ): - circuit_indices = [] - for circuit in circuits: - key = _circuit_key(circuit) - index = self._circuit_ids.get(key) - if index is not None: - circuit_indices.append(index) - else: - circuit_indices.append(len(self._circuits)) - self._circuit_ids[key] = len(self._circuits) - self._circuits.append(circuit) - self._parameters.append(circuit.parameters) - observable_indices = [] - for observable in observables: - observable = init_observable(observable) - index = self._observable_ids.get(_observable_key(observable)) - if index is not None: - observable_indices.append(index) - else: - observable_indices.append(len(self._observables)) - self._observable_ids[_observable_key(observable)] = len(self._observables) - self._observables.append(observable) - job = PrimitiveJob( - self._call, circuit_indices, observable_indices, parameter_values, **run_options - ) - job._submit() - return job diff --git a/qiskit/primitives/sampler.py b/qiskit/primitives/sampler.py deleted file mode 100644 index 9155e50ad2ce..000000000000 --- a/qiskit/primitives/sampler.py +++ /dev/null @@ -1,162 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2022. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. -""" -Sampler V1 reference implementation -""" - -from __future__ import annotations - -from collections.abc import Sequence -from typing import Any - -import numpy as np - -from qiskit.circuit import QuantumCircuit -from qiskit.exceptions import QiskitError -from qiskit.quantum_info import Statevector -from qiskit.result import QuasiDistribution -from qiskit.utils.deprecation import deprecate_func - -from .base import BaseSampler, SamplerResult -from .primitive_job import PrimitiveJob -from .utils import ( - _circuit_key, - bound_circuit_to_instruction, - final_measurement_mapping, - init_circuit, -) - - -class Sampler(BaseSampler[PrimitiveJob[SamplerResult]]): - """ - Sampler V1 class. - - :class:`~Sampler` is a reference implementation of :class:`~BaseSampler` (V1). - - :Run Options: - - - **shots** (None or int) -- - The number of shots. If None, it calculates the probabilities. - Otherwise, it samples from multinomial distributions. - - - **seed** (np.random.Generator or int) -- - Set a fixed seed or generator for the multinomial distribution. If shots is None, this - option is ignored. - """ - - @deprecate_func( - since="1.2", - additional_msg="All implementations of the `BaseSamplerV1` interface " - "have been deprecated in favor of their V2 counterparts. " - "The V2 alternative for the `Sampler` class is `StatevectorSampler`.", - ) - def __init__(self, *, options: dict | None = None): - """ - Args: - options: Default options. - - Raises: - QiskitError: if some classical bits are not used for measurements. - """ - super().__init__(options=options) - self._circuits = [] - self._parameters = [] - self._qargs_list = [] - self._circuit_ids = {} - - def _call( - self, - circuits: Sequence[int], - parameter_values: Sequence[Sequence[float]], - **run_options, - ) -> SamplerResult: - shots = run_options.pop("shots", None) - seed = run_options.pop("seed", None) - if seed is None: - rng = np.random.default_rng() - elif isinstance(seed, np.random.Generator): - rng = seed - else: - rng = np.random.default_rng(seed) - - # Initialize metadata - metadata: list[dict[str, Any]] = [{} for _ in range(len(circuits))] - - bound_circuits = [] - qargs_list = [] - for i, value in zip(circuits, parameter_values): - if len(value) != len(self._parameters[i]): - raise QiskitError( - f"The number of values ({len(value)}) does not match " - f"the number of parameters ({len(self._parameters[i])})." - ) - bound_circuits.append( - self._circuits[i] - if len(value) == 0 - else self._circuits[i].assign_parameters(dict(zip(self._parameters[i], value))) - ) - qargs_list.append(self._qargs_list[i]) - probabilities = [ - Statevector(bound_circuit_to_instruction(circ)).probabilities_dict( - qargs=qargs, decimals=16 - ) - for circ, qargs in zip(bound_circuits, qargs_list) - ] - if shots is not None: - for i, prob_dict in enumerate(probabilities): - counts = rng.multinomial(shots, np.fromiter(prob_dict.values(), dtype=float)) - probabilities[i] = { - key: count / shots for key, count in zip(prob_dict.keys(), counts) if count > 0 - } - for metadatum in metadata: - metadatum["shots"] = shots - quasis = [QuasiDistribution(p, shots=shots) for p in probabilities] - - return SamplerResult(quasis, metadata) - - def _run( - self, - circuits: tuple[QuantumCircuit, ...], - parameter_values: tuple[tuple[float, ...], ...], - **run_options, - ): - circuit_indices = [] - for circuit in circuits: - key = _circuit_key(circuit) - index = self._circuit_ids.get(key) - if index is not None: - circuit_indices.append(index) - else: - circuit_indices.append(len(self._circuits)) - self._circuit_ids[key] = len(self._circuits) - circuit, qargs = self._preprocess_circuit(circuit) - self._circuits.append(circuit) - self._qargs_list.append(qargs) - self._parameters.append(circuit.parameters) - job = PrimitiveJob(self._call, circuit_indices, parameter_values, **run_options) - job._submit() - return job - - @staticmethod - def _preprocess_circuit(circuit: QuantumCircuit): - circuit = init_circuit(circuit) - q_c_mapping = final_measurement_mapping(circuit) - if set(range(circuit.num_clbits)) != set(q_c_mapping.values()): - raise QiskitError( - "Some classical bits are not used for measurements." - f" the number of classical bits ({circuit.num_clbits})," - f" the used classical bits ({set(q_c_mapping.values())})." - ) - c_q_mapping = sorted((c, q) for q, c in q_c_mapping.items()) - qargs = [q for _, q in c_q_mapping] - circuit = circuit.remove_final_measurements(inplace=False) - return circuit, qargs diff --git a/releasenotes/notes/remove-deprecated-primitives-c200e72332c1bc16.yaml b/releasenotes/notes/remove-deprecated-primitives-c200e72332c1bc16.yaml new file mode 100644 index 000000000000..77bfff966bc7 --- /dev/null +++ b/releasenotes/notes/remove-deprecated-primitives-c200e72332c1bc16.yaml @@ -0,0 +1,16 @@ +--- +upgrade_primitives: + - | + The deprecated primitive V1 implementations and V1-exclusive non-versioned + type aliases are now removed in favor of their V2 counterparts. The + following classes have been removed: + + * ``Estimator``, replaced with :class:`.StatevectorEstimator` + * ``Sampler``, replaced with :class:`.StatevectorSampler` + * ``BackendEstimator``, replaced with :class:`.BackendEstimatorV2` + * ``BackendSampler``, replaced with :class:`.BackendSamplerV2` + + The follwoing type aliases were removed as well: + + * ``BaseEstimator`` + * ``BaseSampler`` diff --git a/test/python/primitives/test_backend_estimator.py b/test/python/primitives/test_backend_estimator.py deleted file mode 100644 index 49d5efa2d782..000000000000 --- a/test/python/primitives/test_backend_estimator.py +++ /dev/null @@ -1,511 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2022, 2024. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -"""Tests for BackendEstimator.""" - -import unittest -from unittest.mock import patch -from multiprocessing import Manager -import numpy as np -from ddt import ddt - -from qiskit.circuit import QuantumCircuit -from qiskit.circuit.library import RealAmplitudes -from qiskit.primitives import BackendEstimator, EstimatorResult -from qiskit.providers.fake_provider import Fake7QPulseV1, GenericBackendV2 -from qiskit.providers.backend_compat import BackendV2Converter -from qiskit.quantum_info import SparsePauliOp -from qiskit.transpiler import PassManager -from qiskit.utils import optionals -from test import QiskitTestCase # pylint: disable=wrong-import-order -from test import combine # pylint: disable=wrong-import-order -from test.python.transpiler._dummy_passes import DummyAP # pylint: disable=wrong-import-order - - -BACKENDS = [ - Fake7QPulseV1(), - BackendV2Converter(Fake7QPulseV1()), - GenericBackendV2(num_qubits=5, seed=42), -] - - -class CallbackPass(DummyAP): - """A dummy analysis pass that calls a callback when executed""" - - def __init__(self, message, callback): - super().__init__() - self.message = message - self.callback = callback - - def run(self, dag): - self.callback(self.message) - - -@ddt -class TestBackendEstimator(QiskitTestCase): - """Test Estimator""" - - def setUp(self): - super().setUp() - self._rng = np.random.default_rng(12) - self.ansatz = RealAmplitudes(num_qubits=2, reps=2) - self.observable = SparsePauliOp.from_list( - [ - ("II", -1.052373245772859), - ("IZ", 0.39793742484318045), - ("ZI", -0.39793742484318045), - ("ZZ", -0.01128010425623538), - ("XX", 0.18093119978423156), - ] - ) - self.expvals = -1.0284380963435145, -1.284366511861733 - - self.psi = (RealAmplitudes(num_qubits=2, reps=2), RealAmplitudes(num_qubits=2, reps=3)) - self.params = tuple(psi.parameters for psi in self.psi) - self.hamiltonian = ( - SparsePauliOp.from_list([("II", 1), ("IZ", 2), ("XI", 3)]), - SparsePauliOp.from_list([("IZ", 1)]), - SparsePauliOp.from_list([("ZI", 1), ("ZZ", 1)]), - ) - self.theta = ( - [0, 1, 1, 2, 3, 5], - [0, 1, 1, 2, 3, 5, 8, 13], - [1, 2, 3, 4, 5, 6], - ) - - @combine(backend=BACKENDS) - def test_estimator_run(self, backend): - """Test Estimator.run()""" - psi1, psi2 = self.psi - hamiltonian1, hamiltonian2, hamiltonian3 = self.hamiltonian - theta1, theta2, theta3 = self.theta - with self.assertWarns(DeprecationWarning): - estimator = BackendEstimator(backend=backend) - estimator.set_options(seed_simulator=123) - - # Specify the circuit and observable by indices. - # calculate [ ] - with self.assertWarns(DeprecationWarning): - job = estimator.run([psi1], [hamiltonian1], [theta1]) - result = job.result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1.5555572817900956], rtol=0.5, atol=0.2) - - # Objects can be passed instead of indices. - # Note that passing objects has an overhead - # since the corresponding indices need to be searched. - # User can append a circuit and observable. - # calculate [ ] - with self.assertWarns(DeprecationWarning): - result2 = estimator.run([psi2], [hamiltonian1], [theta2]).result() - np.testing.assert_allclose(result2.values, [2.97797666], rtol=0.5, atol=0.2) - - # calculate [ , ] - with self.assertWarns(DeprecationWarning): - result3 = estimator.run( - [psi1, psi1], [hamiltonian2, hamiltonian3], [theta1] * 2 - ).result() - np.testing.assert_allclose(result3.values, [-0.551653, 0.07535239], rtol=0.5, atol=0.2) - - # calculate [ , - # , - # ] - with self.assertWarns(DeprecationWarning): - result4 = estimator.run( - [psi1, psi2, psi1], - [hamiltonian1, hamiltonian2, hamiltonian3], - [theta1, theta2, theta3], - ).result() - np.testing.assert_allclose( - result4.values, [1.55555728, 0.17849238, -1.08766318], rtol=0.5, atol=0.2 - ) - - @combine(backend=BACKENDS) - def test_estimator_run_no_params(self, backend): - """test for estimator without parameters""" - circuit = self.ansatz.assign_parameters([0, 1, 1, 2, 3, 5]) - with self.assertWarns(DeprecationWarning): - est = BackendEstimator(backend=backend) - est.set_options(seed_simulator=123) - result = est.run([circuit], [self.observable]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [-1.284366511861733], rtol=0.05) - - @combine(backend=BACKENDS, creg=[True, False]) - def test_run_1qubit(self, backend, creg): - """Test for 1-qubit cases""" - qc = QuantumCircuit(1, 1) if creg else QuantumCircuit(1) - qc2 = QuantumCircuit(1, 1) if creg else QuantumCircuit(1) - qc2.x(0) - - op = SparsePauliOp.from_list([("I", 1)]) - op2 = SparsePauliOp.from_list([("Z", 1)]) - - with self.assertWarns(DeprecationWarning): - est = BackendEstimator(backend=backend) - est.set_options(seed_simulator=123) - result = est.run([qc], [op], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1], rtol=0.1) - - with self.assertWarns(DeprecationWarning): - result = est.run([qc], [op2], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1], rtol=0.1) - - with self.assertWarns(DeprecationWarning): - result = est.run([qc2], [op], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1], rtol=0.1) - - with self.assertWarns(DeprecationWarning): - result = est.run([qc2], [op2], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [-1], rtol=0.1) - - @combine(backend=BACKENDS, creg=[True, False]) - def test_run_2qubits(self, backend, creg): - """Test for 2-qubit cases (to check endian)""" - backend.set_options(seed_simulator=123) - qc = QuantumCircuit(2, 1) if creg else QuantumCircuit(2) - qc2 = QuantumCircuit(2, 1) if creg else QuantumCircuit(2, 1) - qc2.x(0) - - op = SparsePauliOp.from_list([("II", 1)]) - op2 = SparsePauliOp.from_list([("ZI", 1)]) - op3 = SparsePauliOp.from_list([("IZ", 1)]) - - with self.assertWarns(DeprecationWarning): - est = BackendEstimator(backend=backend) - result = est.run([qc], [op], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1], rtol=0.1) - - with self.assertWarns(DeprecationWarning): - result = est.run([qc2], [op], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1], rtol=0.1) - - with self.assertWarns(DeprecationWarning): - result = est.run([qc], [op2], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1], rtol=0.1) - - with self.assertWarns(DeprecationWarning): - result = est.run([qc2], [op2], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1], rtol=0.1) - - with self.assertWarns(DeprecationWarning): - result = est.run([qc], [op3], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1], rtol=0.1) - - with self.assertWarns(DeprecationWarning): - result = est.run([qc2], [op3], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [-1], rtol=0.1) - - @combine(backend=BACKENDS) - def test_run_errors(self, backend): - """Test for errors""" - qc = QuantumCircuit(1) - qc2 = QuantumCircuit(2) - - op = SparsePauliOp.from_list([("I", 1)]) - op2 = SparsePauliOp.from_list([("II", 1)]) - - with self.assertWarns(DeprecationWarning): - est = BackendEstimator(backend=backend) - est.set_options(seed_simulator=123) - with self.assertRaises(ValueError): - est.run([qc], [op2], [[]]).result() - with self.assertRaises(ValueError): - est.run([qc], [op], [[1e4]]).result() - with self.assertRaises(ValueError): - est.run([qc2], [op2], [[1, 2]]).result() - with self.assertRaises(ValueError): - est.run([qc, qc2], [op2], [[1]]).result() - with self.assertRaises(ValueError): - est.run([qc], [op, op2], [[1]]).result() - - @combine(backend=BACKENDS) - def test_run_numpy_params(self, backend): - """Test for numpy array as parameter values""" - qc = RealAmplitudes(num_qubits=2, reps=2) - op = SparsePauliOp.from_list([("IZ", 1), ("XI", 2), ("ZY", -1)]) - k = 5 - params_array = self._rng.random((k, qc.num_parameters)) - params_list = params_array.tolist() - params_list_array = list(params_array) - with self.assertWarns(DeprecationWarning): - estimator = BackendEstimator(backend=backend) - estimator.set_options(seed_simulator=123) - - target = estimator.run([qc] * k, [op] * k, params_list).result() - - with self.subTest("ndarrary"): - with self.assertWarns(DeprecationWarning): - result = estimator.run([qc] * k, [op] * k, params_array).result() - self.assertEqual(len(result.metadata), k) - np.testing.assert_allclose(result.values, target.values, rtol=0.2, atol=0.2) - - with self.subTest("list of ndarray"): - with self.assertWarns(DeprecationWarning): - result = estimator.run([qc] * k, [op] * k, params_list_array).result() - self.assertEqual(len(result.metadata), k) - np.testing.assert_allclose(result.values, target.values, rtol=0.2, atol=0.2) - - @combine(backend=BACKENDS) - def test_run_with_shots_option(self, backend): - """test with shots option.""" - with self.assertWarns(DeprecationWarning): - est = BackendEstimator(backend=backend) - result = est.run( - [self.ansatz], - [self.observable], - parameter_values=[[0, 1, 1, 2, 3, 5]], - shots=1024, - seed_simulator=15, - ).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [-1.307397243478641], rtol=0.1) - - @combine(backend=BACKENDS) - def test_options(self, backend): - """Test for options""" - with self.subTest("init"): - with self.assertWarns(DeprecationWarning): - estimator = BackendEstimator(backend=backend, options={"shots": 3000}) - self.assertEqual(estimator.options.get("shots"), 3000) - with self.subTest("set_options"): - estimator.set_options(shots=1024, seed_simulator=15) - self.assertEqual(estimator.options.get("shots"), 1024) - self.assertEqual(estimator.options.get("seed_simulator"), 15) - with self.subTest("run"): - with self.assertWarns(DeprecationWarning): - result = estimator.run( - [self.ansatz], - [self.observable], - parameter_values=[[0, 1, 1, 2, 3, 5]], - ).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [-1.307397243478641], rtol=0.1) - - def test_job_size_limit_v2(self): - """Test BackendEstimator respects job size limit""" - - class FakeBackendLimitedCircuits(GenericBackendV2): - """Generic backend V2 with job size limit.""" - - @property - def max_circuits(self): - return 1 - - backend = FakeBackendLimitedCircuits(num_qubits=5, seed=42) - backend.set_options(seed_simulator=123) - qc = RealAmplitudes(num_qubits=2, reps=2) - op = SparsePauliOp.from_list([("IZ", 1), ("XI", 2), ("ZY", -1)]) - k = 5 - params_array = self._rng.random((k, qc.num_parameters)) - params_list = params_array.tolist() - with self.assertWarns(DeprecationWarning): - estimator = BackendEstimator(backend=backend) - with patch.object(backend, "run") as run_mock: - with self.assertWarns(DeprecationWarning): - estimator.run([qc] * k, [op] * k, params_list).result() - self.assertEqual(run_mock.call_count, 10) - - def test_job_size_limit_v1(self): - """Test BackendEstimator respects job size limit - REMOVE ONCE Fake7QPulseV1 GETS REMOVED""" - with self.assertWarns(DeprecationWarning): - backend = Fake7QPulseV1() - config = backend.configuration() - config.max_experiments = 1 - backend._configuration = config - qc = RealAmplitudes(num_qubits=2, reps=2) - op = SparsePauliOp.from_list([("IZ", 1), ("XI", 2), ("ZY", -1)]) - k = 5 - params_array = self._rng.random((k, qc.num_parameters)) - params_list = params_array.tolist() - with self.assertWarns(DeprecationWarning): - estimator = BackendEstimator(backend=backend) - estimator.set_options(seed_simulator=123) - with patch.object(backend, "run") as run_mock: - with self.assertWarns(DeprecationWarning): - estimator.run([qc] * k, [op] * k, params_list).result() - self.assertEqual(run_mock.call_count, 10) - - def test_no_max_circuits(self): - """Test BackendEstimator works with BackendV1 and no max_experiments set. - REMOVE ONCE Fake7QPulseV1 GETS REMOVED""" - with self.assertWarns(DeprecationWarning): - backend = Fake7QPulseV1() - config = backend.configuration() - del config.max_experiments - backend._configuration = config - qc = RealAmplitudes(num_qubits=2, reps=2) - op = SparsePauliOp.from_list([("IZ", 1), ("XI", 2), ("ZY", -1)]) - k = 5 - params_array = self._rng.random((k, qc.num_parameters)) - params_list = params_array.tolist() - params_list_array = list(params_array) - with self.assertWarns(DeprecationWarning): - estimator = BackendEstimator(backend=backend) - estimator.set_options(seed_simulator=123) - target = estimator.run([qc] * k, [op] * k, params_list).result() - with self.subTest("ndarrary"): - with self.assertWarns(DeprecationWarning): - result = estimator.run([qc] * k, [op] * k, params_array).result() - self.assertEqual(len(result.metadata), k) - np.testing.assert_allclose(result.values, target.values, rtol=0.2, atol=0.2) - - with self.subTest("list of ndarray"): - with self.assertWarns(DeprecationWarning): - result = estimator.run([qc] * k, [op] * k, params_list_array).result() - self.assertEqual(len(result.metadata), k) - np.testing.assert_allclose(result.values, target.values, rtol=0.2, atol=0.2) - - def test_bound_pass_manager(self): - """Test bound pass manager.""" - - qc = QuantumCircuit(2) - op = SparsePauliOp.from_list([("II", 1)]) - - with self.subTest("Test single circuit"): - messages = [] - - def callback(msg): - messages.append(msg) - - bound_counter = CallbackPass("bound_pass_manager", callback) - bound_pass = PassManager(bound_counter) - with self.assertWarns(DeprecationWarning): - estimator = BackendEstimator( - backend=GenericBackendV2(num_qubits=5, seed=42), bound_pass_manager=bound_pass - ) - _ = estimator.run(qc, op).result() - expected = [ - "bound_pass_manager", - ] - self.assertEqual(messages, expected) - - with self.subTest("Test circuit batch"): - with Manager() as manager: - # The multiprocessing manager is used to share data - # between different processes. Pass Managers parallelize - # execution for batches of circuits, so this is necessary - # to keep track of the callback calls for num_circuits > 1 - messages = manager.list() - - def callback(msg): # pylint: disable=function-redefined - messages.append(msg) - - bound_counter = CallbackPass("bound_pass_manager", callback) - bound_pass = PassManager(bound_counter) - with self.assertWarns(DeprecationWarning): - estimator = BackendEstimator( - backend=GenericBackendV2(num_qubits=5, seed=42), - bound_pass_manager=bound_pass, - ) - _ = estimator.run([qc, qc], [op, op]).result() - expected = [ - "bound_pass_manager", - "bound_pass_manager", - ] - self.assertEqual(list(messages), expected) - - @combine(backend=BACKENDS) - def test_layout(self, backend): - """Test layout for split transpilation.""" - with self.subTest("initial layout test"): - qc = QuantumCircuit(3) - qc.x(0) - qc.cx(0, 1) - qc.cx(0, 2) - op = SparsePauliOp("IZI") - backend.set_options(seed_simulator=15) - with self.assertWarns(DeprecationWarning): - estimator = BackendEstimator(backend) - estimator.set_transpile_options(seed_transpiler=15, optimization_level=1) - value = estimator.run(qc, op, shots=10000).result().values[0] - if optionals.HAS_AER: - ref_value = -0.9954 if isinstance(backend, GenericBackendV2) else -0.934 - else: - ref_value = -1 - self.assertEqual(value, ref_value) - - with self.subTest("final layout test"): - qc = QuantumCircuit(3) - qc.x(0) - qc.cx(0, 1) - qc.cx(0, 2) - op = SparsePauliOp("IZI") - with self.assertWarns(DeprecationWarning): - estimator = BackendEstimator(backend) - estimator.set_transpile_options( - initial_layout=[0, 1, 2], seed_transpiler=15, optimization_level=1 - ) - estimator.set_options(seed_simulator=15) - value = estimator.run(qc, op, shots=10000).result().values[0] - if optionals.HAS_AER: - ref_value = -0.9954 if isinstance(backend, GenericBackendV2) else -0.8902 - else: - ref_value = -1 - self.assertEqual(value, ref_value) - - @unittest.skipUnless(optionals.HAS_AER, "qiskit-aer is required to run this test") - def test_circuit_with_measurement(self): - """Test estimator with a dynamic circuit""" - from qiskit_aer import AerSimulator - - bell = QuantumCircuit(2) - bell.h(0) - bell.cx(0, 1) - bell.measure_all() - observable = SparsePauliOp("ZZ") - - backend = AerSimulator() - backend.set_options(seed_simulator=15) - with self.assertWarns(DeprecationWarning): - estimator = BackendEstimator(backend, skip_transpilation=True) - estimator.set_transpile_options(seed_transpiler=15) - result = estimator.run(bell, observable).result() - self.assertAlmostEqual(result.values[0], 1, places=1) - - @unittest.skipUnless(optionals.HAS_AER, "qiskit-aer is required to run this test") - def test_dynamic_circuit(self): - """Test estimator with a dynamic circuit""" - from qiskit_aer import AerSimulator - - qc = QuantumCircuit(2, 1) - with qc.for_loop(range(5)): - qc.h(0) - qc.cx(0, 1) - qc.measure(1, 0) - with self.assertWarns(DeprecationWarning): - qc.break_loop().c_if(0, True) - - observable = SparsePauliOp("IZ") - - backend = AerSimulator() - backend.set_options(seed_simulator=15) - with self.assertWarns(DeprecationWarning): - estimator = BackendEstimator(backend, skip_transpilation=True) - estimator.set_transpile_options(seed_transpiler=15) - result = estimator.run(qc, observable).result() - self.assertAlmostEqual(result.values[0], 0, places=1) - - -if __name__ == "__main__": - unittest.main() diff --git a/test/python/primitives/test_backend_sampler.py b/test/python/primitives/test_backend_sampler.py deleted file mode 100644 index 6acfa6443530..000000000000 --- a/test/python/primitives/test_backend_sampler.py +++ /dev/null @@ -1,496 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2022, 2024. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -"""Tests for BackendSampler.""" - -import math -import unittest -from multiprocessing import Manager -import numpy as np -from ddt import ddt - -from qiskit import QuantumCircuit -from qiskit.circuit.library import RealAmplitudes -from qiskit.primitives import BackendSampler, SamplerResult -from qiskit.providers import JobStatus -from qiskit.providers.backend_compat import BackendV2Converter -from qiskit.providers.basic_provider import BasicSimulator -from qiskit.providers.fake_provider import Fake7QPulseV1, GenericBackendV2 -from qiskit.transpiler import PassManager -from qiskit.utils import optionals -from test import QiskitTestCase # pylint: disable=wrong-import-order -from test import combine # pylint: disable=wrong-import-order -from test.python.transpiler._dummy_passes import DummyAP # pylint: disable=wrong-import-order - - -BACKENDS = [Fake7QPulseV1(), BackendV2Converter(Fake7QPulseV1())] - -BACKENDS_V1 = [Fake7QPulseV1()] -BACKENDS_V2 = [ - BackendV2Converter(Fake7QPulseV1()), -] -BACKENDS = BACKENDS_V1 + BACKENDS_V2 - - -class CallbackPass(DummyAP): - """A dummy analysis pass that calls a callback when executed""" - - def __init__(self, message, callback): - super().__init__() - self.message = message - self.callback = callback - - def run(self, dag): - self.callback(self.message) - - -@ddt -class TestBackendSampler(QiskitTestCase): - """Test BackendSampler""" - - def setUp(self): - super().setUp() - hadamard = QuantumCircuit(1, 1) - hadamard.h(0) - hadamard.measure(0, 0) - bell = QuantumCircuit(2) - bell.h(0) - bell.cx(0, 1) - bell.measure_all() - self._circuit = [hadamard, bell] - self._target = [ - {0: 0.5, 1: 0.5}, - {0: 0.5, 3: 0.5, 1: 0, 2: 0}, - ] - self._pqc = RealAmplitudes(num_qubits=2, reps=2) - self._pqc.measure_all() - self._pqc2 = RealAmplitudes(num_qubits=2, reps=3) - self._pqc2.measure_all() - self._pqc_params = [[0.0] * 6, [1.0] * 6] - self._pqc_target = [{0: 1}, {0: 0.0148, 1: 0.3449, 2: 0.0531, 3: 0.5872}] - self._theta = [ - [0, 1, 1, 2, 3, 5], - [1, 2, 3, 4, 5, 6], - [0, 1, 2, 3, 4, 5, 6, 7], - ] - - def _generate_circuits_target(self, indices): - if isinstance(indices, list): - circuits = [self._circuit[j] for j in indices] - target = [self._target[j] for j in indices] - else: - raise ValueError(f"invalid index {indices}") - return circuits, target - - def _generate_params_target(self, indices): - if isinstance(indices, int): - params = self._pqc_params[indices] - target = self._pqc_target[indices] - elif isinstance(indices, list): - params = [self._pqc_params[j] for j in indices] - target = [self._pqc_target[j] for j in indices] - else: - raise ValueError(f"invalid index {indices}") - return params, target - - def _compare_probs(self, prob, target): - if not isinstance(prob, list): - prob = [prob] - if not isinstance(target, list): - target = [target] - self.assertEqual(len(prob), len(target)) - for p, targ in zip(prob, target): - for key, t_val in targ.items(): - if key in p: - self.assertAlmostEqual(p[key], t_val, delta=0.1) - else: - self.assertAlmostEqual(t_val, 0, delta=0.1) - - @combine(backend=BACKENDS) - def test_sampler_run(self, backend): - """Test Sampler.run().""" - bell = self._circuit[1] - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=backend) - job = sampler.run(circuits=[bell], shots=1000) - result = job.result() - self.assertIsInstance(result, SamplerResult) - self.assertEqual(result.quasi_dists[0].shots, 1000) - self.assertEqual(result.quasi_dists[0].stddev_upper_bound, math.sqrt(1 / 1000)) - self._compare_probs(result.quasi_dists, self._target[1]) - - @combine(backend=BACKENDS) - def test_sample_run_multiple_circuits(self, backend): - """Test Sampler.run() with multiple circuits.""" - # executes three Bell circuits - # Argument `parameters` is optional. - bell = self._circuit[1] - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=backend) - result = sampler.run([bell, bell, bell]).result() - self._compare_probs(result.quasi_dists[0], self._target[1]) - self._compare_probs(result.quasi_dists[1], self._target[1]) - self._compare_probs(result.quasi_dists[2], self._target[1]) - - @combine(backend=BACKENDS) - def test_sampler_run_with_parameterized_circuits(self, backend): - """Test Sampler.run() with parameterized circuits.""" - # parameterized circuit - - pqc = self._pqc - pqc2 = self._pqc2 - theta1, theta2, theta3 = self._theta - - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=backend) - result = sampler.run([pqc, pqc, pqc2], [theta1, theta2, theta3]).result() - - # result of pqc(theta1) - prob1 = { - "00": 0.1309248462975777, - "01": 0.3608720796028448, - "10": 0.09324865232050054, - "11": 0.41495442177907715, - } - self.assertDictAlmostEqual(result.quasi_dists[0].binary_probabilities(), prob1, delta=0.1) - - # result of pqc(theta2) - prob2 = { - "00": 0.06282290651933871, - "01": 0.02877144385576705, - "10": 0.606654494132085, - "11": 0.3017511554928094, - } - self.assertDictAlmostEqual(result.quasi_dists[1].binary_probabilities(), prob2, delta=0.1) - - # result of pqc2(theta3) - prob3 = { - "00": 0.1880263994380416, - "01": 0.6881971261189544, - "10": 0.09326232720582443, - "11": 0.030514147237179892, - } - self.assertDictAlmostEqual(result.quasi_dists[2].binary_probabilities(), prob3, delta=0.1) - - @combine(backend=BACKENDS) - def test_run_1qubit(self, backend): - """test for 1-qubit cases""" - qc = QuantumCircuit(1) - qc.measure_all() - qc2 = QuantumCircuit(1) - qc2.x(0) - qc2.measure_all() - - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=backend) - result = sampler.run([qc, qc2]).result() - self.assertIsInstance(result, SamplerResult) - self.assertEqual(len(result.quasi_dists), 2) - - self.assertDictAlmostEqual(result.quasi_dists[0], {0: 1}, 0.1) - self.assertDictAlmostEqual(result.quasi_dists[1], {1: 1}, 0.1) - - @combine(backend=BACKENDS) - def test_run_2qubit(self, backend): - """test for 2-qubit cases""" - qc0 = QuantumCircuit(2) - qc0.measure_all() - qc1 = QuantumCircuit(2) - qc1.x(0) - qc1.measure_all() - qc2 = QuantumCircuit(2) - qc2.x(1) - qc2.measure_all() - qc3 = QuantumCircuit(2) - qc3.x([0, 1]) - qc3.measure_all() - - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=backend) - result = sampler.run([qc0, qc1, qc2, qc3]).result() - self.assertIsInstance(result, SamplerResult) - self.assertEqual(len(result.quasi_dists), 4) - - self.assertDictAlmostEqual(result.quasi_dists[0], {0: 1}, 0.1) - self.assertDictAlmostEqual(result.quasi_dists[1], {1: 1}, 0.1) - self.assertDictAlmostEqual(result.quasi_dists[2], {2: 1}, 0.1) - self.assertDictAlmostEqual(result.quasi_dists[3], {3: 1}, 0.1) - - @combine(backend=BACKENDS) - def test_run_errors(self, backend): - """Test for errors""" - qc1 = QuantumCircuit(1) - qc1.measure_all() - qc2 = RealAmplitudes(num_qubits=1, reps=1) - qc2.measure_all() - - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=backend) - with self.assertRaises(ValueError): - sampler.run([qc1], [[1e2]]).result() - with self.assertRaises(ValueError): - sampler.run([qc2], [[]]).result() - with self.assertRaises(ValueError): - sampler.run([qc2], [[1e2]]).result() - - @combine(backend=BACKENDS_V1) - def test_run_empty_parameter_v1(self, backend): - """Test for empty parameter""" - n = 5 - qc = QuantumCircuit(n, n - 1) - qc.measure(range(n - 1), range(n - 1)) - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=backend) - with self.subTest("one circuit"): - with self.assertWarnsRegex( - DeprecationWarning, - expected_regex="The `transpile` function will " - "stop supporting inputs of type `BackendV1`", - ): - result = sampler.run([qc], shots=1000).result() - self.assertEqual(len(result.quasi_dists), 1) - for q_d in result.quasi_dists: - quasi_dist = {k: v for k, v in q_d.items() if v != 0.0} - self.assertDictAlmostEqual(quasi_dist, {0: 1.0}, delta=0.1) - self.assertEqual(len(result.metadata), 1) - - with self.subTest("two circuits"): - result = sampler.run([qc, qc], shots=1000).result() - self.assertEqual(len(result.quasi_dists), 2) - for q_d in result.quasi_dists: - quasi_dist = {k: v for k, v in q_d.items() if v != 0.0} - self.assertDictAlmostEqual(quasi_dist, {0: 1.0}, delta=0.1) - self.assertEqual(len(result.metadata), 2) - - @combine(backend=BACKENDS_V2) - def test_run_empty_parameter_v2(self, backend): - """Test for empty parameter""" - n = 5 - qc = QuantumCircuit(n, n - 1) - qc.measure(range(n - 1), range(n - 1)) - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=backend) - with self.subTest("one circuit"): - result = sampler.run([qc], shots=1000).result() - self.assertEqual(len(result.quasi_dists), 1) - for q_d in result.quasi_dists: - quasi_dist = {k: v for k, v in q_d.items() if v != 0.0} - self.assertDictAlmostEqual(quasi_dist, {0: 1.0}, delta=0.1) - self.assertEqual(len(result.metadata), 1) - - with self.subTest("two circuits"): - result = sampler.run([qc, qc], shots=1000).result() - self.assertEqual(len(result.quasi_dists), 2) - for q_d in result.quasi_dists: - quasi_dist = {k: v for k, v in q_d.items() if v != 0.0} - self.assertDictAlmostEqual(quasi_dist, {0: 1.0}, delta=0.1) - self.assertEqual(len(result.metadata), 2) - - @combine(backend=BACKENDS) - def test_run_numpy_params(self, backend): - """Test for numpy array as parameter values""" - qc = RealAmplitudes(num_qubits=2, reps=2) - qc.measure_all() - k = 5 - rng = np.random.default_rng(12) - params_array = rng.random((k, qc.num_parameters)) - params_list = params_array.tolist() - params_list_array = list(params_array) - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=backend) - target = sampler.run([qc] * k, params_list).result() - - with self.subTest("ndarrary"): - result = sampler.run([qc] * k, params_array).result() - self.assertEqual(len(result.metadata), k) - for i in range(k): - self.assertDictAlmostEqual(result.quasi_dists[i], target.quasi_dists[i], delta=0.1) - - with self.subTest("list of ndarray"): - result = sampler.run([qc] * k, params_list_array).result() - self.assertEqual(len(result.metadata), k) - for i in range(k): - self.assertDictAlmostEqual(result.quasi_dists[i], target.quasi_dists[i], delta=0.1) - - @combine(backend=BACKENDS) - def test_run_with_shots_option(self, backend): - """test with shots option.""" - params, target = self._generate_params_target([1]) - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=backend) - result = sampler.run( - circuits=[self._pqc], parameter_values=params, shots=1024, seed=15 - ).result() - self._compare_probs(result.quasi_dists, target) - - @combine(backend=BACKENDS) - def test_primitive_job_status_done(self, backend): - """test primitive job's status""" - bell = self._circuit[1] - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=backend) - job = sampler.run(circuits=[bell]) - _ = job.result() - self.assertEqual(job.status(), JobStatus.DONE) - - def test_primitive_job_size_limit_backend_v2(self): - """Test primitive respects backend's job size limit.""" - - class FakeBackendLimitedCircuits(GenericBackendV2): - """Generic backend V2 with job size limit.""" - - @property - def max_circuits(self): - return 1 - - qc = QuantumCircuit(1) - qc.measure_all() - qc2 = QuantumCircuit(1) - qc2.x(0) - qc2.measure_all() - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=FakeBackendLimitedCircuits(num_qubits=5, seed=42)) - result = sampler.run([qc, qc2]).result() - self.assertIsInstance(result, SamplerResult) - self.assertEqual(len(result.quasi_dists), 2) - - self.assertDictAlmostEqual(result.quasi_dists[0], {0: 1}, 0.1) - self.assertDictAlmostEqual(result.quasi_dists[1], {1: 1}, 0.1) - - def test_primitive_job_size_limit_backend_v1(self): - """Test primitive respects backend's job size limit.""" - backend = GenericBackendV2(7, basis_gates=["cx", "u1", "u2", "u3"], seed=42) - qc = QuantumCircuit(1) - qc.measure_all() - qc2 = QuantumCircuit(1) - qc2.x(0) - qc2.measure_all() - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=backend) - result = sampler.run([qc, qc2]).result() - self.assertIsInstance(result, SamplerResult) - self.assertEqual(len(result.quasi_dists), 2) - - self.assertDictAlmostEqual(result.quasi_dists[0], {0: 1}, 0.1) - self.assertDictAlmostEqual(result.quasi_dists[1], {1: 1}, 0.1) - - @unittest.skipUnless(optionals.HAS_AER, "qiskit-aer is required to run this test") - def test_circuit_with_dynamic_circuit(self): - """Test BackendSampler with QuantumCircuit with a dynamic circuit""" - from qiskit_aer import Aer - - qc = QuantumCircuit(2, 1) - - with qc.for_loop(range(5)): - qc.h(0) - qc.cx(0, 1) - qc.measure(0, 0) - with self.assertWarns(DeprecationWarning): - qc.break_loop().c_if(0, True) - - with self.assertWarns(DeprecationWarning): - backend = Aer.get_backend("aer_simulator") - sampler = BackendSampler(backend, skip_transpilation=True) - sampler.set_options(seed_simulator=15) - sampler.set_transpile_options(seed_transpiler=15) - result = sampler.run(qc).result() - self.assertDictAlmostEqual(result.quasi_dists[0], {0: 0.5029296875, 1: 0.4970703125}) - - def test_sequential_run(self): - """Test sequential run.""" - backend = GenericBackendV2(7, basis_gates=["cx", "u1", "u2", "u3"], seed=42) - qc = QuantumCircuit(1) - qc.measure_all() - qc2 = QuantumCircuit(1) - qc2.x(0) - qc2.measure_all() - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=backend) - result = sampler.run([qc]).result() - self.assertDictAlmostEqual(result.quasi_dists[0], {0: 1}, 0.1) - result2 = sampler.run([qc2]).result() - self.assertDictAlmostEqual(result2.quasi_dists[0], {1: 1}, 0.1) - result3 = sampler.run([qc, qc2]).result() - self.assertDictAlmostEqual(result3.quasi_dists[0], {0: 1}, 0.1) - self.assertDictAlmostEqual(result3.quasi_dists[1], {1: 1}, 0.1) - - def test_outcome_bitstring_size(self): - """Test that the result bitstrings are properly padded. - - E.g. measuring '0001' should not get truncated to '1'. - """ - qc = QuantumCircuit(4) - qc.x(0) - qc.measure_all() - - # We need a noise-free backend here (shot noise is fine) to ensure that - # the only bit string measured is "0001". With device noise, it could happen that - # strings with a leading 1 are measured and then the truncation cannot be tested. - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=BasicSimulator()) - - result = sampler.run(qc).result() - probs = result.quasi_dists[0].binary_probabilities() - - self.assertIn("0001", probs.keys()) - self.assertEqual(len(probs), 1) - - def test_bound_pass_manager(self): - """Test bound pass manager.""" - - with self.subTest("Test single circuit"): - messages = [] - - def callback(msg): - messages.append(msg) - - bound_counter = CallbackPass("bound_pass_manager", callback) - bound_pass = PassManager(bound_counter) - backend = GenericBackendV2(7, basis_gates=["cx", "u1", "u2", "u3"], seed=42) - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=backend, bound_pass_manager=bound_pass) - _ = sampler.run([self._circuit[0]]).result() - expected = [ - "bound_pass_manager", - ] - self.assertEqual(messages, expected) - - with self.subTest("Test circuit batch"): - with Manager() as manager: - # The multiprocessing manager is used to share data - # between different processes. Pass Managers parallelize - # execution for batches of circuits, so this is necessary - # to keep track of the callback calls for num_circuits > 1 - messages = manager.list() - - def callback(msg): # pylint: disable=function-redefined - messages.append(msg) - - bound_counter = CallbackPass("bound_pass_manager", callback) - bound_pass = PassManager(bound_counter) - backend = GenericBackendV2( - 7, - basis_gates=["cx", "u1", "u2", "u3"], - seed=42, - ) - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=backend, bound_pass_manager=bound_pass) - _ = sampler.run([self._circuit[0], self._circuit[0]]).result() - expected = [ - "bound_pass_manager", - "bound_pass_manager", - ] - self.assertEqual(list(messages), expected) - - -if __name__ == "__main__": - unittest.main() diff --git a/test/python/primitives/test_estimator.py b/test/python/primitives/test_estimator.py deleted file mode 100644 index 2c251af65d1b..000000000000 --- a/test/python/primitives/test_estimator.py +++ /dev/null @@ -1,437 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2022, 2023. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -"""Tests for Estimator.""" - -import unittest -from test import QiskitTestCase - -import numpy as np -from ddt import data, ddt, unpack - -from qiskit.circuit import Parameter, QuantumCircuit -from qiskit.circuit.library import RealAmplitudes -from qiskit.exceptions import QiskitError -from qiskit.primitives import Estimator, EstimatorResult -from qiskit.primitives.base import validation -from qiskit.primitives.utils import _observable_key -from qiskit.quantum_info import Pauli, SparsePauliOp - - -class TestEstimator(QiskitTestCase): - """Test Estimator""" - - def setUp(self): - super().setUp() - self.ansatz = RealAmplitudes(num_qubits=2, reps=2) - self.observable = SparsePauliOp.from_list( - [ - ("II", -1.052373245772859), - ("IZ", 0.39793742484318045), - ("ZI", -0.39793742484318045), - ("ZZ", -0.01128010425623538), - ("XX", 0.18093119978423156), - ] - ) - self.expvals = -1.0284380963435145, -1.284366511861733 - - self.psi = (RealAmplitudes(num_qubits=2, reps=2), RealAmplitudes(num_qubits=2, reps=3)) - self.params = tuple(psi.parameters for psi in self.psi) - self.hamiltonian = ( - SparsePauliOp.from_list([("II", 1), ("IZ", 2), ("XI", 3)]), - SparsePauliOp.from_list([("IZ", 1)]), - SparsePauliOp.from_list([("ZI", 1), ("ZZ", 1)]), - ) - self.theta = ( - [0, 1, 1, 2, 3, 5], - [0, 1, 1, 2, 3, 5, 8, 13], - [1, 2, 3, 4, 5, 6], - ) - - def test_estimator_run(self): - """Test Estimator.run()""" - psi1, psi2 = self.psi - hamiltonian1, hamiltonian2, hamiltonian3 = self.hamiltonian - theta1, theta2, theta3 = self.theta - with self.assertWarns(DeprecationWarning): - estimator = Estimator() - - # Specify the circuit and observable by indices. - # calculate [ ] - job = estimator.run([psi1], [hamiltonian1], [theta1]) - result = job.result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1.5555572817900956]) - - # Objects can be passed instead of indices. - # Note that passing objects has an overhead - # since the corresponding indices need to be searched. - # User can append a circuit and observable. - # calculate [ ] - with self.assertWarns(DeprecationWarning): - result2 = estimator.run([psi2], [hamiltonian1], [theta2]).result() - np.testing.assert_allclose(result2.values, [2.97797666]) - - # calculate [ , ] - with self.assertWarns(DeprecationWarning): - result3 = estimator.run( - [psi1, psi1], [hamiltonian2, hamiltonian3], [theta1] * 2 - ).result() - np.testing.assert_allclose(result3.values, [-0.551653, 0.07535239]) - - # calculate [ , - # , - # ] - with self.assertWarns(DeprecationWarning): - result4 = estimator.run( - [psi1, psi2, psi1], - [hamiltonian1, hamiltonian2, hamiltonian3], - [theta1, theta2, theta3], - ).result() - np.testing.assert_allclose(result4.values, [1.55555728, 0.17849238, -1.08766318]) - - def test_estiamtor_run_no_params(self): - """test for estimator without parameters""" - circuit = self.ansatz.assign_parameters([0, 1, 1, 2, 3, 5]) - with self.assertWarns(DeprecationWarning): - est = Estimator() - result = est.run([circuit], [self.observable]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [-1.284366511861733]) - - def test_run_single_circuit_observable(self): - """Test for single circuit and single observable case.""" - with self.assertWarns(DeprecationWarning): - est = Estimator() - - with self.subTest("No parameter"): - qc = QuantumCircuit(1) - qc.x(0) - op = SparsePauliOp("Z") - param_vals = [None, [], [[]], np.array([]), np.array([[]]), [np.array([])]] - target = [-1] - for val in param_vals: - self.subTest(f"{val}") - with self.assertWarns(DeprecationWarning): - result = est.run(qc, op, val).result() - np.testing.assert_allclose(result.values, target) - self.assertEqual(len(result.metadata), 1) - - with self.subTest("One parameter"): - param = Parameter("x") - qc = QuantumCircuit(1) - qc.ry(param, 0) - op = SparsePauliOp("Z") - param_vals = [ - [np.pi], - [[np.pi]], - np.array([np.pi]), - np.array([[np.pi]]), - [np.array([np.pi])], - ] - target = [-1] - for val in param_vals: - self.subTest(f"{val}") - with self.assertWarns(DeprecationWarning): - result = est.run(qc, op, val).result() - np.testing.assert_allclose(result.values, target) - self.assertEqual(len(result.metadata), 1) - - with self.subTest("More than one parameter"): - qc = self.psi[0] - op = self.hamiltonian[0] - param_vals = [ - self.theta[0], - [self.theta[0]], - np.array(self.theta[0]), - np.array([self.theta[0]]), - [np.array(self.theta[0])], - ] - target = [1.5555572817900956] - for val in param_vals: - self.subTest(f"{val}") - with self.assertWarns(DeprecationWarning): - result = est.run(qc, op, val).result() - np.testing.assert_allclose(result.values, target) - self.assertEqual(len(result.metadata), 1) - - def test_run_1qubit(self): - """Test for 1-qubit cases""" - qc = QuantumCircuit(1) - qc2 = QuantumCircuit(1) - qc2.x(0) - - op = SparsePauliOp.from_list([("I", 1)]) - op2 = SparsePauliOp.from_list([("Z", 1)]) - - with self.assertWarns(DeprecationWarning): - est = Estimator() - result = est.run([qc], [op], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1]) - - with self.assertWarns(DeprecationWarning): - result = est.run([qc], [op2], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1]) - - with self.assertWarns(DeprecationWarning): - result = est.run([qc2], [op], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1]) - - with self.assertWarns(DeprecationWarning): - result = est.run([qc2], [op2], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [-1]) - - def test_run_2qubits(self): - """Test for 2-qubit cases (to check endian)""" - qc = QuantumCircuit(2) - qc2 = QuantumCircuit(2) - qc2.x(0) - - op = SparsePauliOp.from_list([("II", 1)]) - op2 = SparsePauliOp.from_list([("ZI", 1)]) - op3 = SparsePauliOp.from_list([("IZ", 1)]) - - with self.assertWarns(DeprecationWarning): - est = Estimator() - result = est.run([qc], [op], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1]) - - with self.assertWarns(DeprecationWarning): - result = est.run([qc2], [op], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1]) - - with self.assertWarns(DeprecationWarning): - result = est.run([qc], [op2], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1]) - - with self.assertWarns(DeprecationWarning): - result = est.run([qc2], [op2], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1]) - - with self.assertWarns(DeprecationWarning): - result = est.run([qc], [op3], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1]) - - with self.assertWarns(DeprecationWarning): - result = est.run([qc2], [op3], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [-1]) - - def test_run_errors(self): - """Test for errors""" - qc = QuantumCircuit(1) - qc2 = QuantumCircuit(2) - - op = SparsePauliOp.from_list([("I", 1)]) - op2 = SparsePauliOp.from_list([("II", 1)]) - - with self.assertWarns(DeprecationWarning): - est = Estimator() - with self.assertRaises(ValueError): - est.run([qc], [op2], [[]]).result() - with self.assertRaises(ValueError): - est.run([qc], [op], [[1e4]]).result() - with self.assertRaises(ValueError): - est.run([qc2], [op2], [[1, 2]]).result() - with self.assertRaises(ValueError): - est.run([qc, qc2], [op2], [[1]]).result() - with self.assertRaises(ValueError): - est.run([qc], [op, op2], [[1]]).result() - - def test_run_numpy_params(self): - """Test for numpy array as parameter values""" - qc = RealAmplitudes(num_qubits=2, reps=2) - op = SparsePauliOp.from_list([("IZ", 1), ("XI", 2), ("ZY", -1)]) - k = 5 - rng = np.random.default_rng(12) - params_array = rng.random((k, qc.num_parameters)) - params_list = params_array.tolist() - params_list_array = list(params_array) - with self.assertWarns(DeprecationWarning): - estimator = Estimator() - target = estimator.run([qc] * k, [op] * k, params_list).result() - - with self.subTest("ndarrary"): - with self.assertWarns(DeprecationWarning): - result = estimator.run([qc] * k, [op] * k, params_array).result() - self.assertEqual(len(result.metadata), k) - np.testing.assert_allclose(result.values, target.values) - - with self.subTest("list of ndarray"): - with self.assertWarns(DeprecationWarning): - result = estimator.run([qc] * k, [op] * k, params_list_array).result() - self.assertEqual(len(result.metadata), k) - np.testing.assert_allclose(result.values, target.values) - - def test_run_with_shots_option(self): - """test with shots option.""" - with self.assertWarns(DeprecationWarning): - est = Estimator() - result = est.run( - [self.ansatz], - [self.observable], - parameter_values=[[0, 1, 1, 2, 3, 5]], - shots=1024, - seed=15, - ).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [-1.307397243478641]) - - def test_run_with_shots_option_none(self): - """test with shots=None option. Seed is ignored then.""" - with self.assertWarns(DeprecationWarning): - est = Estimator() - result_42 = est.run( - [self.ansatz], - [self.observable], - parameter_values=[[0, 1, 1, 2, 3, 5]], - shots=None, - seed=42, - ).result() - result_15 = est.run( - [self.ansatz], - [self.observable], - parameter_values=[[0, 1, 1, 2, 3, 5]], - shots=None, - seed=15, - ).result() - np.testing.assert_allclose(result_42.values, result_15.values) - - def test_options(self): - """Test for options""" - with self.subTest("init"): - with self.assertWarns(DeprecationWarning): - estimator = Estimator(options={"shots": 3000}) - self.assertEqual(estimator.options.get("shots"), 3000) - with self.subTest("set_options"): - estimator.set_options(shots=1024, seed=15) - self.assertEqual(estimator.options.get("shots"), 1024) - self.assertEqual(estimator.options.get("seed"), 15) - with self.subTest("run"): - with self.assertWarns(DeprecationWarning): - result = estimator.run( - [self.ansatz], - [self.observable], - parameter_values=[[0, 1, 1, 2, 3, 5]], - ).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [-1.307397243478641]) - - def test_negative_variance(self): - """Test for negative variance caused by numerical error.""" - qc = QuantumCircuit(1) - - with self.assertWarns(DeprecationWarning): - estimator = Estimator() - result = estimator.run(qc, 1e-4 * SparsePauliOp("I"), shots=1024).result() - self.assertEqual(result.values[0], 1e-4) - self.assertEqual(result.metadata[0]["variance"], 0.0) - - def test_different_circuits(self): - """Test collision of quantum observables.""" - - def get_op(i): - op = SparsePauliOp.from_list([("IXIX", i)]) - return op - - keys = [_observable_key(get_op(i)) for i in range(5)] - self.assertEqual(len(keys), len(set(keys))) - - def test_reset(self): - """Test for circuits with reset.""" - qc = QuantumCircuit(2) - qc.h(0) - qc.cx(0, 1) - qc.reset(0) - op = SparsePauliOp("ZI") - - seed = 12 - n = 1000 - with self.subTest("shots=None"): - with self.assertWarns(DeprecationWarning): - estimator = Estimator(options={"seed": seed}) - result = estimator.run([qc for _ in range(n)], [op] * n).result() - # expectation values should be stochastic due to reset for subsystems - np.testing.assert_allclose(result.values.mean(), 0, atol=1e-1) - - with self.assertWarns(DeprecationWarning): - result2 = estimator.run([qc for _ in range(n)], [op] * n).result() - # expectation values should be reproducible due to seed - np.testing.assert_allclose(result.values, result2.values) - - with self.subTest("shots=10000"): - shots = 10000 - with self.assertWarns(DeprecationWarning): - estimator = Estimator(options={"seed": seed}) - result = estimator.run([qc for _ in range(n)], [op] * n, shots=shots).result() - # expectation values should be stochastic due to reset for subsystems - np.testing.assert_allclose(result.values.mean(), 0, atol=1e-1) - - with self.assertWarns(DeprecationWarning): - result2 = estimator.run([qc for _ in range(n)], [op] * n, shots=shots).result() - # expectation values should be reproducible due to seed - np.testing.assert_allclose(result.values, result2.values) - - -@ddt -class TestObservableValidation(QiskitTestCase): - """Test observables validation logic.""" - - @data( - ("IXYZ", (SparsePauliOp("IXYZ"),)), - (Pauli("IXYZ"), (SparsePauliOp("IXYZ"),)), - (SparsePauliOp("IXYZ"), (SparsePauliOp("IXYZ"),)), - ( - ["IXYZ", "ZYXI"], - (SparsePauliOp("IXYZ"), SparsePauliOp("ZYXI")), - ), - ( - [Pauli("IXYZ"), Pauli("ZYXI")], - (SparsePauliOp("IXYZ"), SparsePauliOp("ZYXI")), - ), - ( - [SparsePauliOp("IXYZ"), SparsePauliOp("ZYXI")], - (SparsePauliOp("IXYZ"), SparsePauliOp("ZYXI")), - ), - ) - @unpack - def test_validate_observables(self, observables, expected): - """Test observables standardization.""" - with self.assertWarns(DeprecationWarning): - self.assertEqual(validation._validate_observables(observables), expected) - - @data(None, "ERROR") - def test_qiskit_error(self, observables): - """Test qiskit error if invalid input.""" - with self.assertRaises(QiskitError): - with self.assertWarns(DeprecationWarning): - validation._validate_observables(observables) - - @data((), []) - def test_value_error(self, observables): - """Test value error if no observables are provided.""" - with self.assertRaises(ValueError): - with self.assertWarns(DeprecationWarning): - validation._validate_observables(observables) - - -if __name__ == "__main__": - unittest.main() diff --git a/test/python/primitives/test_sampler.py b/test/python/primitives/test_sampler.py deleted file mode 100644 index 58ab97689748..000000000000 --- a/test/python/primitives/test_sampler.py +++ /dev/null @@ -1,440 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2022, 2023. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -"""Tests for Sampler.""" - -import unittest -import numpy as np - -from qiskit import QuantumCircuit -from qiskit.circuit import Parameter -from qiskit.circuit.library import RealAmplitudes, UnitaryGate -from qiskit.primitives import Sampler, SamplerResult -from qiskit.providers import JobStatus -from test import QiskitTestCase # pylint: disable=wrong-import-order - - -class TestSampler(QiskitTestCase): - """Test Sampler""" - - def setUp(self): - super().setUp() - hadamard = QuantumCircuit(1, 1, name="Hadamard") - hadamard.h(0) - hadamard.measure(0, 0) - bell = QuantumCircuit(2, name="Bell") - bell.h(0) - bell.cx(0, 1) - bell.measure_all() - self._circuit = [hadamard, bell] - self._target = [ - {0: 0.5, 1: 0.5}, - {0: 0.5, 3: 0.5, 1: 0, 2: 0}, - ] - self._pqc = RealAmplitudes(num_qubits=2, reps=2) - self._pqc.measure_all() - self._pqc2 = RealAmplitudes(num_qubits=2, reps=3) - self._pqc2.measure_all() - self._pqc_params = [[0.0] * 6, [1.0] * 6] - self._pqc_target = [{0: 1}, {0: 0.0148, 1: 0.3449, 2: 0.0531, 3: 0.5872}] - self._theta = [ - [0, 1, 1, 2, 3, 5], - [1, 2, 3, 4, 5, 6], - [0, 1, 2, 3, 4, 5, 6, 7], - ] - - def _generate_circuits_target(self, indices): - if isinstance(indices, list): - circuits = [self._circuit[j] for j in indices] - target = [self._target[j] for j in indices] - else: - raise ValueError(f"invalid index {indices}") - return circuits, target - - def _generate_params_target(self, indices): - if isinstance(indices, int): - params = self._pqc_params[indices] - target = self._pqc_target[indices] - elif isinstance(indices, list): - params = [self._pqc_params[j] for j in indices] - target = [self._pqc_target[j] for j in indices] - else: - raise ValueError(f"invalid index {indices}") - return params, target - - def _compare_probs(self, prob, target): - if not isinstance(prob, list): - prob = [prob] - if not isinstance(target, list): - target = [target] - self.assertEqual(len(prob), len(target)) - for p, targ in zip(prob, target): - for key, t_val in targ.items(): - if key in p: - self.assertAlmostEqual(p[key], t_val, places=1) - else: - self.assertAlmostEqual(t_val, 0, places=1) - - def test_sampler_run(self): - """Test Sampler.run().""" - bell = self._circuit[1] - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - job = sampler.run(circuits=[bell]) - result = job.result() - self.assertIsInstance(result, SamplerResult) - self._compare_probs(result.quasi_dists, self._target[1]) - - def test_sample_run_multiple_circuits(self): - """Test Sampler.run() with multiple circuits.""" - # executes three Bell circuits - # Argument `parameters` is optional. - bell = self._circuit[1] - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - result = sampler.run([bell, bell, bell]).result() - self._compare_probs(result.quasi_dists[0], self._target[1]) - self._compare_probs(result.quasi_dists[1], self._target[1]) - self._compare_probs(result.quasi_dists[2], self._target[1]) - - def test_sampler_run_with_parameterized_circuits(self): - """Test Sampler.run() with parameterized circuits.""" - # parameterized circuit - - pqc = self._pqc - pqc2 = self._pqc2 - theta1, theta2, theta3 = self._theta - - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - result = sampler.run([pqc, pqc, pqc2], [theta1, theta2, theta3]).result() - - # result of pqc(theta1) - prob1 = { - "00": 0.1309248462975777, - "01": 0.3608720796028448, - "10": 0.09324865232050054, - "11": 0.41495442177907715, - } - self.assertDictAlmostEqual(result.quasi_dists[0].binary_probabilities(), prob1) - - # result of pqc(theta2) - prob2 = { - "00": 0.06282290651933871, - "01": 0.02877144385576705, - "10": 0.606654494132085, - "11": 0.3017511554928094, - } - self.assertDictAlmostEqual(result.quasi_dists[1].binary_probabilities(), prob2) - - # result of pqc2(theta3) - prob3 = { - "00": 0.1880263994380416, - "01": 0.6881971261189544, - "10": 0.09326232720582443, - "11": 0.030514147237179892, - } - self.assertDictAlmostEqual(result.quasi_dists[2].binary_probabilities(), prob3) - - def test_run_1qubit(self): - """test for 1-qubit cases""" - qc = QuantumCircuit(1) - qc.measure_all() - qc2 = QuantumCircuit(1) - qc2.x(0) - qc2.measure_all() - - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - result = sampler.run([qc, qc2]).result() - self.assertIsInstance(result, SamplerResult) - self.assertEqual(len(result.quasi_dists), 2) - - for i in range(2): - keys, values = zip(*sorted(result.quasi_dists[i].items())) - self.assertTupleEqual(keys, (i,)) - np.testing.assert_allclose(values, [1]) - - def test_run_2qubit(self): - """test for 2-qubit cases""" - qc0 = QuantumCircuit(2) - qc0.measure_all() - qc1 = QuantumCircuit(2) - qc1.x(0) - qc1.measure_all() - qc2 = QuantumCircuit(2) - qc2.x(1) - qc2.measure_all() - qc3 = QuantumCircuit(2) - qc3.x([0, 1]) - qc3.measure_all() - - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - result = sampler.run([qc0, qc1, qc2, qc3]).result() - self.assertIsInstance(result, SamplerResult) - self.assertEqual(len(result.quasi_dists), 4) - - for i in range(4): - keys, values = zip(*sorted(result.quasi_dists[i].items())) - self.assertTupleEqual(keys, (i,)) - np.testing.assert_allclose(values, [1]) - - def test_run_single_circuit(self): - """Test for single circuit case.""" - - with self.subTest("No parameter"): - circuit = self._circuit[1] - target = self._target[1] - param_vals = [None, [], [[]], np.array([]), np.array([[]])] - for val in param_vals: - with self.subTest(f"{circuit.name} w/ {val}"): - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - with self.assertWarns(DeprecationWarning): - result = sampler.run(circuit, val).result() - self._compare_probs(result.quasi_dists, target) - self.assertEqual(len(result.metadata), 1) - - with self.subTest("One parameter"): - circuit = QuantumCircuit(1, 1, name="X gate") - param = Parameter("x") - circuit.ry(param, 0) - circuit.measure(0, 0) - target = [{1: 1}] - param_vals = [ - [np.pi], - [[np.pi]], - np.array([np.pi]), - np.array([[np.pi]]), - [np.array([np.pi])], - ] - for val in param_vals: - with self.subTest(f"{circuit.name} w/ {val}"): - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - with self.assertWarns(DeprecationWarning): - result = sampler.run(circuit, val).result() - self._compare_probs(result.quasi_dists, target) - self.assertEqual(len(result.metadata), 1) - - with self.subTest("More than one parameter"): - circuit = self._pqc - target = [self._pqc_target[0]] - param_vals = [ - self._pqc_params[0], - [self._pqc_params[0]], - np.array(self._pqc_params[0]), - np.array([self._pqc_params[0]]), - [np.array(self._pqc_params[0])], - ] - for val in param_vals: - with self.subTest(f"{circuit.name} w/ {val}"): - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - with self.assertWarns(DeprecationWarning): - result = sampler.run(circuit, val).result() - self._compare_probs(result.quasi_dists, target) - self.assertEqual(len(result.metadata), 1) - - def test_run_reverse_meas_order(self): - """test for sampler with reverse measurement order""" - x = Parameter("x") - y = Parameter("y") - - qc = QuantumCircuit(3, 3) - qc.rx(x, 0) - qc.rx(y, 1) - qc.x(2) - qc.measure(0, 2) - qc.measure(1, 1) - qc.measure(2, 0) - - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - result = sampler.run([qc] * 2, [[0, 0], [np.pi / 2, 0]]).result() - self.assertIsInstance(result, SamplerResult) - self.assertEqual(len(result.quasi_dists), 2) - - # qc({x: 0, y: 0}) - keys, values = zip(*sorted(result.quasi_dists[0].items())) - self.assertTupleEqual(keys, (1,)) - np.testing.assert_allclose(values, [1]) - - # qc({x: pi/2, y: 0}) - keys, values = zip(*sorted(result.quasi_dists[1].items())) - self.assertTupleEqual(keys, (1, 5)) - np.testing.assert_allclose(values, [0.5, 0.5]) - - def test_run_errors(self): - """Test for errors with run method""" - qc1 = QuantumCircuit(1) - qc1.measure_all() - qc2 = RealAmplitudes(num_qubits=1, reps=1) - qc2.measure_all() - qc3 = QuantumCircuit(1) - qc4 = QuantumCircuit(1, 1) - qc5 = QuantumCircuit(1, 1) - with qc5.for_loop(range(5)): - qc5.h(0) - - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - with self.subTest("set parameter values to a non-parameterized circuit"): - with self.assertRaises(ValueError): - _ = sampler.run([qc1], [[1e2]]) - with self.subTest("missing all parameter values for a parameterized circuit"): - with self.assertRaises(ValueError): - _ = sampler.run([qc2], [[]]) - with self.subTest("missing some parameter values for a parameterized circuit"): - with self.assertRaises(ValueError): - _ = sampler.run([qc2], [[1e2]]) - with self.subTest("too many parameter values for a parameterized circuit"): - with self.assertRaises(ValueError): - _ = sampler.run([qc2], [[1e2]] * 100) - with self.subTest("no classical bits"): - with self.assertRaises(ValueError): - _ = sampler.run([qc3], [[]]) - with self.subTest("no measurement"): - with self.assertRaises(ValueError): - _ = sampler.run([qc4], [[]]) - with self.subTest("no measurement in control flow"): - with self.assertRaises(ValueError): - _ = sampler.run([qc5], [[]]) - - def test_run_empty_parameter(self): - """Test for empty parameter""" - n = 5 - qc = QuantumCircuit(n, n - 1) - qc.measure(range(n - 1), range(n - 1)) - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - with self.subTest("one circuit"): - with self.assertWarns(DeprecationWarning): - result = sampler.run([qc], shots=1000).result() - self.assertEqual(len(result.quasi_dists), 1) - for q_d in result.quasi_dists: - quasi_dist = {k: v for k, v in q_d.items() if v != 0.0} - self.assertDictEqual(quasi_dist, {0: 1.0}) - self.assertEqual(len(result.metadata), 1) - - with self.subTest("two circuits"): - result = sampler.run([qc, qc], shots=1000).result() - self.assertEqual(len(result.quasi_dists), 2) - for q_d in result.quasi_dists: - quasi_dist = {k: v for k, v in q_d.items() if v != 0.0} - self.assertDictEqual(quasi_dist, {0: 1.0}) - self.assertEqual(len(result.metadata), 2) - - def test_run_numpy_params(self): - """Test for numpy array as parameter values""" - qc = RealAmplitudes(num_qubits=2, reps=2) - qc.measure_all() - k = 5 - rng = np.random.default_rng(12) - params_array = rng.random((k, qc.num_parameters)) - params_list = params_array.tolist() - params_list_array = list(params_array) - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - target = sampler.run([qc] * k, params_list).result() - - with self.subTest("ndarrary"): - result = sampler.run([qc] * k, params_array).result() - self.assertEqual(len(result.metadata), k) - for i in range(k): - self.assertDictEqual(result.quasi_dists[i], target.quasi_dists[i]) - - with self.subTest("list of ndarray"): - result = sampler.run([qc] * k, params_list_array).result() - self.assertEqual(len(result.metadata), k) - for i in range(k): - self.assertDictEqual(result.quasi_dists[i], target.quasi_dists[i]) - - def test_run_with_shots_option(self): - """test with shots option.""" - params, target = self._generate_params_target([1]) - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - result = sampler.run( - circuits=[self._pqc], parameter_values=params, shots=1024, seed=15 - ).result() - self._compare_probs(result.quasi_dists, target) - - def test_run_with_shots_option_none(self): - """test with shots=None option. Seed is ignored then.""" - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - result_42 = sampler.run( - [self._pqc], parameter_values=[[0, 1, 1, 2, 3, 5]], shots=None, seed=42 - ).result() - result_15 = sampler.run( - [self._pqc], parameter_values=[[0, 1, 1, 2, 3, 5]], shots=None, seed=15 - ).result() - self.assertDictAlmostEqual(result_42.quasi_dists, result_15.quasi_dists) - - def test_run_shots_result_size(self): - """test with shots option to validate the result size""" - n = 10 - shots = 100 - qc = QuantumCircuit(n) - qc.h(range(n)) - qc.measure_all() - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - result = sampler.run(qc, [], shots=shots, seed=42).result() - self.assertEqual(len(result.quasi_dists), 1) - self.assertLessEqual(len(result.quasi_dists[0]), shots) - self.assertAlmostEqual(sum(result.quasi_dists[0].values()), 1.0) - - def test_primitive_job_status_done(self): - """test primitive job's status""" - bell = self._circuit[1] - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - job = sampler.run(circuits=[bell]) - _ = job.result() - self.assertEqual(job.status(), JobStatus.DONE) - - def test_options(self): - """Test for options""" - with self.subTest("init"): - with self.assertWarns(DeprecationWarning): - sampler = Sampler(options={"shots": 3000}) - self.assertEqual(sampler.options.get("shots"), 3000) - with self.subTest("set_options"): - sampler.set_options(shots=1024, seed=15) - self.assertEqual(sampler.options.get("shots"), 1024) - self.assertEqual(sampler.options.get("seed"), 15) - with self.subTest("run"): - params, target = self._generate_params_target([1]) - with self.assertWarns(DeprecationWarning): - result = sampler.run([self._pqc], parameter_values=params).result() - self._compare_probs(result.quasi_dists, target) - self.assertEqual(result.quasi_dists[0].shots, 1024) - - def test_circuit_with_unitary(self): - """Test for circuit with unitary gate.""" - gate = UnitaryGate(np.eye(2)) - - circuit = QuantumCircuit(1) - circuit.append(gate, [0]) - circuit.measure_all() - - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - sampler_result = sampler.run([circuit]).result() - self.assertDictAlmostEqual(sampler_result.quasi_dists[0], {0: 1, 1: 0}) - - -if __name__ == "__main__": - unittest.main()