-
Notifications
You must be signed in to change notification settings - Fork 2
/
machine_builder.go
230 lines (193 loc) · 6.13 KB
/
machine_builder.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
package process
import (
"context"
"sync"
)
type machineBuilder struct {
injecter Injecter
health *Health
}
// closedErrorsChannel is a global, always closed channel of error values.
var closedErrorsChannel = make(chan error)
func init() {
close(closedErrorsChannel)
}
func newMachineBuilder(configs ...MachineConfigFunc) *machineBuilder {
b := &machineBuilder{
injecter: InjecterFunc(func(ctx context.Context, meta *Meta) error { return nil }),
health: NewHealth(),
}
for _, f := range configs {
f(b)
}
return b
}
// buildRun creates a function that initializers, runs, and monitors the processes
// registered to the given container. For each priority from low values to high values,
// the function will:
//
// 1. Run the inject hook for each process registered to the target priority. If the
// injecter provided to the builder is nil, this step is skipped. The injecter will
// assign fields of the processes registered at this priority. Initializers registered
// to lower priorities may provide values to be injected into processes registered to
// higher priorities.
//
// 2. Initialize each process. These methods are invoked in parallel. If an error occurs
// during initialization, the application will skip the following step for this and
// remaining priority groups.
//
// 3. Start each process. These methods are invoked in parallel as well. As processes
// are expected to be long-running (with exceptions), this phase has no obvious end.
// The next priority group of processes will begin to execute in the same manner after
// all of the processes registered to this priority have started and the process becomes
// healthy (or the health timeout for an unhealthyprocess elapses).
//
// On shutdown due to a user signal, an explicit request, or a process error, all of the
// processes registered to the given container are finalized. All finalizer methods are
// invoked in parallel.
func (b *machineBuilder) buildRun(container *Container) streamErrorFunc {
n := 0
for _, meta := range container.meta {
n += len(meta)
}
var wg sync.WaitGroup
processErrors := make(chan error, n)
healthCheckCtx, healthCheckCancel := context.WithCancel(context.Background())
var initAndRunEachPriority []streamErrorFunc
for _, priority := range container.priorities {
meta := container.meta[priority]
var partitions [][]*Meta
if priority == 0 {
for _, meta := range meta {
partitions = append(partitions, []*Meta{meta})
}
} else {
partitions = append(partitions, meta)
}
var initEachPriority []streamErrorFunc
for _, meta := range partitions {
injectAtPriority := mapMetaParallel(meta, func(meta *Meta) streamErrorFunc {
return toStreamErrorFunc(func(ctx context.Context) error {
if b.injecter == nil {
return nil
}
meta.logger.Info("Running inject hook for %s", meta.Name())
if err := b.injecter.Inject(ctx, meta); err != nil {
return &opError{
source: err,
metaName: meta.Name(),
opName: "inject hook",
message: "failed",
}
}
return nil
})
})
initAtPriority := mapMetaParallel(meta, func(m *Meta) streamErrorFunc {
return toStreamErrorFunc(m.Init)
})
initEachPriority = append(initEachPriority, chain(
injectAtPriority,
initAtPriority,
))
}
runAtPriority := mapMetaParallel(meta, func(meta *Meta) streamErrorFunc {
return func(ctx context.Context) <-chan error {
wg.Add(1)
go func() {
defer wg.Done()
if err := meta.Run(ctx); err != nil {
healthCheckCancel()
processErrors <- err
}
}()
return closedErrorsChannel
}
})
healthKeyMap := map[interface{}]struct{}{}
for _, meta := range meta {
for _, key := range meta.options.healthKeys {
healthKeyMap[key] = struct{}{}
}
}
healthKeys := make([]interface{}, 0, len(healthKeyMap))
for key := range healthKeyMap {
healthKeys = append(healthKeys, key)
}
waitUntilHealthy := toStreamErrorFunc(func(ctx context.Context) error {
components, err := b.health.GetAll(healthKeys...)
if err != nil {
return err
}
if len(components) == 0 {
return nil
}
ch, cancel := b.health.Subscribe()
defer cancel()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ch:
case <-healthCheckCtx.Done():
return ErrHealthCheckCanceled
}
healthy := true
for _, component := range components {
if !component.Healthy() {
healthy = false
break
}
}
if healthy {
return nil
}
}
})
initAndRunEachPriority = append(initAndRunEachPriority, chain(
chain(initEachPriority...),
runAtPriority,
waitUntilHealthy,
))
}
forwardProcessErrors := func(ctx context.Context) <-chan error {
go func() {
wg.Wait()
close(processErrors)
healthCheckCancel()
}()
return processErrors
}
runFinalizers := mapMetaParallel(container.Meta(), func(m *Meta) streamErrorFunc {
return toStreamErrorFunc(func(ctx context.Context) error {
return m.Finalize(context.Background())
})
})
return sequence(
chain(initAndRunEachPriority...),
forwardProcessErrors,
runFinalizers,
)
}
// buildShutdown creates a function that invokes the Stop function of each process registered
// to the given container. Processes registered to the same priority are stopped in parallel and
// processes with a higher priority are stopped before those registered to a lower priority.
func (b *machineBuilder) buildShutdown(container *Container) streamErrorFunc {
var stopEachPriority []streamErrorFunc
for i := len(container.priorities) - 1; i >= 0; i-- {
metaAtPriority := container.meta[container.priorities[i]]
stopEachPriority = append(stopEachPriority, mapMetaParallel(metaAtPriority, func(m *Meta) streamErrorFunc {
return toStreamErrorFunc(m.Stop)
}))
}
return sequence(stopEachPriority...)
}
// mapMetaParallel creates a function that executes the given function in parallel
// over each of the given meta values.
func mapMetaParallel(meta []*Meta, fn func(meta *Meta) streamErrorFunc) streamErrorFunc {
var mapped []streamErrorFunc
for _, meta := range meta {
mapped = append(mapped, fn(meta))
}
return parallel(mapped...)
}