-
Notifications
You must be signed in to change notification settings - Fork 863
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Scheduled Actions V2] WIP Generator component #6905
base: sched2_core
Are you sure you want to change the base?
Conversation
"go.temporal.io/server/service/worker/scheduler" | ||
scheduler1 "go.temporal.io/server/service/worker/scheduler" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Duplicate import.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will fix
} | ||
|
||
// process time range between last high water mark and system time | ||
t1 := timestamp.TimeValue(generator.LastProcessedTime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why not this? (out of curiosity)
t1 := timestamp.TimeValue(generator.LastProcessedTime) | |
t1 := generator.LastProcessedTime.AsTime() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not sure, artifact from original implementation I suppose. will fix :)
t1 := timestamp.TimeValue(generator.LastProcessedTime) | ||
t2 := time.Now() | ||
if t2.Before(t1) { | ||
e.Logger.Warn("Time went backwards", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering how this log would be printed. Could be worth baking a logger with some relevant tags.
return common.ValidateTask(node, TransitionBuffer) | ||
} | ||
|
||
var TransitionSleep = hsm.NewTransition( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I typically put the transitions with the state machine. I think we should try and keep that consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sure, will do.
TaskTypeSleep = "scheduler.generator.Sleep" | ||
TaskTypeBuffer = "scheduler.generator.Buffer" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering why you need two tasks here, why don't you buffer in the sleep task under a write lock? It's less DB writes this way. You should only need the buffer task if you plan to do any IO or other type of side effect.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Wondering why you need two tasks here, why don't you buffer in the sleep task under a write lock? It's less DB writes this way. You should only need the buffer task if you plan to do any IO or other type of side effect.
If that's an option (buffering under a write lock), that works for me, since there shouldn't be any regular contention - posted a question on your other comment, re: ConflictToken
vs write locking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I was thinking about WAITING
versus EXECUTING
in the context of the Backfiller, as well, where a user's backfill request will trigger an immediate Backfiller state transition and task to start backfilling. For Generator
, if we don't have a separate EXECUTING
task to transition to after a user updates, how would we signal the sleeping Generator
sub-state machine? If re-entering the WAITING
state works, then we can probably also get rid of EXECUTING
on the Backfiller.
return err | ||
} | ||
if refreshedScheduler.ConflictToken != scheduler.ConflictToken { | ||
// schedule was updated while processing, retry |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do we actually want to retry the task here? I commented on the transition definition too but I think you can do all of this under a write lock since there's no IO involved AFAICT.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe not - If we were doing this under the write lock on the generator node, is the expectation that we'd also attempt to acquire the generator node for write access during an update request? I've been assuming that env.Access
scopes applied only to the given ref
, do they/can they apply more broadly?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The lock is held for the entire mutable state record including the whole tree structure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah! Okay, I'd been thinking that each component could potentially map to its own mutable state. One MS simplifies that. Will update to just fail out the task (I believe ErrStaleReference
is for this purpose?)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's start with a clean slate and delete the PoC scheduler component and all of its protos.
if !ok { | ||
return 0, fmt.Errorf("%w: expected state1 to be a Generator instance, got %v", hsm.ErrIncompatibleType, s1) | ||
} | ||
s2, ok := a.(Generator) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
s2, ok := a.(Generator) | |
s2, ok := b.(Generator) |
// We're reprocessing since the most recent event after an update. Discard actions before | ||
// the update time (which was just set to "now"). This doesn't have to be guarded with | ||
// hasMinVersion because this condition couldn't happen in previous versions. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I imagine this comment should be rewritten somewhat. We don't have versions anymore, at least. Also I'm not sure how updates fit into this execution model, it may not be true that "we just set update time to 'now'"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops, yes, will rewrite this.
|
||
// Returns the next time result, or an error if the schedule cannot be compiled. | ||
func (e taskExecutor) getNextTime(s core.Scheduler, after time.Time) (scheduler.GetNextTimeResult, error) { | ||
spec, err := s.CompiledSpec(e.SpecBuilder) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
just curious if these can get cached at all. it's not that expensive to construct so probably not worth it, except for backfills where it's doing this in a loop during one transition
oh, I see, the caching is elsewhere. nm
if s1.State() > s2.State() { | ||
return 1, nil | ||
} else if s1.State() < s2.State() { | ||
return -1, nil | ||
} | ||
|
||
return 0, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if s1.State() > s2.State() { | |
return 1, nil | |
} else if s1.State() < s2.State() { | |
return -1, nil | |
} | |
return 0, nil | |
return cmp.Compare(s1.State(), s2.State()), nil |
bufferedStarts = append(bufferedStarts, &schedpb.BufferedStart{ | ||
NominalTime: timestamppb.New(next.Nominal), | ||
ActualTime: timestamppb.New(next.Next), | ||
OverlapPolicy: scheduler.OverlapPolicy(), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OverlapPolicy: scheduler.OverlapPolicy(), | |
OverlapPolicy: overlapPolicy, |
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed all the way to spec_processor.go
, will defer to @dnr to verify the business logic for lack of context.
// to the schedule's specification. Manually requested actions (from an immediate | ||
// request or backfill) are separately handled in the Backfiller sub state machine. | ||
Generator struct { | ||
*schedspb.GeneratorInternal |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider removing the redundant pointer here and embedding the struct directly.
|
||
// Process time range between last high water mark and system time. | ||
t1 := generator.LastProcessedTime.AsTime() | ||
t2 := time.Now().UTC() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use env.Now()
instead, this will potentially allow you to inject a fake time source and fast forward for testing purposes.
res, err := e.SpecProcessor.ProcessTimeRange(scheduler, t1, t2, false, nil) | ||
if err != nil { | ||
// An error here should be impossible, send to the DLQ. | ||
e.Logger.Error("Error processing time range", tag.Error(err)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Consider baking a logger with the relevant scheduler attributes as tags.
err, | ||
serviceerror.NewInternal("Scheduler's Generator failed to process a time range"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These should typically be reversed:
err, | |
serviceerror.NewInternal("Scheduler's Generator failed to process a time range"), | |
serviceerror.NewInternal("Scheduler's Generator failed to process a time range"), | |
err, |
}) | ||
if err != nil { | ||
return fmt.Errorf( | ||
"%w: unable to transition Executor to Executing state", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Typically the wrapped error comes in the end.
"%w: unable to transition Executor to Executing state", | |
"unable to transition Executor to Executing state: %w", |
} | ||
|
||
return Scheduler{ | ||
SchedulerInternal: servercommon.CloneProto(prevScheduler.SchedulerInternal), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same.
|
||
func (g Generator) tasks() ([]hsm.Task, error) { | ||
switch g.State() { // nolint:exhaustive | ||
case enumsspb.SCHEDULER_GENERATOR_STATE_BUFFERING: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Shouldn't we always be buffering?
func (g Generator) tasks() ([]hsm.Task, error) { | ||
switch g.State() { // nolint:exhaustive | ||
case enumsspb.SCHEDULER_GENERATOR_STATE_BUFFERING: | ||
return []hsm.Task{BufferTask{deadline: g.NextInvocationTime.AsTime()}}, nil |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Isn't NextInvocationTime a computed property? Based on the first item in the buffer? Seems like a better representation. I may be commenting out of context so LMK if I'm off here.
panic("TODO: CompareState not yet implemented for Generator") | ||
} | ||
|
||
var TransitionBuffer = hsm.NewTransition( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've mentioned this before but to reiterate, there's only one state, you probably don't need to define state transitions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you insist on defining this transition, you should accept a transition from BUFFERING to BUFFERING.
err = hsm.MachineTransition(node, func(g Generator) (hsm.TransitionOutput, error) { | ||
wakeupTime := res.NextWakeupTime | ||
g.LastProcessedTime = timestamppb.New(res.LastActionTime) | ||
g.NextInvocationTime = timestamppb.New(wakeupTime) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd be careful caching this property, it may be better to compute it on the fly and store it only on the generated task's deadline.
Based on #6904
What changed?
Why?
How did you test it?