diff --git a/Makefile b/Makefile index 95d9fb8..763bd9c 100644 --- a/Makefile +++ b/Makefile @@ -186,5 +186,5 @@ uninstall: $(PIP) uninstall fandango-fuzzer -y # python -m evaluation.vs_isla.run_evaluation -evaluate: +evaluate evaluation: $(PYTHON) -m evaluation.vs_isla.run_evaluation 1 diff --git a/src/fandango/evolution/adaptation.py b/src/fandango/evolution/adaptation.py new file mode 100644 index 0000000..b61137a --- /dev/null +++ b/src/fandango/evolution/adaptation.py @@ -0,0 +1,82 @@ +from typing import List, Tuple + +from fandango.evolution.evaluation import Evaluator +from fandango.language.grammar import DerivationTree +from fandango.logger import LOGGER + + +class AdaptiveTuner: + def __init__(self, initial_mutation_rate: float, initial_crossover_rate: float): + self.mutation_rate = initial_mutation_rate + self.crossover_rate = initial_crossover_rate + + def update_parameters( + self, + generation: int, + prev_best_fitness: float, + current_best_fitness: float, + population: List[DerivationTree], + evaluator: Evaluator, + ) -> Tuple[float, float]: + diversity_map = evaluator.compute_diversity_bonus(population) + avg_diversity = ( + sum(diversity_map.values()) / len(diversity_map) if diversity_map else 0 + ) + + fitness_improvement_threshold = ( + 0.01 # minimal improvement to be considered significant + ) + diversity_low_threshold = 0.1 # low diversity threshold + + # Adaptive Mutation + if ( + current_best_fitness - prev_best_fitness + ) < fitness_improvement_threshold or avg_diversity < diversity_low_threshold: + new_mutation_rate = min(1.0, self.mutation_rate * 1.1) + LOGGER.info( + f"Generation {generation}: Increasing mutation rate from {self.mutation_rate:.2f} to {new_mutation_rate:.2f}" + ) + self.mutation_rate = new_mutation_rate + else: + new_mutation_rate = max(0.01, self.mutation_rate * 0.95) + LOGGER.info( + f"Generation {generation}: Decreasing mutation rate from {self.mutation_rate:.2f} to {new_mutation_rate:.2f}" + ) + self.mutation_rate = new_mutation_rate + + # Adaptive Crossover + if avg_diversity < diversity_low_threshold: + new_crossover_rate = min(1.0, self.crossover_rate * 1.05) + LOGGER.info( + f"Generation {generation}: Increasing crossover rate from {self.crossover_rate:.2f} to {new_crossover_rate:.2f}" + ) + self.crossover_rate = new_crossover_rate + else: + new_crossover_rate = max(0.1, self.crossover_rate * 0.98) + LOGGER.info( + f"Generation {generation}: Decreasing crossover rate from {self.crossover_rate:.2f} to {new_crossover_rate:.2f}" + ) + self.crossover_rate = new_crossover_rate + + return self.mutation_rate, self.crossover_rate + + def log_generation_statistics( + self, + generation: int, + evaluation: List[Tuple[DerivationTree, float, List]], + population: List[DerivationTree], + evaluator: Evaluator, + ): + best_fitness = max(fitness for _, fitness, _ in evaluation) + avg_fitness = sum(fitness for _, fitness, _ in evaluation) / len(evaluation) + diversity_bonus = evaluator.compute_diversity_bonus(population) + avg_diversity = ( + sum(diversity_bonus.values()) / len(diversity_bonus) + if diversity_bonus + else 0 + ) + LOGGER.info( + f"Generation {generation} stats -- Best fitness: {best_fitness:.2f}, " + f"Avg fitness: {avg_fitness:.2f}, Avg diversity: {avg_diversity:.2f}, " + f"Population size: {len(population)}" + ) diff --git a/src/fandango/evolution/algorithm.py b/src/fandango/evolution/algorithm.py index e4ad56e..2d0f3d7 100644 --- a/src/fandango/evolution/algorithm.py +++ b/src/fandango/evolution/algorithm.py @@ -1,14 +1,16 @@ -# evolution/algorithm.py +# fandango/evolution/algorithm.py import enum import logging import random import time -from typing import Dict, List, Tuple +from typing import List, Tuple, Union from fandango.constraints.base import Constraint -from fandango.constraints.fitness import Comparison, ComparisonSide, FailingTree +from fandango.evolution.adaptation import AdaptiveTuner from fandango.evolution.crossover import CrossoverOperator, SimpleSubtreeCrossover +from fandango.evolution.evaluation import Evaluator from fandango.evolution.mutation import MutationOperator, SimpleMutation +from fandango.evolution.population import PopulationManager from fandango.language.grammar import DerivationTree, Grammar from fandango.logger import LOGGER, clear_visualization, visualize_evaluation @@ -29,7 +31,7 @@ def __init__( constraints: List[Constraint], population_size: int = 100, desired_solutions: int = 0, - initial_population: List[DerivationTree | str] = None, + initial_population: List[Union[DerivationTree, str]] = None, max_generations: int = 500, expected_fitness: float = 1.0, elitism_rate: float = 0.1, @@ -47,86 +49,49 @@ def __init__( diversity_k: int = 5, diversity_weight: float = 1.0, ): - """ - Initialize the FANDANGO genetic algorithm. The algorithm will evolve a population of individuals - that are generated by the given grammar, and will evaluate them against the given constraints. - - Note: This version enforces uniqueness of individuals in the population and solutions. - A maximum number of attempts is used when generating individuals to avoid infinite loops. - - :param grammar: The grammar used to generate individuals. - :param constraints: The constraints used to evaluate individuals. - :param population_size: The size of the population. - :param desired_solutions: The number of perfect solutions to find before stopping the algorithm. - :param initial_population: A list of individuals to use as the initial population. - :param max_generations: The maximum number of generations to run the algorithm. - :param expected_fitness: The fitness value that the algorithm should aim to achieve. - :param elitism_rate: The rate of individuals that will be preserved in the next generation. - :param crossover_method: The crossover operator to use. - :param crossover_rate: The rate of individuals that will undergo crossover. - :param mutation_method: The mutation operator to use. - :param mutation_rate: The rate of individuals that will undergo mutation. - :param tournament_size: The size of the tournament selection. - :param destruction_rate: The rate of individuals that will be destroyed. - :param logger_level: If set, the level of logging to use. - :param start_symbol: The start symbol to use with the grammar. - :param warnings_are_errors: If set, turns warnings into errors. - :param best_effort: If set, returns also solutions not satisfying all constraints. - :param random_seed: The random seed to use for reproducibility. - :param diversity_k: The k value for k-path coverage diversity bonus. - :param diversity_weight: The weight of the diversity bonus. - """ if tournament_size > 1: raise ValueError( f"Parameter tournament_size must be in range ]0, 1], but is {tournament_size}." ) if random_seed is not None: random.seed(random_seed) - if logger_level is not None: LOGGER.setLevel(logger_level.value) - LOGGER.info("---------- Initializing FANDANGO algorithm ---------- ") + + self.fixes_made = 0 self.grammar = grammar self.constraints = constraints self.population_size = population_size self.expected_fitness = expected_fitness - - self.crossover_rate = crossover_rate - self.crossover_operator = crossover_method - - self.mutation_method = mutation_method - self.mutation_rate = mutation_rate - - self.tournament_size = max(2, int(population_size * tournament_size)) - self.max_generations = max_generations self.elitism_rate = elitism_rate self.destruction_rate = destruction_rate self.start_symbol = start_symbol - - self.diversity_k = diversity_k - self.diversity_weight = diversity_weight - - self.fitness_cache = {} - - self.fixes_made = 0 - self.checks_made = 0 - self.crossovers_made = 0 - self.mutations_made = 0 - - self.time_taken = None - + self.tournament_size = max(2, int(population_size * tournament_size)) + self.max_generations = max_generations self.warnings_are_errors = warnings_are_errors self.best_effort = best_effort - # Initialize population and solutions with uniqueness enforced - self.solution = [] - self.solution_set = set() # to ensure solutions are unique (using their hash) - self.desired_solutions = desired_solutions + # Instantiate managers + self.population_manager = PopulationManager( + grammar, start_symbol, population_size, warnings_are_errors + ) + self.evaluator = Evaluator( + grammar, + constraints, + expected_fitness, + diversity_k, + diversity_weight, + warnings_are_errors, + ) + self.adaptive_tuner = AdaptiveTuner(mutation_rate, crossover_rate) + self.crossover_operator = crossover_method + self.mutation_method = mutation_method + # Initialize population if initial_population is not None: LOGGER.info("Saving the provided initial population...") - self.population = [] + unique_population = [] unique_hashes = set() for individual in initial_population: try: @@ -145,19 +110,16 @@ def __init__( h = hash(tree) if h not in unique_hashes: unique_hashes.add(h) - self.population.append(tree) + unique_population.append(tree) except Exception as e: LOGGER.error( f"Error processing initial individual: {individual} - {e}" ) if self.warnings_are_errors: raise - # Fill remaining population if needed attempts = 0 - max_attempts = (self.population_size - len(self.population)) * 10 - while ( - len(self.population) < self.population_size and attempts < max_attempts - ): + max_attempts = (population_size - len(unique_population)) * 10 + while len(unique_population) < population_size and attempts < max_attempts: try: candidate = self.fix_individual( self.grammar.fuzz(self.start_symbol) @@ -165,82 +127,49 @@ def __init__( h = hash(candidate) if h not in unique_hashes: unique_hashes.add(h) - self.population.append(candidate) + unique_population.append(candidate) except Exception as e: LOGGER.error(f"Error generating/fixing candidate during init: {e}") attempts += 1 - if len(self.population) < self.population_size: + if len(unique_population) < population_size: LOGGER.warning( - f"Could not generate full unique initial population. Final size is {len(self.population)}." + f"Could not generate full unique initial population. Final size is {len(unique_population)}." ) + self.population = unique_population else: - LOGGER.info( - f"Generating initial population (size: {self.population_size})..." - ) + LOGGER.info(f"Generating initial population (size: {population_size})...") st_time = time.time() - self.population = self.generate_random_initial_population() + self.population = ( + self.population_manager.generate_random_initial_population( + self.fix_individual + ) + ) LOGGER.info( f"Initial population generated in {time.time() - st_time:.2f} seconds" ) - # Evaluate population - self.evaluation = self.evaluate_population() + # Evaluate initial population + self.evaluation = self.evaluator.evaluate_population(self.population) self.fitness = ( - sum(fitness for _, fitness, _ in self.evaluation) / self.population_size + sum(fitness for _, fitness, _ in self.evaluation) / population_size ) - def add_unique_individual( - self, - population: List[DerivationTree], - candidate: DerivationTree, - unique_set: set, - ) -> bool: - """ - Helper method to add a candidate to the given population if its hash is not already in unique_set. - Returns True if the candidate was added. - """ - h = hash(candidate) - if h not in unique_set: - unique_set.add(h) - population.append(candidate) - return True - return False - - def generate_random_initial_population(self) -> List[DerivationTree]: - """ - Generate the initial population of individuals ensuring uniqueness. - """ - unique_population = [] - unique_hashes = set() - attempts = 0 - max_attempts = self.population_size * 10 # safeguard against infinite loops - - while len(unique_population) < self.population_size and attempts < max_attempts: - try: - candidate = self.fix_individual(self.grammar.fuzz(self.start_symbol)) - self.add_unique_individual(unique_population, candidate, unique_hashes) - except Exception as e: - LOGGER.error(f"Error during initial population generation: {e}") - attempts += 1 - - if len(unique_population) < self.population_size: - LOGGER.warning( - "Could not generate a full population of unique individuals. " - f"Population size reduced to {len(unique_population)}." - ) - - return unique_population + self.fixes_made = 0 + self.checks_made = self.evaluator.checks_made + self.crossovers_made = 0 + self.mutations_made = 0 + self.time_taken = None + self.solution = self.evaluator.solution + self.solution_set = self.evaluator.solution_set + self.desired_solutions = desired_solutions def fix_individual(self, individual: DerivationTree) -> DerivationTree: - """ - Fix an individual by replacing failing subtrees if Comparison.EQUAL constraints are involved. - Includes error recovery by logging any exceptions during fixes. - """ try: - evaluation = self.evaluate_individual(individual) - failing_trees = evaluation[1] + fitness_val, failing_trees = self.evaluator.evaluate_individual(individual) for failing_tree in failing_trees: for operator, value, side in failing_tree.suggestions: + from fandango.constraints.fitness import Comparison, ComparisonSide + if operator == Comparison.EQUAL and side == ComparisonSide.LEFT: suggested_tree = self.grammar.parse( str(value), failing_tree.tree.symbol @@ -260,116 +189,6 @@ def fix_individual(self, individual: DerivationTree) -> DerivationTree: raise return individual - def compute_diversity_bonus( - self, individuals: List[DerivationTree] - ) -> Dict[int, float]: - """ - Compute a diversity bonus for each individual based on the rarity of its k-paths. - The bonus is computed as the (weighted) average of the reciprocal frequencies of its k-paths. - :param individuals: The list of derivation trees. - :return: A dictionary mapping each individual’s index to its diversity bonus. - """ - k = self.diversity_k - ind_kpaths: Dict[int, set] = {} - for idx, tree in enumerate(individuals): - paths = self.grammar._extract_k_paths_from_tree(tree, k) - ind_kpaths[idx] = paths - - frequency: Dict[tuple, int] = {} - for paths in ind_kpaths.values(): - for path in paths: - frequency[path] = frequency.get(path, 0) + 1 - - bonus: Dict[int, float] = {} - for idx, paths in ind_kpaths.items(): - if paths: - bonus_score = sum(1.0 / frequency[path] for path in paths) / len(paths) - else: - bonus_score = 0.0 - bonus[idx] = bonus_score * self.diversity_weight - - return bonus - - def evaluate_individual( - self, individual: DerivationTree - ) -> Tuple[float, List[FailingTree]]: - """ - Evaluate a single individual and update the solution list if the individual is perfect. - Uniqueness in the solution list is enforced via self.solution_set. - """ - key = hash(individual) - if key in self.fitness_cache: - if ( - self.fitness_cache[key][0] >= self.expected_fitness - and key not in self.solution_set - ): - self.solution_set.add(key) - self.solution.append(individual) - return self.fitness_cache[key] - - fitness = 0.0 - failing_trees = [] - - for constraint in self.constraints: - try: - result = constraint.fitness(individual) - if result.success: - fitness += result.fitness() - else: - failing_trees.extend(result.failing_trees) - fitness += result.fitness() - self.checks_made += 1 - except Exception as e: - LOGGER.error(f"Error evaluating constraint {constraint}: {e}") - # Optionally, decide how to penalize this individual. - fitness += 0.0 - - try: - fitness /= len(self.constraints) - except ZeroDivisionError: - fitness = 1.0 - - if fitness >= self.expected_fitness and key not in self.solution_set: - self.solution_set.add(key) - self.solution.append(individual) - - self.fitness_cache[key] = [fitness, failing_trees] - return fitness, failing_trees - - def evaluate_population( - self, - ) -> List[Tuple[DerivationTree, float, List[FailingTree]]]: - """ - Evaluate the fitness of each individual in the population and add a diversity bonus - based on k-path coverage. - :return: A list of tuples, each containing an individual, its fitness, and the list of failing trees. - """ - evaluation = [] - for individual in self.population: - fitness, failing_trees = self.evaluate_individual(individual) - evaluation.append((individual, fitness, failing_trees)) - if self.diversity_k > 0 and self.diversity_weight > 0: - bonus_map = self.compute_diversity_bonus(self.population) - new_evaluation = [] - for idx, (ind, fitness, failing_trees) in enumerate(evaluation): - new_fitness = fitness + bonus_map.get(idx, 0.0) - new_evaluation.append((ind, new_fitness, failing_trees)) - evaluation = new_evaluation - - return evaluation - - def select_elites(self) -> List[DerivationTree]: - """ - Select the top elite individuals from the current population. - :return: A list of elite individuals. - """ - return [ - x[0] - for x in sorted(self.evaluation, key=lambda x: x[1], reverse=True)[ - : int(self.elitism_rate * self.population_size) - ] - ] - def tournament_selection(self) -> Tuple[DerivationTree, DerivationTree]: try: tournament = random.sample(self.evaluation, k=self.tournament_size) @@ -388,82 +207,9 @@ def tournament_selection(self) -> Tuple[DerivationTree, DerivationTree]: LOGGER.error(f"Error during tournament selection: {e}") raise - def update_parameters( - self, generation: int, prev_best_fitness: float, current_best_fitness: float - ): - """ - Adapt mutation and crossover rates based on the progress in fitness and population diversity. - """ - # Compute diversity metric (using the diversity bonus as a proxy) - diversity_map = self.compute_diversity_bonus(self.population) - avg_diversity = ( - sum(diversity_map.values()) / len(diversity_map) if diversity_map else 0 - ) - - # Example thresholds (you might need to tune these): - fitness_improvement_threshold = ( - 0.01 # minimal improvement to be considered significant - ) - diversity_low_threshold = ( - 0.1 # if average diversity bonus is too low, population might be converging - ) - - # Adaptive Mutation: Increase if little improvement or low diversity, decrease otherwise - if ( - current_best_fitness - prev_best_fitness - ) < fitness_improvement_threshold or avg_diversity < diversity_low_threshold: - # Increase mutation rate (but cap it at 1.0) - new_mutation_rate = min(1.0, self.mutation_rate * 1.1) - LOGGER.info( - f"Generation {generation}: Increasing mutation rate from {self.mutation_rate:.2f} to {new_mutation_rate:.2f}" - ) - self.mutation_rate = new_mutation_rate - else: - # Decrease mutation rate slightly if progress is good - new_mutation_rate = max(0.01, self.mutation_rate * 0.95) - LOGGER.info( - f"Generation {generation}: Decreasing mutation rate from {self.mutation_rate:.2f} to {new_mutation_rate:.2f}" - ) - self.mutation_rate = new_mutation_rate - - # Adaptive Crossover: Optionally, adjust based on diversity - if avg_diversity < diversity_low_threshold: - new_crossover_rate = min(1.0, self.crossover_rate * 1.05) - LOGGER.info( - f"Generation {generation}: Increasing crossover rate from {self.crossover_rate:.2f} to {new_crossover_rate:.2f}" - ) - self.crossover_rate = new_crossover_rate - else: - new_crossover_rate = max(0.1, self.crossover_rate * 0.98) - LOGGER.info( - f"Generation {generation}: Decreasing crossover rate from {self.crossover_rate:.2f} to {new_crossover_rate:.2f}" - ) - self.crossover_rate = new_crossover_rate - - def log_generation_statistics(self, generation: int): - """ - Log detailed statistics for the current generation. - """ - best_fitness = max(fitness for _, fitness, _ in self.evaluation) - avg_fitness = sum(fitness for _, fitness, _ in self.evaluation) / len( - self.evaluation - ) - diversity_bonus = self.compute_diversity_bonus(self.population) - avg_diversity = ( - sum(diversity_bonus.values()) / len(diversity_bonus) - if diversity_bonus - else 0 - ) - LOGGER.info( - f"Generation {generation} stats -- Best fitness: {best_fitness:.2f}, " - f"Avg fitness: {avg_fitness:.2f}, Avg diversity: {avg_diversity:.2f}, " - f"Population size: {len(self.population)}" - ) - def evolve(self) -> List[DerivationTree]: LOGGER.info("---------- Starting evolution ----------") start_time = time.time() - prev_best_fitness = 0.0 for generation in range(1, self.max_generations + 1): @@ -484,22 +230,22 @@ def evolve(self) -> List[DerivationTree]: f"Generation {generation} - Fitness: {self.fitness:.2f} - #solutions found: {len(self.solution)}" ) - # --- Selection & Crossover --- + # Selection & Crossover new_population = self.select_elites() unique_hashes = {hash(ind) for ind in new_population} while len(new_population) < self.population_size: - if random.random() < self.crossover_rate: + if random.random() < self.adaptive_tuner.crossover_rate: try: parent1, parent2 = self.tournament_selection() child1, child2 = self.crossover_operator.crossover( parent1, parent2 ) - self.add_unique_individual( + self.population_manager.add_unique_individual( new_population, child1, unique_hashes ) if len(new_population) < self.population_size: - self.add_unique_individual( + self.population_manager.add_unique_individual( new_population, child2, unique_hashes ) self.crossovers_made += 1 @@ -512,13 +258,13 @@ def evolve(self) -> List[DerivationTree]: if len(new_population) > self.population_size: new_population = new_population[: self.population_size] - # --- Mutation --- + # Mutation mutated_population = [] for individual in new_population: - if random.random() < self.mutation_rate: + if random.random() < self.adaptive_tuner.mutation_rate: try: mutated_individual = self.mutation_method.mutate( - individual, self.grammar, self.evaluate_individual + individual, self.grammar, self.evaluator.evaluate_individual ) mutated_population.append(mutated_individual) self.mutations_made += 1 @@ -529,7 +275,7 @@ def evolve(self) -> List[DerivationTree]: mutated_population.append(individual) new_population = mutated_population - # --- Destruction --- + # Destruction if self.destruction_rate > 0: LOGGER.debug( f"Destroying {self.destruction_rate * 100:.2f}% of the population" @@ -540,70 +286,43 @@ def evolve(self) -> List[DerivationTree]: ] unique_hashes = {hash(ind) for ind in new_population} - # --- Ensure Uniqueness & Fill Population --- + # Ensure Uniqueness & Fill Population unique_temp = {} for ind in new_population: unique_temp[hash(ind)] = ind new_population = list(unique_temp.values()) + new_population = self.population_manager.refill_population( + new_population, self.fix_individual + ) - attempts = 0 - max_attempts = (self.population_size - len(new_population)) * 10 - while ( - len(new_population) < self.population_size and attempts < max_attempts - ): - try: - candidate = self.fix_individual( - self.grammar.fuzz(self.start_symbol) - ) - if hash(candidate) not in unique_hashes: - unique_hashes.add(hash(candidate)) - new_population.append(candidate) - except Exception as e: - LOGGER.error(f"Error during population refill: {e}") - attempts += 1 - - if len(new_population) < self.population_size: - LOGGER.warning( - "Could not generate full unique new population, filling remaining slots with duplicates." - ) - while len(new_population) < self.population_size: - try: - new_population.append(self.grammar.fuzz(self.start_symbol)) - except Exception as e: - LOGGER.error(f"Error during fallback population filling: {e}") - break - - fixed_population = [] - for ind in new_population: - try: - fixed_population.append(self.fix_individual(ind)) - except Exception as e: - LOGGER.error( - f"Error during fixing individual in new population: {e}" - ) - fixed_population.append(ind) + fixed_population = [self.fix_individual(ind) for ind in new_population] self.population = fixed_population[: self.population_size] - self.evaluation = self.evaluate_population() + self.evaluation = self.evaluator.evaluate_population(self.population) self.fitness = ( sum(fitness for _, fitness, _ in self.evaluation) / self.population_size ) - # Retrieve the best fitness of this generation for adaptive tuning current_best_fitness = max(fitness for _, fitness, _ in self.evaluation) - self.update_parameters(generation, prev_best_fitness, current_best_fitness) + self.adaptive_tuner.update_parameters( + generation, + prev_best_fitness, + current_best_fitness, + self.population, + self.evaluator, + ) prev_best_fitness = current_best_fitness - self.log_generation_statistics(generation) + self.adaptive_tuner.log_generation_statistics( + generation, self.evaluation, self.population, self.evaluator + ) visualize_evaluation(generation, self.max_generations, self.evaluation) clear_visualization() self.time_taken = time.time() - start_time - LOGGER.info("---------- Evolution finished ----------") LOGGER.info(f"Perfect solutions found: ({len(self.solution)})") LOGGER.info(f"Fitness of final population: {self.fitness:.2f}") LOGGER.info(f"Time taken: {self.time_taken:.2f} seconds") - LOGGER.debug("---------- FANDANGO statistics ----------") LOGGER.debug(f"Fixes made: {self.fixes_made}") LOGGER.debug(f"Fitness checks: {self.checks_made}") @@ -629,3 +348,11 @@ def evolve(self) -> List[DerivationTree]: return self.population[: self.desired_solutions] return self.solution + + def select_elites(self) -> List[DerivationTree]: + return [ + x[0] + for x in sorted(self.evaluation, key=lambda x: x[1], reverse=True)[ + : int(self.elitism_rate * self.population_size) + ] + ] diff --git a/src/fandango/evolution/evaluation.py b/src/fandango/evolution/evaluation.py new file mode 100644 index 0000000..1746b7d --- /dev/null +++ b/src/fandango/evolution/evaluation.py @@ -0,0 +1,106 @@ +from typing import Dict, List, Tuple + +from fandango.constraints.base import Constraint +from fandango.constraints.fitness import FailingTree +from fandango.language.grammar import DerivationTree, Grammar +from fandango.logger import LOGGER + + +class Evaluator: + def __init__( + self, + grammar: Grammar, + constraints: List[Constraint], + expected_fitness: float, + diversity_k: int, + diversity_weight: float, + warnings_are_errors: bool = False, + ): + self.grammar = grammar + self.constraints = constraints + self.expected_fitness = expected_fitness + self.diversity_k = diversity_k + self.diversity_weight = diversity_weight + self.warnings_are_errors = warnings_are_errors + self.fitness_cache: Dict[int, Tuple[float, List[FailingTree]]] = {} + self.solution = [] + self.solution_set = set() + self.checks_made = 0 + + def compute_diversity_bonus( + self, individuals: List[DerivationTree] + ) -> Dict[int, float]: + k = self.diversity_k + ind_kpaths: Dict[int, set] = {} + for idx, tree in enumerate(individuals): + # Assuming your grammar is available in evaluator + paths = self.grammar._extract_k_paths_from_tree(tree, k) + ind_kpaths[idx] = paths + + frequency: Dict[tuple, int] = {} + for paths in ind_kpaths.values(): + for path in paths: + frequency[path] = frequency.get(path, 0) + 1 + + bonus: Dict[int, float] = {} + for idx, paths in ind_kpaths.items(): + if paths: + bonus_score = sum(1.0 / frequency[path] for path in paths) / len(paths) + else: + bonus_score = 0.0 + bonus[idx] = bonus_score * self.diversity_weight + return bonus + + def evaluate_individual( + self, individual: DerivationTree + ) -> Tuple[float, List[FailingTree]]: + key = hash(individual) + if key in self.fitness_cache: + if ( + self.fitness_cache[key][0] >= self.expected_fitness + and key not in self.solution_set + ): + self.solution_set.add(key) + self.solution.append(individual) + return self.fitness_cache[key] + + fitness = 0.0 + failing_trees: List[FailingTree] = [] + for constraint in self.constraints: + try: + result = constraint.fitness(individual) + if result.success: + fitness += result.fitness() + else: + failing_trees.extend(result.failing_trees) + fitness += result.fitness() + self.checks_made += 1 + except Exception as e: + LOGGER.error(f"Error evaluating constraint {constraint}: {e}") + fitness += 0.0 + try: + fitness /= len(self.constraints) + except ZeroDivisionError: + fitness = 1.0 + + if fitness >= self.expected_fitness and key not in self.solution_set: + self.solution_set.add(key) + self.solution.append(individual) + self.fitness_cache[key] = (fitness, failing_trees) + return fitness, failing_trees + + def evaluate_population( + self, population: List[DerivationTree] + ) -> List[Tuple[DerivationTree, float, List[FailingTree]]]: + evaluation = [] + for individual in population: + fitness, failing_trees = self.evaluate_individual(individual) + evaluation.append((individual, fitness, failing_trees)) + if self.diversity_k > 0 and self.diversity_weight > 0: + bonus_map = self.compute_diversity_bonus(population) + new_evaluation = [] + for idx, (ind, fitness, failing_trees) in enumerate(evaluation): + new_fitness = fitness + bonus_map.get(idx, 0.0) + new_evaluation.append((ind, new_fitness, failing_trees)) + evaluation = new_evaluation + return evaluation diff --git a/src/fandango/evolution/population.py b/src/fandango/evolution/population.py new file mode 100644 index 0000000..9b54ae9 --- /dev/null +++ b/src/fandango/evolution/population.py @@ -0,0 +1,86 @@ +from typing import Callable, List, Set + +from fandango.language.grammar import DerivationTree, Grammar +from fandango.logger import LOGGER + + +class PopulationManager: + def __init__( + self, + grammar: Grammar, + start_symbol: str, + population_size: int, + warnings_are_errors: bool = False, + ): + self.grammar = grammar + self.start_symbol = start_symbol + self.population_size = population_size + self.warnings_are_errors = warnings_are_errors + + def add_unique_individual( + self, + population: List[DerivationTree], + candidate: DerivationTree, + unique_set: Set[int], + ) -> bool: + h = hash(candidate) + if h not in unique_set: + unique_set.add(h) + population.append(candidate) + return True + return False + + def generate_random_initial_population( + self, fix_func: Callable[[DerivationTree], DerivationTree] + ) -> List[DerivationTree]: + unique_population = [] + unique_hashes = set() + attempts = 0 + max_attempts = self.population_size * 10 # safeguard against infinite loops + + while len(unique_population) < self.population_size and attempts < max_attempts: + try: + candidate = fix_func(self.grammar.fuzz(self.start_symbol)) + self.add_unique_individual(unique_population, candidate, unique_hashes) + except Exception as e: + LOGGER.error(f"Error during initial population generation: {e}") + attempts += 1 + + if len(unique_population) < self.population_size: + LOGGER.warning( + f"Could not generate a full population of unique individuals. Population size reduced to {len(unique_population)}." + ) + return unique_population + + def refill_population( + self, + current_population: List[DerivationTree], + fix_func: Callable[[DerivationTree], DerivationTree], + ) -> List[DerivationTree]: + unique_hashes = {hash(ind) for ind in current_population} + attempts = 0 + max_attempts = (self.population_size - len(current_population)) * 10 + + while ( + len(current_population) < self.population_size and attempts < max_attempts + ): + try: + candidate = fix_func(self.grammar.fuzz(self.start_symbol)) + if hash(candidate) not in unique_hashes: + unique_hashes.add(hash(candidate)) + current_population.append(candidate) + except Exception as e: + LOGGER.error(f"Error during population refill: {e}") + attempts += 1 + + if len(current_population) < self.population_size: + LOGGER.warning( + "Could not generate full unique new population, filling remaining slots with duplicates." + ) + while len(current_population) < self.population_size: + try: + current_population.append(self.grammar.fuzz(self.start_symbol)) + except Exception as e: + LOGGER.error(f"Error during fallback population filling: {e}") + break + return current_population diff --git a/tests/test_optimizer.py b/tests/test_optimizer.py index 508ffd2..05d1640 100644 --- a/tests/test_optimizer.py +++ b/tests/test_optimizer.py @@ -47,7 +47,9 @@ def test_generate_initial_population(self): def test_evaluate_fitness(self): # Evaluate the fitness of the population for individual in self.fandango.population: - fitness, failing_trees = self.fandango.evaluate_individual(individual) + fitness, failing_trees = self.fandango.evaluator.evaluate_individual( + individual + ) self.assertIsInstance(fitness, float) self.assertGreaterEqual(fitness, 0.0) self.assertLessEqual(fitness, 1.0) @@ -58,7 +60,9 @@ def test_evaluate_fitness(self): def test_evaluate_population(self): # Evaluate the fitness of the population - evaluation = self.fandango.evaluate_population() + evaluation = self.fandango.evaluator.evaluate_population( + self.fandango.population + ) assert len(evaluation) == len(self.fandango.population) for derivation_tree, fitness, failing_trees in evaluation: self.assertIsInstance(fitness, float) @@ -129,10 +133,14 @@ def test_mutation(self): # Perform mutation mutant1 = self.fandango.mutation_method.mutate( - children[0], self.fandango.grammar, self.fandango.evaluate_individual + children[0], + self.fandango.grammar, + self.fandango.evaluator.evaluate_individual, ) mutant2 = self.fandango.mutation_method.mutate( - children[1], self.fandango.grammar, self.fandango.evaluate_individual + children[1], + self.fandango.grammar, + self.fandango.evaluator.evaluate_individual, ) # Check that the mutated children are of the correct type