Skip to content
This repository has been archived by the owner on Oct 9, 2023. It is now read-only.

Introduce a CRD Definition Version to allow backward incompatible changes & use it to change LP Exec ID #475

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 3 additions & 2 deletions pkg/apis/flyteworkflow/v1alpha1/iface.go
Original file line number Diff line number Diff line change
Expand Up @@ -236,8 +236,8 @@ type MutableDynamicNodeStatus interface {
SetExecutionError(executionError *core.ExecutionError)
}

// Interface for Branch node. All the methods are purely read only except for the GetExecutionStatus.
// p returns ExecutableBranchNodeStatus, which permits some mutations
// ExecutableBranchNode is an interface for Branch node. All the methods are purely read only except for the
// GetExecutionStatus. p returns ExecutableBranchNodeStatus, which permits some mutations
type ExecutableBranchNode interface {
GetIf() ExecutableIfBlock
GetElse() *NodeID
Expand Down Expand Up @@ -443,6 +443,7 @@ type Meta interface {
GetSecurityContext() core.SecurityContext
IsInterruptible() bool
GetEventVersion() EventVersion
GetDefinitionVersion() WorkflowDefinitionVersion
GetRawOutputDataConfig() RawOutputDataConfig
}

Expand Down
22 changes: 22 additions & 0 deletions pkg/apis/flyteworkflow/v1alpha1/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,15 @@ const ShardKeyspaceSize = 32
const StartNodeID = "start-node"
const EndNodeID = "end-node"

type WorkflowDefinitionVersion uint32

var LatestWorkflowDefinitionVersion = WorkflowDefinitionVersion1

const (
WorkflowDefinitionVersion0 WorkflowDefinitionVersion = iota
WorkflowDefinitionVersion1
)

// +genclient
// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object

Expand Down Expand Up @@ -82,12 +91,25 @@ func (in *FlyteWorkflow) GetEventVersion() EventVersion {
return EventVersion0
}

func (in *FlyteWorkflow) GetDefinitionVersion() WorkflowDefinitionVersion {
if meta := in.WorkflowMeta; meta != nil && meta.DefinitionVersion != nil {
return *meta.DefinitionVersion
}

return WorkflowDefinitionVersion0
}

func (in *FlyteWorkflow) GetExecutionConfig() ExecutionConfig {
return in.ExecutionConfig
}

type WorkflowMeta struct {
EventVersion EventVersion `json:"eventVersion,omitempty"`
// DefinitionVersion allows propeller code that populates the CRD to evolve (in backward incompatible ways) without
// affecting in-flight executions. Once an execution starts, propeller will populate this field with the current or
// latest version. If a newer propeller version is deployed midway that comes with a newer version, code that relies
// on the latest version should be gated behind this.
DefinitionVersion *WorkflowDefinitionVersion `json:"defVersion,omitempty"`
}

type EventVersion int
Expand Down
11 changes: 11 additions & 0 deletions pkg/controller/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,16 @@ func (p *Propeller) Initialize(ctx context.Context) error {
return p.workflowExecutor.Initialize(ctx)
}

func SetDefinitionVersionIfEmpty(wf *v1alpha1.FlyteWorkflow, version v1alpha1.WorkflowDefinitionVersion) {
if wf.WorkflowMeta == nil {
wf.WorkflowMeta = &v1alpha1.WorkflowMeta{}
}

if wf.WorkflowMeta.DefinitionVersion == nil {
wf.WorkflowMeta.DefinitionVersion = &version
}
}

// TryMutateWorkflow will try to mutate the workflow by traversing it and reconciling the desired and actual state.
// The desired state here is the entire workflow is completed, actual state is each nodes current execution state.
func (p *Propeller) TryMutateWorkflow(ctx context.Context, originalW *v1alpha1.FlyteWorkflow) (*v1alpha1.FlyteWorkflow, error) {
Expand Down Expand Up @@ -120,6 +130,7 @@ func (p *Propeller) TryMutateWorkflow(ctx context.Context, originalW *v1alpha1.F
if !mutableW.GetExecutionStatus().IsTerminated() {
var err error
SetFinalizerIfEmpty(mutableW, FinalizerKey)
SetDefinitionVersionIfEmpty(mutableW, v1alpha1.LatestWorkflowDefinitionVersion)

func() {
t := p.metrics.RawWorkflowTraversalTime.Start(ctx)
Expand Down
30 changes: 24 additions & 6 deletions pkg/controller/nodes/subworkflow/launchplan.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,10 +56,12 @@ func (l *launchPlanHandler) StartLaunchPlan(ctx context.Context, nCtx handler.No
if err != nil {
return handler.UnknownTransition, err
}
childID, err := GetChildWorkflowExecutionID(

childID, err := GetChildWorkflowExecutionIDForExecution(
parentNodeExecutionID,
nCtx.CurrentAttempt(),
nCtx,
)

if err != nil {
return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, errors.RuntimeExecutionError, "failed to create unique ID", nil)), nil
}
Expand Down Expand Up @@ -110,15 +112,31 @@ func (l *launchPlanHandler) StartLaunchPlan(ctx context.Context, nCtx handler.No
})), nil
}

func GetChildWorkflowExecutionIDForExecution(parentNodeExecID *core.NodeExecutionIdentifier, nCtx handler.NodeExecutionContext) (*core.WorkflowExecutionIdentifier, error) {
// Handle launch plan
if nCtx.ExecutionContext().GetDefinitionVersion() == v1alpha1.WorkflowDefinitionVersion0 {
return GetChildWorkflowExecutionID(
parentNodeExecID,
nCtx.CurrentAttempt(),
)
}

return GetChildWorkflowExecutionIDV2(
parentNodeExecID,
nCtx.CurrentAttempt(),
)
}

func (l *launchPlanHandler) CheckLaunchPlanStatus(ctx context.Context, nCtx handler.NodeExecutionContext) (handler.Transition, error) {
parentNodeExecutionID, err := getParentNodeExecutionID(nCtx)
if err != nil {
return handler.UnknownTransition, err
}

// Handle launch plan
childID, err := GetChildWorkflowExecutionID(
childID, err := GetChildWorkflowExecutionIDForExecution(
parentNodeExecutionID,
nCtx.CurrentAttempt(),
nCtx,
)

if err != nil {
Expand Down Expand Up @@ -203,9 +221,9 @@ func (l *launchPlanHandler) HandleAbort(ctx context.Context, nCtx handler.NodeEx
if err != nil {
return err
}
childID, err := GetChildWorkflowExecutionID(
childID, err := GetChildWorkflowExecutionIDForExecution(
parentNodeExecutionID,
nCtx.CurrentAttempt(),
nCtx,
)
if err != nil {
// THIS SHOULD NEVER HAPPEN
Expand Down
23 changes: 23 additions & 0 deletions pkg/controller/nodes/subworkflow/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,26 @@ func GetChildWorkflowExecutionID(nodeExecID *core.NodeExecutionIdentifier, attem
Name: name,
}, nil
}

func GetChildWorkflowExecutionIDV2(nodeExecID *core.NodeExecutionIdentifier, attempt uint32) (*core.WorkflowExecutionIdentifier, error) {
name, err := encoding.FixedLengthUniqueIDForParts(maxLengthForSubWorkflow, nodeExecID.ExecutionId.Name, nodeExecID.NodeId, strconv.Itoa(int(attempt)))
if err != nil {
return nil, err
}

// Restriction on name is 20 chars
return &core.WorkflowExecutionIdentifier{
Project: nodeExecID.ExecutionId.Project,
Domain: nodeExecID.ExecutionId.Domain,
Name: EnsureExecIDWithinLength(nodeExecID.ExecutionId.Name, name, maxLengthForSubWorkflow),
}, nil
}

func EnsureExecIDWithinLength(execID, subName string, maxLength int) string {
maxLengthRemaining := maxLength - len(subName)
if len(execID) < maxLengthRemaining {
return execID + subName
}

return execID[:maxLengthRemaining] + subName
}