|
| 1 | +""" |
| 2 | +Simple multithreaded algorithm to show how the 4 phases of a genetic algorithm works |
| 3 | +(Evaluation, Selection, Crossover and Mutation) |
| 4 | +https://en.wikipedia.org/wiki/Genetic_algorithm |
| 5 | +Author: D4rkia |
| 6 | +""" |
| 7 | + |
| 8 | +import random |
| 9 | +from typing import List, Tuple |
| 10 | + |
| 11 | +# Maximum size of the population. bigger could be faster but is more memory expensive |
| 12 | +N_POPULATION = 200 |
| 13 | +# Number of elements selected in every generation for evolution the selection takes |
| 14 | +# place from the best to the worst of that generation must be smaller than N_POPULATION |
| 15 | +N_SELECTED = 50 |
| 16 | +# Probability that an element of a generation can mutate changing one of its genes this |
| 17 | +# guarantees that all genes will be used during evolution |
| 18 | +MUTATION_PROBABILITY = 0.4 |
| 19 | +# just a seed to improve randomness required by the algorithm |
| 20 | +random.seed(random.randint(0, 1000)) |
| 21 | + |
| 22 | + |
| 23 | +def basic(target: str, genes: List[str], debug: bool = True) -> Tuple[int, int, str]: |
| 24 | + """ |
| 25 | + Verify that the target contains no genes besides the ones inside genes variable. |
| 26 | +
|
| 27 | + >>> from string import ascii_lowercase |
| 28 | + >>> basic("doctest", ascii_lowercase, debug=False)[2] |
| 29 | + 'doctest' |
| 30 | + >>> genes = list(ascii_lowercase) |
| 31 | + >>> genes.remove("e") |
| 32 | + >>> basic("test", genes) |
| 33 | + Traceback (most recent call last): |
| 34 | + ... |
| 35 | + ValueError: ['e'] is not in genes list, evolution cannot converge |
| 36 | + >>> genes.remove("s") |
| 37 | + >>> basic("test", genes) |
| 38 | + Traceback (most recent call last): |
| 39 | + ... |
| 40 | + ValueError: ['e', 's'] is not in genes list, evolution cannot converge |
| 41 | + >>> genes.remove("t") |
| 42 | + >>> basic("test", genes) |
| 43 | + Traceback (most recent call last): |
| 44 | + ... |
| 45 | + ValueError: ['e', 's', 't'] is not in genes list, evolution cannot converge |
| 46 | + """ |
| 47 | + |
| 48 | + # Verify if N_POPULATION is bigger than N_SELECTED |
| 49 | + if N_POPULATION < N_SELECTED: |
| 50 | + raise ValueError(f"{N_POPULATION} must be bigger than {N_SELECTED}") |
| 51 | + # Verify that the target contains no genes besides the ones inside genes variable. |
| 52 | + not_in_genes_list = sorted({c for c in target if c not in genes}) |
| 53 | + if not_in_genes_list: |
| 54 | + raise ValueError( |
| 55 | + f"{not_in_genes_list} is not in genes list, evolution cannot converge" |
| 56 | + ) |
| 57 | + |
| 58 | + # Generate random starting population |
| 59 | + population = [] |
| 60 | + for _ in range(N_POPULATION): |
| 61 | + population.append("".join([random.choice(genes) for i in range(len(target))])) |
| 62 | + |
| 63 | + # Just some logs to know what the algorithms is doing |
| 64 | + generation, total_population = 0, 0 |
| 65 | + |
| 66 | + # This loop will end when we will find a perfect match for our target |
| 67 | + while True: |
| 68 | + generation += 1 |
| 69 | + total_population += len(population) |
| 70 | + |
| 71 | + # Random population created now it's time to evaluate |
| 72 | + def evaluate(item: str, main_target: str = target) -> Tuple[str, float]: |
| 73 | + """ |
| 74 | + Evaluate how similar the item is with the target by just |
| 75 | + counting each char in the right position |
| 76 | + >>> evaluate("Helxo Worlx", Hello World) |
| 77 | + ["Helxo Worlx", 9] |
| 78 | + """ |
| 79 | + score = len( |
| 80 | + [g for position, g in enumerate(item) if g == main_target[position]] |
| 81 | + ) |
| 82 | + return (item, float(score)) |
| 83 | + |
| 84 | + # Adding a bit of concurrency can make everything faster, |
| 85 | + # |
| 86 | + # import concurrent.futures |
| 87 | + # population_score: List[Tuple[str, float]] = [] |
| 88 | + # with concurrent.futures.ThreadPoolExecutor( |
| 89 | + # max_workers=NUM_WORKERS) as executor: |
| 90 | + # futures = {executor.submit(evaluate, item) for item in population} |
| 91 | + # concurrent.futures.wait(futures) |
| 92 | + # population_score = [item.result() for item in futures] |
| 93 | + # |
| 94 | + # but with a simple algorithm like this will probably be slower |
| 95 | + # we just need to call evaluate for every item inside population |
| 96 | + population_score = [evaluate(item) for item in population] |
| 97 | + |
| 98 | + # Check if there is a matching evolution |
| 99 | + population_score = sorted(population_score, key=lambda x: x[1], reverse=True) |
| 100 | + if population_score[0][0] == target: |
| 101 | + return (generation, total_population, population_score[0][0]) |
| 102 | + |
| 103 | + # Print the Best result every 10 generation |
| 104 | + # just to know that the algorithm is working |
| 105 | + if debug and generation % 10 == 0: |
| 106 | + print( |
| 107 | + f"\nGeneration: {generation}" |
| 108 | + f"\nTotal Population:{total_population}" |
| 109 | + f"\nBest score: {population_score[0][1]}" |
| 110 | + f"\nBest string: {population_score[0][0]}" |
| 111 | + ) |
| 112 | + |
| 113 | + # Flush the old population keeping some of the best evolutions |
| 114 | + # Keeping this avoid regression of evolution |
| 115 | + population_best = population[: int(N_POPULATION / 3)] |
| 116 | + population.clear() |
| 117 | + population.extend(population_best) |
| 118 | + # Normalize population score from 0 to 1 |
| 119 | + population_score = [ |
| 120 | + (item, score / len(target)) for item, score in population_score |
| 121 | + ] |
| 122 | + |
| 123 | + # Select, Crossover and Mutate a new population |
| 124 | + def select(parent_1: Tuple[str, float]) -> List[str]: |
| 125 | + """Select the second parent and generate new population""" |
| 126 | + pop = [] |
| 127 | + # Generate more child proportionally to the fitness score |
| 128 | + child_n = int(parent_1[1] * 100) + 1 |
| 129 | + child_n = 10 if child_n >= 10 else child_n |
| 130 | + for _ in range(child_n): |
| 131 | + parent_2 = population_score[random.randint(0, N_SELECTED)][0] |
| 132 | + child_1, child_2 = crossover(parent_1[0], parent_2) |
| 133 | + # Append new string to the population list |
| 134 | + pop.append(mutate(child_1)) |
| 135 | + pop.append(mutate(child_2)) |
| 136 | + return pop |
| 137 | + |
| 138 | + def crossover(parent_1: str, parent_2: str) -> Tuple[str, str]: |
| 139 | + """Slice and combine two string in a random point""" |
| 140 | + random_slice = random.randint(0, len(parent_1) - 1) |
| 141 | + child_1 = parent_1[:random_slice] + parent_2[random_slice:] |
| 142 | + child_2 = parent_2[:random_slice] + parent_1[random_slice:] |
| 143 | + return (child_1, child_2) |
| 144 | + |
| 145 | + def mutate(child: str) -> str: |
| 146 | + """Mutate a random gene of a child with another one from the list""" |
| 147 | + child_list = list(child) |
| 148 | + if random.uniform(0, 1) < MUTATION_PROBABILITY: |
| 149 | + child_list[random.randint(0, len(child)) - 1] = random.choice(genes) |
| 150 | + return "".join(child_list) |
| 151 | + |
| 152 | + # This is Selection |
| 153 | + for i in range(N_SELECTED): |
| 154 | + population.extend(select(population_score[int(i)])) |
| 155 | + # Check if the population has already reached the maximum value and if so, |
| 156 | + # break the cycle. if this check is disabled the algorithm will take |
| 157 | + # forever to compute large strings but will also calculate small string in |
| 158 | + # a lot fewer generations |
| 159 | + if len(population) > N_POPULATION: |
| 160 | + break |
| 161 | + |
| 162 | + |
| 163 | +if __name__ == "__main__": |
| 164 | + target_str = ( |
| 165 | + "This is a genetic algorithm to evaluate, combine, evolve, and mutate a string!" |
| 166 | + ) |
| 167 | + genes_list = list( |
| 168 | + " ABCDEFGHIJKLMNOPQRSTUVWXYZabcdefghijklm" |
| 169 | + "nopqrstuvwxyz.,;!?+-*#@^'èéòà€ù=)(&%$£/\\" |
| 170 | + ) |
| 171 | + print( |
| 172 | + "\nGeneration: %s\nTotal Population: %s\nTarget: %s" |
| 173 | + % basic(target_str, genes_list) |
| 174 | + ) |
0 commit comments