-
-
Notifications
You must be signed in to change notification settings - Fork 313
/
executor.go
533 lines (485 loc) · 15.8 KB
/
executor.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
package gocron
import (
"context"
"fmt"
"strconv"
"sync"
"time"
"github.com/jonboulle/clockwork"
"github.com/google/uuid"
)
type executor struct {
// context used for shutting down
ctx context.Context
// cancel used by the executor to signal a stop of it's functions
cancel context.CancelFunc
// clock used for regular time or mocking time
clock clockwork.Clock
// the executor's logger
logger Logger
// receives jobs scheduled to execute
jobsIn chan jobIn
// sends out jobs for rescheduling
jobsOutForRescheduling chan uuid.UUID
// sends out jobs once completed
jobsOutCompleted chan uuid.UUID
// used to request jobs from the scheduler
jobOutRequest chan jobOutRequest
// used by the executor to receive a stop signal from the scheduler
stopCh chan struct{}
// the timeout value when stopping
stopTimeout time.Duration
// used to signal that the executor has completed shutdown
done chan error
// runners for any singleton type jobs
// map[uuid.UUID]singletonRunner
singletonRunners *sync.Map
// config for limit mode
limitMode *limitModeConfig
// the elector when running distributed instances
elector Elector
// the locker when running distributed instances
locker Locker
// monitor for reporting metrics
monitor Monitor
// monitorStatus for reporting metrics
monitorStatus MonitorStatus
}
type jobIn struct {
id uuid.UUID
shouldSendOut bool
}
type singletonRunner struct {
in chan jobIn
rescheduleLimiter chan struct{}
}
type limitModeConfig struct {
started bool
mode LimitMode
limit uint
rescheduleLimiter chan struct{}
in chan jobIn
// singletonJobs is used to track singleton jobs that are running
// in the limit mode runner. This is used to prevent the same job
// from running multiple times across limit mode runners when both
// a limit mode and singleton mode are enabled.
singletonJobs map[uuid.UUID]struct{}
singletonJobsMu sync.Mutex
}
func (e *executor) start() {
e.logger.Debug("gocron: executor started")
// creating the executor's context here as the executor
// is the only goroutine that should access this context
// any other uses within the executor should create a context
// using the executor context as parent.
e.ctx, e.cancel = context.WithCancel(context.Background())
// the standardJobsWg tracks
standardJobsWg := &waitGroupWithMutex{}
singletonJobsWg := &waitGroupWithMutex{}
limitModeJobsWg := &waitGroupWithMutex{}
// create a fresh map for tracking singleton runners
e.singletonRunners = &sync.Map{}
// start the for leap that is the executor
// selecting on channels for work to do
for {
select {
// job ids in are sent from 1 of 2 places:
// 1. the scheduler sends directly when jobs
// are run immediately.
// 2. sent from time.AfterFuncs in which job schedules
// are spun up by the scheduler
case jIn := <-e.jobsIn:
select {
case <-e.stopCh:
e.stop(standardJobsWg, singletonJobsWg, limitModeJobsWg)
return
default:
}
// this context is used to handle cancellation of the executor
// on requests for a job to the scheduler via requestJobCtx
ctx, cancel := context.WithCancel(e.ctx)
if e.limitMode != nil && !e.limitMode.started {
// check if we are already running the limit mode runners
// if not, spin up the required number i.e. limit!
e.limitMode.started = true
for i := e.limitMode.limit; i > 0; i-- {
limitModeJobsWg.Add(1)
go e.limitModeRunner("limitMode-"+strconv.Itoa(int(i)), e.limitMode.in, limitModeJobsWg, e.limitMode.mode, e.limitMode.rescheduleLimiter)
}
}
// spin off into a goroutine to unblock the executor and
// allow for processing for more work
go func() {
// make sure to cancel the above context per the docs
// // Canceling this context releases resources associated with it, so code should
// // call cancel as soon as the operations running in this Context complete.
defer cancel()
// check for limit mode - this spins up a separate runner which handles
// limiting the total number of concurrently running jobs
if e.limitMode != nil {
if e.limitMode.mode == LimitModeReschedule {
select {
// rescheduleLimiter is a channel the size of the limit
// this blocks publishing to the channel and keeps
// the executor from building up a waiting queue
// and forces rescheduling
case e.limitMode.rescheduleLimiter <- struct{}{}:
e.limitMode.in <- jIn
default:
// all runners are busy, reschedule the work for later
// which means we just skip it here and do nothing
// TODO when metrics are added, this should increment a rescheduled metric
e.sendOutForRescheduling(&jIn)
}
} else {
// since we're not using LimitModeReschedule, but instead using LimitModeWait
// we do want to queue up the work to the limit mode runners and allow them
// to work through the channel backlog. A hard limit of 1000 is in place
// at which point this call would block.
// TODO when metrics are added, this should increment a wait metric
e.sendOutForRescheduling(&jIn)
e.limitMode.in <- jIn
}
} else {
// no limit mode, so we're either running a regular job or
// a job with a singleton mode
//
// get the job, so we can figure out what kind it is and how
// to execute it
j := requestJobCtx(ctx, jIn.id, e.jobOutRequest)
if j == nil {
// safety check as it'd be strange bug if this occurred
return
}
if j.singletonMode {
// for singleton mode, get the existing runner for the job
// or spin up a new one
runner := &singletonRunner{}
runnerSrc, ok := e.singletonRunners.Load(jIn.id)
if !ok {
runner.in = make(chan jobIn, 1000)
if j.singletonLimitMode == LimitModeReschedule {
runner.rescheduleLimiter = make(chan struct{}, 1)
}
e.singletonRunners.Store(jIn.id, runner)
singletonJobsWg.Add(1)
go e.singletonModeRunner("singleton-"+jIn.id.String(), runner.in, singletonJobsWg, j.singletonLimitMode, runner.rescheduleLimiter)
} else {
runner = runnerSrc.(*singletonRunner)
}
if j.singletonLimitMode == LimitModeReschedule {
// reschedule mode uses the limiter channel to check
// for a running job and reschedules if the channel is full.
select {
case runner.rescheduleLimiter <- struct{}{}:
runner.in <- jIn
e.sendOutForRescheduling(&jIn)
default:
// runner is busy, reschedule the work for later
// which means we just skip it here and do nothing
e.incrementJobCounter(*j, SingletonRescheduled)
e.sendOutForRescheduling(&jIn)
}
} else {
// wait mode, fill up that queue (buffered channel, so it's ok)
runner.in <- jIn
e.sendOutForRescheduling(&jIn)
}
} else {
select {
case <-e.stopCh:
e.stop(standardJobsWg, singletonJobsWg, limitModeJobsWg)
return
default:
}
// we've gotten to the basic / standard jobs --
// the ones without anything special that just want
// to be run. Add to the WaitGroup so that
// stopping or shutting down can wait for the jobs to
// complete.
standardJobsWg.Add(1)
go func(j internalJob) {
e.runJob(j, jIn)
standardJobsWg.Done()
}(*j)
}
}
}()
case <-e.stopCh:
e.stop(standardJobsWg, singletonJobsWg, limitModeJobsWg)
return
}
}
}
func (e *executor) sendOutForRescheduling(jIn *jobIn) {
if jIn.shouldSendOut {
select {
case e.jobsOutForRescheduling <- jIn.id:
case <-e.ctx.Done():
return
}
}
// we need to set this to false now, because to handle
// non-limit jobs, we send out from the e.runJob function
// and in this case we don't want to send out twice.
jIn.shouldSendOut = false
}
func (e *executor) limitModeRunner(name string, in chan jobIn, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) {
e.logger.Debug("gocron: limitModeRunner starting", "name", name)
for {
select {
case jIn := <-in:
select {
case <-e.ctx.Done():
e.logger.Debug("gocron: limitModeRunner shutting down", "name", name)
wg.Done()
return
default:
}
ctx, cancel := context.WithCancel(e.ctx)
j := requestJobCtx(ctx, jIn.id, e.jobOutRequest)
cancel()
if j != nil {
if j.singletonMode {
e.limitMode.singletonJobsMu.Lock()
_, ok := e.limitMode.singletonJobs[jIn.id]
if ok {
// this job is already running, so don't run it
// but instead reschedule it
e.limitMode.singletonJobsMu.Unlock()
if jIn.shouldSendOut {
select {
case <-e.ctx.Done():
return
case <-j.ctx.Done():
return
case e.jobsOutForRescheduling <- j.id:
}
}
// remove the limiter block, as this particular job
// was a singleton already running, and we want to
// allow another job to be scheduled
if limitMode == LimitModeReschedule {
<-rescheduleLimiter
}
continue
}
e.limitMode.singletonJobs[jIn.id] = struct{}{}
e.limitMode.singletonJobsMu.Unlock()
}
e.runJob(*j, jIn)
if j.singletonMode {
e.limitMode.singletonJobsMu.Lock()
delete(e.limitMode.singletonJobs, jIn.id)
e.limitMode.singletonJobsMu.Unlock()
}
}
// remove the limiter block to allow another job to be scheduled
if limitMode == LimitModeReschedule {
<-rescheduleLimiter
}
case <-e.ctx.Done():
e.logger.Debug("limitModeRunner shutting down", "name", name)
wg.Done()
return
}
}
}
func (e *executor) singletonModeRunner(name string, in chan jobIn, wg *waitGroupWithMutex, limitMode LimitMode, rescheduleLimiter chan struct{}) {
e.logger.Debug("gocron: singletonModeRunner starting", "name", name)
for {
select {
case jIn := <-in:
select {
case <-e.ctx.Done():
e.logger.Debug("gocron: singletonModeRunner shutting down", "name", name)
wg.Done()
return
default:
}
ctx, cancel := context.WithCancel(e.ctx)
j := requestJobCtx(ctx, jIn.id, e.jobOutRequest)
cancel()
if j != nil {
// need to set shouldSendOut = false here, as there is a duplicative call to sendOutForRescheduling
// inside the runJob function that needs to be skipped. sendOutForRescheduling is previously called
// when the job is sent to the singleton mode runner.
jIn.shouldSendOut = false
e.runJob(*j, jIn)
}
// remove the limiter block to allow another job to be scheduled
if limitMode == LimitModeReschedule {
<-rescheduleLimiter
}
case <-e.ctx.Done():
e.logger.Debug("singletonModeRunner shutting down", "name", name)
wg.Done()
return
}
}
}
func (e *executor) runJob(j internalJob, jIn jobIn) {
if j.ctx == nil {
return
}
select {
case <-e.ctx.Done():
return
case <-j.ctx.Done():
return
default:
}
if j.stopTimeReached(e.clock.Now()) {
return
}
if e.elector != nil {
if err := e.elector.IsLeader(j.ctx); err != nil {
e.sendOutForRescheduling(&jIn)
e.incrementJobCounter(j, Skip)
return
}
} else if j.locker != nil {
lock, err := j.locker.Lock(j.ctx, j.name)
if err != nil {
_ = callJobFuncWithParams(j.afterLockError, j.id, j.name, err)
e.sendOutForRescheduling(&jIn)
e.incrementJobCounter(j, Skip)
return
}
defer func() { _ = lock.Unlock(j.ctx) }()
} else if e.locker != nil {
lock, err := e.locker.Lock(j.ctx, j.name)
if err != nil {
_ = callJobFuncWithParams(j.afterLockError, j.id, j.name, err)
e.sendOutForRescheduling(&jIn)
e.incrementJobCounter(j, Skip)
return
}
defer func() { _ = lock.Unlock(j.ctx) }()
}
_ = callJobFuncWithParams(j.beforeJobRuns, j.id, j.name)
e.sendOutForRescheduling(&jIn)
select {
case e.jobsOutCompleted <- j.id:
case <-e.ctx.Done():
}
startTime := time.Now()
var err error
if j.afterJobRunsWithPanic != nil {
err = e.callJobWithRecover(j)
} else {
err = callJobFuncWithParams(j.function, j.parameters...)
}
e.recordJobTiming(startTime, time.Now(), j)
if err != nil {
_ = callJobFuncWithParams(j.afterJobRunsWithError, j.id, j.name, err)
e.incrementJobCounter(j, Fail)
e.recordJobTimingWithStatus(startTime, time.Now(), j, Fail, err)
} else {
_ = callJobFuncWithParams(j.afterJobRuns, j.id, j.name)
e.incrementJobCounter(j, Success)
e.recordJobTimingWithStatus(startTime, time.Now(), j, Success, nil)
}
}
func (e *executor) callJobWithRecover(j internalJob) (err error) {
defer func() {
if recoverData := recover(); recoverData != nil {
_ = callJobFuncWithParams(j.afterJobRunsWithPanic, j.id, j.name, recoverData)
// if panic is occurred, we should return an error
err = fmt.Errorf("%w from %v", ErrPanicRecovered, recoverData)
}
}()
return callJobFuncWithParams(j.function, j.parameters...)
}
func (e *executor) recordJobTiming(start time.Time, end time.Time, j internalJob) {
if e.monitor != nil {
e.monitor.RecordJobTiming(start, end, j.id, j.name, j.tags)
}
}
func (e *executor) recordJobTimingWithStatus(start time.Time, end time.Time, j internalJob, status JobStatus, err error) {
if e.monitorStatus != nil {
e.monitorStatus.RecordJobTimingWithStatus(start, end, j.id, j.name, j.tags, status, err)
}
}
func (e *executor) incrementJobCounter(j internalJob, status JobStatus) {
if e.monitor != nil {
e.monitor.IncrementJob(j.id, j.name, j.tags, status)
}
}
func (e *executor) stop(standardJobsWg, singletonJobsWg, limitModeJobsWg *waitGroupWithMutex) {
e.logger.Debug("gocron: stopping executor")
// we've been asked to stop. This is either because the scheduler has been told
// to stop all jobs or the scheduler has been asked to completely shutdown.
//
// cancel tells all the functions to stop their work and send in a done response
e.cancel()
// the wait for job channels are used to report back whether we successfully waited
// for all jobs to complete or if we hit the configured timeout.
waitForJobs := make(chan struct{}, 1)
waitForSingletons := make(chan struct{}, 1)
waitForLimitMode := make(chan struct{}, 1)
// the waiter context is used to cancel the functions waiting on jobs.
// this is done to avoid goroutine leaks.
waiterCtx, waiterCancel := context.WithCancel(context.Background())
// wait for standard jobs to complete
go func() {
e.logger.Debug("gocron: waiting for standard jobs to complete")
go func() {
// this is done in a separate goroutine, so we aren't
// blocked by the WaitGroup's Wait call in the event
// that the waiter context is cancelled.
// This particular goroutine could leak in the event that
// some long-running standard job doesn't complete.
standardJobsWg.Wait()
e.logger.Debug("gocron: standard jobs completed")
waitForJobs <- struct{}{}
}()
<-waiterCtx.Done()
}()
// wait for per job singleton limit mode runner jobs to complete
go func() {
e.logger.Debug("gocron: waiting for singleton jobs to complete")
go func() {
singletonJobsWg.Wait()
e.logger.Debug("gocron: singleton jobs completed")
waitForSingletons <- struct{}{}
}()
<-waiterCtx.Done()
}()
// wait for limit mode runners to complete
go func() {
e.logger.Debug("gocron: waiting for limit mode jobs to complete")
go func() {
limitModeJobsWg.Wait()
e.logger.Debug("gocron: limitMode jobs completed")
waitForLimitMode <- struct{}{}
}()
<-waiterCtx.Done()
}()
// now either wait for all the jobs to complete,
// or hit the timeout.
var count int
timeout := time.Now().Add(e.stopTimeout)
for time.Now().Before(timeout) && count < 3 {
select {
case <-waitForJobs:
count++
case <-waitForSingletons:
count++
case <-waitForLimitMode:
count++
default:
}
}
if count < 3 {
e.done <- ErrStopJobsTimedOut
e.logger.Debug("gocron: executor stopped - timed out")
} else {
e.done <- nil
e.logger.Debug("gocron: executor stopped")
}
waiterCancel()
if e.limitMode != nil {
e.limitMode.started = false
}
}