Skip to content

Commit

Permalink
initial commit of DEAPGAOpt
Browse files Browse the repository at this point in the history
  • Loading branch information
dmark04 committed May 3, 2023
1 parent 2679b52 commit 66cc422
Show file tree
Hide file tree
Showing 3 changed files with 140 additions and 1 deletion.
132 changes: 132 additions & 0 deletions qiaopt/optimization.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
import numpy as np
from abc import ABCMeta, abstractmethod

import random
from deap import base, creator, tools
from typing import Callable, Dict, List, Tuple, Union

from qiaopt.ga import ModGA
from qiaopt.pre import Experiment

Expand Down Expand Up @@ -231,6 +235,134 @@ def overwrite_internal_data_points(self, data_points: list) -> None:
self.ga_instance.population = data_array


class DEAPGAOpt(GenericOptimization):
@staticmethod
def create_individual(creator, input_values: List[float], index=None):
individual = creator(input_values)
individual.sol_idx = index
return individual

def __init__(self, fitness_func: Callable, initial_population: List, num_generations: int, num_parents_mating: int,
gene_space: Union[Dict, List] = None, refinement_factors=None, logging_level=1,
allow_duplicate_genes=False, **deap_kwargs):
super().__init__(function=fitness_func, refinement_factors=refinement_factors, logging_level=logging_level,
extrema=self.MINIMUM, evolutionary=True)

if deap_kwargs is None:
deap_kwargs = {}

self.num_generations = num_generations
self.num_parents_mating = num_parents_mating
self.deap_kwargs = deap_kwargs
self.gene_space = gene_space
self.allow_duplicate_genes = allow_duplicate_genes
self.current_generation = 0

self.cxpb = deap_kwargs.get("cxpb", 0.5)
self.mutpb = deap_kwargs.get("mutpb", 0.2)

creator.create("FitnessMax", base.Fitness, weights=(-1.0 if self.extrema == self.MINIMUM else 1.0,))
creator.create("Individual", list, fitness=creator.FitnessMax, sol_idx=None)

self.toolbox = base.Toolbox()

self.toolbox.register("mate", tools.cxTwoPoint)
self.toolbox.register("mutate", tools.mutUniformInt, low=0, up=len(initial_population) - 1, indpb=0.5)
self.toolbox.register("select", tools.selBest)
self.toolbox.register("evaluate", self._objective_func)

# Create the initial population using the create_individual method
self.population = [self.create_individual(creator.Individual, ind, index=idx)
for idx, ind in enumerate(initial_population)]


def _objective_func(self, individual, sol_idx) -> Tuple[float,]:
if self.gene_space is not None and not self._check_constraints(individual):
return (float('-inf'),)
print(individual, sol_idx, individual.sol_idx)
fitness = self.function(None, individual, sol_idx)
return (fitness,)

def _check_constraints(self, individual: List) -> bool:
if isinstance(self.gene_space, list):
for index, value in enumerate(individual):
if self.gene_space[index] is not None:
if not (self.gene_space[index]['low'] <= value <= self.gene_space[index]['high']):
return False
elif isinstance(self.gene_space, dict):
for value in individual:
if not (self.gene_space['low'] <= value <= self.gene_space['high']):
return False
else:
raise TypeError(f"Unacceptable type {type(self.gene_space)} for gene_space.")
return True

def _check_uniqueness(self, population: List) -> List:
seen = set()
new_population = []
for ind in population:
key = tuple(ind)
if key not in seen or self.allow_duplicate_genes:
seen.add(key)
new_population.append(ind)
else:
while key in seen:
new_ind = self.toolbox.individual()
key = tuple(new_ind)
seen.add(key)
new_population.append(new_ind)
return new_population

def execute(self) -> None:
if self.current_generation < self.num_generations:
offspring = self.toolbox.select(self.population, len(self.population))
# todo: somehow something still goes wrong with the indexing.. can we get rid of the awkward indexing
# todo: restrictions at this point? maybe this while struggle is no longer required in this project?
offspring = list(offspring)

for child1, child2 in zip(offspring[::2], offspring[1::2]):
if random.random() < self.cxpb:
self.toolbox.mate(child1, child2)
del child1.fitness.values
del child2.fitness.values

for mutant in offspring:
if random.random() < self.mutpb:
self.toolbox.mutate(mutant)
del mutant.fitness.values

invalid_ind = [ind for ind in offspring if not ind.fitness.valid]
# Update fitnesses with the correct sol_idx
fitnesses = [self._objective_func(ind, ind.sol_idx) for ind in invalid_ind]
for ind, fit in zip(invalid_ind, fitnesses):
ind.fitness.values = fit

# Update sol_idx for the new population
new_population = self.toolbox.select(self.population + offspring, len(self.population))
for idx, ind in enumerate(new_population):
ind.sol_idx = idx

self.population = new_population
self.population = self._check_uniqueness(self.population)
self.current_generation += 1
else:
print("All generations completed.")

def get_best_solution(self) -> Tuple[List, float, int]:
best_ind = tools.selBest(self.population, 1)[0]
best_fitness = best_ind.fitness.values[0]
best_solution = best_ind[:]
best_solution_idx = self.population.index(best_ind)
return best_solution, best_fitness, best_solution_idx

def get_new_points(self) -> List[Tuple]:
return [tuple(ind) for ind in self.population]

def overwrite_internal_data_points(self, data_points: List) -> None:
self.population = [creator.Individual(point) for point in data_points]
self.population = list(map(self.toolbox.clone, self.population))


class CGOpt(GenericOptimization):
"""
CG algorithm
Expand Down
8 changes: 7 additions & 1 deletion qiaopt/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import pandas

from qiaopt.pre import Experiment
from qiaopt.optimization import Optimizer, GAOpt, GenericOptimization
from qiaopt.optimization import Optimizer, GAOpt, GenericOptimization, DEAPGAOpt

from qcg.pilotjob.api.job import Jobs
from qcg.pilotjob.api.manager import LocalManager
Expand Down Expand Up @@ -361,6 +361,12 @@ def set_optimization_algorithm(self) -> GenericOptimization:
# gene_type=param_types,
gene_space=constraints,
**opt_info.parameters)
elif opt_info.name == 'DEAPGA':
optimization_alg = DEAPGAOpt(initial_population=self.experiment.data_points,
fitness_func=self.input_params_to_cost_value,
# gene_type=param_types,
gene_space=constraints,
**opt_info.parameters)
else:
raise ValueError('Unknown optimization algorithm.')
else:
Expand Down
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
deap
flake8
matplotlib
numpy
Expand Down

0 comments on commit 66cc422

Please sign in to comment.