Skip to content

Commit

Permalink
homogenize on weight not lambda (#42)
Browse files Browse the repository at this point in the history
  • Loading branch information
tjlane authored Oct 25, 2024
1 parent d71405a commit 22dcd60
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 54 deletions.
6 changes: 3 additions & 3 deletions meteor/iterative.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def _complex_derivative_from_iterative_tv(
metadata.append(
{
"iteration": num_iterations,
"tv_weight": tv_metadata.optimal_lambda,
"tv_weight": tv_metadata.optimal_weight,
"negentropy_after_tv": tv_metadata.optimal_negentropy,
"average_phase_change": phase_change,
},
Expand Down Expand Up @@ -160,7 +160,7 @@ def iterative_tv_phase_retrieval(
Fh' = (D_F' + F) * [|Fh| / |D_F' + F|] Fourier space projection onto experimental set
D_F = Fh' - F
Where the TV lambda parameter is determined using golden section optimization. The algorithm
Where the TV weight parameter is determined using golden section optimization. The algorithm
iterates until the changes in the derivative phase drop below a specified threshold.
Parameters
Expand Down Expand Up @@ -202,7 +202,7 @@ def tv_denoise_closure(difference: np.ndarray) -> tuple[np.ndarray, TvDenoiseRes

denoised_map, tv_metadata = tv_denoise_difference_map(
diffmap,
lambda_values_to_scan=tv_weights_to_scan,
weights_to_scan=tv_weights_to_scan,
full_output=True,
)

Expand Down
2 changes: 1 addition & 1 deletion meteor/settings.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

TV_LAMBDA_RANGE: tuple[float, float] = (1e-3, 1.0)
TV_WEIGHT_RANGE: tuple[float, float] = (1e-3, 1.0)
TV_STOP_TOLERANCE: float = 0.00000005
TV_MAX_NUM_ITER: int = 50
MAP_SAMPLING: int = 3
Expand Down
56 changes: 29 additions & 27 deletions meteor/tv.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from __future__ import annotations

from dataclasses import dataclass, field
from dataclasses import dataclass
from typing import Literal, Sequence, overload

import numpy as np
Expand All @@ -9,19 +9,20 @@
from .rsmap import Map
from .settings import (
MAP_SAMPLING,
TV_LAMBDA_RANGE,
TV_MAX_NUM_ITER,
TV_STOP_TOLERANCE,
TV_WEIGHT_RANGE,
)
from .validate import ScalarMaximizer, negentropy


@dataclass
class TvDenoiseResult:
optimal_lambda: float
optimal_weight: float
optimal_negentropy: float
map_sampling_used_for_tv: float
lambdas_scanned: set[float] = field(default_factory=set)
weights_scanned: list[float]
negentropy_at_weights: list[float]


def _tv_denoise_array(*, map_as_array: np.ndarray, weight: float) -> np.ndarray:
Expand All @@ -41,7 +42,7 @@ def tv_denoise_difference_map(
difference_map: Map,
*,
full_output: Literal[False],
lambda_values_to_scan: Sequence[float] | np.ndarray | None = None,
weights_to_scan: Sequence[float] | np.ndarray | None = None,
) -> Map: ...


Expand All @@ -50,26 +51,26 @@ def tv_denoise_difference_map(
difference_map: Map,
*,
full_output: Literal[True],
lambda_values_to_scan: Sequence[float] | np.ndarray | None = None,
weights_to_scan: Sequence[float] | np.ndarray | None = None,
) -> tuple[Map, TvDenoiseResult]: ...


def tv_denoise_difference_map(
difference_map: Map,
*,
full_output: bool = False,
lambda_values_to_scan: Sequence[float] | np.ndarray | None = None,
weights_to_scan: Sequence[float] | np.ndarray | None = None,
) -> Map | tuple[Map, TvDenoiseResult]:
"""Single-pass TV denoising of a difference map.
Automatically selects the optimal level of regularization (the TV lambda parameter) by
Automatically selects the optimal level of regularization (the TV weight, aka lambda) by
maximizing the negentropy of the denoised map. Two modes can be used to dictate which
candidate values of lambda are assessed:
candidate values of weights are assessed:
1. By default (`lambda_values_to_scan=None`), the golden-section search algorithm selects
a lambda value according to the bounds and convergence criteria set in meteor.settings.
2. Alternatively, an explicit list of lambda values to assess can be provided using
`lambda_values_to_scan`.
1. By default (`weights_to_scan=None`), the golden-section search algorithm selects
a weights value according to the bounds and convergence criteria set in meteor.settings.
2. Alternatively, an explicit list of weights values to assess can be provided using
`weights_to_scan`.
Parameters
----------
Expand All @@ -78,25 +79,25 @@ def tv_denoise_difference_map(
that will be used to compute the difference map.
full_output : bool, optional
If `True`, the function returns both the denoised map coefficients and a `TvDenoiseResult`
object containing the optimal lambda and the associated negentropy. If `False`, only
object containing the optimal weight and the associated negentropy. If `False`, only
the denoised map coefficients are returned. Default is `False`.
lambda_values_to_scan : Sequence[float] | None, optional
A sequence of lambda values to explicitly scan for determining the optimal value. If
weights_to_scan : Sequence[float] | None, optional
A sequence of weight values to explicitly scan for determining the optimal value. If
`None`, the function uses the golden-section search method to determine the optimal
lambda. Default is `None`.
weight. Default is `None`.
Returns
-------
Map | tuple[Map, TvDenoiseResult]
If `full_output` is `False`, returns a `Map`, the denoised map coefficients.
If `full_output` is `True`, returns a tuple containing:
- `Map`: The denoised map coefficients.
- `TvDenoiseResult`: An object w/ the optimal lambda and the corresponding negentropy.
- `TvDenoiseResult`: An object w/ the optimal weight and the corresponding negentropy.
Raises
------
AssertionError
If the golden-section search fails to find an optimal lambda.
If the golden-section search fails to find an optimal weight.
Notes
-----
Expand All @@ -110,20 +111,20 @@ def tv_denoise_difference_map(
-------
>>> coefficients = Map.read_mtz("./path/to/difference_map.mtz", ...) # load dataset
>>> denoised_map, result = tv_denoise_difference_map(coefficients, full_output=True)
>>> print(f"Optimal Lambda: {result.optimal_lambda}, Negentropy: {result.optimal_negentropy}")
>>> print(f"Optimal: {result.optimal_weight}, Negentropy: {result.optimal_negentropy}")
"""
realspace_map = difference_map.to_ccp4_map(map_sampling=MAP_SAMPLING)
realspace_map_array = np.array(realspace_map.grid)

def negentropy_objective(tv_lambda: float) -> float:
denoised_map = _tv_denoise_array(map_as_array=realspace_map_array, weight=tv_lambda)
def negentropy_objective(tv_weight: float) -> float:
denoised_map = _tv_denoise_array(map_as_array=realspace_map_array, weight=tv_weight)
return negentropy(denoised_map)

maximizer = ScalarMaximizer(objective=negentropy_objective)
if lambda_values_to_scan is not None:
maximizer.optimize_over_explicit_values(arguments_to_scan=lambda_values_to_scan)
if weights_to_scan is not None:
maximizer.optimize_over_explicit_values(arguments_to_scan=weights_to_scan)
else:
maximizer.optimize_with_golden_algorithm(bracket=TV_LAMBDA_RANGE)
maximizer.optimize_with_golden_algorithm(bracket=TV_WEIGHT_RANGE)

# denoise using the optimized parameters and convert to an rs.DataSet
final_realspace_map_as_array = _tv_denoise_array(
Expand Down Expand Up @@ -152,10 +153,11 @@ def negentropy_objective(tv_lambda: float) -> float:

if full_output:
tv_result = TvDenoiseResult(
optimal_lambda=maximizer.argument_optimum,
optimal_weight=maximizer.argument_optimum,
optimal_negentropy=maximizer.objective_maximum,
map_sampling_used_for_tv=MAP_SAMPLING,
lambdas_scanned=maximizer.values_evaluated,
weights_scanned=maximizer.values_evaluated,
negentropy_at_weights=maximizer.objective_at_values,
)
return final_map, tv_result

Expand Down
17 changes: 11 additions & 6 deletions meteor/validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,13 +95,15 @@ def __init__(self, *, objective: Callable[[float], float]) -> None:
self.objective = objective
self.argument_optimum: float = np.nan
self.objective_maximum: float = -np.inf
self.values_evaluated: set[float] = set()
self.values_evaluated: list[float] = []
self.objective_at_values: list[float] = []

def _update_optima(self, argument_test_value: float) -> None:
def _update_optima(self, argument_test_value: float) -> float:
objective_value = self.objective(argument_test_value)
if objective_value > self.objective_maximum:
self.argument_optimum = argument_test_value
self.objective_maximum = objective_value
return objective_value

def optimize_over_explicit_values(
self, *, arguments_to_scan: Sequence[float] | np.ndarray
Expand All @@ -115,8 +117,9 @@ def optimize_over_explicit_values(
A list or array of argument values to evaluate.
"""
for argument_test_value in arguments_to_scan:
self._update_optima(argument_test_value)
self.values_evaluated.add(argument_test_value)
objective_value = self._update_optima(argument_test_value)
self.values_evaluated.append(argument_test_value)
self.objective_at_values.append(objective_value)

def optimize_with_golden_algorithm(
self,
Expand All @@ -142,8 +145,10 @@ def optimize_with_golden_algorithm(

def _objective_with_value_tracking(argument_test_value: float) -> float:
"""Adds the evaluated value to self.values_evaluated"""
self.values_evaluated.add(argument_test_value)
return -self.objective(argument_test_value) # negative: we want max
objective_value = self.objective(argument_test_value)
self.values_evaluated.append(argument_test_value)
self.objective_at_values.append(objective_value)
return -objective_value # negative: we want max

optimizer_result = minimize_scalar(
_objective_with_value_tracking,
Expand Down
5 changes: 3 additions & 2 deletions test/unit/test_iterative.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,10 @@ def simple_tv_function(fourier_array: np.ndarray) -> tuple[np.ndarray, TvDenoise
real_space = np.fft.ifftn(fourier_array).real
denoised = denoise_tv_chambolle(real_space, weight=weight) # type: ignore[no-untyped-call]
result = TvDenoiseResult(
optimal_lambda=weight,
optimal_weight=weight,
optimal_negentropy=0.0,
lambdas_scanned={weight},
weights_scanned=[weight],
negentropy_at_weights=[0.0],
map_sampling_used_for_tv=0,
)
return np.fft.fftn(denoised), result
Expand Down
28 changes: 14 additions & 14 deletions test/unit/test_tv.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from meteor import tv
from meteor.rsmap import Map

DEFAULT_LAMBDA_VALUES_TO_SCAN = np.logspace(-2, 0, 25)
DEFAULT_WEIGHTS_TO_SCAN = np.logspace(-2, 0, 25)


def rms_between_coefficients(map1: Map, map2: Map) -> float:
Expand All @@ -25,21 +25,21 @@ def rms_between_coefficients(map1: Map, map2: Map) -> float:


@pytest.mark.parametrize(
"lambda_values_to_scan",
"weights_to_scan",
[
None,
[-1.0, 0.0, 1.0],
],
)
@pytest.mark.parametrize("full_output", [False, True])
def test_tv_denoise_map_smoke(
lambda_values_to_scan: None | Sequence[float],
weights_to_scan: None | Sequence[float],
full_output: bool,
random_difference_map: Map,
) -> None:
output = tv.tv_denoise_difference_map(
random_difference_map,
lambda_values_to_scan=lambda_values_to_scan,
weights_to_scan=weights_to_scan,
full_output=full_output,
) # type: ignore[call-overload]
if full_output:
Expand All @@ -54,17 +54,17 @@ def test_tv_denoise_zero_weight(random_difference_map: Map) -> None:
weight = 0.0
output = tv.tv_denoise_difference_map(
random_difference_map,
lambda_values_to_scan=[weight],
weights_to_scan=[weight],
full_output=False,
)
random_difference_map.canonicalize_amplitudes()
output.canonicalize_amplitudes()
pd.testing.assert_frame_equal(random_difference_map, output, rtol=1e-2)


@pytest.mark.parametrize("lambda_values_to_scan", [None, DEFAULT_LAMBDA_VALUES_TO_SCAN])
@pytest.mark.parametrize("weights_to_scan", [None, DEFAULT_WEIGHTS_TO_SCAN])
def test_tv_denoise_map(
lambda_values_to_scan: None | Sequence[float],
weights_to_scan: None | Sequence[float],
noise_free_map: Map,
noisy_map: Map,
) -> None:
Expand All @@ -75,28 +75,28 @@ def rms_to_noise_free(test_map: Map) -> float:
# know the ground truth, work around this to test all possible results.

lowest_rms: float = np.inf
best_lambda: float = 0.0
best_weight: float = 0.0

for trial_lambda in DEFAULT_LAMBDA_VALUES_TO_SCAN:
for trial_weight in DEFAULT_WEIGHTS_TO_SCAN:
denoised_map, result = tv.tv_denoise_difference_map(
noisy_map,
lambda_values_to_scan=[
trial_lambda,
weights_to_scan=[
trial_weight,
],
full_output=True,
)
rms = rms_to_noise_free(denoised_map)
if rms < lowest_rms:
lowest_rms = rms
best_lambda = trial_lambda
best_weight = trial_weight

# now run the denoising algorithm and make sure we get a result that's close
# to the one that minimizes the RMS error to the ground truth
denoised_map, result = tv.tv_denoise_difference_map(
noisy_map,
lambda_values_to_scan=lambda_values_to_scan,
weights_to_scan=weights_to_scan,
full_output=True,
)

assert rms_to_noise_free(denoised_map) < rms_to_noise_free(noisy_map), "error didnt drop"
np.testing.assert_allclose(result.optimal_lambda, best_lambda, rtol=0.5, err_msg="opt lambda")
np.testing.assert_allclose(result.optimal_weight, best_weight, rtol=0.5, err_msg="opt weight")
4 changes: 3 additions & 1 deletion test/unit/test_validate.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,8 @@ def test_negentropy_maximizer_explicit() -> None:
maximizer.optimize_over_explicit_values(arguments_to_scan=test_values)
assert_almost_equal(maximizer.argument_optimum, 1.0)
assert_almost_equal(maximizer.objective_maximum, 0.0)
assert set(test_values) == maximizer.values_evaluated
assert list(test_values) == maximizer.values_evaluated
assert len(maximizer.values_evaluated) == len(maximizer.objective_at_values)


def test_negentropy_maximizer_golden() -> None:
Expand All @@ -49,3 +50,4 @@ def test_negentropy_maximizer_golden() -> None:
assert_almost_equal(maximizer.argument_optimum, 1.0, decimal=2)
assert_almost_equal(maximizer.objective_maximum, 0.0, decimal=2)
assert len(maximizer.values_evaluated) > 0
assert len(maximizer.values_evaluated) == len(maximizer.objective_at_values)

0 comments on commit 22dcd60

Please sign in to comment.