Skip to content

Commit

Permalink
feat: much better logging within the algorithm
Browse files Browse the repository at this point in the history
  • Loading branch information
joszamama committed Feb 5, 2025
1 parent 6451ba7 commit 7f78fb6
Showing 1 changed file with 165 additions and 79 deletions.
244 changes: 165 additions & 79 deletions src/fandango/evolution/algorithm.py
Original file line number Diff line number Diff line change
Expand Up @@ -129,37 +129,49 @@ def __init__(
self.population = []
unique_hashes = set()
for individual in initial_population:
if isinstance(individual, str):
tree = self.grammar.parse(individual)
if not tree:
raise ValueError(
f"Failed to parse initial individual: {individual}"
try:
if isinstance(individual, str):
tree = self.grammar.parse(individual)
if not tree:
raise ValueError(
f"Failed to parse initial individual: {individual}"
)
elif isinstance(individual, DerivationTree):
tree = individual
else:
raise TypeError(
"Initial individuals must be DerivationTree or String"
)
elif isinstance(individual, DerivationTree):
tree = individual
else:
raise TypeError(
"Initial individuals must be DerivationTree or String"
h = hash(tree)
if h not in unique_hashes:
unique_hashes.add(h)
self.population.append(tree)
except Exception as e:
LOGGER.error(
f"Error processing initial individual: {individual} - {e}"
)
h = hash(tree)
if h not in unique_hashes:
unique_hashes.add(h)
self.population.append(tree)
if self.warnings_are_errors:
raise
# Fill remaining population if needed
attempts = 0
max_attempts = (self.population_size - len(self.population)) * 10
while (
len(self.population) < self.population_size and attempts < max_attempts
):
candidate = self.fix_individual(self.grammar.fuzz(self.start_symbol))
h = hash(candidate)
if h not in unique_hashes:
unique_hashes.add(h)
self.population.append(candidate)
try:
candidate = self.fix_individual(
self.grammar.fuzz(self.start_symbol)
)
h = hash(candidate)
if h not in unique_hashes:
unique_hashes.add(h)
self.population.append(candidate)
except Exception as e:
LOGGER.error(f"Error generating/fixing candidate during init: {e}")
attempts += 1
if len(self.population) < self.population_size:
LOGGER.warning(
f"Could not generate full unique initial population. Size is {len(self.population)}."
f"Could not generate full unique initial population. Final size is {len(self.population)}."
)
else:
LOGGER.info(
Expand Down Expand Up @@ -204,8 +216,11 @@ def generate_random_initial_population(self) -> List[DerivationTree]:
max_attempts = self.population_size * 10 # safeguard against infinite loops

while len(unique_population) < self.population_size and attempts < max_attempts:
candidate = self.fix_individual(self.grammar.fuzz(self.start_symbol))
self.add_unique_individual(unique_population, candidate, unique_hashes)
try:
candidate = self.fix_individual(self.grammar.fuzz(self.start_symbol))
self.add_unique_individual(unique_population, candidate, unique_hashes)
except Exception as e:
LOGGER.error(f"Error during initial population generation: {e}")
attempts += 1

if len(unique_population) < self.population_size:
Expand All @@ -219,19 +234,30 @@ def generate_random_initial_population(self) -> List[DerivationTree]:
def fix_individual(self, individual: DerivationTree) -> DerivationTree:
"""
Fix an individual by replacing failing subtrees if Comparison.EQUAL constraints are involved.
Includes error recovery by logging any exceptions during fixes.
"""
evaluation = self.evaluate_individual(individual)
failing_trees = evaluation[1]
for failing_tree in failing_trees:
for operator, value, side in failing_tree.suggestions:
if operator == Comparison.EQUAL and side == ComparisonSide.LEFT:
suggested_tree = self.grammar.parse(
str(value), failing_tree.tree.symbol
)
if suggested_tree is None:
continue
individual = individual.replace(failing_tree.tree, suggested_tree)
self.fixes_made += 1
try:
evaluation = self.evaluate_individual(individual)
failing_trees = evaluation[1]
for failing_tree in failing_trees:
for operator, value, side in failing_tree.suggestions:
if operator == Comparison.EQUAL and side == ComparisonSide.LEFT:
suggested_tree = self.grammar.parse(
str(value), failing_tree.tree.symbol
)
if suggested_tree is None:
LOGGER.warning(
f"Suggested fix for {failing_tree.tree.symbol} returned None."
)
continue
individual = individual.replace(
failing_tree.tree, suggested_tree
)
self.fixes_made += 1
except Exception as e:
LOGGER.error(f"Error in fix_individual: {e}")
if self.warnings_are_errors:
raise
return individual

def compute_diversity_bonus(
Expand Down Expand Up @@ -273,33 +299,39 @@ def evaluate_individual(
"""
key = hash(individual)
if key in self.fitness_cache:
if self.fitness_cache[key][0] >= self.expected_fitness:
if key not in self.solution_set:
self.solution_set.add(key)
self.solution.append(individual)
if (
self.fitness_cache[key][0] >= self.expected_fitness
and key not in self.solution_set
):
self.solution_set.add(key)
self.solution.append(individual)
return self.fitness_cache[key]

fitness = 0.0
failing_trees = []

for constraint in self.constraints:
result = constraint.fitness(individual)
if result.success:
fitness += result.fitness()
else:
failing_trees.extend(result.failing_trees)
fitness += result.fitness()
self.checks_made += 1
try:
result = constraint.fitness(individual)
if result.success:
fitness += result.fitness()
else:
failing_trees.extend(result.failing_trees)
fitness += result.fitness()
self.checks_made += 1
except Exception as e:
LOGGER.error(f"Error evaluating constraint {constraint}: {e}")
# Optionally, decide how to penalize this individual.
fitness += 0.0

try:
fitness /= len(self.constraints)
except ZeroDivisionError:
fitness = 1.0

if fitness >= self.expected_fitness:
if key not in self.solution_set:
self.solution_set.add(key)
self.solution.append(individual)
if fitness >= self.expected_fitness and key not in self.solution_set:
self.solution_set.add(key)
self.solution.append(individual)

self.fitness_cache[key] = [fitness, failing_trees]
return fitness, failing_trees
Expand All @@ -316,7 +348,6 @@ def evaluate_population(
for individual in self.population:
fitness, failing_trees = self.evaluate_individual(individual)
evaluation.append((individual, fitness, failing_trees))

if self.diversity_k > 0 and self.diversity_weight > 0:
bonus_map = self.compute_diversity_bonus(self.population)
new_evaluation = []
Expand All @@ -340,16 +371,22 @@ def select_elites(self) -> List[DerivationTree]:
]

def tournament_selection(self) -> Tuple[DerivationTree, DerivationTree]:
tournament = random.sample(self.evaluation, k=self.tournament_size)
tournament.sort(key=lambda x: x[1], reverse=True)
parent1 = tournament[0][0]
if len(tournament) == 2:
parent2 = tournament[1][0] if tournament[1][0] != parent1 else parent1
else:
parent2 = (
tournament[1][0] if tournament[1][0] != parent1 else tournament[2][0]
)
return parent1, parent2
try:
tournament = random.sample(self.evaluation, k=self.tournament_size)
tournament.sort(key=lambda x: x[1], reverse=True)
parent1 = tournament[0][0]
if len(tournament) == 2:
parent2 = tournament[1][0] if tournament[1][0] != parent1 else parent1
else:
parent2 = (
tournament[1][0]
if tournament[1][0] != parent1
else tournament[2][0]
)
return parent1, parent2
except Exception as e:
LOGGER.error(f"Error during tournament selection: {e}")
raise

def update_parameters(
self, generation: int, prev_best_fitness: float, current_best_fitness: float
Expand Down Expand Up @@ -403,6 +440,26 @@ def update_parameters(
)
self.crossover_rate = new_crossover_rate

def log_generation_statistics(self, generation: int):
"""
Log detailed statistics for the current generation.
"""
best_fitness = max(fitness for _, fitness, _ in self.evaluation)
avg_fitness = sum(fitness for _, fitness, _ in self.evaluation) / len(
self.evaluation
)
diversity_bonus = self.compute_diversity_bonus(self.population)
avg_diversity = (
sum(diversity_bonus.values()) / len(diversity_bonus)
if diversity_bonus
else 0
)
LOGGER.info(
f"Generation {generation} stats -- Best fitness: {best_fitness:.2f}, "
f"Avg fitness: {avg_fitness:.2f}, Avg diversity: {avg_diversity:.2f}, "
f"Population size: {len(self.population)}"
)

def evolve(self) -> List[DerivationTree]:
LOGGER.info("---------- Starting evolution ----------")
start_time = time.time()
Expand All @@ -424,8 +481,7 @@ def evolve(self) -> List[DerivationTree]:
break

LOGGER.info(
f"Generation {generation} - Fitness: {self.fitness:.2f} - "
f"#solutions found: {len(self.solution)}"
f"Generation {generation} - Fitness: {self.fitness:.2f} - #solutions found: {len(self.solution)}"
)

# --- Selection & Crossover ---
Expand All @@ -434,14 +490,22 @@ def evolve(self) -> List[DerivationTree]:

while len(new_population) < self.population_size:
if random.random() < self.crossover_rate:
parent1, parent2 = self.tournament_selection()
child1, child2 = self.crossover_operator.crossover(parent1, parent2)
self.add_unique_individual(new_population, child1, unique_hashes)
if len(new_population) < self.population_size:
try:
parent1, parent2 = self.tournament_selection()
child1, child2 = self.crossover_operator.crossover(
parent1, parent2
)
self.add_unique_individual(
new_population, child2, unique_hashes
new_population, child1, unique_hashes
)
self.crossovers_made += 1
if len(new_population) < self.population_size:
self.add_unique_individual(
new_population, child2, unique_hashes
)
self.crossovers_made += 1
except Exception as e:
LOGGER.error(f"Error during crossover: {e}")
continue
else:
break

Expand All @@ -452,11 +516,15 @@ def evolve(self) -> List[DerivationTree]:
mutated_population = []
for individual in new_population:
if random.random() < self.mutation_rate:
mutated_individual = self.mutation_method.mutate(
individual, self.grammar, self.evaluate_individual
)
mutated_population.append(mutated_individual)
self.mutations_made += 1
try:
mutated_individual = self.mutation_method.mutate(
individual, self.grammar, self.evaluate_individual
)
mutated_population.append(mutated_individual)
self.mutations_made += 1
except Exception as e:
LOGGER.error(f"Error during mutation: {e}")
mutated_population.append(individual)
else:
mutated_population.append(individual)
new_population = mutated_population
Expand All @@ -483,20 +551,37 @@ def evolve(self) -> List[DerivationTree]:
while (
len(new_population) < self.population_size and attempts < max_attempts
):
candidate = self.fix_individual(self.grammar.fuzz(self.start_symbol))
if hash(candidate) not in unique_hashes:
unique_hashes.add(hash(candidate))
new_population.append(candidate)
try:
candidate = self.fix_individual(
self.grammar.fuzz(self.start_symbol)
)
if hash(candidate) not in unique_hashes:
unique_hashes.add(hash(candidate))
new_population.append(candidate)
except Exception as e:
LOGGER.error(f"Error during population refill: {e}")
attempts += 1

if len(new_population) < self.population_size:
LOGGER.warning(
"Could not generate full unique new population, filling remaining slots with duplicates."
)
while len(new_population) < self.population_size:
new_population.append(self.grammar.fuzz(self.start_symbol))
try:
new_population.append(self.grammar.fuzz(self.start_symbol))
except Exception as e:
LOGGER.error(f"Error during fallback population filling: {e}")
break

fixed_population = [self.fix_individual(ind) for ind in new_population]
fixed_population = []
for ind in new_population:
try:
fixed_population.append(self.fix_individual(ind))
except Exception as e:
LOGGER.error(
f"Error during fixing individual in new population: {e}"
)
fixed_population.append(ind)
self.population = fixed_population[: self.population_size]
self.evaluation = self.evaluate_population()
self.fitness = (
Expand All @@ -508,6 +593,7 @@ def evolve(self) -> List[DerivationTree]:
self.update_parameters(generation, prev_best_fitness, current_best_fitness)
prev_best_fitness = current_best_fitness

self.log_generation_statistics(generation)
visualize_evaluation(generation, self.max_generations, self.evaluation)

clear_visualization()
Expand Down

0 comments on commit 7f78fb6

Please sign in to comment.