Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Scheduled Actions V2] WIP Generator component #6905

Draft
wants to merge 3 commits into
base: sched2_core
Choose a base branch
from

Conversation

lina-temporal
Copy link
Contributor

Based on #6904

What changed?

  • Added the WIP (but mostly implementation-complete) Generator component, a sub-state machine of the top-level Scheduler

Why?

  • The generator generates buffered actions, and we want those!

How did you test it?

  • I haven't, testing is next (this is a draft PR)

Comment on lines 20 to 21
"go.temporal.io/server/service/worker/scheduler"
scheduler1 "go.temporal.io/server/service/worker/scheduler"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Duplicate import.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Will fix

}

// process time range between last high water mark and system time
t1 := timestamp.TimeValue(generator.LastProcessedTime)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not this? (out of curiosity)

Suggested change
t1 := timestamp.TimeValue(generator.LastProcessedTime)
t1 := generator.LastProcessedTime.AsTime()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure, artifact from original implementation I suppose. will fix :)

t1 := timestamp.TimeValue(generator.LastProcessedTime)
t2 := time.Now()
if t2.Before(t1) {
e.Logger.Warn("Time went backwards",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering how this log would be printed. Could be worth baking a logger with some relevant tags.

return common.ValidateTask(node, TransitionBuffer)
}

var TransitionSleep = hsm.NewTransition(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I typically put the transitions with the state machine. I think we should try and keep that consistent.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, will do.

Comment on lines 35 to 36
TaskTypeSleep = "scheduler.generator.Sleep"
TaskTypeBuffer = "scheduler.generator.Buffer"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering why you need two tasks here, why don't you buffer in the sleep task under a write lock? It's less DB writes this way. You should only need the buffer task if you plan to do any IO or other type of side effect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wondering why you need two tasks here, why don't you buffer in the sleep task under a write lock? It's less DB writes this way. You should only need the buffer task if you plan to do any IO or other type of side effect.

If that's an option (buffering under a write lock), that works for me, since there shouldn't be any regular contention - posted a question on your other comment, re: ConflictToken vs write locking.

Copy link
Contributor Author

@lina-temporal lina-temporal Dec 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about WAITING versus EXECUTING in the context of the Backfiller, as well, where a user's backfill request will trigger an immediate Backfiller state transition and task to start backfilling. For Generator, if we don't have a separate EXECUTING task to transition to after a user updates, how would we signal the sleeping Generator sub-state machine? If re-entering the WAITING state works, then we can probably also get rid of EXECUTING on the Backfiller.

return err
}
if refreshedScheduler.ConflictToken != scheduler.ConflictToken {
// schedule was updated while processing, retry
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we actually want to retry the task here? I commented on the transition definition too but I think you can do all of this under a write lock since there's no IO involved AFAICT.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe not - If we were doing this under the write lock on the generator node, is the expectation that we'd also attempt to acquire the generator node for write access during an update request? I've been assuming that env.Access scopes applied only to the given ref, do they/can they apply more broadly?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The lock is held for the entire mutable state record including the whole tree structure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah! Okay, I'd been thinking that each component could potentially map to its own mutable state. One MS simplifies that. Will update to just fail out the task (I believe ErrStaleReference is for this purpose?)

Copy link
Member

@bergundy bergundy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's start with a clean slate and delete the PoC scheduler component and all of its protos.

if !ok {
return 0, fmt.Errorf("%w: expected state1 to be a Generator instance, got %v", hsm.ErrIncompatibleType, s1)
}
s2, ok := a.(Generator)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
s2, ok := a.(Generator)
s2, ok := b.(Generator)

Comment on lines 220 to 222
// We're reprocessing since the most recent event after an update. Discard actions before
// the update time (which was just set to "now"). This doesn't have to be guarded with
// hasMinVersion because this condition couldn't happen in previous versions.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I imagine this comment should be rewritten somewhat. We don't have versions anymore, at least. Also I'm not sure how updates fit into this execution model, it may not be true that "we just set update time to 'now'"

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oops, yes, will rewrite this.


// Returns the next time result, or an error if the schedule cannot be compiled.
func (e taskExecutor) getNextTime(s core.Scheduler, after time.Time) (scheduler.GetNextTimeResult, error) {
spec, err := s.CompiledSpec(e.SpecBuilder)
Copy link
Member

@dnr dnr Dec 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just curious if these can get cached at all. it's not that expensive to construct so probably not worth it, except for backfills where it's doing this in a loop during one transition


oh, I see, the caching is elsewhere. nm

Comment on lines 83 to 89
if s1.State() > s2.State() {
return 1, nil
} else if s1.State() < s2.State() {
return -1, nil
}

return 0, nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if s1.State() > s2.State() {
return 1, nil
} else if s1.State() < s2.State() {
return -1, nil
}
return 0, nil
return cmp.Compare(s1.State(), s2.State()), nil

bufferedStarts = append(bufferedStarts, &schedpb.BufferedStart{
NominalTime: timestamppb.New(next.Nominal),
ActualTime: timestamppb.New(next.Next),
OverlapPolicy: scheduler.OverlapPolicy(),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
OverlapPolicy: scheduler.OverlapPolicy(),
OverlapPolicy: overlapPolicy,

?

Copy link
Member

@bergundy bergundy left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reviewed all the way to spec_processor.go, will defer to @dnr to verify the business logic for lack of context.

// to the schedule's specification. Manually requested actions (from an immediate
// request or backfill) are separately handled in the Backfiller sub state machine.
Generator struct {
*schedspb.GeneratorInternal
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider removing the redundant pointer here and embedding the struct directly.


// Process time range between last high water mark and system time.
t1 := generator.LastProcessedTime.AsTime()
t2 := time.Now().UTC()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Use env.Now() instead, this will potentially allow you to inject a fake time source and fast forward for testing purposes.

res, err := e.SpecProcessor.ProcessTimeRange(scheduler, t1, t2, false, nil)
if err != nil {
// An error here should be impossible, send to the DLQ.
e.Logger.Error("Error processing time range", tag.Error(err))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider baking a logger with the relevant scheduler attributes as tags.

Comment on lines +78 to +79
err,
serviceerror.NewInternal("Scheduler's Generator failed to process a time range"),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These should typically be reversed:

Suggested change
err,
serviceerror.NewInternal("Scheduler's Generator failed to process a time range"),
serviceerror.NewInternal("Scheduler's Generator failed to process a time range"),
err,

})
if err != nil {
return fmt.Errorf(
"%w: unable to transition Executor to Executing state",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Typically the wrapped error comes in the end.

Suggested change
"%w: unable to transition Executor to Executing state",
"unable to transition Executor to Executing state: %w",

}

return Scheduler{
SchedulerInternal: servercommon.CloneProto(prevScheduler.SchedulerInternal),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same.


func (g Generator) tasks() ([]hsm.Task, error) {
switch g.State() { // nolint:exhaustive
case enumsspb.SCHEDULER_GENERATOR_STATE_BUFFERING:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we always be buffering?

func (g Generator) tasks() ([]hsm.Task, error) {
switch g.State() { // nolint:exhaustive
case enumsspb.SCHEDULER_GENERATOR_STATE_BUFFERING:
return []hsm.Task{BufferTask{deadline: g.NextInvocationTime.AsTime()}}, nil
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't NextInvocationTime a computed property? Based on the first item in the buffer? Seems like a better representation. I may be commenting out of context so LMK if I'm off here.

panic("TODO: CompareState not yet implemented for Generator")
}

var TransitionBuffer = hsm.NewTransition(
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've mentioned this before but to reiterate, there's only one state, you probably don't need to define state transitions.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you insist on defining this transition, you should accept a transition from BUFFERING to BUFFERING.

err = hsm.MachineTransition(node, func(g Generator) (hsm.TransitionOutput, error) {
wakeupTime := res.NextWakeupTime
g.LastProcessedTime = timestamppb.New(res.LastActionTime)
g.NextInvocationTime = timestamppb.New(wakeupTime)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be careful caching this property, it may be better to compute it on the fly and store it only on the generated task's deadline.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants