-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathncga.py
More file actions
138 lines (113 loc) · 5.2 KB
/
ncga.py
File metadata and controls
138 lines (113 loc) · 5.2 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
from __future__ import annotations
import time
import random
import multiprocessing
import itertools
from typing import Optional
from deap import tools
#import networkx as nx
# my library
from galib import GA, my_multiprocessing_map
from evaluator import Evaluator
from allocatorunit import AllocatorUnit
#----------------------------------------------------------------------------------------
class NCGA(GA):
SORT_METHOD_LIST = ['cyclic', 'random']
def __init__(self,
seed: AllocatorUnit | bytes | str,
mate_pb: float = 1,
mutation_pb: float = 0.5,
archive_size: int = 40,
offspring_size: Optional[int] = None,
sort_method: str = 'cyclic'):
super().__init__(seed)
self.toolbox.register("select", tools.selSPEA2)
self.mate_pb = mate_pb
self.mutation_pb = mutation_pb
self.pop_num = archive_size
if offspring_size is None:
self.offspring_size = archive_size - (archive_size % 2)
elif offspring_size % 2 == 0:
self.offspring_size = self.offspring_size
else:
raise ValueError("offspring_size must be a multiple of 2.")
if sort_method in NCGA.SORT_METHOD_LIST:
self.sort_method = sort_method
else:
raise ValueError("Invalid sort_method.")
##-----------------------------------------------------------------------------------
def run(self, exectution_time: float, process_num: int = 1) -> tools.ParetoFront:
# multiprocessing settings
if process_num != 1:
pool = multiprocessing.Pool(process_num)
self.toolbox.register("map", my_multiprocessing_map, pool)
elif process_num == 1:
self.toolbox.register("map", map)
hall_of_fame = tools.ParetoFront()
gen = 0
mate_pb_array = [self.mate_pb] * (self.offspring_size // 2)
mut_pb_array = [self.mutation_pb] * self.offspring_size
# start timer
start_time = time.time()
# generate 0th population
pop = self.toolbox.population(self.pop_num)
# evaluate the population
invalid_ind = [ind for ind in pop if not ind.fitness.valid]
fitnesses = self.toolbox.map(self.toolbox.evaluate, invalid_ind)
for ind, fit in zip(invalid_ind, fitnesses):
ind.fitness.values = fit
# update hall of fame
hall_of_fame.update(pop)
# record
record = self.stats.compile(pop)
record = {eval_name: {"min": record["min"][i], "avg": record["avg"][i], "max": record["max"][i]}
for i, eval_name in enumerate(Evaluator.eval_list())}
self.logbook.record(gen=0, evals=len(invalid_ind), **record)
while time.time() - start_time < exectution_time:
# uppdate generation number
gen += 1
# generate offsprings
if self.sort_method == 'cyclic':
index = (gen - 1) % len(Evaluator.eval_list())
elif self.sort_method == 'random':
index = random.randrange(len(Evaluator.eval_list()))
parents = sorted(pop, key=lambda ind: ind.fitness.values[index])[0:self.offspring_size]
offsprings = list(itertools.chain.from_iterable(\
map(self.toolbox.mate, parents[::2], parents[1::2], mate_pb_array)))
# offsprings' mutation
offsprings = list(itertools.chain.from_iterable(
map(self.toolbox.mutate, offsprings, mut_pb_array)))
# evatuate offsprings
invalid_ind = [ind for ind in offsprings if not ind.fitness.valid]
fitnesses = self.toolbox.map(self.toolbox.evaluate, invalid_ind)
for ind, fit in zip(invalid_ind, fitnesses):
ind.fitness.values = fit
# selection
pop = self.toolbox.select(pop + offsprings, self.pop_num)
## insert random individuals
#rand_pop = self.toolbox.population(20)
#fitnesses = self.toolbox.map(self.toolbox.evaluate, rand_pop)
#for ind, fit in zip(rand_pop, fitnesses):
# ind.fitness.values = fit
#pop += rand_pop
#invalid_ind += rand_pop
# update hall of fame
hall_of_fame.update(pop)
# record
record = self.stats.compile(pop)
record = {eval_name: {"min": record["min"][i], "avg": record["avg"][i], "max": record["max"][i]}
for i, eval_name in enumerate(Evaluator.eval_list())}
self.logbook.record(gen=gen, evals=len(invalid_ind), **record)
if process_num != 1:
pool.close()
pool.join()
print(self.logbook.stream)
print("# of individuals in hall_of_fame: {}".format(len(hall_of_fame)))
indbook = tools.Logbook()
indbook.header = ['index'] + Evaluator.eval_list()
for i, ind in enumerate(hall_of_fame):
record = {name: value
for name, value in zip(Evaluator.eval_list(), ind.fitness.values)}
indbook.record(index=i, **record)
print(indbook.stream)
return hall_of_fame