-
Notifications
You must be signed in to change notification settings - Fork 0
/
stepping.py
158 lines (123 loc) · 5.86 KB
/
stepping.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
"""Implementation of a genetic algorithm step calculation.
Steps:
- score current generation
- select parents
- build next generation
"""
import itertools
from multiprocessing import Pool
from collections import Counter, namedtuple
from case import Case
from unit import Unit
from utils import named_functions_interface_decorator
MULTIPROC_PROCESSES = 16
MULTIPROC_TASK_PER_CHILD = 32
# printing
MAX_PRINTED_PROPS = 10
StepResult = namedtuple('StepResult', 'pop, scored_old_pop, winners')
@named_functions_interface_decorator
def named_functions() -> dict:
"""Return GA functions"""
return {
'DIV': step,
# 'SCR': step_cross_first, # TODO: DOESN'T WORK PROPERLY (LOGIC PROBLEM)
}
def default_functions() -> tuple:
"""Return default GA functions"""
return named_functions.as_tuple()
def step(pop, case, pop_size:int, score:callable,
select:callable, reproduce:callable, cross: callable, mutate:callable,
step_number:int=None, callback_stats:callable=(lambda **kwargs: None)) -> 'pop':
"""Compute one step, return the new population
This implementation first select the population, then produce
the new generation.
This behavior could tends to favorize diversity.
pop -- individuals of current generation
case -- Case instance
pop_size -- number of individual in the next generation
score -- function used for scoring
select -- function used for selection
reproduce -- function used for reproduction
cross -- function used for crossing
mutate -- function used for mutation
step_number -- number of the current step ; only for cosmetic/logging purpose
callback_stats -- a callback that will get (step, scored_pop, max, min)
"""
assert callable(score)
assert callable(select)
assert callable(reproduce)
assert callable(cross)
assert callable(mutate)
assert callable(callback_stats)
assert isinstance(pop_size, int)
assert all(isinstance(unit, Unit) for unit in pop)
assert isinstance(case, Case)
assert pop
if step_number is not None:
print('\n\n# Step {}'.format(step_number))
stdin, expected = case
scored_pop = _multisolve_scoring(stdin, expected, pop, score)
best_unit = max(pop, key=lambda u: scored_pop[u].score)
best_result = scored_pop[best_unit]
worst_unit = min(pop, key=lambda u: scored_pop[u].score)
print('SCORES:', sorted(tuple(set(round(r.score, 3) for u, r in scored_pop.items())), reverse=True))
# call the user defined data handling
callback_stats(popsize=len(scored_pop), max_score=scored_pop[best_unit].score, min_score=scored_pop[worst_unit].score, diversity=len(set(u.source for u in scored_pop)) / len(scored_pop))
proportions = Counter(r.score for r in scored_pop.values())
print('PROPS :', proportions.most_common(MAX_PRINTED_PROPS))
print('OF', len(scored_pop), 'BEST:', round(best_result.score, 3))
print(f"OUTPUTS: \"{best_result.found}\"\t(expect {best_result.expected})", ('[SUCCESS]' if best_result.found == best_result.expected else ''))
print('SOURCE:', best_unit.source)
winners = ()
if best_result.found == best_result.expected:
winners = tuple(unit for unit, score in scored_pop.items() if score.score == best_result.score)
selected = dict(select(scored_pop))
assert len(selected) > 1, selected
assert selected, "at least one individual must be selected"
final = tuple(reproduce(selected, pop_size, cross, mutator=mutate))
assert len(final) == pop_size, "new pop must have a size of {} ({}), not {}".format(pop_size, type(pop_size), len(final))
return StepResult(final, scored_pop, winners)
def step_cross_first(pop, case, pop_size:int, score:callable,
select:callable, reproduce:callable, mutate:callable,
step_number:int=None) -> 'pop':
"""Compute one step, return the new population
This implementation first produce the new generation, then select
among the bests.
This behavior could tends to favorize mean score increasing.
"""
assert callable(score)
assert callable(select)
assert callable(reproduce)
assert callable(mutate)
assert isinstance(pop_size, int)
assert all(isinstance(unit, Unit) for unit in pop)
assert isinstance(case, Case)
assert pop
if step_number is not None:
print('\n\n# Step {}'.format(step_number))
pop = tuple(reproduce(pop, pop_size * 2, mutator=mutate))
assert pop
stdin, expected = case
scored_pop = _multisolve_scoring(stdin, expected, pop, score)
# keep only the bests. TODO: NEED SEARCH ABOUT HOW TO IMPLEMENT THIS.
scored_pop = dict(itertool.islice(sorted(scored_pop.items(), reverse=True, key=itemgetter(1)), 0, pop_size))
for score in sorted(pop_by_score, reverse=True):
score
best_unit = max(pop, key=lambda u: scored_pop[u].score)
best_result = scored_pop[best_unit]
print('SCORES:', sorted(tuple(set(round(r.score, 3) for u, r in scored_pop.items())), reverse=True))
proportions = Counter(r.score for r in scored_pop.values())
print('PROPS :', proportions.most_common(MAX_PRINTED_PROPS))
print('OF', len(scored_pop), 'BEST:', round(best_result.score, 3))
print(f"OUTPUTS: \"{best_result.found}\"\t(expect {best_result.expected})",
("[SUCCESS]" if best_result.found == best_result.expected else ''))
print('SOURCE:', best_unit.source)
return StepResult(tuple(select(scored_pop)), scored_pop)
def _multisolve_scoring(stdin, expected, pop, score:callable) -> dict:
"""Perform the scoring of given population for given stdin and
expected result, using given scoring function.
Return {individual: score}.
"""
inputs = itertools.repeat((stdin, expected))
with Pool(processes=MULTIPROC_PROCESSES, maxtasksperchild=MULTIPROC_TASK_PER_CHILD) as p:
return dict(zip(pop, p.starmap(score, zip(pop, inputs))))