diff --git a/qiskit/primitives/__init__.py b/qiskit/primitives/__init__.py index c4fb9855115f..597d84b832fe 100644 --- a/qiskit/primitives/__init__.py +++ b/qiskit/primitives/__init__.py @@ -437,8 +437,6 @@ BaseEstimator BaseEstimatorV1 - Estimator - BackendEstimator EstimatorResult @@ -450,14 +448,10 @@ BaseSampler BaseSamplerV1 - Sampler - BackendSampler SamplerResult """ -from .backend_estimator import BackendEstimator -from .backend_sampler import BackendSampler from .base import ( BaseEstimator, BaseEstimatorV1, @@ -480,9 +474,7 @@ ObservableLike, ObservablesArrayLike, ) -from .estimator import Estimator from .primitive_job import BasePrimitiveJob, PrimitiveJob -from .sampler import Sampler from .statevector_estimator import StatevectorEstimator from .statevector_sampler import StatevectorSampler from .backend_estimator_v2 import BackendEstimatorV2 diff --git a/qiskit/primitives/backend_estimator.py b/qiskit/primitives/backend_estimator.py deleted file mode 100644 index 5f9f8d20c75e..000000000000 --- a/qiskit/primitives/backend_estimator.py +++ /dev/null @@ -1,486 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2022. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -"""Estimator V1 implementation for an arbitrary Backend object.""" - -from __future__ import annotations - -from collections.abc import Sequence -from itertools import accumulate - -import numpy as np - -from qiskit.circuit import ClassicalRegister, QuantumCircuit, QuantumRegister -from qiskit.compiler import transpile -from qiskit.exceptions import QiskitError -from qiskit.providers import BackendV1, BackendV2, Options -from qiskit.quantum_info import Pauli, PauliList -from qiskit.quantum_info.operators.base_operator import BaseOperator -from qiskit.result import Counts, Result -from qiskit.transpiler import CouplingMap, PassManager -from qiskit.transpiler.passes import ( - ApplyLayout, - EnlargeWithAncilla, - FullAncillaAllocation, - Optimize1qGatesDecomposition, - SetLayout, -) -from qiskit.utils.deprecation import deprecate_func - -from .base import BaseEstimator, EstimatorResult -from .primitive_job import PrimitiveJob -from .utils import _circuit_key, _observable_key, init_observable - - -def _run_circuits( - circuits: QuantumCircuit | list[QuantumCircuit], - backend: BackendV1 | BackendV2, - clear_metadata: bool = True, - **run_options, -) -> tuple[list[Result], list[dict]]: - """Remove metadata of circuits and run the circuits on a backend. - Args: - circuits: The circuits - backend: The backend - clear_metadata: Clear circuit metadata before passing to backend.run if - True. - **run_options: run_options - Returns: - The result and the metadata of the circuits - """ - if isinstance(circuits, QuantumCircuit): - circuits = [circuits] - metadata = [] - for circ in circuits: - metadata.append(circ.metadata) - if clear_metadata: - circ.metadata = {} - if isinstance(backend, BackendV1): - max_circuits = getattr(backend.configuration(), "max_experiments", None) - elif isinstance(backend, BackendV2): - max_circuits = backend.max_circuits - else: - raise RuntimeError("Backend version not supported") - if max_circuits: - jobs = [ - backend.run(circuits[pos : pos + max_circuits], **run_options) - for pos in range(0, len(circuits), max_circuits) - ] - result = [x.result() for x in jobs] - else: - result = [backend.run(circuits, **run_options).result()] - return result, metadata - - -def _prepare_counts(results: list[Result]): - counts = [] - for res in results: - count = res.get_counts() - if not isinstance(count, list): - count = [count] - counts.extend(count) - return counts - - -class BackendEstimator(BaseEstimator[PrimitiveJob[EstimatorResult]]): - """Evaluates expectation value using Pauli rotation gates. - - The :class:`~.BackendEstimator` class is a generic implementation of the - :class:`~.BaseEstimator` (V1) interface that is used to wrap a :class:`~.BackendV2` - (or :class:`~.BackendV1`) object in the :class:`~.BaseEstimator` V1 API. It - facilitates using backends that do not provide a native - :class:`~.BaseEstimator` V1 implementation in places that work with - :class:`~.BaseEstimator` V1. - However, if you're using a provider that has a native implementation of - :class:`~.BaseEstimatorV1` ( :class:`~.BaseEstimator`) or - :class:`~.BaseEstimatorV2`, it is a better - choice to leverage that native implementation as it will likely include - additional optimizations and be a more efficient implementation. - The generic nature of this class precludes doing any provider- or - backend-specific optimizations. - """ - - @deprecate_func( - since="1.2", - additional_msg="All implementations of the `BaseEstimatorV1` interface " - "have been deprecated in favor of their V2 counterparts. " - "The V2 alternative for the `BackendEstimator` class is `BackendEstimatorV2`.", - ) - def __init__( - self, - backend: BackendV1 | BackendV2, - options: dict | None = None, - abelian_grouping: bool = True, - bound_pass_manager: PassManager | None = None, - skip_transpilation: bool = False, - ): - """Initialize a new BackendEstimator (V1) instance - - Args: - backend: (required) the backend to run the primitive on - options: Default options. - abelian_grouping: Whether the observable should be grouped into - commuting - bound_pass_manager: An optional pass manager to run after - parameter binding. - skip_transpilation: If this is set to True the internal compilation - of the input circuits is skipped and the circuit objects - will be directly executed when this object is called. - """ - super().__init__(options=options) - self._circuits = [] - self._parameters = [] - self._observables = [] - - self._abelian_grouping = abelian_grouping - - self._backend = backend - - self._transpile_options = Options() - self._bound_pass_manager = bound_pass_manager - - self._preprocessed_circuits: list[tuple[QuantumCircuit, list[QuantumCircuit]]] | None = None - self._transpiled_circuits: list[QuantumCircuit] | None = None - - self._grouping = list(zip(range(len(self._circuits)), range(len(self._observables)))) - self._skip_transpilation = skip_transpilation - - self._circuit_ids = {} - self._observable_ids = {} - - @property - def transpile_options(self) -> Options: - """Return the transpiler options for transpiling the circuits.""" - return self._transpile_options - - def set_transpile_options(self, **fields): - """Set the transpiler options for transpiler. - Args: - **fields: The fields to update the options - """ - self._transpiled_circuits = None - self._transpile_options.update_options(**fields) - - @property - def preprocessed_circuits( - self, - ) -> list[tuple[QuantumCircuit, list[QuantumCircuit]]]: - """ - Transpiled quantum circuits produced by preprocessing - Returns: - List of the transpiled quantum circuit - """ - self._preprocessed_circuits = self._preprocessing() - return self._preprocessed_circuits - - @property - def transpiled_circuits(self) -> list[QuantumCircuit]: - """ - Transpiled quantum circuits. - Returns: - List of the transpiled quantum circuit - Raises: - QiskitError: if the instance has been closed. - """ - self._transpile() - return self._transpiled_circuits - - @property - def backend(self) -> BackendV1 | BackendV2: - """ - Returns: - The backend which this estimator object based on - """ - return self._backend - - def _transpile(self): - """Split Transpile""" - self._transpiled_circuits = [] - for common_circuit, diff_circuits in self.preprocessed_circuits: - # 1. transpile a common circuit - if self._skip_transpilation: - transpiled_circuit = common_circuit.copy() - final_index_layout = list(range(common_circuit.num_qubits)) - else: - transpiled_circuit = transpile( # pylint:disable=unexpected-keyword-arg - common_circuit, self.backend, **self.transpile_options.__dict__ - ) - if transpiled_circuit.layout is not None: - final_index_layout = transpiled_circuit.layout.final_index_layout() - else: - final_index_layout = list(range(transpiled_circuit.num_qubits)) - - # 2. transpile diff circuits - passmanager = _passmanager_for_measurement_circuits(final_index_layout, self.backend) - diff_circuits = passmanager.run(diff_circuits) - # 3. combine - transpiled_circuits = [] - for diff_circuit in diff_circuits: - transpiled_circuit_copy = transpiled_circuit.copy() - # diff_circuit is supposed to have a classical register whose name is different from - # those of the transpiled_circuit - clbits = diff_circuit.cregs[0] - for creg in transpiled_circuit_copy.cregs: - if clbits.name == creg.name: - raise QiskitError( - "Classical register for measurements conflict with those of the input " - f"circuit: {clbits}. " - "Recommended to avoid register names starting with '__'." - ) - transpiled_circuit_copy.add_register(clbits) - transpiled_circuit_copy.compose(diff_circuit, clbits=clbits, inplace=True) - transpiled_circuit_copy.metadata = diff_circuit.metadata - transpiled_circuits.append(transpiled_circuit_copy) - self._transpiled_circuits += transpiled_circuits - - def _call( - self, - circuits: Sequence[int], - observables: Sequence[int], - parameter_values: Sequence[Sequence[float]], - **run_options, - ) -> EstimatorResult: - - # Transpile - self._grouping = list(zip(circuits, observables)) - transpiled_circuits = self.transpiled_circuits - num_observables = [len(m) for (_, m) in self.preprocessed_circuits] - accum = [0] + list(accumulate(num_observables)) - - # Bind parameters - parameter_dicts = [ - dict(zip(self._parameters[i], value)) for i, value in zip(circuits, parameter_values) - ] - bound_circuits = [ - ( - transpiled_circuits[circuit_index] - if len(p) == 0 - else transpiled_circuits[circuit_index].assign_parameters(p) - ) - for i, (p, n) in enumerate(zip(parameter_dicts, num_observables)) - for circuit_index in range(accum[i], accum[i] + n) - ] - bound_circuits = self._bound_pass_manager_run(bound_circuits) - - # Run - result, metadata = _run_circuits(bound_circuits, self._backend, **run_options) - - return self._postprocessing(result, accum, metadata) - - def _run( - self, - circuits: tuple[QuantumCircuit, ...], - observables: tuple[BaseOperator, ...], - parameter_values: tuple[tuple[float, ...], ...], - **run_options, - ): - circuit_indices = [] - for circuit in circuits: - index = self._circuit_ids.get(_circuit_key(circuit)) - if index is not None: - circuit_indices.append(index) - else: - circuit_indices.append(len(self._circuits)) - self._circuit_ids[_circuit_key(circuit)] = len(self._circuits) - self._circuits.append(circuit) - self._parameters.append(circuit.parameters) - observable_indices = [] - for observable in observables: - observable = init_observable(observable) - index = self._observable_ids.get(_observable_key(observable)) - if index is not None: - observable_indices.append(index) - else: - observable_indices.append(len(self._observables)) - self._observable_ids[_observable_key(observable)] = len(self._observables) - self._observables.append(observable) - job = PrimitiveJob( - self._call, circuit_indices, observable_indices, parameter_values, **run_options - ) - job._submit() - return job - - @staticmethod - def _measurement_circuit(num_qubits: int, pauli: Pauli): - # Note: if pauli is I for all qubits, this function generates a circuit to measure only - # the first qubit. - # Although such an operator can be optimized out by interpreting it as a constant (1), - # this optimization requires changes in various methods. So it is left as future work. - qubit_indices = np.arange(pauli.num_qubits)[pauli.z | pauli.x] - if not np.any(qubit_indices): - qubit_indices = [0] - meas_circuit = QuantumCircuit( - QuantumRegister(num_qubits, "q"), ClassicalRegister(len(qubit_indices), f"__c_{pauli}") - ) - for clbit, i in enumerate(qubit_indices): - if pauli.x[i]: - if pauli.z[i]: - meas_circuit.sdg(i) - meas_circuit.h(i) - meas_circuit.measure(i, clbit) - return meas_circuit, qubit_indices - - def _preprocessing(self) -> list[tuple[QuantumCircuit, list[QuantumCircuit]]]: - """ - Preprocessing for evaluation of expectation value using pauli rotation gates. - """ - preprocessed_circuits = [] - for group in self._grouping: - circuit = self._circuits[group[0]] - observable = self._observables[group[1]] - diff_circuits: list[QuantumCircuit] = [] - if self._abelian_grouping: - for obs in observable.group_commuting(qubit_wise=True): - basis = Pauli( - (np.logical_or.reduce(obs.paulis.z), np.logical_or.reduce(obs.paulis.x)) - ) - meas_circuit, indices = self._measurement_circuit(circuit.num_qubits, basis) - paulis = PauliList.from_symplectic( - obs.paulis.z[:, indices], - obs.paulis.x[:, indices], - obs.paulis.phase, - ) - meas_circuit.metadata = { - "paulis": paulis, - "coeffs": np.real_if_close(obs.coeffs), - } - diff_circuits.append(meas_circuit) - else: - for basis, obs in zip(observable.paulis, observable): - meas_circuit, indices = self._measurement_circuit(circuit.num_qubits, basis) - paulis = PauliList.from_symplectic( - obs.paulis.z[:, indices], - obs.paulis.x[:, indices], - obs.paulis.phase, - ) - meas_circuit.metadata = { - "paulis": paulis, - "coeffs": np.real_if_close(obs.coeffs), - } - diff_circuits.append(meas_circuit) - - preprocessed_circuits.append((circuit.copy(), diff_circuits)) - return preprocessed_circuits - - def _postprocessing( - self, result: list[Result], accum: list[int], metadata: list[dict] - ) -> EstimatorResult: - """ - Postprocessing for evaluation of expectation value using pauli rotation gates. - """ - counts = _prepare_counts(result) - expval_list = [] - var_list = [] - shots_list = [] - - for i, j in zip(accum, accum[1:]): - - combined_expval = 0.0 - combined_var = 0.0 - - for k in range(i, j): - meta = metadata[k] - paulis = meta["paulis"] - coeffs = meta["coeffs"] - - count = counts[k] - - expvals, variances = _pauli_expval_with_variance(count, paulis) - - # Accumulate - combined_expval += np.dot(expvals, coeffs) - combined_var += np.dot(variances, coeffs**2) - - expval_list.append(combined_expval) - var_list.append(combined_var) - shots_list.append(sum(counts[i].values())) - - metadata = [{"variance": var, "shots": shots} for var, shots in zip(var_list, shots_list)] - - return EstimatorResult(np.real_if_close(expval_list), metadata) - - def _bound_pass_manager_run(self, circuits): - if self._bound_pass_manager is None: - return circuits - else: - output = self._bound_pass_manager.run(circuits) - if not isinstance(output, list): - output = [output] - return output - - -def _paulis2inds(paulis: PauliList) -> list[int]: - """Convert PauliList to diagonal integers. - These are integer representations of the binary string with a - 1 where there are Paulis, and 0 where there are identities. - """ - # Treat Z, X, Y the same - nonid = paulis.z | paulis.x - - # bits are packed into uint8 in little endian - # e.g., i-th bit corresponds to coefficient 2^i - packed_vals = np.packbits(nonid, axis=1, bitorder="little") - power_uint8 = 1 << (8 * np.arange(packed_vals.shape[1], dtype=object)) - inds = packed_vals @ power_uint8 - return inds.tolist() - - -def _parity(integer: int) -> int: - """Return the parity of an integer""" - return bin(integer).count("1") % 2 - - -def _pauli_expval_with_variance(counts: Counts, paulis: PauliList) -> tuple[np.ndarray, np.ndarray]: - """Return array of expval and variance pairs for input Paulis. - Note: All non-identity Pauli's are treated as Z-paulis, assuming - that basis rotations have been applied to convert them to the - diagonal basis. - """ - # Diag indices - size = len(paulis) - diag_inds = _paulis2inds(paulis) - - expvals = np.zeros(size, dtype=float) - denom = 0 # Total shots for counts dict - for bin_outcome, freq in counts.items(): - split_outcome = bin_outcome.split(" ", 1)[0] if " " in bin_outcome else bin_outcome - outcome = int(split_outcome, 2) - denom += freq - for k in range(size): - coeff = (-1) ** _parity(diag_inds[k] & outcome) - expvals[k] += freq * coeff - - # Divide by total shots - expvals /= denom - - # Compute variance - variances = 1 - expvals**2 - return expvals, variances - - -def _passmanager_for_measurement_circuits(layout, backend) -> PassManager: - passmanager = PassManager([SetLayout(layout)]) - if isinstance(backend, BackendV2): - opt1q = Optimize1qGatesDecomposition(target=backend.target) - else: - opt1q = Optimize1qGatesDecomposition(basis=backend.configuration().basis_gates) - passmanager.append(opt1q) - if isinstance(backend, BackendV2) and isinstance(backend.coupling_map, CouplingMap): - coupling_map = backend.coupling_map - passmanager.append(FullAncillaAllocation(coupling_map)) - passmanager.append(EnlargeWithAncilla()) - elif isinstance(backend, BackendV1) and backend.configuration().coupling_map is not None: - coupling_map = CouplingMap(backend.configuration().coupling_map) - passmanager.append(FullAncillaAllocation(coupling_map)) - passmanager.append(EnlargeWithAncilla()) - passmanager.append(ApplyLayout()) - return passmanager diff --git a/qiskit/primitives/backend_sampler.py b/qiskit/primitives/backend_sampler.py deleted file mode 100644 index 905fa2371542..000000000000 --- a/qiskit/primitives/backend_sampler.py +++ /dev/null @@ -1,222 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2022. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -"""Sampler V1 implementation for an arbitrary Backend object.""" - -from __future__ import annotations - -import math -from collections.abc import Sequence -from typing import Any - -from qiskit.circuit.quantumcircuit import QuantumCircuit -from qiskit.providers.backend import BackendV1, BackendV2 -from qiskit.providers.options import Options -from qiskit.result import QuasiDistribution, Result -from qiskit.transpiler.passmanager import PassManager -from qiskit.utils.deprecation import deprecate_func - -from .backend_estimator import _prepare_counts, _run_circuits -from .base import BaseSampler, SamplerResult -from .primitive_job import PrimitiveJob -from .utils import _circuit_key - - -class BackendSampler(BaseSampler[PrimitiveJob[SamplerResult]]): - """A :class:`~.BaseSampler` (V1) implementation that provides a wrapper for - leveraging the Sampler V1 interface from any backend. - - This class provides a sampler interface from any backend and doesn't do - any measurement mitigation, it just computes the probability distribution - from the counts. It facilitates using backends that do not provide a - native :class:`~.BaseSampler` V1 implementation in places that work with - :class:`~.BaseSampler` V1. - However, if you're using a provider that has a native implementation of - :class:`~.BaseSamplerV1` ( :class:`~.BaseSampler`) or - :class:`~.BaseESamplerV2`, it is a better - choice to leverage that native implementation as it will likely include - additional optimizations and be a more efficient implementation. - The generic nature of this class precludes doing any provider- or - backend-specific optimizations. - """ - - @deprecate_func( - since="1.2", - additional_msg="All implementations of the `BaseSamplerV1` interface " - "have been deprecated in favor of their V2 counterparts. " - "The V2 alternative for the `BackendSampler` class is `BackendSamplerV2`.", - ) - def __init__( - self, - backend: BackendV1 | BackendV2, - options: dict | None = None, - bound_pass_manager: PassManager | None = None, - skip_transpilation: bool = False, - ): - """Initialize a new BackendSampler (V1) instance - - Args: - backend: (required) the backend to run the sampler primitive on - options: Default options. - bound_pass_manager: An optional pass manager to run after - parameter binding. - skip_transpilation: If this is set to True the internal compilation - of the input circuits is skipped and the circuit objects - will be directly executed when this objected is called. - Raises: - ValueError: If backend is not provided - """ - - super().__init__(options=options) - self._circuits = [] - self._parameters = [] - self._backend = backend - self._transpile_options = Options() - self._bound_pass_manager = bound_pass_manager - self._preprocessed_circuits: list[QuantumCircuit] | None = None - self._transpiled_circuits: list[QuantumCircuit] = [] - self._skip_transpilation = skip_transpilation - self._circuit_ids = {} - - @property - def preprocessed_circuits(self) -> list[QuantumCircuit]: - """ - Preprocessed quantum circuits produced by preprocessing - Returns: - List of the transpiled quantum circuit - Raises: - QiskitError: if the instance has been closed. - """ - return list(self._circuits) - - @property - def transpiled_circuits(self) -> list[QuantumCircuit]: - """ - Transpiled quantum circuits. - Returns: - List of the transpiled quantum circuit - Raises: - QiskitError: if the instance has been closed. - """ - if self._skip_transpilation: - self._transpiled_circuits = list(self._circuits) - elif len(self._transpiled_circuits) < len(self._circuits): - # transpile only circuits that are not transpiled yet - self._transpile() - return self._transpiled_circuits - - @property - def backend(self) -> BackendV1 | BackendV2: - """ - Returns: - The backend which this sampler object based on - """ - return self._backend - - @property - def transpile_options(self) -> Options: - """Return the transpiler options for transpiling the circuits.""" - return self._transpile_options - - def set_transpile_options(self, **fields): - """Set the transpiler options for transpiler. - Args: - **fields: The fields to update the options. - Returns: - self. - Raises: - QiskitError: if the instance has been closed. - """ - self._transpile_options.update_options(**fields) - - def _call( - self, - circuits: Sequence[int], - parameter_values: Sequence[Sequence[float]], - **run_options, - ) -> SamplerResult: - - # This line does the actual transpilation - transpiled_circuits = self.transpiled_circuits - bound_circuits = [ - ( - transpiled_circuits[i] - if len(value) == 0 - else transpiled_circuits[i].assign_parameters( - (dict(zip(self._parameters[i], value))) - ) - ) - for i, value in zip(circuits, parameter_values) - ] - bound_circuits = self._bound_pass_manager_run(bound_circuits) - # Run - result, _metadata = _run_circuits(bound_circuits, self._backend, **run_options) - return self._postprocessing(result, bound_circuits) - - def _postprocessing( - self, result: list[Result], circuits: list[QuantumCircuit] - ) -> SamplerResult: - counts = _prepare_counts(result) - shots = sum(counts[0].values()) - - probabilities = [] - metadata: list[dict[str, Any]] = [{} for _ in range(len(circuits))] - for count in counts: - prob_dist = {k: v / shots for k, v in count.items()} - probabilities.append( - QuasiDistribution(prob_dist, shots=shots, stddev_upper_bound=math.sqrt(1 / shots)) - ) - for metadatum in metadata: - metadatum["shots"] = shots - - return SamplerResult(probabilities, metadata) - - def _transpile(self): - from qiskit.compiler import transpile - - start = len(self._transpiled_circuits) - self._transpiled_circuits.extend( - transpile( # pylint:disable=unexpected-keyword-arg - self.preprocessed_circuits[start:], - self.backend, - **self.transpile_options.__dict__, - ), - ) - - def _bound_pass_manager_run(self, circuits): - if self._bound_pass_manager is None: - return circuits - else: - output = self._bound_pass_manager.run(circuits) - if not isinstance(output, list): - output = [output] - return output - - def _run( - self, - circuits: tuple[QuantumCircuit, ...], - parameter_values: tuple[tuple[float, ...], ...], - **run_options, - ): - circuit_indices = [] - for circuit in circuits: - index = self._circuit_ids.get(_circuit_key(circuit)) - if index is not None: - circuit_indices.append(index) - else: - circuit_indices.append(len(self._circuits)) - self._circuit_ids[_circuit_key(circuit)] = len(self._circuits) - self._circuits.append(circuit) - self._parameters.append(circuit.parameters) - job = PrimitiveJob(self._call, circuit_indices, parameter_values, **run_options) - job._submit() - return job diff --git a/qiskit/primitives/estimator.py b/qiskit/primitives/estimator.py deleted file mode 100644 index d8e0a00f45e1..000000000000 --- a/qiskit/primitives/estimator.py +++ /dev/null @@ -1,172 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2022. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. -""" -Estimator V1 reference implementation -""" - -from __future__ import annotations - -from collections.abc import Sequence -from typing import Any - -import numpy as np - -from qiskit.circuit import QuantumCircuit -from qiskit.exceptions import QiskitError -from qiskit.quantum_info.operators.base_operator import BaseOperator -from qiskit.utils.deprecation import deprecate_func - -from .base import BaseEstimator, EstimatorResult -from .primitive_job import PrimitiveJob -from .utils import ( - _circuit_key, - _observable_key, - _statevector_from_circuit, - init_observable, -) - - -class Estimator(BaseEstimator[PrimitiveJob[EstimatorResult]]): - """ - Reference implementation of :class:`BaseEstimator` (V1). - - :Run Options: - - - **shots** (None or int) -- - The number of shots. If None, it calculates the expectation values - with full state vector simulation. - Otherwise, it samples from normal distributions with standard errors as standard - deviations using normal distribution approximation. - - - **seed** (np.random.Generator or int) -- - Set a fixed seed or generator for the normal distribution. If shots is None, - this option is ignored. - - .. note:: - The result of this class is exact if the circuit contains only unitary operations. - On the other hand, the result could be stochastic if the circuit contains a non-unitary - operation such as a reset for a some subsystems. - The stochastic result can be made reproducible by setting ``seed``, e.g., - ``Estimator(options={"seed":123})``. - """ - - @deprecate_func( - since="1.2", - additional_msg="All implementations of the `BaseEstimatorV1` interface " - "have been deprecated in favor of their V2 counterparts. " - "The V2 alternative for the `Estimator` class is `StatevectorEstimator`.", - ) - def __init__(self, *, options: dict | None = None): - """ - Args: - options: Default options. - - Raises: - QiskitError: if some classical bits are not used for measurements. - """ - super().__init__(options=options) - self._circuits = [] - self._parameters = [] - self._observables = [] - self._circuit_ids = {} - self._observable_ids = {} - - def _call( - self, - circuits: Sequence[int], - observables: Sequence[int], - parameter_values: Sequence[Sequence[float]], - **run_options, - ) -> EstimatorResult: - shots = run_options.pop("shots", None) - seed = run_options.pop("seed", None) - if seed is None: - rng = np.random.default_rng() - elif isinstance(seed, np.random.Generator): - rng = seed - else: - rng = np.random.default_rng(seed) - - # Initialize metadata - metadata: list[dict[str, Any]] = [{} for _ in range(len(circuits))] - - bound_circuits = [] - for i, value in zip(circuits, parameter_values): - if len(value) != len(self._parameters[i]): - raise QiskitError( - f"The number of values ({len(value)}) does not match " - f"the number of parameters ({len(self._parameters[i])})." - ) - bound_circuits.append( - self._circuits[i] - if len(value) == 0 - else self._circuits[i].assign_parameters(dict(zip(self._parameters[i], value))) - ) - sorted_observables = [self._observables[i] for i in observables] - expectation_values = [] - for circ, obs, metadatum in zip(bound_circuits, sorted_observables, metadata): - if circ.num_qubits != obs.num_qubits: - raise QiskitError( - f"The number of qubits of a circuit ({circ.num_qubits}) does not match " - f"the number of qubits of a observable ({obs.num_qubits})." - ) - final_state = _statevector_from_circuit(circ, rng) - expectation_value = final_state.expectation_value(obs) - if shots is None: - expectation_values.append(expectation_value) - else: - expectation_value = np.real_if_close(expectation_value) - sq_obs = (obs @ obs).simplify(atol=0) - sq_exp_val = np.real_if_close(final_state.expectation_value(sq_obs)) - variance = sq_exp_val - expectation_value**2 - variance = max(variance, 0) - standard_error = np.sqrt(variance / shots) - expectation_value_with_error = rng.normal(expectation_value, standard_error) - expectation_values.append(expectation_value_with_error) - metadatum["variance"] = variance - metadatum["shots"] = shots - - return EstimatorResult(np.real_if_close(expectation_values), metadata) - - def _run( - self, - circuits: tuple[QuantumCircuit, ...], - observables: tuple[BaseOperator, ...], - parameter_values: tuple[tuple[float, ...], ...], - **run_options, - ): - circuit_indices = [] - for circuit in circuits: - key = _circuit_key(circuit) - index = self._circuit_ids.get(key) - if index is not None: - circuit_indices.append(index) - else: - circuit_indices.append(len(self._circuits)) - self._circuit_ids[key] = len(self._circuits) - self._circuits.append(circuit) - self._parameters.append(circuit.parameters) - observable_indices = [] - for observable in observables: - observable = init_observable(observable) - index = self._observable_ids.get(_observable_key(observable)) - if index is not None: - observable_indices.append(index) - else: - observable_indices.append(len(self._observables)) - self._observable_ids[_observable_key(observable)] = len(self._observables) - self._observables.append(observable) - job = PrimitiveJob( - self._call, circuit_indices, observable_indices, parameter_values, **run_options - ) - job._submit() - return job diff --git a/qiskit/primitives/sampler.py b/qiskit/primitives/sampler.py deleted file mode 100644 index 9155e50ad2ce..000000000000 --- a/qiskit/primitives/sampler.py +++ /dev/null @@ -1,162 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2022. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. -""" -Sampler V1 reference implementation -""" - -from __future__ import annotations - -from collections.abc import Sequence -from typing import Any - -import numpy as np - -from qiskit.circuit import QuantumCircuit -from qiskit.exceptions import QiskitError -from qiskit.quantum_info import Statevector -from qiskit.result import QuasiDistribution -from qiskit.utils.deprecation import deprecate_func - -from .base import BaseSampler, SamplerResult -from .primitive_job import PrimitiveJob -from .utils import ( - _circuit_key, - bound_circuit_to_instruction, - final_measurement_mapping, - init_circuit, -) - - -class Sampler(BaseSampler[PrimitiveJob[SamplerResult]]): - """ - Sampler V1 class. - - :class:`~Sampler` is a reference implementation of :class:`~BaseSampler` (V1). - - :Run Options: - - - **shots** (None or int) -- - The number of shots. If None, it calculates the probabilities. - Otherwise, it samples from multinomial distributions. - - - **seed** (np.random.Generator or int) -- - Set a fixed seed or generator for the multinomial distribution. If shots is None, this - option is ignored. - """ - - @deprecate_func( - since="1.2", - additional_msg="All implementations of the `BaseSamplerV1` interface " - "have been deprecated in favor of their V2 counterparts. " - "The V2 alternative for the `Sampler` class is `StatevectorSampler`.", - ) - def __init__(self, *, options: dict | None = None): - """ - Args: - options: Default options. - - Raises: - QiskitError: if some classical bits are not used for measurements. - """ - super().__init__(options=options) - self._circuits = [] - self._parameters = [] - self._qargs_list = [] - self._circuit_ids = {} - - def _call( - self, - circuits: Sequence[int], - parameter_values: Sequence[Sequence[float]], - **run_options, - ) -> SamplerResult: - shots = run_options.pop("shots", None) - seed = run_options.pop("seed", None) - if seed is None: - rng = np.random.default_rng() - elif isinstance(seed, np.random.Generator): - rng = seed - else: - rng = np.random.default_rng(seed) - - # Initialize metadata - metadata: list[dict[str, Any]] = [{} for _ in range(len(circuits))] - - bound_circuits = [] - qargs_list = [] - for i, value in zip(circuits, parameter_values): - if len(value) != len(self._parameters[i]): - raise QiskitError( - f"The number of values ({len(value)}) does not match " - f"the number of parameters ({len(self._parameters[i])})." - ) - bound_circuits.append( - self._circuits[i] - if len(value) == 0 - else self._circuits[i].assign_parameters(dict(zip(self._parameters[i], value))) - ) - qargs_list.append(self._qargs_list[i]) - probabilities = [ - Statevector(bound_circuit_to_instruction(circ)).probabilities_dict( - qargs=qargs, decimals=16 - ) - for circ, qargs in zip(bound_circuits, qargs_list) - ] - if shots is not None: - for i, prob_dict in enumerate(probabilities): - counts = rng.multinomial(shots, np.fromiter(prob_dict.values(), dtype=float)) - probabilities[i] = { - key: count / shots for key, count in zip(prob_dict.keys(), counts) if count > 0 - } - for metadatum in metadata: - metadatum["shots"] = shots - quasis = [QuasiDistribution(p, shots=shots) for p in probabilities] - - return SamplerResult(quasis, metadata) - - def _run( - self, - circuits: tuple[QuantumCircuit, ...], - parameter_values: tuple[tuple[float, ...], ...], - **run_options, - ): - circuit_indices = [] - for circuit in circuits: - key = _circuit_key(circuit) - index = self._circuit_ids.get(key) - if index is not None: - circuit_indices.append(index) - else: - circuit_indices.append(len(self._circuits)) - self._circuit_ids[key] = len(self._circuits) - circuit, qargs = self._preprocess_circuit(circuit) - self._circuits.append(circuit) - self._qargs_list.append(qargs) - self._parameters.append(circuit.parameters) - job = PrimitiveJob(self._call, circuit_indices, parameter_values, **run_options) - job._submit() - return job - - @staticmethod - def _preprocess_circuit(circuit: QuantumCircuit): - circuit = init_circuit(circuit) - q_c_mapping = final_measurement_mapping(circuit) - if set(range(circuit.num_clbits)) != set(q_c_mapping.values()): - raise QiskitError( - "Some classical bits are not used for measurements." - f" the number of classical bits ({circuit.num_clbits})," - f" the used classical bits ({set(q_c_mapping.values())})." - ) - c_q_mapping = sorted((c, q) for q, c in q_c_mapping.items()) - qargs = [q for _, q in c_q_mapping] - circuit = circuit.remove_final_measurements(inplace=False) - return circuit, qargs diff --git a/releasenotes/notes/remove-deprecated-primitives-c200e72332c1bc16.yaml b/releasenotes/notes/remove-deprecated-primitives-c200e72332c1bc16.yaml new file mode 100644 index 000000000000..77bfff966bc7 --- /dev/null +++ b/releasenotes/notes/remove-deprecated-primitives-c200e72332c1bc16.yaml @@ -0,0 +1,16 @@ +--- +upgrade_primitives: + - | + The deprecated primitive V1 implementations and V1-exclusive non-versioned + type aliases are now removed in favor of their V2 counterparts. The + following classes have been removed: + + * ``Estimator``, replaced with :class:`.StatevectorEstimator` + * ``Sampler``, replaced with :class:`.StatevectorSampler` + * ``BackendEstimator``, replaced with :class:`.BackendEstimatorV2` + * ``BackendSampler``, replaced with :class:`.BackendSamplerV2` + + The follwoing type aliases were removed as well: + + * ``BaseEstimator`` + * ``BaseSampler`` diff --git a/test/python/primitives/test_backend_estimator.py b/test/python/primitives/test_backend_estimator.py deleted file mode 100644 index 49d5efa2d782..000000000000 --- a/test/python/primitives/test_backend_estimator.py +++ /dev/null @@ -1,511 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2022, 2024. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -"""Tests for BackendEstimator.""" - -import unittest -from unittest.mock import patch -from multiprocessing import Manager -import numpy as np -from ddt import ddt - -from qiskit.circuit import QuantumCircuit -from qiskit.circuit.library import RealAmplitudes -from qiskit.primitives import BackendEstimator, EstimatorResult -from qiskit.providers.fake_provider import Fake7QPulseV1, GenericBackendV2 -from qiskit.providers.backend_compat import BackendV2Converter -from qiskit.quantum_info import SparsePauliOp -from qiskit.transpiler import PassManager -from qiskit.utils import optionals -from test import QiskitTestCase # pylint: disable=wrong-import-order -from test import combine # pylint: disable=wrong-import-order -from test.python.transpiler._dummy_passes import DummyAP # pylint: disable=wrong-import-order - - -BACKENDS = [ - Fake7QPulseV1(), - BackendV2Converter(Fake7QPulseV1()), - GenericBackendV2(num_qubits=5, seed=42), -] - - -class CallbackPass(DummyAP): - """A dummy analysis pass that calls a callback when executed""" - - def __init__(self, message, callback): - super().__init__() - self.message = message - self.callback = callback - - def run(self, dag): - self.callback(self.message) - - -@ddt -class TestBackendEstimator(QiskitTestCase): - """Test Estimator""" - - def setUp(self): - super().setUp() - self._rng = np.random.default_rng(12) - self.ansatz = RealAmplitudes(num_qubits=2, reps=2) - self.observable = SparsePauliOp.from_list( - [ - ("II", -1.052373245772859), - ("IZ", 0.39793742484318045), - ("ZI", -0.39793742484318045), - ("ZZ", -0.01128010425623538), - ("XX", 0.18093119978423156), - ] - ) - self.expvals = -1.0284380963435145, -1.284366511861733 - - self.psi = (RealAmplitudes(num_qubits=2, reps=2), RealAmplitudes(num_qubits=2, reps=3)) - self.params = tuple(psi.parameters for psi in self.psi) - self.hamiltonian = ( - SparsePauliOp.from_list([("II", 1), ("IZ", 2), ("XI", 3)]), - SparsePauliOp.from_list([("IZ", 1)]), - SparsePauliOp.from_list([("ZI", 1), ("ZZ", 1)]), - ) - self.theta = ( - [0, 1, 1, 2, 3, 5], - [0, 1, 1, 2, 3, 5, 8, 13], - [1, 2, 3, 4, 5, 6], - ) - - @combine(backend=BACKENDS) - def test_estimator_run(self, backend): - """Test Estimator.run()""" - psi1, psi2 = self.psi - hamiltonian1, hamiltonian2, hamiltonian3 = self.hamiltonian - theta1, theta2, theta3 = self.theta - with self.assertWarns(DeprecationWarning): - estimator = BackendEstimator(backend=backend) - estimator.set_options(seed_simulator=123) - - # Specify the circuit and observable by indices. - # calculate [ ] - with self.assertWarns(DeprecationWarning): - job = estimator.run([psi1], [hamiltonian1], [theta1]) - result = job.result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1.5555572817900956], rtol=0.5, atol=0.2) - - # Objects can be passed instead of indices. - # Note that passing objects has an overhead - # since the corresponding indices need to be searched. - # User can append a circuit and observable. - # calculate [ ] - with self.assertWarns(DeprecationWarning): - result2 = estimator.run([psi2], [hamiltonian1], [theta2]).result() - np.testing.assert_allclose(result2.values, [2.97797666], rtol=0.5, atol=0.2) - - # calculate [ , ] - with self.assertWarns(DeprecationWarning): - result3 = estimator.run( - [psi1, psi1], [hamiltonian2, hamiltonian3], [theta1] * 2 - ).result() - np.testing.assert_allclose(result3.values, [-0.551653, 0.07535239], rtol=0.5, atol=0.2) - - # calculate [ , - # , - # ] - with self.assertWarns(DeprecationWarning): - result4 = estimator.run( - [psi1, psi2, psi1], - [hamiltonian1, hamiltonian2, hamiltonian3], - [theta1, theta2, theta3], - ).result() - np.testing.assert_allclose( - result4.values, [1.55555728, 0.17849238, -1.08766318], rtol=0.5, atol=0.2 - ) - - @combine(backend=BACKENDS) - def test_estimator_run_no_params(self, backend): - """test for estimator without parameters""" - circuit = self.ansatz.assign_parameters([0, 1, 1, 2, 3, 5]) - with self.assertWarns(DeprecationWarning): - est = BackendEstimator(backend=backend) - est.set_options(seed_simulator=123) - result = est.run([circuit], [self.observable]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [-1.284366511861733], rtol=0.05) - - @combine(backend=BACKENDS, creg=[True, False]) - def test_run_1qubit(self, backend, creg): - """Test for 1-qubit cases""" - qc = QuantumCircuit(1, 1) if creg else QuantumCircuit(1) - qc2 = QuantumCircuit(1, 1) if creg else QuantumCircuit(1) - qc2.x(0) - - op = SparsePauliOp.from_list([("I", 1)]) - op2 = SparsePauliOp.from_list([("Z", 1)]) - - with self.assertWarns(DeprecationWarning): - est = BackendEstimator(backend=backend) - est.set_options(seed_simulator=123) - result = est.run([qc], [op], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1], rtol=0.1) - - with self.assertWarns(DeprecationWarning): - result = est.run([qc], [op2], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1], rtol=0.1) - - with self.assertWarns(DeprecationWarning): - result = est.run([qc2], [op], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1], rtol=0.1) - - with self.assertWarns(DeprecationWarning): - result = est.run([qc2], [op2], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [-1], rtol=0.1) - - @combine(backend=BACKENDS, creg=[True, False]) - def test_run_2qubits(self, backend, creg): - """Test for 2-qubit cases (to check endian)""" - backend.set_options(seed_simulator=123) - qc = QuantumCircuit(2, 1) if creg else QuantumCircuit(2) - qc2 = QuantumCircuit(2, 1) if creg else QuantumCircuit(2, 1) - qc2.x(0) - - op = SparsePauliOp.from_list([("II", 1)]) - op2 = SparsePauliOp.from_list([("ZI", 1)]) - op3 = SparsePauliOp.from_list([("IZ", 1)]) - - with self.assertWarns(DeprecationWarning): - est = BackendEstimator(backend=backend) - result = est.run([qc], [op], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1], rtol=0.1) - - with self.assertWarns(DeprecationWarning): - result = est.run([qc2], [op], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1], rtol=0.1) - - with self.assertWarns(DeprecationWarning): - result = est.run([qc], [op2], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1], rtol=0.1) - - with self.assertWarns(DeprecationWarning): - result = est.run([qc2], [op2], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1], rtol=0.1) - - with self.assertWarns(DeprecationWarning): - result = est.run([qc], [op3], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1], rtol=0.1) - - with self.assertWarns(DeprecationWarning): - result = est.run([qc2], [op3], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [-1], rtol=0.1) - - @combine(backend=BACKENDS) - def test_run_errors(self, backend): - """Test for errors""" - qc = QuantumCircuit(1) - qc2 = QuantumCircuit(2) - - op = SparsePauliOp.from_list([("I", 1)]) - op2 = SparsePauliOp.from_list([("II", 1)]) - - with self.assertWarns(DeprecationWarning): - est = BackendEstimator(backend=backend) - est.set_options(seed_simulator=123) - with self.assertRaises(ValueError): - est.run([qc], [op2], [[]]).result() - with self.assertRaises(ValueError): - est.run([qc], [op], [[1e4]]).result() - with self.assertRaises(ValueError): - est.run([qc2], [op2], [[1, 2]]).result() - with self.assertRaises(ValueError): - est.run([qc, qc2], [op2], [[1]]).result() - with self.assertRaises(ValueError): - est.run([qc], [op, op2], [[1]]).result() - - @combine(backend=BACKENDS) - def test_run_numpy_params(self, backend): - """Test for numpy array as parameter values""" - qc = RealAmplitudes(num_qubits=2, reps=2) - op = SparsePauliOp.from_list([("IZ", 1), ("XI", 2), ("ZY", -1)]) - k = 5 - params_array = self._rng.random((k, qc.num_parameters)) - params_list = params_array.tolist() - params_list_array = list(params_array) - with self.assertWarns(DeprecationWarning): - estimator = BackendEstimator(backend=backend) - estimator.set_options(seed_simulator=123) - - target = estimator.run([qc] * k, [op] * k, params_list).result() - - with self.subTest("ndarrary"): - with self.assertWarns(DeprecationWarning): - result = estimator.run([qc] * k, [op] * k, params_array).result() - self.assertEqual(len(result.metadata), k) - np.testing.assert_allclose(result.values, target.values, rtol=0.2, atol=0.2) - - with self.subTest("list of ndarray"): - with self.assertWarns(DeprecationWarning): - result = estimator.run([qc] * k, [op] * k, params_list_array).result() - self.assertEqual(len(result.metadata), k) - np.testing.assert_allclose(result.values, target.values, rtol=0.2, atol=0.2) - - @combine(backend=BACKENDS) - def test_run_with_shots_option(self, backend): - """test with shots option.""" - with self.assertWarns(DeprecationWarning): - est = BackendEstimator(backend=backend) - result = est.run( - [self.ansatz], - [self.observable], - parameter_values=[[0, 1, 1, 2, 3, 5]], - shots=1024, - seed_simulator=15, - ).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [-1.307397243478641], rtol=0.1) - - @combine(backend=BACKENDS) - def test_options(self, backend): - """Test for options""" - with self.subTest("init"): - with self.assertWarns(DeprecationWarning): - estimator = BackendEstimator(backend=backend, options={"shots": 3000}) - self.assertEqual(estimator.options.get("shots"), 3000) - with self.subTest("set_options"): - estimator.set_options(shots=1024, seed_simulator=15) - self.assertEqual(estimator.options.get("shots"), 1024) - self.assertEqual(estimator.options.get("seed_simulator"), 15) - with self.subTest("run"): - with self.assertWarns(DeprecationWarning): - result = estimator.run( - [self.ansatz], - [self.observable], - parameter_values=[[0, 1, 1, 2, 3, 5]], - ).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [-1.307397243478641], rtol=0.1) - - def test_job_size_limit_v2(self): - """Test BackendEstimator respects job size limit""" - - class FakeBackendLimitedCircuits(GenericBackendV2): - """Generic backend V2 with job size limit.""" - - @property - def max_circuits(self): - return 1 - - backend = FakeBackendLimitedCircuits(num_qubits=5, seed=42) - backend.set_options(seed_simulator=123) - qc = RealAmplitudes(num_qubits=2, reps=2) - op = SparsePauliOp.from_list([("IZ", 1), ("XI", 2), ("ZY", -1)]) - k = 5 - params_array = self._rng.random((k, qc.num_parameters)) - params_list = params_array.tolist() - with self.assertWarns(DeprecationWarning): - estimator = BackendEstimator(backend=backend) - with patch.object(backend, "run") as run_mock: - with self.assertWarns(DeprecationWarning): - estimator.run([qc] * k, [op] * k, params_list).result() - self.assertEqual(run_mock.call_count, 10) - - def test_job_size_limit_v1(self): - """Test BackendEstimator respects job size limit - REMOVE ONCE Fake7QPulseV1 GETS REMOVED""" - with self.assertWarns(DeprecationWarning): - backend = Fake7QPulseV1() - config = backend.configuration() - config.max_experiments = 1 - backend._configuration = config - qc = RealAmplitudes(num_qubits=2, reps=2) - op = SparsePauliOp.from_list([("IZ", 1), ("XI", 2), ("ZY", -1)]) - k = 5 - params_array = self._rng.random((k, qc.num_parameters)) - params_list = params_array.tolist() - with self.assertWarns(DeprecationWarning): - estimator = BackendEstimator(backend=backend) - estimator.set_options(seed_simulator=123) - with patch.object(backend, "run") as run_mock: - with self.assertWarns(DeprecationWarning): - estimator.run([qc] * k, [op] * k, params_list).result() - self.assertEqual(run_mock.call_count, 10) - - def test_no_max_circuits(self): - """Test BackendEstimator works with BackendV1 and no max_experiments set. - REMOVE ONCE Fake7QPulseV1 GETS REMOVED""" - with self.assertWarns(DeprecationWarning): - backend = Fake7QPulseV1() - config = backend.configuration() - del config.max_experiments - backend._configuration = config - qc = RealAmplitudes(num_qubits=2, reps=2) - op = SparsePauliOp.from_list([("IZ", 1), ("XI", 2), ("ZY", -1)]) - k = 5 - params_array = self._rng.random((k, qc.num_parameters)) - params_list = params_array.tolist() - params_list_array = list(params_array) - with self.assertWarns(DeprecationWarning): - estimator = BackendEstimator(backend=backend) - estimator.set_options(seed_simulator=123) - target = estimator.run([qc] * k, [op] * k, params_list).result() - with self.subTest("ndarrary"): - with self.assertWarns(DeprecationWarning): - result = estimator.run([qc] * k, [op] * k, params_array).result() - self.assertEqual(len(result.metadata), k) - np.testing.assert_allclose(result.values, target.values, rtol=0.2, atol=0.2) - - with self.subTest("list of ndarray"): - with self.assertWarns(DeprecationWarning): - result = estimator.run([qc] * k, [op] * k, params_list_array).result() - self.assertEqual(len(result.metadata), k) - np.testing.assert_allclose(result.values, target.values, rtol=0.2, atol=0.2) - - def test_bound_pass_manager(self): - """Test bound pass manager.""" - - qc = QuantumCircuit(2) - op = SparsePauliOp.from_list([("II", 1)]) - - with self.subTest("Test single circuit"): - messages = [] - - def callback(msg): - messages.append(msg) - - bound_counter = CallbackPass("bound_pass_manager", callback) - bound_pass = PassManager(bound_counter) - with self.assertWarns(DeprecationWarning): - estimator = BackendEstimator( - backend=GenericBackendV2(num_qubits=5, seed=42), bound_pass_manager=bound_pass - ) - _ = estimator.run(qc, op).result() - expected = [ - "bound_pass_manager", - ] - self.assertEqual(messages, expected) - - with self.subTest("Test circuit batch"): - with Manager() as manager: - # The multiprocessing manager is used to share data - # between different processes. Pass Managers parallelize - # execution for batches of circuits, so this is necessary - # to keep track of the callback calls for num_circuits > 1 - messages = manager.list() - - def callback(msg): # pylint: disable=function-redefined - messages.append(msg) - - bound_counter = CallbackPass("bound_pass_manager", callback) - bound_pass = PassManager(bound_counter) - with self.assertWarns(DeprecationWarning): - estimator = BackendEstimator( - backend=GenericBackendV2(num_qubits=5, seed=42), - bound_pass_manager=bound_pass, - ) - _ = estimator.run([qc, qc], [op, op]).result() - expected = [ - "bound_pass_manager", - "bound_pass_manager", - ] - self.assertEqual(list(messages), expected) - - @combine(backend=BACKENDS) - def test_layout(self, backend): - """Test layout for split transpilation.""" - with self.subTest("initial layout test"): - qc = QuantumCircuit(3) - qc.x(0) - qc.cx(0, 1) - qc.cx(0, 2) - op = SparsePauliOp("IZI") - backend.set_options(seed_simulator=15) - with self.assertWarns(DeprecationWarning): - estimator = BackendEstimator(backend) - estimator.set_transpile_options(seed_transpiler=15, optimization_level=1) - value = estimator.run(qc, op, shots=10000).result().values[0] - if optionals.HAS_AER: - ref_value = -0.9954 if isinstance(backend, GenericBackendV2) else -0.934 - else: - ref_value = -1 - self.assertEqual(value, ref_value) - - with self.subTest("final layout test"): - qc = QuantumCircuit(3) - qc.x(0) - qc.cx(0, 1) - qc.cx(0, 2) - op = SparsePauliOp("IZI") - with self.assertWarns(DeprecationWarning): - estimator = BackendEstimator(backend) - estimator.set_transpile_options( - initial_layout=[0, 1, 2], seed_transpiler=15, optimization_level=1 - ) - estimator.set_options(seed_simulator=15) - value = estimator.run(qc, op, shots=10000).result().values[0] - if optionals.HAS_AER: - ref_value = -0.9954 if isinstance(backend, GenericBackendV2) else -0.8902 - else: - ref_value = -1 - self.assertEqual(value, ref_value) - - @unittest.skipUnless(optionals.HAS_AER, "qiskit-aer is required to run this test") - def test_circuit_with_measurement(self): - """Test estimator with a dynamic circuit""" - from qiskit_aer import AerSimulator - - bell = QuantumCircuit(2) - bell.h(0) - bell.cx(0, 1) - bell.measure_all() - observable = SparsePauliOp("ZZ") - - backend = AerSimulator() - backend.set_options(seed_simulator=15) - with self.assertWarns(DeprecationWarning): - estimator = BackendEstimator(backend, skip_transpilation=True) - estimator.set_transpile_options(seed_transpiler=15) - result = estimator.run(bell, observable).result() - self.assertAlmostEqual(result.values[0], 1, places=1) - - @unittest.skipUnless(optionals.HAS_AER, "qiskit-aer is required to run this test") - def test_dynamic_circuit(self): - """Test estimator with a dynamic circuit""" - from qiskit_aer import AerSimulator - - qc = QuantumCircuit(2, 1) - with qc.for_loop(range(5)): - qc.h(0) - qc.cx(0, 1) - qc.measure(1, 0) - with self.assertWarns(DeprecationWarning): - qc.break_loop().c_if(0, True) - - observable = SparsePauliOp("IZ") - - backend = AerSimulator() - backend.set_options(seed_simulator=15) - with self.assertWarns(DeprecationWarning): - estimator = BackendEstimator(backend, skip_transpilation=True) - estimator.set_transpile_options(seed_transpiler=15) - result = estimator.run(qc, observable).result() - self.assertAlmostEqual(result.values[0], 0, places=1) - - -if __name__ == "__main__": - unittest.main() diff --git a/test/python/primitives/test_backend_sampler.py b/test/python/primitives/test_backend_sampler.py deleted file mode 100644 index 6acfa6443530..000000000000 --- a/test/python/primitives/test_backend_sampler.py +++ /dev/null @@ -1,496 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2022, 2024. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -"""Tests for BackendSampler.""" - -import math -import unittest -from multiprocessing import Manager -import numpy as np -from ddt import ddt - -from qiskit import QuantumCircuit -from qiskit.circuit.library import RealAmplitudes -from qiskit.primitives import BackendSampler, SamplerResult -from qiskit.providers import JobStatus -from qiskit.providers.backend_compat import BackendV2Converter -from qiskit.providers.basic_provider import BasicSimulator -from qiskit.providers.fake_provider import Fake7QPulseV1, GenericBackendV2 -from qiskit.transpiler import PassManager -from qiskit.utils import optionals -from test import QiskitTestCase # pylint: disable=wrong-import-order -from test import combine # pylint: disable=wrong-import-order -from test.python.transpiler._dummy_passes import DummyAP # pylint: disable=wrong-import-order - - -BACKENDS = [Fake7QPulseV1(), BackendV2Converter(Fake7QPulseV1())] - -BACKENDS_V1 = [Fake7QPulseV1()] -BACKENDS_V2 = [ - BackendV2Converter(Fake7QPulseV1()), -] -BACKENDS = BACKENDS_V1 + BACKENDS_V2 - - -class CallbackPass(DummyAP): - """A dummy analysis pass that calls a callback when executed""" - - def __init__(self, message, callback): - super().__init__() - self.message = message - self.callback = callback - - def run(self, dag): - self.callback(self.message) - - -@ddt -class TestBackendSampler(QiskitTestCase): - """Test BackendSampler""" - - def setUp(self): - super().setUp() - hadamard = QuantumCircuit(1, 1) - hadamard.h(0) - hadamard.measure(0, 0) - bell = QuantumCircuit(2) - bell.h(0) - bell.cx(0, 1) - bell.measure_all() - self._circuit = [hadamard, bell] - self._target = [ - {0: 0.5, 1: 0.5}, - {0: 0.5, 3: 0.5, 1: 0, 2: 0}, - ] - self._pqc = RealAmplitudes(num_qubits=2, reps=2) - self._pqc.measure_all() - self._pqc2 = RealAmplitudes(num_qubits=2, reps=3) - self._pqc2.measure_all() - self._pqc_params = [[0.0] * 6, [1.0] * 6] - self._pqc_target = [{0: 1}, {0: 0.0148, 1: 0.3449, 2: 0.0531, 3: 0.5872}] - self._theta = [ - [0, 1, 1, 2, 3, 5], - [1, 2, 3, 4, 5, 6], - [0, 1, 2, 3, 4, 5, 6, 7], - ] - - def _generate_circuits_target(self, indices): - if isinstance(indices, list): - circuits = [self._circuit[j] for j in indices] - target = [self._target[j] for j in indices] - else: - raise ValueError(f"invalid index {indices}") - return circuits, target - - def _generate_params_target(self, indices): - if isinstance(indices, int): - params = self._pqc_params[indices] - target = self._pqc_target[indices] - elif isinstance(indices, list): - params = [self._pqc_params[j] for j in indices] - target = [self._pqc_target[j] for j in indices] - else: - raise ValueError(f"invalid index {indices}") - return params, target - - def _compare_probs(self, prob, target): - if not isinstance(prob, list): - prob = [prob] - if not isinstance(target, list): - target = [target] - self.assertEqual(len(prob), len(target)) - for p, targ in zip(prob, target): - for key, t_val in targ.items(): - if key in p: - self.assertAlmostEqual(p[key], t_val, delta=0.1) - else: - self.assertAlmostEqual(t_val, 0, delta=0.1) - - @combine(backend=BACKENDS) - def test_sampler_run(self, backend): - """Test Sampler.run().""" - bell = self._circuit[1] - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=backend) - job = sampler.run(circuits=[bell], shots=1000) - result = job.result() - self.assertIsInstance(result, SamplerResult) - self.assertEqual(result.quasi_dists[0].shots, 1000) - self.assertEqual(result.quasi_dists[0].stddev_upper_bound, math.sqrt(1 / 1000)) - self._compare_probs(result.quasi_dists, self._target[1]) - - @combine(backend=BACKENDS) - def test_sample_run_multiple_circuits(self, backend): - """Test Sampler.run() with multiple circuits.""" - # executes three Bell circuits - # Argument `parameters` is optional. - bell = self._circuit[1] - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=backend) - result = sampler.run([bell, bell, bell]).result() - self._compare_probs(result.quasi_dists[0], self._target[1]) - self._compare_probs(result.quasi_dists[1], self._target[1]) - self._compare_probs(result.quasi_dists[2], self._target[1]) - - @combine(backend=BACKENDS) - def test_sampler_run_with_parameterized_circuits(self, backend): - """Test Sampler.run() with parameterized circuits.""" - # parameterized circuit - - pqc = self._pqc - pqc2 = self._pqc2 - theta1, theta2, theta3 = self._theta - - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=backend) - result = sampler.run([pqc, pqc, pqc2], [theta1, theta2, theta3]).result() - - # result of pqc(theta1) - prob1 = { - "00": 0.1309248462975777, - "01": 0.3608720796028448, - "10": 0.09324865232050054, - "11": 0.41495442177907715, - } - self.assertDictAlmostEqual(result.quasi_dists[0].binary_probabilities(), prob1, delta=0.1) - - # result of pqc(theta2) - prob2 = { - "00": 0.06282290651933871, - "01": 0.02877144385576705, - "10": 0.606654494132085, - "11": 0.3017511554928094, - } - self.assertDictAlmostEqual(result.quasi_dists[1].binary_probabilities(), prob2, delta=0.1) - - # result of pqc2(theta3) - prob3 = { - "00": 0.1880263994380416, - "01": 0.6881971261189544, - "10": 0.09326232720582443, - "11": 0.030514147237179892, - } - self.assertDictAlmostEqual(result.quasi_dists[2].binary_probabilities(), prob3, delta=0.1) - - @combine(backend=BACKENDS) - def test_run_1qubit(self, backend): - """test for 1-qubit cases""" - qc = QuantumCircuit(1) - qc.measure_all() - qc2 = QuantumCircuit(1) - qc2.x(0) - qc2.measure_all() - - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=backend) - result = sampler.run([qc, qc2]).result() - self.assertIsInstance(result, SamplerResult) - self.assertEqual(len(result.quasi_dists), 2) - - self.assertDictAlmostEqual(result.quasi_dists[0], {0: 1}, 0.1) - self.assertDictAlmostEqual(result.quasi_dists[1], {1: 1}, 0.1) - - @combine(backend=BACKENDS) - def test_run_2qubit(self, backend): - """test for 2-qubit cases""" - qc0 = QuantumCircuit(2) - qc0.measure_all() - qc1 = QuantumCircuit(2) - qc1.x(0) - qc1.measure_all() - qc2 = QuantumCircuit(2) - qc2.x(1) - qc2.measure_all() - qc3 = QuantumCircuit(2) - qc3.x([0, 1]) - qc3.measure_all() - - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=backend) - result = sampler.run([qc0, qc1, qc2, qc3]).result() - self.assertIsInstance(result, SamplerResult) - self.assertEqual(len(result.quasi_dists), 4) - - self.assertDictAlmostEqual(result.quasi_dists[0], {0: 1}, 0.1) - self.assertDictAlmostEqual(result.quasi_dists[1], {1: 1}, 0.1) - self.assertDictAlmostEqual(result.quasi_dists[2], {2: 1}, 0.1) - self.assertDictAlmostEqual(result.quasi_dists[3], {3: 1}, 0.1) - - @combine(backend=BACKENDS) - def test_run_errors(self, backend): - """Test for errors""" - qc1 = QuantumCircuit(1) - qc1.measure_all() - qc2 = RealAmplitudes(num_qubits=1, reps=1) - qc2.measure_all() - - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=backend) - with self.assertRaises(ValueError): - sampler.run([qc1], [[1e2]]).result() - with self.assertRaises(ValueError): - sampler.run([qc2], [[]]).result() - with self.assertRaises(ValueError): - sampler.run([qc2], [[1e2]]).result() - - @combine(backend=BACKENDS_V1) - def test_run_empty_parameter_v1(self, backend): - """Test for empty parameter""" - n = 5 - qc = QuantumCircuit(n, n - 1) - qc.measure(range(n - 1), range(n - 1)) - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=backend) - with self.subTest("one circuit"): - with self.assertWarnsRegex( - DeprecationWarning, - expected_regex="The `transpile` function will " - "stop supporting inputs of type `BackendV1`", - ): - result = sampler.run([qc], shots=1000).result() - self.assertEqual(len(result.quasi_dists), 1) - for q_d in result.quasi_dists: - quasi_dist = {k: v for k, v in q_d.items() if v != 0.0} - self.assertDictAlmostEqual(quasi_dist, {0: 1.0}, delta=0.1) - self.assertEqual(len(result.metadata), 1) - - with self.subTest("two circuits"): - result = sampler.run([qc, qc], shots=1000).result() - self.assertEqual(len(result.quasi_dists), 2) - for q_d in result.quasi_dists: - quasi_dist = {k: v for k, v in q_d.items() if v != 0.0} - self.assertDictAlmostEqual(quasi_dist, {0: 1.0}, delta=0.1) - self.assertEqual(len(result.metadata), 2) - - @combine(backend=BACKENDS_V2) - def test_run_empty_parameter_v2(self, backend): - """Test for empty parameter""" - n = 5 - qc = QuantumCircuit(n, n - 1) - qc.measure(range(n - 1), range(n - 1)) - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=backend) - with self.subTest("one circuit"): - result = sampler.run([qc], shots=1000).result() - self.assertEqual(len(result.quasi_dists), 1) - for q_d in result.quasi_dists: - quasi_dist = {k: v for k, v in q_d.items() if v != 0.0} - self.assertDictAlmostEqual(quasi_dist, {0: 1.0}, delta=0.1) - self.assertEqual(len(result.metadata), 1) - - with self.subTest("two circuits"): - result = sampler.run([qc, qc], shots=1000).result() - self.assertEqual(len(result.quasi_dists), 2) - for q_d in result.quasi_dists: - quasi_dist = {k: v for k, v in q_d.items() if v != 0.0} - self.assertDictAlmostEqual(quasi_dist, {0: 1.0}, delta=0.1) - self.assertEqual(len(result.metadata), 2) - - @combine(backend=BACKENDS) - def test_run_numpy_params(self, backend): - """Test for numpy array as parameter values""" - qc = RealAmplitudes(num_qubits=2, reps=2) - qc.measure_all() - k = 5 - rng = np.random.default_rng(12) - params_array = rng.random((k, qc.num_parameters)) - params_list = params_array.tolist() - params_list_array = list(params_array) - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=backend) - target = sampler.run([qc] * k, params_list).result() - - with self.subTest("ndarrary"): - result = sampler.run([qc] * k, params_array).result() - self.assertEqual(len(result.metadata), k) - for i in range(k): - self.assertDictAlmostEqual(result.quasi_dists[i], target.quasi_dists[i], delta=0.1) - - with self.subTest("list of ndarray"): - result = sampler.run([qc] * k, params_list_array).result() - self.assertEqual(len(result.metadata), k) - for i in range(k): - self.assertDictAlmostEqual(result.quasi_dists[i], target.quasi_dists[i], delta=0.1) - - @combine(backend=BACKENDS) - def test_run_with_shots_option(self, backend): - """test with shots option.""" - params, target = self._generate_params_target([1]) - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=backend) - result = sampler.run( - circuits=[self._pqc], parameter_values=params, shots=1024, seed=15 - ).result() - self._compare_probs(result.quasi_dists, target) - - @combine(backend=BACKENDS) - def test_primitive_job_status_done(self, backend): - """test primitive job's status""" - bell = self._circuit[1] - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=backend) - job = sampler.run(circuits=[bell]) - _ = job.result() - self.assertEqual(job.status(), JobStatus.DONE) - - def test_primitive_job_size_limit_backend_v2(self): - """Test primitive respects backend's job size limit.""" - - class FakeBackendLimitedCircuits(GenericBackendV2): - """Generic backend V2 with job size limit.""" - - @property - def max_circuits(self): - return 1 - - qc = QuantumCircuit(1) - qc.measure_all() - qc2 = QuantumCircuit(1) - qc2.x(0) - qc2.measure_all() - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=FakeBackendLimitedCircuits(num_qubits=5, seed=42)) - result = sampler.run([qc, qc2]).result() - self.assertIsInstance(result, SamplerResult) - self.assertEqual(len(result.quasi_dists), 2) - - self.assertDictAlmostEqual(result.quasi_dists[0], {0: 1}, 0.1) - self.assertDictAlmostEqual(result.quasi_dists[1], {1: 1}, 0.1) - - def test_primitive_job_size_limit_backend_v1(self): - """Test primitive respects backend's job size limit.""" - backend = GenericBackendV2(7, basis_gates=["cx", "u1", "u2", "u3"], seed=42) - qc = QuantumCircuit(1) - qc.measure_all() - qc2 = QuantumCircuit(1) - qc2.x(0) - qc2.measure_all() - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=backend) - result = sampler.run([qc, qc2]).result() - self.assertIsInstance(result, SamplerResult) - self.assertEqual(len(result.quasi_dists), 2) - - self.assertDictAlmostEqual(result.quasi_dists[0], {0: 1}, 0.1) - self.assertDictAlmostEqual(result.quasi_dists[1], {1: 1}, 0.1) - - @unittest.skipUnless(optionals.HAS_AER, "qiskit-aer is required to run this test") - def test_circuit_with_dynamic_circuit(self): - """Test BackendSampler with QuantumCircuit with a dynamic circuit""" - from qiskit_aer import Aer - - qc = QuantumCircuit(2, 1) - - with qc.for_loop(range(5)): - qc.h(0) - qc.cx(0, 1) - qc.measure(0, 0) - with self.assertWarns(DeprecationWarning): - qc.break_loop().c_if(0, True) - - with self.assertWarns(DeprecationWarning): - backend = Aer.get_backend("aer_simulator") - sampler = BackendSampler(backend, skip_transpilation=True) - sampler.set_options(seed_simulator=15) - sampler.set_transpile_options(seed_transpiler=15) - result = sampler.run(qc).result() - self.assertDictAlmostEqual(result.quasi_dists[0], {0: 0.5029296875, 1: 0.4970703125}) - - def test_sequential_run(self): - """Test sequential run.""" - backend = GenericBackendV2(7, basis_gates=["cx", "u1", "u2", "u3"], seed=42) - qc = QuantumCircuit(1) - qc.measure_all() - qc2 = QuantumCircuit(1) - qc2.x(0) - qc2.measure_all() - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=backend) - result = sampler.run([qc]).result() - self.assertDictAlmostEqual(result.quasi_dists[0], {0: 1}, 0.1) - result2 = sampler.run([qc2]).result() - self.assertDictAlmostEqual(result2.quasi_dists[0], {1: 1}, 0.1) - result3 = sampler.run([qc, qc2]).result() - self.assertDictAlmostEqual(result3.quasi_dists[0], {0: 1}, 0.1) - self.assertDictAlmostEqual(result3.quasi_dists[1], {1: 1}, 0.1) - - def test_outcome_bitstring_size(self): - """Test that the result bitstrings are properly padded. - - E.g. measuring '0001' should not get truncated to '1'. - """ - qc = QuantumCircuit(4) - qc.x(0) - qc.measure_all() - - # We need a noise-free backend here (shot noise is fine) to ensure that - # the only bit string measured is "0001". With device noise, it could happen that - # strings with a leading 1 are measured and then the truncation cannot be tested. - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=BasicSimulator()) - - result = sampler.run(qc).result() - probs = result.quasi_dists[0].binary_probabilities() - - self.assertIn("0001", probs.keys()) - self.assertEqual(len(probs), 1) - - def test_bound_pass_manager(self): - """Test bound pass manager.""" - - with self.subTest("Test single circuit"): - messages = [] - - def callback(msg): - messages.append(msg) - - bound_counter = CallbackPass("bound_pass_manager", callback) - bound_pass = PassManager(bound_counter) - backend = GenericBackendV2(7, basis_gates=["cx", "u1", "u2", "u3"], seed=42) - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=backend, bound_pass_manager=bound_pass) - _ = sampler.run([self._circuit[0]]).result() - expected = [ - "bound_pass_manager", - ] - self.assertEqual(messages, expected) - - with self.subTest("Test circuit batch"): - with Manager() as manager: - # The multiprocessing manager is used to share data - # between different processes. Pass Managers parallelize - # execution for batches of circuits, so this is necessary - # to keep track of the callback calls for num_circuits > 1 - messages = manager.list() - - def callback(msg): # pylint: disable=function-redefined - messages.append(msg) - - bound_counter = CallbackPass("bound_pass_manager", callback) - bound_pass = PassManager(bound_counter) - backend = GenericBackendV2( - 7, - basis_gates=["cx", "u1", "u2", "u3"], - seed=42, - ) - with self.assertWarns(DeprecationWarning): - sampler = BackendSampler(backend=backend, bound_pass_manager=bound_pass) - _ = sampler.run([self._circuit[0], self._circuit[0]]).result() - expected = [ - "bound_pass_manager", - "bound_pass_manager", - ] - self.assertEqual(list(messages), expected) - - -if __name__ == "__main__": - unittest.main() diff --git a/test/python/primitives/test_estimator.py b/test/python/primitives/test_estimator.py deleted file mode 100644 index 2c251af65d1b..000000000000 --- a/test/python/primitives/test_estimator.py +++ /dev/null @@ -1,437 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2022, 2023. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -"""Tests for Estimator.""" - -import unittest -from test import QiskitTestCase - -import numpy as np -from ddt import data, ddt, unpack - -from qiskit.circuit import Parameter, QuantumCircuit -from qiskit.circuit.library import RealAmplitudes -from qiskit.exceptions import QiskitError -from qiskit.primitives import Estimator, EstimatorResult -from qiskit.primitives.base import validation -from qiskit.primitives.utils import _observable_key -from qiskit.quantum_info import Pauli, SparsePauliOp - - -class TestEstimator(QiskitTestCase): - """Test Estimator""" - - def setUp(self): - super().setUp() - self.ansatz = RealAmplitudes(num_qubits=2, reps=2) - self.observable = SparsePauliOp.from_list( - [ - ("II", -1.052373245772859), - ("IZ", 0.39793742484318045), - ("ZI", -0.39793742484318045), - ("ZZ", -0.01128010425623538), - ("XX", 0.18093119978423156), - ] - ) - self.expvals = -1.0284380963435145, -1.284366511861733 - - self.psi = (RealAmplitudes(num_qubits=2, reps=2), RealAmplitudes(num_qubits=2, reps=3)) - self.params = tuple(psi.parameters for psi in self.psi) - self.hamiltonian = ( - SparsePauliOp.from_list([("II", 1), ("IZ", 2), ("XI", 3)]), - SparsePauliOp.from_list([("IZ", 1)]), - SparsePauliOp.from_list([("ZI", 1), ("ZZ", 1)]), - ) - self.theta = ( - [0, 1, 1, 2, 3, 5], - [0, 1, 1, 2, 3, 5, 8, 13], - [1, 2, 3, 4, 5, 6], - ) - - def test_estimator_run(self): - """Test Estimator.run()""" - psi1, psi2 = self.psi - hamiltonian1, hamiltonian2, hamiltonian3 = self.hamiltonian - theta1, theta2, theta3 = self.theta - with self.assertWarns(DeprecationWarning): - estimator = Estimator() - - # Specify the circuit and observable by indices. - # calculate [ ] - job = estimator.run([psi1], [hamiltonian1], [theta1]) - result = job.result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1.5555572817900956]) - - # Objects can be passed instead of indices. - # Note that passing objects has an overhead - # since the corresponding indices need to be searched. - # User can append a circuit and observable. - # calculate [ ] - with self.assertWarns(DeprecationWarning): - result2 = estimator.run([psi2], [hamiltonian1], [theta2]).result() - np.testing.assert_allclose(result2.values, [2.97797666]) - - # calculate [ , ] - with self.assertWarns(DeprecationWarning): - result3 = estimator.run( - [psi1, psi1], [hamiltonian2, hamiltonian3], [theta1] * 2 - ).result() - np.testing.assert_allclose(result3.values, [-0.551653, 0.07535239]) - - # calculate [ , - # , - # ] - with self.assertWarns(DeprecationWarning): - result4 = estimator.run( - [psi1, psi2, psi1], - [hamiltonian1, hamiltonian2, hamiltonian3], - [theta1, theta2, theta3], - ).result() - np.testing.assert_allclose(result4.values, [1.55555728, 0.17849238, -1.08766318]) - - def test_estiamtor_run_no_params(self): - """test for estimator without parameters""" - circuit = self.ansatz.assign_parameters([0, 1, 1, 2, 3, 5]) - with self.assertWarns(DeprecationWarning): - est = Estimator() - result = est.run([circuit], [self.observable]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [-1.284366511861733]) - - def test_run_single_circuit_observable(self): - """Test for single circuit and single observable case.""" - with self.assertWarns(DeprecationWarning): - est = Estimator() - - with self.subTest("No parameter"): - qc = QuantumCircuit(1) - qc.x(0) - op = SparsePauliOp("Z") - param_vals = [None, [], [[]], np.array([]), np.array([[]]), [np.array([])]] - target = [-1] - for val in param_vals: - self.subTest(f"{val}") - with self.assertWarns(DeprecationWarning): - result = est.run(qc, op, val).result() - np.testing.assert_allclose(result.values, target) - self.assertEqual(len(result.metadata), 1) - - with self.subTest("One parameter"): - param = Parameter("x") - qc = QuantumCircuit(1) - qc.ry(param, 0) - op = SparsePauliOp("Z") - param_vals = [ - [np.pi], - [[np.pi]], - np.array([np.pi]), - np.array([[np.pi]]), - [np.array([np.pi])], - ] - target = [-1] - for val in param_vals: - self.subTest(f"{val}") - with self.assertWarns(DeprecationWarning): - result = est.run(qc, op, val).result() - np.testing.assert_allclose(result.values, target) - self.assertEqual(len(result.metadata), 1) - - with self.subTest("More than one parameter"): - qc = self.psi[0] - op = self.hamiltonian[0] - param_vals = [ - self.theta[0], - [self.theta[0]], - np.array(self.theta[0]), - np.array([self.theta[0]]), - [np.array(self.theta[0])], - ] - target = [1.5555572817900956] - for val in param_vals: - self.subTest(f"{val}") - with self.assertWarns(DeprecationWarning): - result = est.run(qc, op, val).result() - np.testing.assert_allclose(result.values, target) - self.assertEqual(len(result.metadata), 1) - - def test_run_1qubit(self): - """Test for 1-qubit cases""" - qc = QuantumCircuit(1) - qc2 = QuantumCircuit(1) - qc2.x(0) - - op = SparsePauliOp.from_list([("I", 1)]) - op2 = SparsePauliOp.from_list([("Z", 1)]) - - with self.assertWarns(DeprecationWarning): - est = Estimator() - result = est.run([qc], [op], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1]) - - with self.assertWarns(DeprecationWarning): - result = est.run([qc], [op2], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1]) - - with self.assertWarns(DeprecationWarning): - result = est.run([qc2], [op], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1]) - - with self.assertWarns(DeprecationWarning): - result = est.run([qc2], [op2], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [-1]) - - def test_run_2qubits(self): - """Test for 2-qubit cases (to check endian)""" - qc = QuantumCircuit(2) - qc2 = QuantumCircuit(2) - qc2.x(0) - - op = SparsePauliOp.from_list([("II", 1)]) - op2 = SparsePauliOp.from_list([("ZI", 1)]) - op3 = SparsePauliOp.from_list([("IZ", 1)]) - - with self.assertWarns(DeprecationWarning): - est = Estimator() - result = est.run([qc], [op], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1]) - - with self.assertWarns(DeprecationWarning): - result = est.run([qc2], [op], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1]) - - with self.assertWarns(DeprecationWarning): - result = est.run([qc], [op2], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1]) - - with self.assertWarns(DeprecationWarning): - result = est.run([qc2], [op2], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1]) - - with self.assertWarns(DeprecationWarning): - result = est.run([qc], [op3], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [1]) - - with self.assertWarns(DeprecationWarning): - result = est.run([qc2], [op3], [[]]).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [-1]) - - def test_run_errors(self): - """Test for errors""" - qc = QuantumCircuit(1) - qc2 = QuantumCircuit(2) - - op = SparsePauliOp.from_list([("I", 1)]) - op2 = SparsePauliOp.from_list([("II", 1)]) - - with self.assertWarns(DeprecationWarning): - est = Estimator() - with self.assertRaises(ValueError): - est.run([qc], [op2], [[]]).result() - with self.assertRaises(ValueError): - est.run([qc], [op], [[1e4]]).result() - with self.assertRaises(ValueError): - est.run([qc2], [op2], [[1, 2]]).result() - with self.assertRaises(ValueError): - est.run([qc, qc2], [op2], [[1]]).result() - with self.assertRaises(ValueError): - est.run([qc], [op, op2], [[1]]).result() - - def test_run_numpy_params(self): - """Test for numpy array as parameter values""" - qc = RealAmplitudes(num_qubits=2, reps=2) - op = SparsePauliOp.from_list([("IZ", 1), ("XI", 2), ("ZY", -1)]) - k = 5 - rng = np.random.default_rng(12) - params_array = rng.random((k, qc.num_parameters)) - params_list = params_array.tolist() - params_list_array = list(params_array) - with self.assertWarns(DeprecationWarning): - estimator = Estimator() - target = estimator.run([qc] * k, [op] * k, params_list).result() - - with self.subTest("ndarrary"): - with self.assertWarns(DeprecationWarning): - result = estimator.run([qc] * k, [op] * k, params_array).result() - self.assertEqual(len(result.metadata), k) - np.testing.assert_allclose(result.values, target.values) - - with self.subTest("list of ndarray"): - with self.assertWarns(DeprecationWarning): - result = estimator.run([qc] * k, [op] * k, params_list_array).result() - self.assertEqual(len(result.metadata), k) - np.testing.assert_allclose(result.values, target.values) - - def test_run_with_shots_option(self): - """test with shots option.""" - with self.assertWarns(DeprecationWarning): - est = Estimator() - result = est.run( - [self.ansatz], - [self.observable], - parameter_values=[[0, 1, 1, 2, 3, 5]], - shots=1024, - seed=15, - ).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [-1.307397243478641]) - - def test_run_with_shots_option_none(self): - """test with shots=None option. Seed is ignored then.""" - with self.assertWarns(DeprecationWarning): - est = Estimator() - result_42 = est.run( - [self.ansatz], - [self.observable], - parameter_values=[[0, 1, 1, 2, 3, 5]], - shots=None, - seed=42, - ).result() - result_15 = est.run( - [self.ansatz], - [self.observable], - parameter_values=[[0, 1, 1, 2, 3, 5]], - shots=None, - seed=15, - ).result() - np.testing.assert_allclose(result_42.values, result_15.values) - - def test_options(self): - """Test for options""" - with self.subTest("init"): - with self.assertWarns(DeprecationWarning): - estimator = Estimator(options={"shots": 3000}) - self.assertEqual(estimator.options.get("shots"), 3000) - with self.subTest("set_options"): - estimator.set_options(shots=1024, seed=15) - self.assertEqual(estimator.options.get("shots"), 1024) - self.assertEqual(estimator.options.get("seed"), 15) - with self.subTest("run"): - with self.assertWarns(DeprecationWarning): - result = estimator.run( - [self.ansatz], - [self.observable], - parameter_values=[[0, 1, 1, 2, 3, 5]], - ).result() - self.assertIsInstance(result, EstimatorResult) - np.testing.assert_allclose(result.values, [-1.307397243478641]) - - def test_negative_variance(self): - """Test for negative variance caused by numerical error.""" - qc = QuantumCircuit(1) - - with self.assertWarns(DeprecationWarning): - estimator = Estimator() - result = estimator.run(qc, 1e-4 * SparsePauliOp("I"), shots=1024).result() - self.assertEqual(result.values[0], 1e-4) - self.assertEqual(result.metadata[0]["variance"], 0.0) - - def test_different_circuits(self): - """Test collision of quantum observables.""" - - def get_op(i): - op = SparsePauliOp.from_list([("IXIX", i)]) - return op - - keys = [_observable_key(get_op(i)) for i in range(5)] - self.assertEqual(len(keys), len(set(keys))) - - def test_reset(self): - """Test for circuits with reset.""" - qc = QuantumCircuit(2) - qc.h(0) - qc.cx(0, 1) - qc.reset(0) - op = SparsePauliOp("ZI") - - seed = 12 - n = 1000 - with self.subTest("shots=None"): - with self.assertWarns(DeprecationWarning): - estimator = Estimator(options={"seed": seed}) - result = estimator.run([qc for _ in range(n)], [op] * n).result() - # expectation values should be stochastic due to reset for subsystems - np.testing.assert_allclose(result.values.mean(), 0, atol=1e-1) - - with self.assertWarns(DeprecationWarning): - result2 = estimator.run([qc for _ in range(n)], [op] * n).result() - # expectation values should be reproducible due to seed - np.testing.assert_allclose(result.values, result2.values) - - with self.subTest("shots=10000"): - shots = 10000 - with self.assertWarns(DeprecationWarning): - estimator = Estimator(options={"seed": seed}) - result = estimator.run([qc for _ in range(n)], [op] * n, shots=shots).result() - # expectation values should be stochastic due to reset for subsystems - np.testing.assert_allclose(result.values.mean(), 0, atol=1e-1) - - with self.assertWarns(DeprecationWarning): - result2 = estimator.run([qc for _ in range(n)], [op] * n, shots=shots).result() - # expectation values should be reproducible due to seed - np.testing.assert_allclose(result.values, result2.values) - - -@ddt -class TestObservableValidation(QiskitTestCase): - """Test observables validation logic.""" - - @data( - ("IXYZ", (SparsePauliOp("IXYZ"),)), - (Pauli("IXYZ"), (SparsePauliOp("IXYZ"),)), - (SparsePauliOp("IXYZ"), (SparsePauliOp("IXYZ"),)), - ( - ["IXYZ", "ZYXI"], - (SparsePauliOp("IXYZ"), SparsePauliOp("ZYXI")), - ), - ( - [Pauli("IXYZ"), Pauli("ZYXI")], - (SparsePauliOp("IXYZ"), SparsePauliOp("ZYXI")), - ), - ( - [SparsePauliOp("IXYZ"), SparsePauliOp("ZYXI")], - (SparsePauliOp("IXYZ"), SparsePauliOp("ZYXI")), - ), - ) - @unpack - def test_validate_observables(self, observables, expected): - """Test observables standardization.""" - with self.assertWarns(DeprecationWarning): - self.assertEqual(validation._validate_observables(observables), expected) - - @data(None, "ERROR") - def test_qiskit_error(self, observables): - """Test qiskit error if invalid input.""" - with self.assertRaises(QiskitError): - with self.assertWarns(DeprecationWarning): - validation._validate_observables(observables) - - @data((), []) - def test_value_error(self, observables): - """Test value error if no observables are provided.""" - with self.assertRaises(ValueError): - with self.assertWarns(DeprecationWarning): - validation._validate_observables(observables) - - -if __name__ == "__main__": - unittest.main() diff --git a/test/python/primitives/test_sampler.py b/test/python/primitives/test_sampler.py deleted file mode 100644 index 58ab97689748..000000000000 --- a/test/python/primitives/test_sampler.py +++ /dev/null @@ -1,440 +0,0 @@ -# This code is part of Qiskit. -# -# (C) Copyright IBM 2022, 2023. -# -# This code is licensed under the Apache License, Version 2.0. You may -# obtain a copy of this license in the LICENSE.txt file in the root directory -# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0. -# -# Any modifications or derivative works of this code must retain this -# copyright notice, and modified files need to carry a notice indicating -# that they have been altered from the originals. - -"""Tests for Sampler.""" - -import unittest -import numpy as np - -from qiskit import QuantumCircuit -from qiskit.circuit import Parameter -from qiskit.circuit.library import RealAmplitudes, UnitaryGate -from qiskit.primitives import Sampler, SamplerResult -from qiskit.providers import JobStatus -from test import QiskitTestCase # pylint: disable=wrong-import-order - - -class TestSampler(QiskitTestCase): - """Test Sampler""" - - def setUp(self): - super().setUp() - hadamard = QuantumCircuit(1, 1, name="Hadamard") - hadamard.h(0) - hadamard.measure(0, 0) - bell = QuantumCircuit(2, name="Bell") - bell.h(0) - bell.cx(0, 1) - bell.measure_all() - self._circuit = [hadamard, bell] - self._target = [ - {0: 0.5, 1: 0.5}, - {0: 0.5, 3: 0.5, 1: 0, 2: 0}, - ] - self._pqc = RealAmplitudes(num_qubits=2, reps=2) - self._pqc.measure_all() - self._pqc2 = RealAmplitudes(num_qubits=2, reps=3) - self._pqc2.measure_all() - self._pqc_params = [[0.0] * 6, [1.0] * 6] - self._pqc_target = [{0: 1}, {0: 0.0148, 1: 0.3449, 2: 0.0531, 3: 0.5872}] - self._theta = [ - [0, 1, 1, 2, 3, 5], - [1, 2, 3, 4, 5, 6], - [0, 1, 2, 3, 4, 5, 6, 7], - ] - - def _generate_circuits_target(self, indices): - if isinstance(indices, list): - circuits = [self._circuit[j] for j in indices] - target = [self._target[j] for j in indices] - else: - raise ValueError(f"invalid index {indices}") - return circuits, target - - def _generate_params_target(self, indices): - if isinstance(indices, int): - params = self._pqc_params[indices] - target = self._pqc_target[indices] - elif isinstance(indices, list): - params = [self._pqc_params[j] for j in indices] - target = [self._pqc_target[j] for j in indices] - else: - raise ValueError(f"invalid index {indices}") - return params, target - - def _compare_probs(self, prob, target): - if not isinstance(prob, list): - prob = [prob] - if not isinstance(target, list): - target = [target] - self.assertEqual(len(prob), len(target)) - for p, targ in zip(prob, target): - for key, t_val in targ.items(): - if key in p: - self.assertAlmostEqual(p[key], t_val, places=1) - else: - self.assertAlmostEqual(t_val, 0, places=1) - - def test_sampler_run(self): - """Test Sampler.run().""" - bell = self._circuit[1] - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - job = sampler.run(circuits=[bell]) - result = job.result() - self.assertIsInstance(result, SamplerResult) - self._compare_probs(result.quasi_dists, self._target[1]) - - def test_sample_run_multiple_circuits(self): - """Test Sampler.run() with multiple circuits.""" - # executes three Bell circuits - # Argument `parameters` is optional. - bell = self._circuit[1] - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - result = sampler.run([bell, bell, bell]).result() - self._compare_probs(result.quasi_dists[0], self._target[1]) - self._compare_probs(result.quasi_dists[1], self._target[1]) - self._compare_probs(result.quasi_dists[2], self._target[1]) - - def test_sampler_run_with_parameterized_circuits(self): - """Test Sampler.run() with parameterized circuits.""" - # parameterized circuit - - pqc = self._pqc - pqc2 = self._pqc2 - theta1, theta2, theta3 = self._theta - - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - result = sampler.run([pqc, pqc, pqc2], [theta1, theta2, theta3]).result() - - # result of pqc(theta1) - prob1 = { - "00": 0.1309248462975777, - "01": 0.3608720796028448, - "10": 0.09324865232050054, - "11": 0.41495442177907715, - } - self.assertDictAlmostEqual(result.quasi_dists[0].binary_probabilities(), prob1) - - # result of pqc(theta2) - prob2 = { - "00": 0.06282290651933871, - "01": 0.02877144385576705, - "10": 0.606654494132085, - "11": 0.3017511554928094, - } - self.assertDictAlmostEqual(result.quasi_dists[1].binary_probabilities(), prob2) - - # result of pqc2(theta3) - prob3 = { - "00": 0.1880263994380416, - "01": 0.6881971261189544, - "10": 0.09326232720582443, - "11": 0.030514147237179892, - } - self.assertDictAlmostEqual(result.quasi_dists[2].binary_probabilities(), prob3) - - def test_run_1qubit(self): - """test for 1-qubit cases""" - qc = QuantumCircuit(1) - qc.measure_all() - qc2 = QuantumCircuit(1) - qc2.x(0) - qc2.measure_all() - - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - result = sampler.run([qc, qc2]).result() - self.assertIsInstance(result, SamplerResult) - self.assertEqual(len(result.quasi_dists), 2) - - for i in range(2): - keys, values = zip(*sorted(result.quasi_dists[i].items())) - self.assertTupleEqual(keys, (i,)) - np.testing.assert_allclose(values, [1]) - - def test_run_2qubit(self): - """test for 2-qubit cases""" - qc0 = QuantumCircuit(2) - qc0.measure_all() - qc1 = QuantumCircuit(2) - qc1.x(0) - qc1.measure_all() - qc2 = QuantumCircuit(2) - qc2.x(1) - qc2.measure_all() - qc3 = QuantumCircuit(2) - qc3.x([0, 1]) - qc3.measure_all() - - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - result = sampler.run([qc0, qc1, qc2, qc3]).result() - self.assertIsInstance(result, SamplerResult) - self.assertEqual(len(result.quasi_dists), 4) - - for i in range(4): - keys, values = zip(*sorted(result.quasi_dists[i].items())) - self.assertTupleEqual(keys, (i,)) - np.testing.assert_allclose(values, [1]) - - def test_run_single_circuit(self): - """Test for single circuit case.""" - - with self.subTest("No parameter"): - circuit = self._circuit[1] - target = self._target[1] - param_vals = [None, [], [[]], np.array([]), np.array([[]])] - for val in param_vals: - with self.subTest(f"{circuit.name} w/ {val}"): - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - with self.assertWarns(DeprecationWarning): - result = sampler.run(circuit, val).result() - self._compare_probs(result.quasi_dists, target) - self.assertEqual(len(result.metadata), 1) - - with self.subTest("One parameter"): - circuit = QuantumCircuit(1, 1, name="X gate") - param = Parameter("x") - circuit.ry(param, 0) - circuit.measure(0, 0) - target = [{1: 1}] - param_vals = [ - [np.pi], - [[np.pi]], - np.array([np.pi]), - np.array([[np.pi]]), - [np.array([np.pi])], - ] - for val in param_vals: - with self.subTest(f"{circuit.name} w/ {val}"): - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - with self.assertWarns(DeprecationWarning): - result = sampler.run(circuit, val).result() - self._compare_probs(result.quasi_dists, target) - self.assertEqual(len(result.metadata), 1) - - with self.subTest("More than one parameter"): - circuit = self._pqc - target = [self._pqc_target[0]] - param_vals = [ - self._pqc_params[0], - [self._pqc_params[0]], - np.array(self._pqc_params[0]), - np.array([self._pqc_params[0]]), - [np.array(self._pqc_params[0])], - ] - for val in param_vals: - with self.subTest(f"{circuit.name} w/ {val}"): - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - with self.assertWarns(DeprecationWarning): - result = sampler.run(circuit, val).result() - self._compare_probs(result.quasi_dists, target) - self.assertEqual(len(result.metadata), 1) - - def test_run_reverse_meas_order(self): - """test for sampler with reverse measurement order""" - x = Parameter("x") - y = Parameter("y") - - qc = QuantumCircuit(3, 3) - qc.rx(x, 0) - qc.rx(y, 1) - qc.x(2) - qc.measure(0, 2) - qc.measure(1, 1) - qc.measure(2, 0) - - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - result = sampler.run([qc] * 2, [[0, 0], [np.pi / 2, 0]]).result() - self.assertIsInstance(result, SamplerResult) - self.assertEqual(len(result.quasi_dists), 2) - - # qc({x: 0, y: 0}) - keys, values = zip(*sorted(result.quasi_dists[0].items())) - self.assertTupleEqual(keys, (1,)) - np.testing.assert_allclose(values, [1]) - - # qc({x: pi/2, y: 0}) - keys, values = zip(*sorted(result.quasi_dists[1].items())) - self.assertTupleEqual(keys, (1, 5)) - np.testing.assert_allclose(values, [0.5, 0.5]) - - def test_run_errors(self): - """Test for errors with run method""" - qc1 = QuantumCircuit(1) - qc1.measure_all() - qc2 = RealAmplitudes(num_qubits=1, reps=1) - qc2.measure_all() - qc3 = QuantumCircuit(1) - qc4 = QuantumCircuit(1, 1) - qc5 = QuantumCircuit(1, 1) - with qc5.for_loop(range(5)): - qc5.h(0) - - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - with self.subTest("set parameter values to a non-parameterized circuit"): - with self.assertRaises(ValueError): - _ = sampler.run([qc1], [[1e2]]) - with self.subTest("missing all parameter values for a parameterized circuit"): - with self.assertRaises(ValueError): - _ = sampler.run([qc2], [[]]) - with self.subTest("missing some parameter values for a parameterized circuit"): - with self.assertRaises(ValueError): - _ = sampler.run([qc2], [[1e2]]) - with self.subTest("too many parameter values for a parameterized circuit"): - with self.assertRaises(ValueError): - _ = sampler.run([qc2], [[1e2]] * 100) - with self.subTest("no classical bits"): - with self.assertRaises(ValueError): - _ = sampler.run([qc3], [[]]) - with self.subTest("no measurement"): - with self.assertRaises(ValueError): - _ = sampler.run([qc4], [[]]) - with self.subTest("no measurement in control flow"): - with self.assertRaises(ValueError): - _ = sampler.run([qc5], [[]]) - - def test_run_empty_parameter(self): - """Test for empty parameter""" - n = 5 - qc = QuantumCircuit(n, n - 1) - qc.measure(range(n - 1), range(n - 1)) - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - with self.subTest("one circuit"): - with self.assertWarns(DeprecationWarning): - result = sampler.run([qc], shots=1000).result() - self.assertEqual(len(result.quasi_dists), 1) - for q_d in result.quasi_dists: - quasi_dist = {k: v for k, v in q_d.items() if v != 0.0} - self.assertDictEqual(quasi_dist, {0: 1.0}) - self.assertEqual(len(result.metadata), 1) - - with self.subTest("two circuits"): - result = sampler.run([qc, qc], shots=1000).result() - self.assertEqual(len(result.quasi_dists), 2) - for q_d in result.quasi_dists: - quasi_dist = {k: v for k, v in q_d.items() if v != 0.0} - self.assertDictEqual(quasi_dist, {0: 1.0}) - self.assertEqual(len(result.metadata), 2) - - def test_run_numpy_params(self): - """Test for numpy array as parameter values""" - qc = RealAmplitudes(num_qubits=2, reps=2) - qc.measure_all() - k = 5 - rng = np.random.default_rng(12) - params_array = rng.random((k, qc.num_parameters)) - params_list = params_array.tolist() - params_list_array = list(params_array) - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - target = sampler.run([qc] * k, params_list).result() - - with self.subTest("ndarrary"): - result = sampler.run([qc] * k, params_array).result() - self.assertEqual(len(result.metadata), k) - for i in range(k): - self.assertDictEqual(result.quasi_dists[i], target.quasi_dists[i]) - - with self.subTest("list of ndarray"): - result = sampler.run([qc] * k, params_list_array).result() - self.assertEqual(len(result.metadata), k) - for i in range(k): - self.assertDictEqual(result.quasi_dists[i], target.quasi_dists[i]) - - def test_run_with_shots_option(self): - """test with shots option.""" - params, target = self._generate_params_target([1]) - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - result = sampler.run( - circuits=[self._pqc], parameter_values=params, shots=1024, seed=15 - ).result() - self._compare_probs(result.quasi_dists, target) - - def test_run_with_shots_option_none(self): - """test with shots=None option. Seed is ignored then.""" - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - result_42 = sampler.run( - [self._pqc], parameter_values=[[0, 1, 1, 2, 3, 5]], shots=None, seed=42 - ).result() - result_15 = sampler.run( - [self._pqc], parameter_values=[[0, 1, 1, 2, 3, 5]], shots=None, seed=15 - ).result() - self.assertDictAlmostEqual(result_42.quasi_dists, result_15.quasi_dists) - - def test_run_shots_result_size(self): - """test with shots option to validate the result size""" - n = 10 - shots = 100 - qc = QuantumCircuit(n) - qc.h(range(n)) - qc.measure_all() - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - result = sampler.run(qc, [], shots=shots, seed=42).result() - self.assertEqual(len(result.quasi_dists), 1) - self.assertLessEqual(len(result.quasi_dists[0]), shots) - self.assertAlmostEqual(sum(result.quasi_dists[0].values()), 1.0) - - def test_primitive_job_status_done(self): - """test primitive job's status""" - bell = self._circuit[1] - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - job = sampler.run(circuits=[bell]) - _ = job.result() - self.assertEqual(job.status(), JobStatus.DONE) - - def test_options(self): - """Test for options""" - with self.subTest("init"): - with self.assertWarns(DeprecationWarning): - sampler = Sampler(options={"shots": 3000}) - self.assertEqual(sampler.options.get("shots"), 3000) - with self.subTest("set_options"): - sampler.set_options(shots=1024, seed=15) - self.assertEqual(sampler.options.get("shots"), 1024) - self.assertEqual(sampler.options.get("seed"), 15) - with self.subTest("run"): - params, target = self._generate_params_target([1]) - with self.assertWarns(DeprecationWarning): - result = sampler.run([self._pqc], parameter_values=params).result() - self._compare_probs(result.quasi_dists, target) - self.assertEqual(result.quasi_dists[0].shots, 1024) - - def test_circuit_with_unitary(self): - """Test for circuit with unitary gate.""" - gate = UnitaryGate(np.eye(2)) - - circuit = QuantumCircuit(1) - circuit.append(gate, [0]) - circuit.measure_all() - - with self.assertWarns(DeprecationWarning): - sampler = Sampler() - sampler_result = sampler.run([circuit]).result() - self.assertDictAlmostEqual(sampler_result.quasi_dists[0], {0: 1, 1: 0}) - - -if __name__ == "__main__": - unittest.main()