diff --git a/meteor/diffmaps.py b/meteor/diffmaps.py index 310c63a..db22df5 100644 --- a/meteor/diffmaps.py +++ b/meteor/diffmaps.py @@ -1,243 +1,138 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Sequence +from typing import Sequence import numpy as np +import reciprocalspaceship as rs -from .settings import TV_MAP_SAMPLING -from .utils import ( - complex_array_to_rs_dataseries, - compute_map_from_coefficients, - rs_dataseries_to_complex_array, -) +from .rsmap import Map, _assert_is_map +from .settings import MAP_SAMPLING +from .utils import filter_common_indices from .validate import ScalarMaximizer, negentropy -if TYPE_CHECKING: - import reciprocalspaceship as rs - DEFAULT_KPARAMS_TO_SCAN = np.linspace(0.0, 1.0, 101) -def compute_difference_map( - dataset: rs.DataSet, - *, - native_amplitudes_column: str, - native_phases_column: str, - native_uncertainty_column: str | None = None, - derivative_amplitudes_column: str, - derivative_phases_column: str | None = None, - derivative_uncertainty_column: str | None = None, - output_amplitudes_column: str = "DF", - output_phases_column: str = "DPHI", - output_uncertainties_column: str = "SIGDF", -) -> rs.DataSet: +def set_common_crystallographic_metadata(map1: Map, map2: Map, *, output: Map) -> None: + if hasattr(map1, "cell"): + if hasattr(map2, "cell") and (map1.cell != map2.cell): + msg = f"`map1.cell` {map1.cell} != `map2.cell` {map2.cell}" + raise AttributeError(msg) + output.cell = map1.cell + + if hasattr(map1, "spacegroup"): + if hasattr(map2, "spacegroup") and (map1.spacegroup != map2.spacegroup): + msg = f"`map1.spacegroup` {map1.spacegroup} != " + msg += f"`map2.spacegroup` {map2.spacegroup}" + raise AttributeError(msg) + output.spacegroup = map1.spacegroup + + +def compute_difference_map(derivative: Map, native: Map) -> Map: """ Computes amplitude and phase differences between native and derivative structure factor sets. + It converts the amplitude and phase pairs from both the native and derivative structure factor + sets into complex numbers, computes the difference, and then converts the result back + into amplitudes and phases. + + If uncertainty columns are provided for both native and derivative data, it also propagates the + uncertainty of the difference in amplitudes. + Parameters ---------- - dataset : rs.DataSet - The dataset containing the native and derivative structure factor data. - native_amplitudes_column : str - Column name for the native amplitudes. - native_phases_column : str - Column name for the native phases. - native_uncertainty_column : str, optional - Column name for the native uncertainties (optional). If provided, it will be used to compute - uncertainty for the difference map. - derivative_amplitudes_column : str - Column name for the derivative amplitudes. - derivative_phases_column : str, optional - Column name for the derivative phases. If not provided, native phases - will be used in its place. - derivative_uncertainty_column : str, optional - Column name for the derivative uncertainties (optional). If provided, it will be used to - compute uncertainty for the difference map. - output_amplitudes_column : str, optional - Column name for the output difference amplitudes. Default is "DF". - output_phases_column : str, optional - Column name for the output difference phases. Default is "DPHI". - output_uncertainties_column : str, optional - Column name for the output uncertainties. Default is "SIGDF". This will only be used if - both native and derivative uncertainties are provided. + derivative: Map + the derivative amplitudes, phases, uncertainties + native: Map + the native amplitudes, phases, uncertainties Returns ------- - rs.DataSet - A copy of the input dataset with added columns for the difference amplitudes, - phases, and uncertainties, if specified at input. - - Notes - ----- - This function computes the complex difference between native and derivative structure factors. - It converts the amplitude and phase pairs from both the native and derivative structure factor - sets into complex numbers, computes the difference, and then converts the result back - into amplitudes and phases. - - If uncertainty columns are provided for both native and derivative data, - it also propagates the uncertainty of the difference in amplitudes. + diffmap: Map + map corresponding to the complex difference (derivative - native) """ - dataset = dataset.copy() + _assert_is_map(derivative, require_uncertainties=False) + _assert_is_map(native, require_uncertainties=False) - # Convert native and derivative amplitude/phase pairs to complex arrays - native_complex = rs_dataseries_to_complex_array( - dataset[native_amplitudes_column], dataset[native_phases_column] - ) + derivative, native = filter_common_indices(derivative, native) # type: ignore[assignment] - if derivative_phases_column is not None: - derivative_complex = rs_dataseries_to_complex_array( - dataset[derivative_amplitudes_column], dataset[derivative_phases_column] - ) - else: - # If no derivative phases are provided, assume they are the same as native phases - derivative_complex = rs_dataseries_to_complex_array( - dataset[derivative_amplitudes_column], dataset[native_phases_column] - ) + delta_complex = derivative.complex_amplitudes - native.complex_amplitudes + delta = Map.from_structurefactor(delta_complex, index=native.index) - # compute complex differences & convert back to amplitude and phase DataSeries - delta_complex = derivative_complex - native_complex - delta_amplitudes, delta_phases = complex_array_to_rs_dataseries(delta_complex, dataset.index) + set_common_crystallographic_metadata(derivative, native, output=delta) - dataset[output_amplitudes_column] = delta_amplitudes - dataset[output_phases_column] = delta_phases - - if (derivative_uncertainty_column is not None) and (native_uncertainty_column is not None): - sigdelta_amplitudes = np.sqrt( - dataset[derivative_uncertainty_column] ** 2 + dataset[native_uncertainty_column] ** 2 - ) - dataset[output_uncertainties_column] = sigdelta_amplitudes + if derivative.has_uncertainties and native.has_uncertainties: + prop_uncertainties = np.sqrt(derivative.uncertainties**2 + native.uncertainties**2) + delta.set_uncertainties(prop_uncertainties) - return dataset + return delta -def compute_kweights( - delta_amplitudes: rs.DataSeries, - delta_uncertainties: rs.DataSeries, - k_parameter: float, -) -> rs.DataSeries: +def compute_kweights(difference_map: Map, *, k_parameter: float) -> rs.DataSeries: """ Compute weights for each structure factor based on DeltaF and its uncertainty. Parameters ---------- - delta_amplitudes: rs.DataSeries - The series representing the structure factor differences (DeltaF). - - sigdelta_amplitudes: rs.DataSeries - Representing the uncertainties (sigma) of the structure factor differences. - + difference_map: Map + A map of structure factor differences (DeltaF). k_parameter: float A scaling factor applied to the squared `df` values in the weight calculation. Returns ------- - rs.DataSeries: + weights: rs.DataSeries A series of computed weights, where higher uncertainties and larger differences lead to lower weights. """ - w = ( + _assert_is_map(difference_map, require_uncertainties=True) + + inverse_weights = ( 1 - + (delta_uncertainties**2 / (delta_uncertainties**2).mean()) - + k_parameter * (delta_amplitudes**2 / (delta_amplitudes**2).mean()) + + (difference_map.uncertainties**2 / (difference_map.uncertainties**2).mean()) + + k_parameter * (difference_map.amplitudes**2 / (difference_map.amplitudes**2).mean()) ) - return 1.0 / w + return 1.0 / inverse_weights -def compute_kweighted_difference_map( - dataset: rs.DataSet, - *, - k_parameter: float, - native_amplitudes_column: str, - native_phases_column: str, - native_uncertainty_column: str, - derivative_amplitudes_column: str, - derivative_phases_column: str | None = None, - derivative_uncertainty_column: str, - output_amplitudes_column: str = "DF_KWeighted", - output_phases_column: str = "DPHI_KWeighted", - output_uncertainties_column: str = "SIGDF_KWeighted", -) -> rs.DataSet: +def compute_kweighted_difference_map(derivative: Map, native: Map, *, k_parameter: float) -> Map: """ - Compute k-weighted differences between native and derivative structure factor datasets. + Compute k-weighted derivative - native structure factor map. + + This function first computes the standard difference map using `compute_difference_map`. + Then, it applies k-weighting to the amplitude differences based on the provided `k_parameter`. + + Assumes amplitudes have already been scaled prior to invoking this function. Parameters ---------- - dataset : rs.DataSet - The dataset containing native and derivative structure factor data. - k_parameter : float - Weighting factor applied to the amplitude differences. - native_amplitudes_column : str - Column name for native amplitudes. - native_phases_column : str - Column name for native phases. - native_uncertainty_column : str - Column name for native uncertainties. - derivative_amplitudes_column : str - Column name for derivative amplitudes. - derivative_phases_column : str, optional - Column name for derivative phases. If not provided, native phases will be used. - derivative_uncertainty_column : str - Column name for derivative uncertainties. - output_amplitudes_column : str, optional - Column name for k-weighted amplitude differences. Default is "DF_KWeighted". - output_phases_column : str, optional - Column name for k-weighted phase differences. Default is "DPHI_KWeighted". - output_uncertainties_column : str, optional - Column name for uncertainties in the k-weighted differences. Default is "SIGDF_KWeighted". + derivative: Map + the derivative amplitudes, phases, uncertainties + native: Map + the native amplitudes, phases, uncertainties Returns ------- - rs.DataSet - A dataset containing the k-weighted amplitude and phase differences, with uncertainties. - - Notes - ----- - This function first computes the standard difference map using `compute_difference_map`. - Then, it applies k-weighting to the amplitude differences based on the provided `k_parameter`. - Assumes amplitudes have already been scaled prior to invoking this function. + diffmap: Map + the k-weighted difference map """ - # this label is only used internally in this function - diffmap_amplitudes = "__INTERNAL_DF_LABEL" - - diffmap_dataset = compute_difference_map( - dataset=dataset, - native_amplitudes_column=native_amplitudes_column, - native_phases_column=native_phases_column, - native_uncertainty_column=native_uncertainty_column, - derivative_amplitudes_column=derivative_amplitudes_column, - derivative_phases_column=derivative_phases_column, - derivative_uncertainty_column=derivative_uncertainty_column, - output_amplitudes_column=diffmap_amplitudes, - output_phases_column=output_phases_column, - output_uncertainties_column=output_uncertainties_column, - ) + # require uncertainties at the beginning + _assert_is_map(derivative, require_uncertainties=True) + _assert_is_map(native, require_uncertainties=True) - weights = compute_kweights( - diffmap_dataset[diffmap_amplitudes], - diffmap_dataset[output_uncertainties_column], - k_parameter, - ) + difference_map = compute_difference_map(derivative, native) + weights = compute_kweights(difference_map, k_parameter=k_parameter) - output_ds = dataset.copy() - output_ds[output_amplitudes_column] = diffmap_dataset[diffmap_amplitudes] * weights - output_ds[output_phases_column] = diffmap_dataset[output_phases_column] - output_ds[output_uncertainties_column] = diffmap_dataset[output_uncertainties_column] + difference_map.amplitudes *= weights + difference_map.uncertainties *= weights - return output_ds + return difference_map def max_negentropy_kweighted_difference_map( - dataset: rs.DataSet, + derivative: Map, + native: Map, *, - native_amplitudes_column: str, - native_phases_column: str, - native_uncertainty_column: str, - derivative_amplitudes_column: str, - derivative_phases_column: str | None = None, - derivative_uncertainty_column: str, - output_amplitudes_column: str = "DF_KWeighted", - output_phases_column: str = "DPHI_KWeighted", - output_uncertainties_column: str = "SIGDF_KWeighted", k_parameter_values_to_scan: np.ndarray | Sequence[float] = DEFAULT_KPARAMS_TO_SCAN, ) -> rs.DataSet: """ @@ -249,20 +144,10 @@ def max_negentropy_kweighted_difference_map( Parameters ---------- - dataset : rs.DataSet - The input dataset containing columns for native and derivative amplitudes/phases. - native_amplitudes_column : str - Column label for native amplitudes in the dataset. - derivative_amplitudes_column : str - Column label for derivative amplitudes in the dataset. - native_phases_column : str, optional - Column label for native phases. - derivative_phases_column : str, optional - Column label for derivative phases, by default None. - uncertainty_native_column : str - Column label for uncertainties of native amplitudes. - uncertainty_deriv_column : str - Column label for uncertainties of derivative amplitudes. + derivative: Map + the derivative amplitudes, phases, uncertainties + native: Map + the native amplitudes, phases, uncertainties k_parameter_values_to_scan : np.ndarray | Sequence[float] The values to scan to optimize the k-weighting parameter, by default is 0.00, 0.01 ... 1.00 @@ -274,51 +159,27 @@ def max_negentropy_kweighted_difference_map( opt_k_parameter: float optimized k-weighting parameter """ + _assert_is_map(derivative, require_uncertainties=True) + _assert_is_map(native, require_uncertainties=True) def negentropy_objective(k_parameter: float) -> float: kweighted_dataset = compute_kweighted_difference_map( - dataset, + derivative, + native, k_parameter=k_parameter, - native_amplitudes_column=native_amplitudes_column, - native_phases_column=native_phases_column, - native_uncertainty_column=native_uncertainty_column, - derivative_amplitudes_column=derivative_amplitudes_column, - derivative_phases_column=derivative_phases_column, - derivative_uncertainty_column=derivative_uncertainty_column, - output_amplitudes_column=output_amplitudes_column, - output_phases_column=output_phases_column, - output_uncertainties_column=output_uncertainties_column, ) - - k_weighted_map = compute_map_from_coefficients( - map_coefficients=kweighted_dataset, - amplitude_label=output_amplitudes_column, - phase_label=output_phases_column, - map_sampling=TV_MAP_SAMPLING, - ) - + k_weighted_map = kweighted_dataset.to_ccp4_map(map_sampling=MAP_SAMPLING) k_weighted_map_array = np.array(k_weighted_map.grid) - return negentropy(k_weighted_map_array) - # optimize k_parameter using negentropy objective maximizer = ScalarMaximizer(objective=negentropy_objective) maximizer.optimize_over_explicit_values(arguments_to_scan=k_parameter_values_to_scan) - opt_k_parameter = maximizer.argument_optimum kweighted_dataset = compute_kweighted_difference_map( - dataset, + derivative, + native, k_parameter=opt_k_parameter, - native_amplitudes_column=native_amplitudes_column, - native_phases_column=native_phases_column, - native_uncertainty_column=native_uncertainty_column, - derivative_amplitudes_column=derivative_amplitudes_column, - derivative_phases_column=derivative_phases_column, - derivative_uncertainty_column=derivative_uncertainty_column, - output_amplitudes_column=output_amplitudes_column, - output_phases_column=output_phases_column, - output_uncertainties_column=output_uncertainties_column, ) return kweighted_dataset, opt_k_parameter diff --git a/meteor/iterative.py b/meteor/iterative.py index 7bac182..c58be19 100644 --- a/meteor/iterative.py +++ b/meteor/iterative.py @@ -4,16 +4,16 @@ import numpy as np import pandas as pd -import reciprocalspaceship as rs +from .rsmap import Map from .tv import TvDenoiseResult, tv_denoise_difference_map from .utils import ( average_phase_diff_in_degrees, - canonicalize_amplitudes, complex_array_to_rs_dataseries, - rs_dataseries_to_complex_array, ) +DEFAULT_TV_WEIGHTS_TO_SCAN = [0.001, 0.01, 0.1, 1.0] + def _project_derivative_on_experimental_set( *, @@ -68,7 +68,7 @@ def _complex_derivative_from_iterative_tv( Parameters ---------- - complex_native: np.ndarray + native: np.ndarray The complex native structure factors, usually experimental amplitudes and calculated phases initial_complex_derivative : np.ndarray @@ -82,10 +82,10 @@ def _complex_derivative_from_iterative_tv( convergance_tolerance: float If the change in the estimated derivative SFs drops below this value (phase, per-component) - then return + then return. Default 1e-4. max_iterations: int - If this number of iterations is reached, stop early. + If this number of iterations is reached, stop early. Default 1000. Returns ------- @@ -125,7 +125,7 @@ def _complex_derivative_from_iterative_tv( "tv_weight": tv_metadata.optimal_lambda, "negentropy_after_tv": tv_metadata.optimal_negentropy, "average_phase_change": phase_change, - } + }, ) if num_iterations > max_iterations: @@ -135,16 +135,13 @@ def _complex_derivative_from_iterative_tv( def iterative_tv_phase_retrieval( - input_dataset: rs.DataSet, + initial_derivative: Map, + native: Map, *, - native_amplitude_column: str = "F", - derivative_amplitude_column: str = "Fh", - calculated_phase_column: str = "PHIC", - output_derivative_phase_column: str = "PHICh", - convergence_tolerance: float = 1e-3, - max_iterations: int = 100, - tv_weights_to_scan: list[float] | None = None, -) -> tuple[rs.DataSet, pd.DataFrame]: + convergence_tolerance: float = 1e-4, + max_iterations: int = 1000, + tv_weights_to_scan: list[float] = DEFAULT_TV_WEIGHTS_TO_SCAN, +) -> tuple[Map, pd.DataFrame]: """ Here is a brief pseudocode sketch of the alogrithm. Structure factors F below are complex unless explicitly annotated |*|. @@ -168,32 +165,18 @@ def iterative_tv_phase_retrieval( Parameters ---------- - input_dataset : rs.DataSet - The input dataset containing the native and derivative amplitude columns, as well as - the calculated phase column. - - native_amplitude_column : str, optional - Column name in `input_dataset` representing the amplitudes of the native (dark) structure - factors, by default "F". + initial_derivative: Map + the derivative amplitudes, and initial guess for the phases - derivative_amplitude_column : str, optional - Column name in `input_dataset` representing the amplitudes of the derivative (light) - structure factors, by default "Fh". - - calculated_phase_column : str, optional - Column name in `input_dataset` representing the phases of the native (dark) structure - factors, by default "PHIC". - - output_derivative_phase_column : str, optional - Column name where the estimated derivative phases will be stored in the output dataset, - by default "PHICh". + native: Map + the native amplitudes, phases convergance_tolerance: float If the change in the estimated derivative SFs drops below this value (phase, per-component) - then return + then return. Default 1e-4. max_iterations: int - If this number of iterations is reached, stop early. + If this number of iterations is reached, stop early. Default 1000. tv_weights_to_scan : list[float], optional A list of TV regularization weights (λ values) to be scanned for optimal results, @@ -201,7 +184,7 @@ def iterative_tv_phase_retrieval( Returns ------- - output_dataset: rs.DataSet + output_map: Map The estimated derivative phases, along with the input amplitudes and input computed phases. metadata: pd.DataFrame @@ -209,66 +192,37 @@ def iterative_tv_phase_retrieval( the tv_weight used, the negentropy (after the TV step), and the average phase change in degrees. """ + # clean TV denoising interface that is crystallographically intelligent # maintains state for the HKL index, spacegroup, and cell information - if tv_weights_to_scan is None: - tv_weights_to_scan = [0.001, 0.01, 0.1, 1.0] - def tv_denoise_closure(difference: np.ndarray) -> tuple[np.ndarray, TvDenoiseResult]: - delta_amp, delta_phase = complex_array_to_rs_dataseries( - difference, index=input_dataset.index - ) - - # these two names are only used inside this closure - delta_amp.name = "DF_for_tv_closure" - delta_phase.name = "DPHI_for_tv_closure" - - diffmap = rs.concat([delta_amp, delta_phase], axis=1) - diffmap.cell = input_dataset.cell - diffmap.spacegroup = input_dataset.spacegroup + diffmap = Map.from_structurefactor(difference, index=native.index) + diffmap.cell = native.cell + diffmap.spacegroup = native.spacegroup - denoised_map_coefficients, tv_metadata = tv_denoise_difference_map( + denoised_map, tv_metadata = tv_denoise_difference_map( diffmap, - difference_map_amplitude_column=delta_amp.name, - difference_map_phase_column=delta_phase.name, lambda_values_to_scan=tv_weights_to_scan, full_output=True, ) - denoised_difference = rs_dataseries_to_complex_array( - denoised_map_coefficients[delta_amp.name], denoised_map_coefficients[delta_phase.name] - ) - - return denoised_difference, tv_metadata - - # convert the native and derivative datasets to complex arrays - native = rs_dataseries_to_complex_array( - input_dataset[native_amplitude_column], input_dataset[calculated_phase_column] - ) - initial_derivative = rs_dataseries_to_complex_array( - input_dataset[derivative_amplitude_column], input_dataset[calculated_phase_column] - ) + return denoised_map.complex_amplitudes, tv_metadata # estimate the derivative phases using the iterative TV algorithm it_tv_complex_derivative, metadata = _complex_derivative_from_iterative_tv( - native=native, - initial_derivative=initial_derivative, + native=native.complex_amplitudes, + initial_derivative=initial_derivative.complex_amplitudes, tv_denoise_function=tv_denoise_closure, convergence_tolerance=convergence_tolerance, max_iterations=max_iterations, ) _, derivative_phases = complex_array_to_rs_dataseries( - it_tv_complex_derivative, input_dataset.index + it_tv_complex_derivative, + index=initial_derivative.index, ) # combine the determined derivative phases with the input to generate a complete output - output_dataset = input_dataset.copy() - output_dataset[output_derivative_phase_column] = derivative_phases.astype(rs.PhaseDtype()) - canonicalize_amplitudes( - output_dataset, - amplitude_label=derivative_amplitude_column, - phase_label=output_derivative_phase_column, - inplace=True, - ) + output_dataset = initial_derivative.copy() + output_dataset.phases = derivative_phases return output_dataset, metadata diff --git a/meteor/rsmap.py b/meteor/rsmap.py new file mode 100644 index 0000000..3d3d358 --- /dev/null +++ b/meteor/rsmap.py @@ -0,0 +1,371 @@ +from __future__ import annotations + +from pathlib import Path +from typing import Any, ClassVar + +import gemmi +import numpy as np +import pandas as pd +import reciprocalspaceship as rs + +from .settings import GEMMI_HIGH_RESOLUTION_BUFFER +from .utils import ( + canonicalize_amplitudes, + complex_array_to_rs_dataseries, +) + + +class MissingUncertaintiesError(AttributeError): ... + + +class MapMutabilityError(RuntimeError): ... + + +def _assert_is_map(obj: Any, *, require_uncertainties: bool) -> None: + if not isinstance(obj, Map): + msg = f"expected {obj} to be a rsmap.Map, got {type(obj)}" + raise TypeError(msg) + if require_uncertainties and (not obj.has_uncertainties): + msg = f"{obj} Map missing required uncertainty column" + raise MissingUncertaintiesError(msg) + + +class Map(rs.DataSet): + """ + A high-level interface for a crystallographic map of any kind. + + Specifically, this class is based on a reciprocalspaceship `DataSet` (ie. a + crystographically-aware pandas `DataFrame`), but is restricted to three and only three + special columns corresponding to: + + - (real) `amplitudes` + - `phases` + - `uncertainties`, ie the standard deviation for a Gaussian around the amplitude mean + + In addition, the class maintains an `index` of Miller indices, as well as the crystallographic + metadata supported by `rs.DataSet`, most notably a `cell` and `spacegroup`. + + These structured data enable this class to perform some routine map-based caluclations, such as + + - computing a real-space map, or computing map coefficients from a map + - converting between complex Cartesian and polar (amplitude/phase) structure factors + - reading and writing mtz and ccp4 map files + + all in a way that automatically facilitates the bookkeeping tasks normally associated with + these relatively simple operations. + """ + + # these columns are always allowed + _allowed_columns: ClassVar[list[str]] = ["H", "K", "L"] + + # in addition, __init__ specifies 3 columns special that can be named dynamically to support: + # amplitudes, phases, uncertainties; all other columns are forbidden + + def __init__( + self, + data: dict | pd.DataFrame | rs.DataSet, + *, + amplitude_column: str = "F", + phase_column: str = "PHI", + uncertainty_column: str | None = "SIGF", + **kwargs, + ) -> None: + super().__init__(data=data, **kwargs) + + for column in [amplitude_column, phase_column]: + if column not in self.columns: + msg = "amplitude and phase columns must be in input `data`... " + msg += f"looking for {amplitude_column} and {phase_column}, got {self.columns}" + raise KeyError(msg) + + columns_to_keep = [amplitude_column, phase_column, *self._allowed_columns] + + self._amplitude_column = amplitude_column + self._phase_column = phase_column + self._uncertainty_column = uncertainty_column + if uncertainty_column and (uncertainty_column in self.columns): + columns_to_keep.append(uncertainty_column) + + # this feels dangerous, but I cannot find a better way | author: @tjlane + excess_columns = set(self.columns) - set(columns_to_keep) + for column in excess_columns: + del self[column] + + # ensure types correct + self.amplitudes = self._verify_amplitude_type(self.amplitudes, fix=True) + self.phases = self._verify_phase_type(self.phases, fix=True) + if self.has_uncertainties: + self.uncertainties = self._verify_uncertainty_type(self.uncertainties, fix=True) + + @property + def _constructor(self): + return Map + + @property + def _constructor_sliced(self): + return rs.DataSeries + + def _verify_type( + self, + name: str, + allowed_types: list[type], + dataseries: rs.DataSeries, + *, + fix: bool, + cast_fix_to: type, + ) -> rs.DataSeries: + if dataseries.dtype not in allowed_types: + if fix: + return dataseries.astype(cast_fix_to) + msg = f"dtype for passed {name} not allowed, got: {dataseries.dtype} allow {allowed_types}" + raise AssertionError(msg) + return dataseries + + def _verify_amplitude_type( + self, + dataseries: rs.DataSeries, + *, + fix: bool = True, + ) -> rs.DataSeries: + name = "amplitude" + amplitude_dtypes = [ + rs.StructureFactorAmplitudeDtype(), + rs.FriedelStructureFactorAmplitudeDtype(), + rs.NormalizedStructureFactorAmplitudeDtype(), + rs.AnomalousDifferenceDtype(), + ] + return self._verify_type( + name, + amplitude_dtypes, + dataseries, + fix=fix, + cast_fix_to=rs.StructureFactorAmplitudeDtype(), + ) + + def _verify_phase_type(self, dataseries: rs.DataSeries, *, fix: bool = True) -> rs.DataSeries: + name = "phase" + phase_dtypes = [rs.PhaseDtype()] + return self._verify_type( + name, phase_dtypes, dataseries, fix=fix, cast_fix_to=rs.PhaseDtype() + ) + + def _verify_uncertainty_type( + self, + dataseries: rs.DataSeries, + *, + fix: bool = True, + ) -> rs.DataSeries: + name = "uncertainties" + uncertainty_dtypes = [ + rs.StandardDeviationDtype(), + rs.StandardDeviationFriedelIDtype(), + rs.StandardDeviationFriedelSFDtype(), + ] + return self._verify_type( + name, uncertainty_dtypes, dataseries, fix=fix, cast_fix_to=rs.StandardDeviationDtype() + ) + + def __setitem__(self, key: str, value) -> None: + allowed = list(self.columns) + self._allowed_columns + if key not in allowed: + msg = "column assignment not allowed for Map objects" + raise MapMutabilityError(msg) + super().__setitem__(key, value) + + def insert(self, loc: int, column: str, value: Any, *, allow_duplicates: bool = False) -> None: + if column in self._allowed_columns: + super().insert(loc, column, value, allow_duplicates=allow_duplicates) + else: + msg = "general column assignment not allowed for Map objects" + msg += f"special columns allowed: {self._allowed_columns}; " + msg += "see also Map.set_uncertainties(...)" + raise MapMutabilityError(msg) + + def drop(self, labels=None, *, axis=0, columns=None, inplace=False, **kwargs): + if (axis == 1) or (columns is not None): + msg = "columns are fixed for Map objects" + raise MapMutabilityError(msg) + return super().drop(labels=labels, axis=axis, columns=columns, inplace=inplace, **kwargs) + + def get_hkls(self) -> np.ndarray: + # overwrite rs implt'n, return w/o modifying self -> same behavior, under testing - @tjlane + return self.index.to_frame().to_numpy(dtype=np.int32) + + def compute_dHKL(self) -> rs.DataSeries: # noqa: N802, caps from reciprocalspaceship + # rs adds a "dHKL" column to the DataFrame + # that could be enabled by adding "dHKL" to _allowed_columns - @tjlane + d_hkl = self.cell.calculate_d_array(self.get_hkls()) + return rs.DataSeries(d_hkl, dtype="R", index=self.index) + + @property + def resolution_limits(self) -> tuple[float, float]: + d_hkl = self.compute_dHKL() + return np.max(d_hkl), np.min(d_hkl) + + @property + def amplitudes(self) -> rs.DataSeries: + return self[self._amplitude_column] + + @amplitudes.setter + def amplitudes(self, values: rs.DataSeries) -> None: + values = self._verify_amplitude_type(values) + self[self._amplitude_column] = values + + @property + def phases(self) -> rs.DataSeries: + return self[self._phase_column] + + @phases.setter + def phases(self, values: rs.DataSeries) -> None: + values = self._verify_phase_type(values) + self[self._phase_column] = values + + @property + def has_uncertainties(self) -> bool: + return self._uncertainty_column in self.columns + + @property + def uncertainties(self) -> rs.DataSeries: + if self.has_uncertainties: + return self[self._uncertainty_column] + msg = "uncertainties not set for Map object" + raise AttributeError(msg) + + @uncertainties.setter + def uncertainties(self, values: rs.DataSeries) -> None: + if self.has_uncertainties: + values = self._verify_uncertainty_type(values) + self[self._uncertainty_column] = values # type: ignore[index] + else: + msg = "uncertainties unset, and Pandas forbids assignment via attributes; " + msg += "to initialize, use Map.set_uncertainties(...)" + raise AttributeError(msg) + + def set_uncertainties(self, values: rs.DataSeries, column_name: str = "SIGF") -> None: + values = self._verify_uncertainty_type(values) + + if self.has_uncertainties: + self.uncertainties = values + else: + # otherwise, create a new column + self._uncertainty_column = column_name + position = len(self.columns) + if position != 2: # noqa: PLR2004, should be 2: just amplitudes & phases + msg = "Misconfigured columns" + raise RuntimeError(msg) + super().insert(position, self._uncertainty_column, values, allow_duplicates=False) + + @property + def complex_amplitudes(self) -> np.ndarray: + return self.amplitudes.to_numpy() * np.exp(1j * np.deg2rad(self.phases.to_numpy())) + + def canonicalize_amplitudes(self): + canonicalize_amplitudes( + self, + amplitude_label=self._amplitude_column, + phase_label=self._phase_column, + inplace=True, + ) + + def to_structurefactor(self) -> rs.DataSeries: + return super().to_structurefactor(self._amplitude_column, self._phase_column) + + @classmethod + def from_structurefactor( + cls, + complex_structurefactor: np.ndarray | rs.DataSeries, + *, + index: pd.Index, + cell: Any = None, + spacegroup: Any = None, + ) -> Map: + # 1. `rs.DataSet.from_structurefactor` exists, but it operates on a column that's already + # part of the dataset; having such a (redundant) column is forbidden by `Map` + # 2. recprocalspaceship has a `from_structurefactor` method, but it is occasionally + # mangling indices for me when the input is a numpy array, as of 16 OCT 24 - @tjlane + amplitudes, phases = complex_array_to_rs_dataseries(complex_structurefactor, index=index) + dataset = rs.DataSet( + rs.concat([amplitudes.rename("F"), phases.rename("PHI")], axis=1), + index=index, + cell=cell, + spacegroup=spacegroup, + ) + return cls(dataset) + + @classmethod + def from_gemmi( + cls, + gemmi_mtz: gemmi.Mtz, + *, + amplitude_column: str = "F", + phase_column: str = "PHI", + uncertainty_column: str | None = "SIGF", + ) -> Map: + return cls( + rs.DataSet(gemmi_mtz), + amplitude_column=amplitude_column, + phase_column=phase_column, + uncertainty_column=uncertainty_column, + ) + + def to_ccp4_map(self, *, map_sampling: int) -> gemmi.Ccp4Map: + map_coefficients_gemmi_format = self.to_gemmi() + ccp4_map = gemmi.Ccp4Map() + ccp4_map.grid = map_coefficients_gemmi_format.transform_f_phi_to_map( + self._amplitude_column, + self._phase_column, + sample_rate=map_sampling, + ) + ccp4_map.update_ccp4_header() + return ccp4_map + + @classmethod + def from_ccp4_map( + cls, + ccp4_map: gemmi.Ccp4Map, + *, + high_resolution_limit: float, + amplitude_column: str = "F", + phase_column: str = "PHI", + ) -> Map: + # to ensure we include the final shell of reflections, add a small buffer to the resolution + gemmi_structure_factors = gemmi.transform_map_to_f_phi(ccp4_map.grid, half_l=False) + data = gemmi_structure_factors.prepare_asu_data( + dmin=high_resolution_limit - GEMMI_HIGH_RESOLUTION_BUFFER, + with_sys_abs=True, + ) + + mtz = gemmi.Mtz(with_base=True) + mtz.spacegroup = gemmi_structure_factors.spacegroup + mtz.set_cell_for_all(gemmi_structure_factors.unit_cell) + mtz.add_dataset("FromMap") + + mtz.add_column(amplitude_column, "F") + mtz.add_column(phase_column, "P") + + mtz.set_data(data) + mtz.switch_to_asu_hkl() + dataset = super().from_gemmi(mtz) + + return cls(dataset, amplitude_column=amplitude_column, phase_column=phase_column) + + def write_mtz(self, file_path: str | Path) -> None: + path_cast_to_str = str(file_path) + super().write_mtz(path_cast_to_str) + + @classmethod + def read_mtz_file( + cls, + file_path: str | Path, + *, + amplitude_column: str = "F", + phase_column: str = "PHI", + uncertainty_column: str | None = "SIGF", + ) -> Map: + gemmi_mtz = gemmi.read_mtz_file(str(file_path)) + return cls.from_gemmi( + gemmi_mtz, + amplitude_column=amplitude_column, + phase_column=phase_column, + uncertainty_column=uncertainty_column, + ) diff --git a/meteor/scale.py b/meteor/scale.py index 208b98f..96f090d 100644 --- a/meteor/scale.py +++ b/meteor/scale.py @@ -1,14 +1,12 @@ from __future__ import annotations -from typing import TYPE_CHECKING, Tuple +from typing import Tuple import numpy as np +import pandas as pd import reciprocalspaceship as rs import scipy.optimize as opt -if TYPE_CHECKING: - import pandas as pd - DTYPES_TO_SCALE = ( rs.AnomalousDifferenceDtype, rs.FriedelIntensityDtype, @@ -28,7 +26,8 @@ def _compute_anisotropic_scale_factors( - miller_indices: pd.Index, anisotropic_scale_parameters: ScaleParameters + miller_indices: pd.Index, + anisotropic_scale_parameters: ScaleParameters, ) -> np.ndarray: miller_indices_as_array = np.array(list(miller_indices)) squared_miller_indices = np.square(miller_indices_as_array) @@ -114,7 +113,7 @@ def compute_scale_factors( uncertainty_weights = np.sqrt( np.square(reference_uncertainties.loc[common_miller_indices]) - + np.square(to_scale_uncertainties.loc[common_miller_indices]) + + np.square(to_scale_uncertainties.loc[common_miller_indices]), ) else: @@ -125,7 +124,8 @@ def compute_scale_factors( def compute_residuals(scaling_parameters: ScaleParameters) -> np.ndarray: scale_factors = _compute_anisotropic_scale_factors( - common_miller_indices, scaling_parameters + common_miller_indices, + scaling_parameters, ) return uncertainty_weights * ( scale_factors * common_values_to_scale - common_reference_values @@ -137,7 +137,8 @@ def compute_residuals(scaling_parameters: ScaleParameters) -> np.ndarray: # now be sure to compute the scale factors for all miller indices in `values_to_scale` optimized_scale_factors = _compute_anisotropic_scale_factors( - values_to_scale.index, optimized_parameters + values_to_scale.index, + optimized_parameters, ) if len(optimized_scale_factors) != len(values_to_scale): diff --git a/meteor/settings.py b/meteor/settings.py index 12ef5d0..c8b908b 100644 --- a/meteor/settings.py +++ b/meteor/settings.py @@ -3,4 +3,5 @@ TV_LAMBDA_RANGE: tuple[float, float] = (1e-3, 1.0) TV_STOP_TOLERANCE: float = 0.00000005 TV_MAX_NUM_ITER: int = 50 -TV_MAP_SAMPLING: int = 3 +MAP_SAMPLING: int = 3 +GEMMI_HIGH_RESOLUTION_BUFFER = 1e-6 diff --git a/meteor/testing.py b/meteor/testing.py index b6ae99d..f91b899 100644 --- a/meteor/testing.py +++ b/meteor/testing.py @@ -1,6 +1,17 @@ +from __future__ import annotations + +from dataclasses import dataclass + import numpy as np +@dataclass +class MapColumns: + amplitude: str + phase: str + uncertainty: str | None = None + + def assert_phases_allclose(array1: np.ndarray, array2: np.ndarray, atol=1e-3): diff = array2 - array1 diff = (diff + 180) % 360 - 180 diff --git a/meteor/tv.py b/meteor/tv.py index 5730732..a3196c4 100644 --- a/meteor/tv.py +++ b/meteor/tv.py @@ -1,28 +1,23 @@ from __future__ import annotations from dataclasses import dataclass, field -from typing import TYPE_CHECKING, Literal, Sequence, overload +from typing import Literal, Sequence, overload import numpy as np from skimage.restoration import denoise_tv_chambolle +from .rsmap import Map from .settings import ( + MAP_SAMPLING, TV_LAMBDA_RANGE, - TV_MAP_SAMPLING, TV_MAX_NUM_ITER, TV_STOP_TOLERANCE, ) from .utils import ( - compute_coefficients_from_map, - compute_map_from_coefficients, numpy_array_to_map, - resolution_limits, ) from .validate import ScalarMaximizer, negentropy -if TYPE_CHECKING: - import reciprocalspaceship as rs - @dataclass class TvDenoiseResult: @@ -44,34 +39,28 @@ def _tv_denoise_array(*, map_as_array: np.ndarray, weight: float) -> np.ndarray: @overload def tv_denoise_difference_map( - difference_map_coefficients: rs.DataSet, + difference_map: Map, *, full_output: Literal[False], - difference_map_amplitude_column: str = "DF", - difference_map_phase_column: str = "PHIC", lambda_values_to_scan: Sequence[float] | np.ndarray | None = None, -) -> rs.DataSet: ... +) -> Map: ... @overload def tv_denoise_difference_map( - difference_map_coefficients: rs.DataSet, + difference_map: Map, *, full_output: Literal[True], - difference_map_amplitude_column: str = "DF", - difference_map_phase_column: str = "PHIC", lambda_values_to_scan: Sequence[float] | np.ndarray | None = None, -) -> tuple[rs.DataSet, TvDenoiseResult]: ... +) -> tuple[Map, TvDenoiseResult]: ... def tv_denoise_difference_map( - difference_map_coefficients: rs.DataSet, + difference_map: Map, *, full_output: bool = False, - difference_map_amplitude_column: str = "DF", - difference_map_phase_column: str = "PHIC", lambda_values_to_scan: Sequence[float] | np.ndarray | None = None, -) -> rs.DataSet | tuple[rs.DataSet, TvDenoiseResult]: +) -> Map | tuple[Map, TvDenoiseResult]: """Single-pass TV denoising of a difference map. Automatically selects the optimal level of regularization (the TV lambda parameter) by @@ -85,19 +74,13 @@ def tv_denoise_difference_map( Parameters ---------- - difference_map_coefficients : rs.DataSet + difference_map : Map The input dataset containing the difference map coefficients (amplitude and phase) that will be used to compute the difference map. full_output : bool, optional If `True`, the function returns both the denoised map coefficients and a `TvDenoiseResult` object containing the optimal lambda and the associated negentropy. If `False`, only the denoised map coefficients are returned. Default is `False`. - difference_map_amplitude_column : str, optional - The column name in `difference_map_coefficients` that contains the amplitude values for - the difference map. Default is "DF". - difference_map_phase_column : str, optional - The column name in `difference_map_coefficients` that contains the phase values for the - difference map. Default is "PHIC". lambda_values_to_scan : Sequence[float] | None, optional A sequence of lambda values to explicitly scan for determining the optimal value. If `None`, the function uses the golden-section search method to determine the optimal @@ -105,10 +88,10 @@ def tv_denoise_difference_map( Returns ------- - rs.DataSet | tuple[rs.DataSet, TvDenoiseResult] - If `full_output` is `False`, returns a `rs.DataSet`, the denoised map coefficients. + Map | tuple[Map, TvDenoiseResult] + If `full_output` is `False`, returns a `Map`, the denoised map coefficients. If `full_output` is `True`, returns a tuple containing: - - `rs.DataSet`: The denoised map coefficients. + - `Map`: The denoised map coefficients. - `TvDenoiseResult`: An object w/ the optimal lambda and the corresponding negentropy. Raises @@ -126,21 +109,16 @@ def tv_denoise_difference_map( Example ------- - >>> coefficients = rs.read_mtz("./path/to/difference_map.mtz") # load dataset + >>> coefficients = Map.read_mtz("./path/to/difference_map.mtz", ...) # load dataset >>> denoised_map, result = tv_denoise_difference_map(coefficients, full_output=True) >>> print(f"Optimal Lambda: {result.optimal_lambda}, Negentropy: {result.optimal_negentropy}") """ - difference_map = compute_map_from_coefficients( - map_coefficients=difference_map_coefficients, - amplitude_label=difference_map_amplitude_column, - phase_label=difference_map_phase_column, - map_sampling=TV_MAP_SAMPLING, - ) - difference_map_as_array = np.array(difference_map.grid) + realspace_map = difference_map.to_ccp4_map(map_sampling=MAP_SAMPLING) + realspace_map_array = np.array(realspace_map.grid) def negentropy_objective(tv_lambda: float): - denoised_map = _tv_denoise_array(map_as_array=difference_map_as_array, weight=tv_lambda) - return negentropy(denoised_map.flatten()) + denoised_map = _tv_denoise_array(map_as_array=realspace_map_array, weight=tv_lambda) + return negentropy(denoised_map) maximizer = ScalarMaximizer(objective=negentropy_objective) if lambda_values_to_scan is not None: @@ -149,27 +127,26 @@ def negentropy_objective(tv_lambda: float): maximizer.optimize_with_golden_algorithm(bracket=TV_LAMBDA_RANGE) # denoise using the optimized parameters and convert to an rs.DataSet - final_map = _tv_denoise_array( - map_as_array=difference_map_as_array, weight=maximizer.argument_optimum + final_realspace_map_as_array = _tv_denoise_array( + map_as_array=realspace_map_array, + weight=maximizer.argument_optimum, ) - final_map_as_ccp4 = numpy_array_to_map( - final_map, - spacegroup=difference_map_coefficients.spacegroup, - cell=difference_map_coefficients.cell, + final_realspace_map_as_ccp4 = numpy_array_to_map( + final_realspace_map_as_array, + spacegroup=difference_map.spacegroup, + cell=difference_map.cell, ) - _, dmin = resolution_limits(difference_map_coefficients) - final_map_coefficients = compute_coefficients_from_map( - ccp4_map=final_map_as_ccp4, - high_resolution_limit=dmin, - amplitude_label=difference_map_amplitude_column, - phase_label=difference_map_phase_column, + + final_map = Map.from_ccp4_map( + ccp4_map=final_realspace_map_as_ccp4, + high_resolution_limit=difference_map.resolution_limits[1], ) - # sometimes `compute_coefficients_from_map` adds reflections -- systematic absences or + # sometimes `from_ccp4_map` adds reflections -- systematic absences or # reflections just beyond the resolution limt; remove those - extra_indices = final_map_coefficients.index.difference(difference_map_coefficients.index) - final_map_coefficients = final_map_coefficients.drop(extra_indices) - sym_diff = difference_map_coefficients.index.symmetric_difference(final_map_coefficients.index) + extra_indices = final_map.index.difference(difference_map.index) + final_map.drop(extra_indices, axis=0, inplace=True) + sym_diff = difference_map.index.symmetric_difference(final_map.index) if len(sym_diff) > 0: msg = "something went wrong, input and output coefficients do not have identical indices" raise IndexError(msg) @@ -178,9 +155,9 @@ def negentropy_objective(tv_lambda: float): tv_result = TvDenoiseResult( optimal_lambda=maximizer.argument_optimum, optimal_negentropy=maximizer.objective_maximum, - map_sampling_used_for_tv=TV_MAP_SAMPLING, + map_sampling_used_for_tv=MAP_SAMPLING, lambdas_scanned=maximizer.values_evaluated, ) - return final_map_coefficients, tv_result + return final_map, tv_result - return final_map_coefficients + return final_map diff --git a/meteor/utils.py b/meteor/utils.py index 21f598e..2afdbf7 100644 --- a/meteor/utils.py +++ b/meteor/utils.py @@ -1,32 +1,25 @@ from __future__ import annotations -from dataclasses import dataclass -from typing import TYPE_CHECKING, Literal, overload +from typing import Literal, overload import gemmi import numpy as np import reciprocalspaceship as rs -from pandas.testing import assert_index_equal - -if TYPE_CHECKING: - import pandas as pd - -GEMMI_HIGH_RESOLUTION_BUFFER = 1e-6 +from pandas import DataFrame, Index +from reciprocalspaceship.utils import canonicalize_phases class ShapeMismatchError(Exception): ... -@dataclass -class MapColumns: - amplitude: str - phase: str - uncertainty: str | None = None - - -def resolution_limits(dataset: rs.DataSet) -> tuple[float, float]: - d_hkl = dataset.compute_dHKL()["dHKL"] - return d_hkl.max(), d_hkl.min() +def filter_common_indices(df1: DataFrame, df2: DataFrame) -> tuple[DataFrame, DataFrame]: + common_indices = df1.index.intersection(df2.index) + df1_common = df1.loc[common_indices].copy() + df2_common = df2.loc[common_indices].copy() + if len(df1_common) == 0 or len(df2_common) == 0: + msg = "cannot find any HKL incdices in common between `df1` and `df2`" + raise IndexError(msg) + return df1_common, df2_common def cut_resolution( @@ -35,7 +28,7 @@ def cut_resolution( dmax_limit: float | None = None, dmin_limit: float | None = None, ) -> rs.DataSet: - d_hkl = dataset.compute_dHKL()["dHKL"] + d_hkl = dataset.compute_dHKL() if dmax_limit: dataset = dataset.loc[(d_hkl <= dmax_limit)] if dmin_limit: @@ -77,7 +70,7 @@ def canonicalize_amplitudes( dataset[amplitude_label] = np.abs(dataset[amplitude_label]) dataset.loc[negative_amplitude_indices, phase_label] += 180.0 - dataset.canonicalize_phases(inplace=True) + dataset[phase_label] = canonicalize_phases(dataset[phase_label]) if not inplace: return dataset @@ -95,42 +88,10 @@ def average_phase_diff_in_degrees(array1: np.ndarray, array2: np.ndarray) -> flo return np.sum(np.abs(diff)) / float(np.prod(array1.shape)) -def rs_dataseries_to_complex_array(amplitudes: rs.DataSeries, phases: rs.DataSeries) -> np.ndarray: - """ - Convert structure factors from polar (amplitude/phase) to Cartisian (x + iy). - - Parameters - ---------- - amplitudes: DataSeries - with StructureFactorAmplitudeDtype - phases: DataSeries - with PhaseDtype - - Returns - ------- - complex_structure_factors: np.ndarray - with dtype complex128 - - Raises - ------ - ValueError - if the indices of `amplitudes` and `phases` do not match - """ - try: - assert_index_equal(amplitudes.index, phases.index) - except AssertionError as exptn: - msg = ( - "Indices for `amplitudes` and `phases` don't match. To safely cast to a single complex", - " array, pass DataSeries with a common set of indices. One possible way: ", - "Series.align(other, join='inner', axis=0).", - ) - raise ShapeMismatchError(msg) from exptn - return amplitudes.to_numpy() * np.exp(1j * np.deg2rad(phases.to_numpy())) - - def complex_array_to_rs_dataseries( complex_structure_factors: np.ndarray, - index: pd.Index, + *, + index: Index, ) -> tuple[rs.DataSeries, rs.DataSeries]: """ Convert an array of complex structure factors into two reciprocalspaceship DataSeries, one @@ -154,6 +115,12 @@ def complex_array_to_rs_dataseries( ------ ValueError if `complex_structure_factors and `index` do not have the same shape + + See Also + -------- + `reciprocalspaceship/utils/structurefactors.from_structurefactor(...)` + An equivalent function, that does not require the index and does less index/data + checking. """ if complex_structure_factors.shape != index.shape: msg = ( @@ -162,11 +129,19 @@ def complex_array_to_rs_dataseries( ) raise ShapeMismatchError(msg) - amplitudes = rs.DataSeries(np.abs(complex_structure_factors), index=index) - amplitudes = amplitudes.astype(rs.StructureFactorAmplitudeDtype()) + amplitudes = rs.DataSeries( + np.abs(complex_structure_factors), + index=index, + dtype=rs.StructureFactorAmplitudeDtype(), + name="F", + ) - phases = rs.DataSeries(np.rad2deg(np.angle(complex_structure_factors)), index=index) - phases = phases.astype(rs.PhaseDtype()) + phases = rs.DataSeries( + np.angle(complex_structure_factors, deg=True), + index=index, + dtype=rs.PhaseDtype(), + name="PHI", + ) return amplitudes, phases @@ -190,47 +165,3 @@ def numpy_array_to_map( ccp4_map.grid.spacegroup = spacegroup return ccp4_map - - -def compute_map_from_coefficients( - *, - map_coefficients: rs.DataSet, - amplitude_label: str, - phase_label: str, - map_sampling: int, -) -> gemmi.Ccp4Map: - map_coefficients_gemmi_format = map_coefficients.to_gemmi() - ccp4_map = gemmi.Ccp4Map() - ccp4_map.grid = map_coefficients_gemmi_format.transform_f_phi_to_map( - amplitude_label, phase_label, sample_rate=map_sampling - ) - ccp4_map.update_ccp4_header() - - return ccp4_map - - -def compute_coefficients_from_map( - *, - ccp4_map: gemmi.Ccp4Map, - high_resolution_limit: float, - amplitude_label: str, - phase_label: str, -) -> rs.DataSet: - # to ensure we include the final shell of reflections, add a small buffer to the resolution - - gemmi_structure_factors = gemmi.transform_map_to_f_phi(ccp4_map.grid, half_l=False) - data = gemmi_structure_factors.prepare_asu_data( - dmin=high_resolution_limit - GEMMI_HIGH_RESOLUTION_BUFFER, with_sys_abs=True - ) - - mtz = gemmi.Mtz(with_base=True) - mtz.spacegroup = gemmi_structure_factors.spacegroup - mtz.set_cell_for_all(gemmi_structure_factors.unit_cell) - mtz.add_dataset("FromMap") - mtz.add_column(amplitude_label, "F") - mtz.add_column(phase_label, "P") - - mtz.set_data(data) - mtz.switch_to_asu_hkl() - - return rs.DataSet.from_gemmi(mtz) diff --git a/meteor/validate.py b/meteor/validate.py index 84147ee..3b42cc0 100644 --- a/meteor/validate.py +++ b/meteor/validate.py @@ -117,7 +117,10 @@ def optimize_over_explicit_values(self, *, arguments_to_scan: Sequence[float] | self.values_evaluated.add(argument_test_value) def optimize_with_golden_algorithm( - self, *, bracket: tuple[float, float], tolerance: float = 0.001 + self, + *, + bracket: tuple[float, float], + tolerance: float = 0.001, ): """ Uses the golden-section search algorithm to maximize the objective function within a given @@ -141,7 +144,10 @@ def _objective_with_value_tracking(argument_test_value): return -self.objective(argument_test_value) # negative: we want max optimizer_result = minimize_scalar( - _objective_with_value_tracking, bracket=bracket, method="golden", tol=tolerance + _objective_with_value_tracking, + bracket=bracket, + method="golden", + tol=tolerance, ) if not optimizer_result.success: msg = "Golden minimization failed" diff --git a/pyproject.toml b/pyproject.toml index cb8807c..cb5ebdb 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -38,37 +38,35 @@ lint.select = [ ] lint.ignore = [ #### modules - "ANN", # flake8-annotations - "COM", # flake8-commas - "C90", # mccabe complexity - "DJ", # django - "EXE", # flake8-executable - "T10", # debugger - "TID", # flake8-tidy-imports + "ANN", # flake8-annotations: ignore annotation-related errors + "C90", # mccabe complexity: ignore code complexity checks + "DJ", # django: ignore Django-specific linting rules + "EXE", # flake8-executable: ignore file execution permission errors + "T10", # debugger: ignore debugger-related code (e.g., `pdb`) #### specific rules - "D100", # ignore missing docs - "D101", - "D102", - "D103", - "D104", - "D105", - "D106", - "D107", - "D200", - "D205", - "D212", - "D400", - "D401", - "D415", - "E402", # false positives for local imports - "E501", # line too long - "TRY003", # external messages in exceptions are too verbose - "TD002", - "TD003", - "FIX002", # too verbose descriptions of todos - "ISC001", - "PLR0913", # TEMPORARYLY DISABLED too many arguments in function definition + "D100", # ignore missing docstrings in module + "D101", # ignore missing docstrings in class + "D102", # ignore missing docstrings in public method + "D103", # ignore missing docstrings in function + "D104", # ignore missing docstrings in package + "D105", # ignore missing docstrings in magic methods (e.g., __init__) + "D106", # ignore missing docstrings in public nested class + "D107", # ignore missing docstrings in __init__ method + "D205", # ignore failure to separate summary line from description in docstring + "D212", # ignore multiline docstring summary errors + "D400", # periods in docstrings + "D401", # ignore docstring should be in imperative mood + "E501", # ignore line too long (over 79 characters) + "TRY003", # ignore external messages in exceptions being too verbose + "TD002", # to do authorship + "TD003", # to do issue + "PD002", # allow inplace modifications in Pandas operations + "TCH001", # type checking blocks + "TCH002", # type checking blocks + "TCH003", # type checking blocks + "COM812", # missing trailing comma, conflicts + "ISC001", # string line concat, conflicts ] exclude = [ "build/", @@ -86,6 +84,7 @@ exclude = [ "ARG", # unused function args -> fixtures "PLR2004", # magic value used in comparison "FBT001", # allow positional bools as function args + "SLF001", # access private methods ] [tool.ruff.lint.pydocstyle] diff --git a/test/unit/conftest.py b/test/unit/conftest.py index 9bd3bcd..1b95406 100644 --- a/test/unit/conftest.py +++ b/test/unit/conftest.py @@ -5,12 +5,9 @@ import pytest import reciprocalspaceship as rs -from meteor.utils import ( - MapColumns, - canonicalize_amplitudes, - compute_coefficients_from_map, - numpy_array_to_map, -) +from meteor.rsmap import Map +from meteor.testing import MapColumns +from meteor.utils import numpy_array_to_map RESOLUTION = 1.0 UNIT_CELL = gemmi.UnitCell(a=10.0, b=10.0, c=10.0, alpha=90, beta=90, gamma=90) @@ -25,26 +22,17 @@ def test_map_columns() -> MapColumns: return MapColumns( amplitude="F", - phase="PHIC", + phase="PHI", uncertainty="SIGF", ) -@pytest.fixture -def test_diffmap_columns() -> MapColumns: - return MapColumns( - amplitude="DF", - phase="PHIC", - uncertainty="SIGDF", - ) - - def single_carbon_density( carbon_position: tuple[float, float, float], space_group: gemmi.SpaceGroup, unit_cell: gemmi.UnitCell, d_min: float, -) -> gemmi.FloatGrid: +) -> gemmi.Ccp4Map: model = gemmi.Model("single_atom") chain = gemmi.Chain("A") @@ -71,49 +59,51 @@ def single_carbon_density( density_map.grid.setup_from(structure) density_map.put_model_density_on_grid(structure[0]) - return density_map.grid + ccp4_map = gemmi.Ccp4Map() + ccp4_map.grid = density_map.grid + ccp4_map.update_ccp4_header() + + return ccp4_map -def single_atom_map_coefficients(*, noise_sigma: float, labels: MapColumns) -> rs.DataSet: - density = np.array(single_carbon_density(CARBON1_POSITION, SPACE_GROUP, UNIT_CELL, RESOLUTION)) - grid_values = np.array(density) + noise_sigma * NP_RNG.normal(size=density.shape) +def single_atom_map_coefficients(*, noise_sigma: float) -> Map: + density_map = single_carbon_density(CARBON1_POSITION, SPACE_GROUP, UNIT_CELL, RESOLUTION) + density_array = np.array(density_map.grid) + grid_values = density_array + noise_sigma * NP_RNG.normal(size=density_array.shape) ccp4_map = numpy_array_to_map(grid_values, spacegroup=SPACE_GROUP, cell=UNIT_CELL) - map_coefficients = compute_coefficients_from_map( - ccp4_map=ccp4_map, - high_resolution_limit=RESOLUTION, - amplitude_label=labels.amplitude, - phase_label=labels.phase, - ) + map_coefficients = Map.from_ccp4_map(ccp4_map=ccp4_map, high_resolution_limit=RESOLUTION) - uncertainties = noise_sigma * np.ones_like(map_coefficients[labels.amplitude]) + uncertainties = noise_sigma * np.ones_like(map_coefficients.phases) uncertainties = rs.DataSeries(uncertainties, index=map_coefficients.index) - map_coefficients[labels.uncertainty] = uncertainties.astype(rs.StandardDeviationDtype()) + map_coefficients.set_uncertainties(uncertainties) return map_coefficients @pytest.fixture -def noise_free_map(test_map_columns: MapColumns) -> rs.DataSet: - return single_atom_map_coefficients(noise_sigma=0.0, labels=test_map_columns) +def ccp4_map() -> gemmi.Ccp4Map: + return single_carbon_density(CARBON1_POSITION, SPACE_GROUP, UNIT_CELL, RESOLUTION) @pytest.fixture -def noisy_map(test_map_columns: MapColumns) -> rs.DataSet: - return single_atom_map_coefficients(noise_sigma=0.03, labels=test_map_columns) +def noise_free_map() -> Map: + return single_atom_map_coefficients(noise_sigma=0.0) @pytest.fixture -def very_noisy_map(test_map_columns: MapColumns) -> rs.DataSet: - return single_atom_map_coefficients(noise_sigma=1.0, labels=test_map_columns) +def noisy_map() -> Map: + return single_atom_map_coefficients(noise_sigma=0.03) @pytest.fixture -def random_difference_map(test_diffmap_columns: MapColumns) -> rs.DataSet: - resolution = 1.0 - cell = gemmi.UnitCell(10.0, 10.0, 10.0, 90.0, 90.0, 90.0) - space_group = gemmi.SpaceGroup(1) - hall = rs.utils.generate_reciprocal_asu(cell, space_group, resolution, anomalous=False) +def very_noisy_map() -> Map: + return single_atom_map_coefficients(noise_sigma=1.0) + + +@pytest.fixture +def random_difference_map(test_map_columns: MapColumns) -> Map: + hall = rs.utils.generate_reciprocal_asu(UNIT_CELL, SPACE_GROUP, RESOLUTION, anomalous=False) sigma = 1.0 h, k, l = hall.T # noqa: E741 @@ -124,25 +114,23 @@ def random_difference_map(test_diffmap_columns: MapColumns) -> rs.DataSet: "H": h, "K": k, "L": l, - test_diffmap_columns.amplitude: sigma * NP_RNG.normal(size=number_of_reflections), - test_diffmap_columns.phase: NP_RNG.uniform(-180, 180, size=number_of_reflections), + test_map_columns.amplitude: sigma * NP_RNG.normal(size=number_of_reflections), + test_map_columns.phase: NP_RNG.uniform(-180, 180, size=number_of_reflections), }, - spacegroup=space_group, - cell=cell, + spacegroup=SPACE_GROUP, + cell=UNIT_CELL, ).infer_mtz_dtypes() ds = ds.set_index(["H", "K", "L"]) - ds[test_diffmap_columns.amplitude] = ds[test_diffmap_columns.amplitude].astype("SFAmplitude") + ds[test_map_columns.amplitude] = ds[test_map_columns.amplitude].astype("SFAmplitude") - canonicalize_amplitudes( - ds, - amplitude_label=test_diffmap_columns.amplitude, - phase_label=test_diffmap_columns.phase, - inplace=True, - ) - - uncertainties = sigma * np.ones_like(ds[test_diffmap_columns.amplitude]) + uncertainties = sigma * np.ones_like(ds[test_map_columns.amplitude]) uncertainties = rs.DataSeries(uncertainties, index=ds.index) - ds[test_diffmap_columns.uncertainty] = uncertainties.astype(rs.StandardDeviationDtype()) + ds[test_map_columns.uncertainty] = uncertainties.astype(rs.StandardDeviationDtype()) - return ds + return Map( + ds, + amplitude_column=test_map_columns.amplitude, + phase_column=test_map_columns.phase, + uncertainty_column=test_map_columns.uncertainty, + ) diff --git a/test/unit/test_diffmaps.py b/test/unit/test_diffmaps.py index 5e9adb2..30163a3 100644 --- a/test/unit/test_diffmaps.py +++ b/test/unit/test_diffmaps.py @@ -1,147 +1,147 @@ +from typing import Callable + import numpy as np import pandas as pd import pytest import reciprocalspaceship as rs +from numpy.testing import assert_almost_equal from meteor.diffmaps import ( compute_difference_map, compute_kweighted_difference_map, compute_kweights, max_negentropy_kweighted_difference_map, + set_common_crystallographic_metadata, ) -from meteor.utils import MapColumns, compute_map_from_coefficients +from meteor.rsmap import Map from meteor.validate import negentropy @pytest.fixture -def dummy_dataset() -> rs.DataSet: +def dummy_derivative() -> Map: + # note, index HKL = (5, 5, 5) is unique to this Map, should be ignored downstream + index = pd.MultiIndex.from_arrays([[1, 1, 5], [1, 2, 5], [1, 3, 5]], names=("H", "K", "L")) + derivative = { + "F": np.array([2.0, 3.0, 1.0]), + "PHI": np.array([180.0, 0.0, 1.0]), + "SIGF": np.array([0.5, 0.5, 1.0]), + } + return Map(derivative, index=index).infer_mtz_dtypes() + + +@pytest.fixture +def dummy_native() -> Map: index = pd.MultiIndex.from_arrays([[1, 1], [1, 2], [1, 3]], names=("H", "K", "L")) - data = { - "NativeAmplitudes": np.array([1.0, 2.0]), - "DerivativeAmplitudes": np.array([2.0, 3.0]), - "NativePhases": np.array([0.0, 180.0]), # in degrees - "DerivativePhases": np.array([180.0, 0.0]), - "SigFNative": np.array([0.5, 0.5]), - "SigFDeriv": np.array([0.5, 0.5]), + native = { + "F": np.array([1.0, 2.0]), + "PHI": np.array([0.0, 180.0]), + "SIGF": np.array([0.5, 0.5]), } - return rs.DataSet(data, index=index) - - -def test_compute_difference_map_smoke(dummy_dataset: rs.DataSet) -> None: - result = compute_difference_map( - dataset=dummy_dataset, - native_amplitudes_column="NativeAmplitudes", - derivative_amplitudes_column="DerivativeAmplitudes", - native_phases_column="NativePhases", - derivative_phases_column="DerivativePhases", - ) - assert isinstance(result, rs.DataSet) - assert "DF" in result.columns - assert "DPHI" in result.columns - - -def test_compute_kweights_smoke(dummy_dataset: rs.DataSet) -> None: - deltaf = dummy_dataset["NativeAmplitudes"] - sigdeltaf = dummy_dataset["SigFNative"] - result = compute_kweights(deltaf, sigdeltaf, k_parameter=0.5) - assert isinstance(result, rs.DataSeries) - - -def test_compute_kweighted_difference_map_smoke(dummy_dataset: rs.DataSet) -> None: - result = compute_kweighted_difference_map( - dataset=dummy_dataset, - k_parameter=0.5, - native_amplitudes_column="NativeAmplitudes", - native_phases_column="NativePhases", - native_uncertainty_column="SigFNative", - derivative_amplitudes_column="DerivativeAmplitudes", - derivative_phases_column="DerivativePhases", - derivative_uncertainty_column="SigFDeriv", - ) - assert isinstance(result, rs.DataSet) - assert "DF_KWeighted" in result.columns - - -def test_compute_difference_map_vs_analytical(dummy_dataset: rs.DataSet) -> None: + return Map(native, index=index).infer_mtz_dtypes() + + +def test_set_common_crystallographic_metadata(dummy_native: Map) -> None: + dummy_native.cell = (10.0, 10.0, 10.0, 90.0, 90.0, 90.0) + dummy_native.spacegroup = 1 + map1 = dummy_native + map2 = dummy_native.copy() + output = dummy_native.copy() + + # ensure things are copied when missing initially + del output._cell + del output._spacegroup + assert not hasattr(output, "cell") + assert not hasattr(output, "spacegroup") + + set_common_crystallographic_metadata(map1, map2, output=output) + + assert output.cell == map1.cell + assert output.spacegroup == map2.spacegroup + + map2.spacegroup = 19 + with pytest.raises(AttributeError): + set_common_crystallographic_metadata(map1, map2, output=output) + + map2.spacegroup = 1 + set_common_crystallographic_metadata(map1, map2, output=output) + + dummy_native.cell = (15.0, 15.0, 15.0, 90.0, 90.0, 90.0) + with pytest.raises(AttributeError): + set_common_crystallographic_metadata(map1, map2, output=output) + + +def test_compute_difference_map_vs_analytical(dummy_derivative: Map, dummy_native: Map) -> None: # Manually calculated expected amplitude and phase differences expected_amplitudes = np.array([3.0, 5.0]) - expected_phases = np.array([180.0, 0.0]) + expected_phases = np.array([-180.0, 0.0]) + assert isinstance(dummy_native, Map) + assert isinstance(dummy_derivative, Map) + + result = compute_difference_map(dummy_derivative, dummy_native) + assert_almost_equal(result.amplitudes, expected_amplitudes, decimal=4) + assert_almost_equal(result.phases, expected_phases, decimal=4) + + +@pytest.mark.parametrize( + "diffmap_fxn", + [ + compute_difference_map, + # lambdas to make the call signatures for these functions match `compute_difference_map` + lambda d, n: compute_kweighted_difference_map(d, n, k_parameter=0.5), + lambda d, n: max_negentropy_kweighted_difference_map(d, n)[0], + ], +) +def test_cell_spacegroup_propogation( + diffmap_fxn: Callable, + dummy_derivative: Map, + dummy_native: Map, +) -> None: + dummy_derivative.cell = (10.0, 10.0, 10.0, 90.0, 90.0, 90.0) + dummy_derivative.spacegroup = 1 # will cast to gemmi.SpaceGroup + dummy_native.cell = (10.0, 10.0, 10.0, 90.0, 90.0, 90.0) + dummy_native.spacegroup = 1 - # Run the function - result = compute_difference_map( - dataset=dummy_dataset, - native_amplitudes_column="NativeAmplitudes", - derivative_amplitudes_column="DerivativeAmplitudes", - native_phases_column="NativePhases", - derivative_phases_column="DerivativePhases", - ) + result = diffmap_fxn(dummy_derivative, dummy_native) + assert result.cell == dummy_native.cell + assert result.spacegroup == dummy_native.spacegroup - np.testing.assert_almost_equal(result["DF"].values, expected_amplitudes) - np.testing.assert_almost_equal(result["DPHI"].values, expected_phases) + dummy_native.spacegroup = 19 + with pytest.raises(AttributeError): + result = diffmap_fxn(dummy_derivative, dummy_native) + + dummy_native.spacegroup = 1 + _ = compute_difference_map(dummy_derivative, dummy_native) + dummy_native.cell = (15.0, 15.0, 15.0, 90.0, 90.0, 90.0) + with pytest.raises(AttributeError): + result = diffmap_fxn(dummy_derivative, dummy_native) def test_compute_kweights_vs_analytical() -> None: deltaf = rs.DataSeries([2.0, 3.0, 4.0]) + phi = rs.DataSeries([0.0, 0.0, 0.0]) sigdeltaf = rs.DataSeries([1.0, 1.0, 1.0]) k_parameter = 0.5 + diffmap = Map.from_dict({"F": deltaf, "PHI": phi, "SIGF": sigdeltaf}) expected_weights = np.array([0.453, 0.406, 0.354]) - result = compute_kweights(deltaf, sigdeltaf, k_parameter) - np.testing.assert_almost_equal(result.values, expected_weights, decimal=3) + + result = compute_kweights(diffmap, k_parameter=k_parameter) + assert_almost_equal(result.values, expected_weights, decimal=3) def test_compute_kweighted_difference_map_vs_analytical( - dummy_dataset: rs.DataSet, + dummy_derivative: Map, + dummy_native: Map, ) -> None: - result = compute_kweighted_difference_map( - dataset=dummy_dataset, - k_parameter=0.5, - native_amplitudes_column="NativeAmplitudes", - native_phases_column="NativePhases", - derivative_amplitudes_column="DerivativeAmplitudes", - derivative_phases_column="DerivativePhases", - native_uncertainty_column="SigFNative", - derivative_uncertainty_column="SigFDeriv", - ) - - # expected weighted amplitudes calculated by hand - expected_weighted_amplitudes = np.array([1.3247, 1.8280]) - - np.testing.assert_almost_equal( - result["DF_KWeighted"].values, expected_weighted_amplitudes, decimal=4 - ) - - -def test_kweight_optimization( - test_map_columns: MapColumns, noise_free_map: rs.DataSet, noisy_map: rs.DataSet -) -> None: - noisy_map_columns = { - test_map_columns.amplitude: "F_noisy", - test_map_columns.phase: "PHIC_noisy", - test_map_columns.uncertainty: "SIGF_noisy", - } + kwt_diffmap = compute_kweighted_difference_map(dummy_derivative, dummy_native, k_parameter=0.5) + expected_weighted_amplitudes = np.array([1.3247, 1.8280]) # calculated by hand + expected_weighted_uncertainties = np.array([0.3122, 0.2585]) + assert_almost_equal(kwt_diffmap.amplitudes, expected_weighted_amplitudes, decimal=4) + assert_almost_equal(kwt_diffmap.uncertainties, expected_weighted_uncertainties, decimal=4) - combined_dataset = rs.concat( - [ - noise_free_map, - noisy_map.rename(columns=noisy_map_columns), - ], - axis=1, - ) - - if not isinstance(test_map_columns.uncertainty, str): - msg = "test_map_columns.uncertainty undefined" - raise TypeError(msg) - - _, max_negent_kweight = max_negentropy_kweighted_difference_map( - combined_dataset, - native_amplitudes_column=test_map_columns.amplitude, - native_phases_column=test_map_columns.phase, - native_uncertainty_column=test_map_columns.uncertainty, - derivative_amplitudes_column=noisy_map_columns[test_map_columns.amplitude], - derivative_phases_column=noisy_map_columns[test_map_columns.phase], - derivative_uncertainty_column=noisy_map_columns[test_map_columns.uncertainty], - ) + +def test_kweight_optimization(noise_free_map: rs.DataSet, noisy_map: rs.DataSet) -> None: + _, max_negent_kweight = max_negentropy_kweighted_difference_map(noisy_map, noise_free_map) epsilon = 0.01 k_parameters_to_scan = [ @@ -150,30 +150,13 @@ def test_kweight_optimization( max(0.0, min(1.0, max_negent_kweight + epsilon)), ] negentropies = [] - - if not isinstance(test_map_columns.uncertainty, str): - msg = "test_map_columns.uncertainty undefined" - raise TypeError(msg) - for k_parameter in k_parameters_to_scan: kweighted_diffmap = compute_kweighted_difference_map( - dataset=combined_dataset, + noisy_map, + noise_free_map, k_parameter=k_parameter, - native_amplitudes_column=test_map_columns.amplitude, - native_phases_column=test_map_columns.phase, - native_uncertainty_column=test_map_columns.uncertainty, - derivative_amplitudes_column=noisy_map_columns[test_map_columns.amplitude], - derivative_phases_column=noisy_map_columns[test_map_columns.phase], - derivative_uncertainty_column=noisy_map_columns[test_map_columns.uncertainty], ) - - realspace_map = compute_map_from_coefficients( - map_coefficients=kweighted_diffmap, - amplitude_label="DF_KWeighted", - phase_label="DPHI_KWeighted", - map_sampling=3, - ) - + realspace_map = kweighted_diffmap.to_ccp4_map(map_sampling=3) map_negentropy = negentropy(np.array(realspace_map.grid)) negentropies.append(map_negentropy) diff --git a/test/unit/test_iterative.py b/test/unit/test_iterative.py index 7c3fd4f..7607855 100644 --- a/test/unit/test_iterative.py +++ b/test/unit/test_iterative.py @@ -1,12 +1,9 @@ from __future__ import annotations -from typing import TYPE_CHECKING - +import gemmi import numpy as np import pandas as pd -import pandas.testing as pdt import pytest -import reciprocalspaceship as rs from skimage.data import binary_blobs from skimage.restoration import denoise_tv_chambolle @@ -15,15 +12,10 @@ _project_derivative_on_experimental_set, iterative_tv_phase_retrieval, ) -from meteor.testing import assert_phases_allclose +from meteor.rsmap import Map from meteor.tv import TvDenoiseResult -from meteor.utils import MapColumns, compute_map_from_coefficients from meteor.validate import negentropy -if TYPE_CHECKING: - import gemmi - - NP_RNG = np.random.default_rng() @@ -62,7 +54,9 @@ def test_projected_derivative(scalar: float) -> None: scaled_native = scalar * native scaled_difference = scalar * difference proj_derivative = _project_derivative_on_experimental_set( - native=scaled_native, derivative_amplitudes=np.abs(derivative), difference=scaled_difference + native=scaled_native, + derivative_amplitudes=np.abs(derivative), + difference=scaled_difference, ) np.testing.assert_allclose(proj_derivative, derivative) @@ -91,67 +85,27 @@ def test_complex_derivative_from_iterative_tv() -> None: assert 1.05 * noisy_error < denoised_error -def test_iterative_tv( - noise_free_map: rs.DataSet, very_noisy_map: rs.DataSet, test_map_columns: MapColumns -) -> None: +def test_iterative_tv(noise_free_map: Map, very_noisy_map: Map) -> None: # the test case is the denoising of a difference: between a noisy map and its noise-free origin # such a diffmap is ideally totally flat, so should have very low TV - noisy_map_columns = MapColumns( - amplitude="F_noisy", phase="PHIC_noisy", uncertainty="SIGF_noisy" - ) - - noisy_column_renaming = { - test_map_columns.amplitude: noisy_map_columns.amplitude, - test_map_columns.phase: noisy_map_columns.phase, - test_map_columns.uncertainty: noisy_map_columns.uncertainty, - } - - noisy_and_noise_free = rs.concat( - [noise_free_map, very_noisy_map.rename(columns=noisy_column_renaming)], axis=1 - ) - - result, metadata = iterative_tv_phase_retrieval( - noisy_and_noise_free, - native_amplitude_column=test_map_columns.amplitude, - calculated_phase_column=test_map_columns.phase, - derivative_amplitude_column=noisy_map_columns.amplitude, - output_derivative_phase_column="PHIC_denoised", + denoised_map, metadata = iterative_tv_phase_retrieval( + very_noisy_map, + noise_free_map, tv_weights_to_scan=[0.01, 0.1, 1.0], max_iterations=100, convergence_tolerance=0.01, ) - # make sure output columns that should not be altered are in fact the same - assert_phases_allclose( - result[test_map_columns.phase], noisy_and_noise_free[test_map_columns.phase] - ) - for label in [test_map_columns.amplitude, noisy_map_columns.amplitude]: - pdt.assert_series_equal(result[label], noisy_and_noise_free[label]) - # make sure metadata exists assert isinstance(metadata, pd.DataFrame) # test correctness by comparing denoised dataset to noise-free map_sampling = 3 - noise_free_density = compute_map_from_coefficients( - map_coefficients=noisy_and_noise_free, - amplitude_label=test_map_columns.amplitude, - phase_label=test_map_columns.phase, - map_sampling=map_sampling, - ) - noisy_density = compute_map_from_coefficients( - map_coefficients=noisy_and_noise_free, - amplitude_label=noisy_map_columns.amplitude, - phase_label=noisy_map_columns.phase, - map_sampling=map_sampling, - ) - denoised_density = compute_map_from_coefficients( - map_coefficients=result, - amplitude_label=noisy_map_columns.amplitude, - phase_label="PHIC_denoised", - map_sampling=map_sampling, - ) + noise_free_density = noise_free_map.to_ccp4_map(map_sampling=map_sampling) + noisy_density = very_noisy_map.to_ccp4_map(map_sampling=3) + denoised_density = denoised_map.to_ccp4_map(map_sampling=3) + noisy_error = map_norm(noisy_density, noise_free_density) denoised_error = map_norm(denoised_density, noise_free_density) diff --git a/test/unit/test_rsmap.py b/test/unit/test_rsmap.py new file mode 100644 index 0000000..babb1ea --- /dev/null +++ b/test/unit/test_rsmap.py @@ -0,0 +1,305 @@ +from pathlib import Path + +import gemmi +import numpy as np +import pandas as pd +import pytest +import reciprocalspaceship as rs + +from meteor.rsmap import Map, MapMutabilityError, MissingUncertaintiesError, _assert_is_map +from meteor.testing import assert_phases_allclose +from meteor.utils import filter_common_indices + + +def test_assert_is_map(noise_free_map: Map) -> None: + _assert_is_map(noise_free_map, require_uncertainties=True) # should work + + del noise_free_map["SIGF"] + _assert_is_map(noise_free_map, require_uncertainties=False) # should work + with pytest.raises(MissingUncertaintiesError): + _assert_is_map(noise_free_map, require_uncertainties=True) + + not_a_map = rs.DataSet(noise_free_map) + with pytest.raises(TypeError): + _assert_is_map(not_a_map, require_uncertainties=False) + + +def test_initialization_leaves_input_unmodified(noise_free_map: Map) -> None: + dataset = rs.DataSet(noise_free_map).copy() + assert not isinstance(dataset, Map) + + dataset["new_column"] = dataset["F"].copy() + new_map = Map(dataset) + assert "new_column" in dataset.columns + assert "new_column" not in new_map.columns + + +def test_amplitude_and_phase_required(noise_free_map: Map) -> None: + ds = rs.DataSet(noise_free_map) + Map(ds) # should be no problem + + with pytest.raises(KeyError): + Map(ds, phase_column="does_not_exist") + + del ds["F"] + with pytest.raises(KeyError): + Map(ds) + + +def test_reset_index(noise_free_map: Map) -> None: + modmap = noise_free_map.reset_index() + assert len(modmap.columns) == len(noise_free_map.columns) + 3 + + +def test_copy(noise_free_map: Map) -> None: + copy_map = noise_free_map.copy() + assert isinstance(copy_map, Map) + pd.testing.assert_frame_equal(copy_map, noise_free_map) + + +def test_filter_common_indices_with_maps(noise_free_map: Map) -> None: + m1 = noise_free_map + m2 = noise_free_map.copy() + m2.drop([m1.index[0]], axis=0, inplace=True) # remove an index + assert len(m1) != len(m2) + f1, f2 = filter_common_indices(m1, m2) + pd.testing.assert_index_equal(f1.index, f2.index) + assert len(f1.columns) == 3 + assert len(f2.columns) == 3 + + +def test_verify_type(noise_free_map: Map) -> None: + dataseries = rs.DataSeries([1, 2, 3]) + assert dataseries.dtype == int + noise_free_map._verify_type("foo", [int], dataseries, fix=False, cast_fix_to=int) + + with pytest.raises(AssertionError): + noise_free_map._verify_type("foo", [float], dataseries, fix=False, cast_fix_to=float) + + output = noise_free_map._verify_type("foo", [float], dataseries, fix=True, cast_fix_to=float) + assert output.dtype == float + + +@pytest.mark.parametrize("fix", [False, True]) +def test_verify_types(noise_free_map: Map, fix: bool) -> None: + functions_to_test = [ + noise_free_map._verify_amplitude_type, + noise_free_map._verify_phase_type, + noise_free_map._verify_uncertainty_type, + ] + + expected_fixed_types = [ + rs.StructureFactorAmplitudeDtype(), + rs.PhaseDtype(), + rs.StandardDeviationDtype(), + ] + + dataseries_to_use = [ + noise_free_map.amplitudes, + noise_free_map.phases, + noise_free_map.uncertainties, + rs.DataSeries(np.arange(10)), + ] + + # success_matrix[i][j] = functions_to_test[i] expected to pass on dataseries_to_use[j] + success_matrix = [ + [True, False, False, False], + [False, True, False, False], + [False, False, True, False], + ] + + for i, fxn in enumerate(functions_to_test): + for j, ds in enumerate(dataseries_to_use): + if success_matrix[i][j] or fix: + output = fxn(ds, fix=fix) # should pass + if fix: + assert output.dtype == expected_fixed_types[i] + else: + with pytest.raises(AssertionError): + fxn(ds, fix=fix) + + +def test_setitem(noise_free_map: Map, noisy_map: Map) -> None: + noisy_map.amplitudes = noise_free_map.amplitudes + noisy_map.phases = noise_free_map.phases + noisy_map.set_uncertainties(noise_free_map.uncertainties) # should work even if already set + + +def test_unallowed_setitem_disabled(noise_free_map: Map) -> None: + with pytest.raises(MapMutabilityError): + noise_free_map["unallowed_column_name"] = noise_free_map.amplitudes + + +def test_insert_disabled(noise_free_map: Map) -> None: + position = 0 + column = "foo" + value = [0, 1] + with pytest.raises(MapMutabilityError): + noise_free_map.insert(position, column, value) + + +def test_drop_columns_disabled(noise_free_map: Map) -> None: + noise_free_map.drop([0, 0, 0], axis=0) + with pytest.raises(MapMutabilityError): + noise_free_map.drop("F", axis=1) + with pytest.raises(MapMutabilityError): + noise_free_map.drop(columns=["F"]) + + +def test_get_hkls(noise_free_map: Map) -> None: + hkl = noise_free_map.get_hkls() + assert len(hkl.shape) == 2 + assert hkl.shape[0] > 0 + assert hkl.shape[1] == 3 + + +def test_get_hkls_consistent_with_reciprocalspaceship(noise_free_map: Map) -> None: + meteor_hkl = noise_free_map.get_hkls() + rs_hkl = rs.DataSet(noise_free_map).get_hkls() + np.testing.assert_array_equal(meteor_hkl, rs_hkl) + + +def test_compute_dhkl(noise_free_map: Map) -> None: + d_hkl = noise_free_map.compute_dHKL() + assert np.max(d_hkl) == 10.0 + assert np.min(d_hkl) == 1.0 + assert d_hkl.shape == noise_free_map.amplitudes.shape + + +def test_resolution_limits(random_difference_map: Map) -> None: + dmax, dmin = random_difference_map.resolution_limits + assert dmax == 10.0 + assert dmin == 1.0 + + +def test_get_set_fixed_columns(noise_free_map: Map) -> None: + assert isinstance(noise_free_map.amplitudes, rs.DataSeries) + assert isinstance(noise_free_map.phases, rs.DataSeries) + assert isinstance(noise_free_map.uncertainties, rs.DataSeries) + + noise_free_map.amplitudes *= 2.0 + noise_free_map.phases *= 2.0 + noise_free_map.uncertainties *= 2.0 + + +def test_has_uncertainties(noise_free_map: Map) -> None: + assert noise_free_map.has_uncertainties + del noise_free_map["SIGF"] + assert not noise_free_map.has_uncertainties + + +@pytest.mark.filterwarnings("ignore:Pandas doesn't allow columns to be created via a new attribute") +def test_set_uncertainties() -> None: + test_map = Map.from_dict( + {"F": rs.DataSeries([2.0, 3.0, 4.0]), "PHI": rs.DataSeries([0.0, 0.0, 0.0])}, + ) + + assert not test_map.has_uncertainties + with pytest.raises(AttributeError): + _ = test_map.uncertainties + + # this would normally generate the suppressed warning, but we are more strict and raise + with pytest.raises(AttributeError): + test_map.uncertainties = rs.DataSeries([2.0, 3.0, 4.0]) + + test_map.set_uncertainties(rs.DataSeries([1.0, 1.0, 1.0])) + assert test_map.has_uncertainties + assert len(test_map.uncertainties) == 3 + + +def test_misconfigured_columns() -> None: + test_map = Map.from_dict( + {"F": rs.DataSeries([2.0, 3.0, 4.0]), "PHI": rs.DataSeries([0.0, 0.0, 0.0])}, + ) + del test_map["F"] + with pytest.raises(RuntimeError): + test_map.set_uncertainties(rs.DataSeries([2.0, 3.0, 4.0])) + + +def test_complex_amplitudes_smoke(noise_free_map: Map) -> None: + c_array = noise_free_map.complex_amplitudes + assert isinstance(c_array, np.ndarray) + assert np.issubdtype(c_array.dtype, np.complexfloating) + + +def test_complex_amplitudes() -> None: + index = pd.Index(np.arange(4)) + amp = rs.DataSeries(np.ones(4), index=index, name="F") + phase = rs.DataSeries(np.arange(4) * 90.0, index=index, name="PHI") + + ds = rs.concat([amp, phase], axis=1) + rsmap = Map(ds) + + expected = np.array([1.0, 0.0, -1.0, 0.0]) + 1j * np.array([0.0, 1.0, 0.0, -1.0]) + np.testing.assert_almost_equal(rsmap.complex_amplitudes, expected) + + +def test_from_dataset(noise_free_map: Map) -> None: + map_as_dataset = rs.DataSet(noise_free_map) + map2 = Map( + map_as_dataset, + amplitude_column=noise_free_map._amplitude_column, + phase_column=noise_free_map._phase_column, + uncertainty_column=noise_free_map._uncertainty_column, + ) + pd.testing.assert_frame_equal(noise_free_map, map2) + + +def test_to_structurefactor(noise_free_map: Map) -> None: + c_dataseries = noise_free_map.to_structurefactor() + c_array = noise_free_map.complex_amplitudes + np.testing.assert_almost_equal(c_dataseries.to_numpy(), c_array) + + +def from_structurefactor(noise_free_map: Map) -> None: + map2 = Map.from_structurefactor(noise_free_map.complex_amplitudes, index=noise_free_map.index) + pd.testing.assert_frame_equal(noise_free_map, map2) + + +def test_to_ccp4_map(noise_free_map: Map) -> None: + ccp4_map = noise_free_map.to_ccp4_map(map_sampling=3) + assert ccp4_map.grid.shape == (30, 30, 30) + + +def test_gemmi_mtz_roundtrip(noise_free_map: Map) -> None: + gemmi_mtz = noise_free_map.to_gemmi() + assert isinstance(gemmi_mtz, gemmi.Mtz) + map2 = Map.from_gemmi(gemmi_mtz) + pd.testing.assert_frame_equal(noise_free_map, map2) + + +def test_from_ccp4_map(ccp4_map: gemmi.Ccp4Map) -> None: + resolution = 1.0 + rsmap = Map.from_ccp4_map(ccp4_map, high_resolution_limit=resolution) + assert len(rsmap) > 0 + + +@pytest.mark.parametrize("map_sampling", [1, 2, 2.25, 3, 5]) +def test_ccp4_map_round_trip( + map_sampling: int, + random_difference_map: Map, +) -> None: + realspace_map = random_difference_map.to_ccp4_map(map_sampling=map_sampling) + + _, dmin = random_difference_map.resolution_limits + output_coefficients = Map.from_ccp4_map(realspace_map, high_resolution_limit=dmin) + + random_difference_map.canonicalize_amplitudes() + output_coefficients.canonicalize_amplitudes() + + pd.testing.assert_series_equal( + random_difference_map.amplitudes, + output_coefficients.amplitudes, + atol=1e-3, + ) + assert_phases_allclose( + random_difference_map.phases.to_numpy(), + output_coefficients.phases.to_numpy(), + ) + + +def test_to_and_from_mtz_file(noise_free_map: Map, tmp_path: Path) -> None: + file_path = tmp_path / "tmp.mtz" + noise_free_map.write_mtz(file_path) + loaded = Map.read_mtz_file(file_path) + pd.testing.assert_frame_equal(noise_free_map, loaded) diff --git a/test/unit/test_scale.py b/test/unit/test_scale.py index 717114d..5a41fed 100644 --- a/test/unit/test_scale.py +++ b/test/unit/test_scale.py @@ -4,8 +4,9 @@ import reciprocalspaceship as rs from meteor import scale +from meteor.rsmap import Map from meteor.scale import _compute_anisotropic_scale_factors -from meteor.utils import MapColumns +from meteor.testing import MapColumns @pytest.fixture @@ -13,7 +14,8 @@ def miller_dataseries() -> rs.DataSeries: miller_indices = [(0, 0, 0), (1, 0, 0), (0, 1, 0), (0, 0, 1), (1, 1, 0)] data = np.array([8.0, 4.0, 2.0, 1.0, 1.0], dtype=np.float32) return rs.DataSeries( - data, index=pd.MultiIndex.from_tuples(miller_indices, names=["H", "K", "L"]) + data, + index=pd.MultiIndex.from_tuples(miller_indices, names=["H", "K", "L"]), ) @@ -27,7 +29,8 @@ def test_compute_anisotropic_scale_factors_smoke(miller_dataseries: rs.DataSerie def test_compute_scale_factors_identical(miller_dataseries: rs.DataSeries) -> None: scale_factors = scale.compute_scale_factors( - reference_values=miller_dataseries, values_to_scale=miller_dataseries + reference_values=miller_dataseries, + values_to_scale=miller_dataseries, ) assert (scale_factors == 1.0).all() @@ -46,7 +49,8 @@ def test_compute_scale_factors_identical(miller_dataseries: rs.DataSeries) -> No def test_compute_scale_factors_shuffle_indices(miller_dataseries: rs.DataSeries) -> None: shuffled_miller_dataseries = miller_dataseries.sample(frac=1) scale_factors = scale.compute_scale_factors( - reference_values=miller_dataseries, values_to_scale=shuffled_miller_dataseries + reference_values=miller_dataseries, + values_to_scale=shuffled_miller_dataseries, ) assert (scale_factors == 1.0).all() @@ -56,7 +60,8 @@ def test_compute_scale_factors_scalar(miller_dataseries: rs.DataSeries) -> None: doubled_miller_dataseries = miller_dataseries / multiple scale_factors = scale.compute_scale_factors( - reference_values=miller_dataseries, values_to_scale=doubled_miller_dataseries + reference_values=miller_dataseries, + values_to_scale=doubled_miller_dataseries, ) np.testing.assert_array_almost_equal(scale_factors, multiple) @@ -66,57 +71,62 @@ def test_compute_scale_factors_anisotropic(miller_dataseries: rs.DataSeries) -> flat_miller_dataseries[:] = np.ones(len(miller_dataseries)) scale_factors = scale.compute_scale_factors( - reference_values=miller_dataseries, values_to_scale=flat_miller_dataseries + reference_values=miller_dataseries, + values_to_scale=flat_miller_dataseries, ) np.testing.assert_array_almost_equal(scale_factors, miller_dataseries.values) def test_scale_datasets( - random_difference_map: rs.DataSet, test_diffmap_columns: MapColumns + random_difference_map: Map, + test_map_columns: MapColumns, ) -> None: multiple = 2.0 - doubled_random_difference_map = random_difference_map.copy() - doubled_random_difference_map[test_diffmap_columns.amplitude] /= multiple + doubled_difference_map = random_difference_map.copy() + doubled_difference_map[test_map_columns.amplitude] /= multiple scaled = scale.scale_datasets( reference_dataset=random_difference_map, - dataset_to_scale=doubled_random_difference_map, - column_to_compare=test_diffmap_columns.amplitude, + dataset_to_scale=doubled_difference_map, + column_to_compare=test_map_columns.amplitude, weight_using_uncertainties=False, ) np.testing.assert_array_almost_equal( - scaled[test_diffmap_columns.amplitude], - random_difference_map[test_diffmap_columns.amplitude], + scaled[test_map_columns.amplitude], + random_difference_map[test_map_columns.amplitude], ) np.testing.assert_array_almost_equal( - scaled[test_diffmap_columns.phase], random_difference_map[test_diffmap_columns.phase] + scaled[test_map_columns.phase], + random_difference_map[test_map_columns.phase], ) def test_scale_datasets_with_errors( - random_difference_map: rs.DataSet, test_diffmap_columns: MapColumns + random_difference_map: Map, + test_map_columns: MapColumns, ) -> None: multiple = 2.0 doubled_difference_map = random_difference_map.copy() - doubled_difference_map[test_diffmap_columns.amplitude] /= multiple + doubled_difference_map[test_map_columns.amplitude] /= multiple scaled = scale.scale_datasets( reference_dataset=random_difference_map, dataset_to_scale=doubled_difference_map, - column_to_compare=test_diffmap_columns.amplitude, - uncertainty_column=str(test_diffmap_columns.uncertainty), + column_to_compare=test_map_columns.amplitude, + uncertainty_column=str(test_map_columns.uncertainty), weight_using_uncertainties=True, ) np.testing.assert_array_almost_equal( - scaled[test_diffmap_columns.amplitude], - random_difference_map[test_diffmap_columns.amplitude], + scaled[test_map_columns.amplitude], + random_difference_map[test_map_columns.amplitude], ) np.testing.assert_array_almost_equal( - scaled[test_diffmap_columns.phase], random_difference_map[test_diffmap_columns.phase] + scaled[test_map_columns.phase], + random_difference_map[test_map_columns.phase], ) # also make sure we scale the uncertainties np.testing.assert_array_almost_equal( - scaled[test_diffmap_columns.uncertainty] / multiple, - random_difference_map[test_diffmap_columns.uncertainty], + scaled[test_map_columns.uncertainty] / multiple, + random_difference_map[test_map_columns.uncertainty], ) diff --git a/test/unit/test_tv.py b/test/unit/test_tv.py index 168d159..2470e80 100644 --- a/test/unit/test_tv.py +++ b/test/unit/test_tv.py @@ -7,29 +7,15 @@ import reciprocalspaceship as rs from meteor import tv -from meteor.utils import MapColumns, compute_map_from_coefficients +from meteor.rsmap import Map DEFAULT_LAMBDA_VALUES_TO_SCAN = np.logspace(-2, 0, 25) -def rms_between_coefficients( - ds1: rs.DataSet, ds2: rs.DataSet, labels: MapColumns, map_sampling: int = 3 -) -> float: - map1 = compute_map_from_coefficients( - map_coefficients=ds1, - amplitude_label=labels.amplitude, - phase_label=labels.phase, - map_sampling=map_sampling, - ) - map2 = compute_map_from_coefficients( - map_coefficients=ds2, - amplitude_label=labels.amplitude, - phase_label=labels.phase, - map_sampling=map_sampling, - ) - - map1_array = np.array(map1.grid) - map2_array = np.array(map2.grid) +def rms_between_coefficients(map1: Map, map2: Map) -> float: + map_sampling = 3 + map1_array = np.array(map1.to_ccp4_map(map_sampling=map_sampling).grid) + map2_array = np.array(map2.to_ccp4_map(map_sampling=map_sampling).grid) # standardize map1_array /= map1_array.std() @@ -50,14 +36,11 @@ def test_tv_denoise_map_smoke( lambda_values_to_scan: None | Sequence[float], full_output: bool, random_difference_map: rs.DataSet, - test_diffmap_columns: MapColumns, ) -> None: output = tv.tv_denoise_difference_map( - difference_map_coefficients=random_difference_map, + random_difference_map, lambda_values_to_scan=lambda_values_to_scan, full_output=full_output, - difference_map_amplitude_column=test_diffmap_columns.amplitude, - difference_map_phase_column=test_diffmap_columns.phase, ) # type: ignore[call-overload] if full_output: assert len(output) == 2 @@ -70,12 +53,11 @@ def test_tv_denoise_map_smoke( @pytest.mark.parametrize("lambda_values_to_scan", [None, DEFAULT_LAMBDA_VALUES_TO_SCAN]) def test_tv_denoise_map( lambda_values_to_scan: None | Sequence[float], - noise_free_map: rs.DataSet, - noisy_map: rs.DataSet, - test_map_columns: MapColumns, + noise_free_map: Map, + noisy_map: Map, ) -> None: - def rms_to_noise_free(test_map: rs.DataSet) -> float: - return rms_between_coefficients(test_map, noise_free_map, test_map_columns) + def rms_to_noise_free(test_map: Map) -> float: + return rms_between_coefficients(test_map, noise_free_map) # Normally, the `tv_denoise_difference_map` function only returns the best result -- since we # know the ground truth, work around this to test all possible results. @@ -85,9 +67,7 @@ def rms_to_noise_free(test_map: rs.DataSet) -> float: for trial_lambda in DEFAULT_LAMBDA_VALUES_TO_SCAN: denoised_map, result = tv.tv_denoise_difference_map( - difference_map_coefficients=noisy_map, - difference_map_amplitude_column=test_map_columns.amplitude, - difference_map_phase_column=test_map_columns.phase, + noisy_map, lambda_values_to_scan=[ trial_lambda, ], @@ -101,10 +81,8 @@ def rms_to_noise_free(test_map: rs.DataSet) -> float: # now run the denoising algorithm and make sure we get a result that's close # to the one that minimizes the RMS error to the ground truth denoised_map, result = tv.tv_denoise_difference_map( - difference_map_coefficients=noisy_map, + noisy_map, lambda_values_to_scan=lambda_values_to_scan, - difference_map_amplitude_column=test_map_columns.amplitude, - difference_map_phase_column=test_map_columns.phase, full_output=True, ) diff --git a/test/unit/test_utils.py b/test/unit/test_utils.py index e2d1018..e94913e 100644 --- a/test/unit/test_utils.py +++ b/test/unit/test_utils.py @@ -1,12 +1,12 @@ -import gemmi import numpy as np import pandas as pd import pytest import reciprocalspaceship as rs from pandas import testing as pdt -from meteor import testing as mt from meteor import utils +from meteor.rsmap import Map +from meteor.testing import MapColumns NP_RNG = np.random.default_rng() @@ -15,10 +15,19 @@ def omit_nones_in_list(input_list: list) -> list: return [x for x in input_list if x] -def test_resolution_limits(random_difference_map: rs.DataSet) -> None: - dmax, dmin = utils.resolution_limits(random_difference_map) - assert dmax == 10.0 - assert dmin == 1.0 +def test_filter_common_indices() -> None: + df1 = pd.DataFrame({"A": [1, 2, 3]}, index=[0, 1, 2]) + df2 = pd.DataFrame({"B": [4, 5, 6]}, index=[1, 2, 3]) + filtered_df1, filtered_df2 = utils.filter_common_indices(df1, df2) + assert len(filtered_df1) == 2 + assert len(filtered_df2) == 2 + + +def test_filter_common_indices_empty_intersection() -> None: + df1 = pd.DataFrame({"A": [1, 2, 3]}, index=[0, 1, 2]) + df2 = pd.DataFrame({"B": [4, 5, 6]}, index=[4, 5, 6]) + with pytest.raises(IndexError): + _, _ = utils.filter_common_indices(df1, df2) @pytest.mark.parametrize( @@ -30,57 +39,55 @@ def test_resolution_limits(random_difference_map: rs.DataSet) -> None: (8.0, 2.0), ], ) -def test_cut_resolution( - random_difference_map: rs.DataSet, dmax_limit: float, dmin_limit: float -) -> None: - dmax_before_cut, dmin_before_cut = utils.resolution_limits(random_difference_map) +def test_cut_resolution(random_difference_map: Map, dmax_limit: float, dmin_limit: float) -> None: + dmax_before_cut, dmin_before_cut = random_difference_map.resolution_limits expected_dmax_upper_bound: float = max(omit_nones_in_list([dmax_before_cut, dmax_limit])) expected_dmin_lower_bound: float = min(omit_nones_in_list([dmin_before_cut, dmin_limit])) random_intensities = utils.cut_resolution( - random_difference_map, dmax_limit=dmax_limit, dmin_limit=dmin_limit + random_difference_map, + dmax_limit=dmax_limit, + dmin_limit=dmin_limit, ) assert len(random_intensities) > 0 - dmax, dmin = utils.resolution_limits(random_intensities) + dmax, dmin = random_intensities.resolution_limits assert dmax <= expected_dmax_upper_bound assert dmin >= expected_dmin_lower_bound @pytest.mark.parametrize("inplace", [False, True]) def test_canonicalize_amplitudes( - inplace: bool, random_difference_map: rs.DataSet, test_diffmap_columns: utils.MapColumns + inplace: bool, random_difference_map: Map, test_map_columns: MapColumns ) -> None: # ensure at least one amplitude is negative, one phase is outside [-180,180) index_single_hkl = 0 - random_difference_map.loc[index_single_hkl, test_diffmap_columns.amplitude] = -1.0 - random_difference_map.loc[index_single_hkl, test_diffmap_columns.phase] = -470.0 + random_difference_map.loc[index_single_hkl, test_map_columns.amplitude] = -1.0 + random_difference_map.loc[index_single_hkl, test_map_columns.phase] = -470.0 if inplace: canonicalized = random_difference_map utils.canonicalize_amplitudes( canonicalized, - amplitude_label=test_diffmap_columns.amplitude, - phase_label=test_diffmap_columns.phase, + amplitude_label=test_map_columns.amplitude, + phase_label=test_map_columns.phase, inplace=inplace, ) else: canonicalized = utils.canonicalize_amplitudes( random_difference_map, - amplitude_label=test_diffmap_columns.amplitude, - phase_label=test_diffmap_columns.phase, + amplitude_label=test_map_columns.amplitude, + phase_label=test_map_columns.phase, inplace=inplace, ) - assert ( - canonicalized[test_diffmap_columns.amplitude] >= 0.0 - ).all(), "not all amplitudes positive" - assert (canonicalized[test_diffmap_columns.phase] >= -180.0).all(), "not all phases > -180" - assert (canonicalized[test_diffmap_columns.phase] <= 180.0).all(), "not all phases < +180" + assert (canonicalized[test_map_columns.amplitude] >= 0.0).all(), "not all amps positive" + assert (canonicalized[test_map_columns.phase] >= -180.0).all(), "not all phases > -180" + assert (canonicalized[test_map_columns.phase] <= 180.0).all(), "not all phases < +180" np.testing.assert_almost_equal( - np.array(np.abs(random_difference_map[test_diffmap_columns.amplitude])), - np.array(canonicalized[test_diffmap_columns.amplitude]), + np.array(np.abs(random_difference_map[test_map_columns.amplitude])), + np.array(canonicalized[test_map_columns.amplitude]), ) @@ -99,32 +106,18 @@ def test_average_phase_diff_in_degrees_shape_mismatch() -> None: utils.average_phase_diff_in_degrees(arr1, arr2) -def test_rs_dataseries_to_complex_array() -> None: - index = pd.Index(np.arange(4)) - amp = rs.DataSeries(np.ones(4), index=index) - phase = rs.DataSeries(np.arange(4) * 90.0, index=index) - - carray = utils.rs_dataseries_to_complex_array(amp, phase) - expected = np.array([1.0, 0.0, -1.0, 0.0]) + 1j * np.array([0.0, 1.0, 0.0, -1.0]) - - np.testing.assert_almost_equal(carray, expected) - - -def test_rs_dataseries_to_complex_array_index_mismatch() -> None: - amp = rs.DataSeries(np.ones(4), index=[0, 1, 2, 3]) - phase = rs.DataSeries(np.arange(4) * 90.0, index=[1, 2, 3, 4]) - with pytest.raises(utils.ShapeMismatchError): - utils.rs_dataseries_to_complex_array(amp, phase) - - def test_complex_array_to_rs_dataseries() -> None: carray = np.array([1.0, 0.0, -1.0, 0.0]) + 1j * np.array([0.0, 1.0, 0.0, -1.0]) index = pd.Index(np.arange(4)) - expected_amp = rs.DataSeries(np.ones(4), index=index).astype(rs.StructureFactorAmplitudeDtype()) - expected_phase = rs.DataSeries([0.0, 90.0, 180.0, -90.0], index=index).astype(rs.PhaseDtype()) + expected_amp = rs.DataSeries(np.ones(4), index=index, name="F").astype( + rs.StructureFactorAmplitudeDtype(), + ) + expected_phase = rs.DataSeries([0.0, 90.0, 180.0, -90.0], index=index, name="PHI").astype( + rs.PhaseDtype(), + ) - amp, phase = utils.complex_array_to_rs_dataseries(carray, index) + amp, phase = utils.complex_array_to_rs_dataseries(carray, index=index) pdt.assert_series_equal(amp, expected_amp) pdt.assert_series_equal(phase, expected_phase) @@ -133,74 +126,4 @@ def test_complex_array_to_rs_dataseries_index_mismatch() -> None: carray = np.array([1.0]) + 1j * np.array([1.0]) index = pd.Index(np.arange(2)) with pytest.raises(utils.ShapeMismatchError): - utils.complex_array_to_rs_dataseries(carray, index) - - -def test_complex_array_dataseries_roundtrip() -> None: - n = 5 - carray = NP_RNG.normal(size=n) + 1j * NP_RNG.normal(size=n) - indices = pd.Index(np.arange(n)) - - ds_amplitudes, ds_phases = utils.complex_array_to_rs_dataseries(carray, indices) - - assert isinstance(ds_amplitudes, rs.DataSeries) - assert isinstance(ds_phases, rs.DataSeries) - - assert ds_amplitudes.dtype == rs.StructureFactorAmplitudeDtype() - assert ds_phases.dtype == rs.PhaseDtype() - - pdt.assert_index_equal(ds_amplitudes.index, indices) - pdt.assert_index_equal(ds_phases.index, indices) - - carray2 = utils.rs_dataseries_to_complex_array(ds_amplitudes, ds_phases) - np.testing.assert_almost_equal(carray, carray2, decimal=5) - - -def test_compute_map_from_coefficients( - random_difference_map: rs.DataSet, test_diffmap_columns: utils.MapColumns -) -> None: - diffmap = utils.compute_map_from_coefficients( - map_coefficients=random_difference_map, - amplitude_label=test_diffmap_columns.amplitude, - phase_label=test_diffmap_columns.phase, - map_sampling=1, - ) - assert isinstance(diffmap, gemmi.Ccp4Map) - - -@pytest.mark.parametrize("map_sampling", [1, 2, 2.25, 3, 5]) -def test_map_to_coefficients_round_trip( - map_sampling: int, random_difference_map: rs.DataSet, test_diffmap_columns: utils.MapColumns -) -> None: - realspace_map = utils.compute_map_from_coefficients( - map_coefficients=random_difference_map, - amplitude_label=test_diffmap_columns.amplitude, - phase_label=test_diffmap_columns.phase, - map_sampling=map_sampling, - ) - - _, dmin = utils.resolution_limits(random_difference_map) - - output_coefficients = utils.compute_coefficients_from_map( - ccp4_map=realspace_map, - high_resolution_limit=dmin, - amplitude_label=test_diffmap_columns.amplitude, - phase_label=test_diffmap_columns.phase, - ) - - utils.canonicalize_amplitudes( - output_coefficients, - amplitude_label=test_diffmap_columns.amplitude, - phase_label=test_diffmap_columns.phase, - inplace=True, - ) - - pd.testing.assert_series_equal( - random_difference_map[test_diffmap_columns.amplitude], - output_coefficients[test_diffmap_columns.amplitude], - atol=1e-3, - ) - mt.assert_phases_allclose( - random_difference_map[test_diffmap_columns.phase].to_numpy(), - output_coefficients[test_diffmap_columns.phase].to_numpy(), - ) + utils.complex_array_to_rs_dataseries(carray, index=index)