Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PIC operator #24

Merged
merged 28 commits into from
Jan 30, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion epde/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from .interface.interface import EpdeSearch
from .interface.interface import EpdeSearch, EpdeMultisample
from .interface.logger import Logger
from .interface.equation_translator import translate_equation

Expand Down
328 changes: 187 additions & 141 deletions epde/cache/cache.py

Large diffs are not rendered by default.

22 changes: 22 additions & 0 deletions epde/cache/ctrl_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
from typing import Union, List, Tuple

import torch

class ControlNNContainer():
def __init__(self, output_num: int = 1, args: List[Tuple[Union[int, List]]] = [(0, [None,]),],
net: torch.nn.Sequential = None, device: str = 'cpu'):
self.net_args = args
self.net = net if isinstance(net, torch.nn.Sequential) else self.create_shallow_nn(len(self.net_args),
output_num, device)

@staticmethod
def create_shallow_nn(arg_num: int = 1, output_num: int = 1,
device: str = 'cpu') -> torch.nn.Sequential: # net: torch.nn.Sequential = None,
hidden_neurons = 256
layers = [torch.nn.Linear(arg_num, hidden_neurons, device=device),
torch.nn.ReLU(),
torch.nn.Linear(hidden_neurons, output_num, device=device)]
control_nn = torch.nn.Sequential(*layers)
control_nn.to(device)
print('control_nn', control_nn, next(control_nn.parameters()).device, 'should be ', device)
return control_nn
2 changes: 2 additions & 0 deletions epde/control/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
from .constr import ConstrLocation, ConditionalLoss, ControlConstrEq, ControlConstrNEq
from .control import ControlExp
326 changes: 326 additions & 0 deletions epde/control/constr.py

Large diffs are not rendered by default.

350 changes: 350 additions & 0 deletions epde/control/control.py

Large diffs are not rendered by default.

128 changes: 128 additions & 0 deletions epde/control/optim.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
from abc import ABC
from collections import OrderedDict
from typing import List, Dict

from warnings import warn

import numpy as np
import torch

class FirstOrderOptimizerNp(ABC):
behavior = 'None'
def __init__(self, parameters: np.ndarray, optimized: np.ndarray):
raise NotImplementedError('Calling __init__ of an abstract optimizer')

def step(self, gradient: np.ndarray):
raise NotImplementedError('Calling step of an abstract optimizer')

class AdamOptimizerNp(FirstOrderOptimizerNp):
behavior = 'Gradient'
def __init__(self, optimized: np.ndarray, parameters: np.ndarray = np.array([0.001, 0.9, 0.999, 1e-8])):
'''
parameters[0] - alpha, parameters[1] - beta_1, parameters[2] - beta_2
parameters[3] - eps
'''
self.reset(optimized, parameters)

def reset(self, optimized: np.ndarray, parameters: np.ndarray):
self._moment = np.zeros_like(optimized)
self._second_moment = np.zeros_like(optimized)
self._second_moment_max = np.zeros_like(optimized)
self.parameters = parameters
self.time = 0

def step(self, gradient: np.ndarray, optimized: np.ndarray):
self.time += 1
self._moment = self.parameters[1] * self._moment + (1-self.parameters[1]) * gradient
self._second_moment = self.parameters[2] * self._second_moment +\
(1-self.parameters[2]) * np.power(gradient)
moment_cor = self._moment/(1 - np.power(self.parameters[1], self.time))
second_moment_cor = self._second_moment/(1 - np.power(self.parameters[2], self.time))
return optimized - self.parameters[0]*moment_cor/(np.sqrt(second_moment_cor)+self.parameters[3])

class FirstOrderOptimizer(ABC):
behavior = 'Gradient'
def __init__(self, optimized: List[torch.Tensor], parameters: list):
raise NotImplementedError('Calling __init__ of an abstract optimizer')

def reset(self, optimized: Dict[str, torch.Tensor], parameters: np.ndarray):
raise NotImplementedError('Calling reset method of an abstract optimizer')

def step(self, gradient: Dict[str, torch.Tensor], optimized: Dict[str, torch.Tensor],
*args, **kwargs) -> Dict[str, torch.Tensor]:
raise NotImplementedError('Calling step of an abstract optimizer')

class AdamOptimizer(FirstOrderOptimizer):
behavior = 'Gradient'
def __init__(self, optimized: List[torch.Tensor], parameters: list = [0.001, 0.9, 0.999, 1e-8]):
'''
parameters[0] - alpha, parameters[1] - beta_1, parameters[2] - beta_2
parameters[3] - eps
'''
self.reset(optimized, parameters)

def reset(self, optimized: Dict[str, torch.Tensor], parameters: np.ndarray):
self._moment = [torch.zeros_like(param_subtensor) for param_subtensor in optimized.values()]
self._second_moment = [torch.zeros_like(param_subtensor) for param_subtensor in optimized.values()]
self.parameters = parameters
self.time = 0

def step(self, gradient: Dict[str, torch.Tensor], optimized: Dict[str, torch.Tensor],
*args, **kwargs) -> Dict[str, torch.Tensor]:
self.time += 1

self._moment = [self.parameters[1] * self._moment[tensor_idx] + (1-self.parameters[1]) * grad_subtensor
for tensor_idx, grad_subtensor in enumerate(gradient.values())]

self._second_moment = [self.parameters[2]*self._second_moment[tensor_idx] +
(1-self.parameters[2])*torch.pow(grad_subtensor, 2)
for tensor_idx, grad_subtensor in enumerate(gradient.values())]

moment_cor = [moment_tensor/(1 - self.parameters[1] ** self.time) for moment_tensor in self._moment]
second_moment_cor = [sm_tensor/(1 - self.parameters[2] ** self.time) for sm_tensor in self._second_moment]
return OrderedDict([(subtensor_key, optimized[subtensor_key] - self.parameters[0] * moment_cor[tensor_idx]/\
(torch.sqrt(second_moment_cor[tensor_idx]) + self.parameters[3]))
for tensor_idx, subtensor_key in enumerate(optimized.keys())])

class CoordDescentOptimizer(FirstOrderOptimizer):
behavior = 'Coordinate'
def __init__(self, optimized: List[torch.Tensor], parameters: list = [0.001,]):
'''
parameters[0] - alpha, parameters[1] - beta_1, parameters[2] - beta_2
parameters[3] - eps
'''
self.reset(optimized, parameters)

def reset(self, optimized: Dict[str, torch.Tensor], parameters: np.ndarray):
self.parameters = parameters
self.time = 0

def step(self, gradient: Dict[str, torch.Tensor], optimized: Dict[str, torch.Tensor],
*args, **kwargs) -> Dict[str, torch.Tensor]:
self.time += 1
assert 'loc' in kwargs.keys(), 'Missing location of parameter value shift in coordinate descent.'
loc = kwargs['loc']
if torch.isclose(gradient[loc[0]][tuple(loc[1:])],
torch.tensor((0,)).to(device=gradient[loc[0]][tuple(loc[1:])].device).float()):
warn(f'Gradient at {loc} is close to zero: {gradient[loc[0]][tuple(loc[1:])]}.')
optimized[loc[0]][tuple(loc[1:])] = optimized[loc[0]][tuple(loc[1:])] -\
self.parameters[0]*gradient[loc[0]][tuple(loc[1:])]
return optimized

# TODO: implement coordinate descent
#np.power(self.parameters[1], self.time)) # np.power(self.parameters[2], self.time)

# class LBFGS(FirstOrderOptimizer):
# def __init__(self, optimized: List[torch.Tensor], parameters: list = []):
# pass

# def step(self, gradient: Dict[str, torch.Tensor], optimized: Dict[str, torch.Tensor]) -> Dict[str, torch.Tensor]:
# pass

# def update_hessian(self, gradient: Dict[str, torch.Tensor], x_vals: Dict[str, torch.Tensor]):
# # Use self._prev_grad
# for i in range(self._mem_size - 1):
# alpha =

# def get_alpha(self):
# return alpha
58 changes: 58 additions & 0 deletions epde/control/utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
from typing import List, Tuple, Union
from collections import OrderedDict

import numpy as np
import torch

from epde.supplementary import BasicDeriv, AutogradDeriv

def prepare_control_inputs(model: Union[torch.nn.Sequential, List[np.ndarray]], grid: torch.Tensor,
args: List[Tuple[Union[int, List]]], diff_method: BasicDeriv = None) -> torch.Tensor:
'''
Recompute the control ANN arguments tensor from the solutions of
controlled equations $L \mathbf{u}(t, \mathbf{x}, \mathbf{c}) = 0$,
calculating necessary derivatives, as `args`

Args:
model (`torch.nn.Sequential`): solution of the controlled equation $\mathbf{u}(\mathbf{u})$.

grid (`torch.Tensor`): tensor of the grids m x n, where m - number of points in the domain, n - number of NN inputs.

args (`List[Tuple[Union[int, List]]]`) - list of arguments of derivative operators.

Returns:
`torch.Tensor`: tensor of arguments for the control ANN.
'''
if diff_method is None:
diff_method = AutogradDeriv

differentiator = diff_method()
ctrl_inputs = [differentiator.take_derivative(u = model, args = grid,
axes = arg[1], component = arg[0]).reshape(-1, 1) for arg in args]
if not isinstance(model, torch.nn.Sequential):
ctrl_inputs = [torch.from_numpy(inp).reshape((-1, 1)) for inp in ctrl_inputs]
ctrl_inputs = torch.cat(ctrl_inputs, dim = 1).float()
# print(f'ctrl_inputs shape is {ctrl_inputs.shape}')
return ctrl_inputs

# if isinstance(model, torch.nn.Sequential):
# differentiator = AutogradDeriv()
# ctrl_inputs = torch.cat([differentiator.take_derivative(u = model, args = grid, axes = arg[1],
# component = arg[0]).reshape(-1, 1) for arg in args], dim = 1).float()
# else:
# assert isinstance(grid, torch.Tensor) and grid.ndim == 2 and grid.shape[-1] == 1
# grid = grid.detach().cpu().numpy()
# differentiator = FDDeriv()
# ctrl_inputs = torch.cat([torch.from_numpy(differentiator.take_derivative(model, grid, axes = arg[1],
# component = arg[0])).reshape(-1, 1)
# for arg in args], dim = 1).float()
return ctrl_inputs

@torch.no_grad()
def eps_increment_diff(input_params: OrderedDict, loc: List[Union[str, Tuple[int]]],
forward: bool = True, eps = 1e-4): # input_keys: list, prev_loc: List = None,
if forward:
input_params[loc[0]][tuple(loc[1:])] += eps
else:
input_params[loc[0]][tuple(loc[1:])] -= 2*eps
return input_params
2 changes: 0 additions & 2 deletions epde/eq_mo_objectives.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,6 @@ def equation_fitness(system, equation_key):
error : float.
The value of the error metric.
'''
# print(f'System, for which we evaluate fitness: {system.text_form}')
# print(f'For equation key {equation_key}, {system.vals[equation_key].fitness_calculated}')
assert system.vals[equation_key].fitness_calculated, 'Trying to call fitness before its evaluation.'
res = system.vals[equation_key].fitness_value
return res
Expand Down
Loading