diff --git a/pkg/apis/flyteworkflow/v1alpha1/iface.go b/pkg/apis/flyteworkflow/v1alpha1/iface.go index 26cfd2321..1ceddd36f 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/iface.go +++ b/pkg/apis/flyteworkflow/v1alpha1/iface.go @@ -236,8 +236,8 @@ type MutableDynamicNodeStatus interface { SetExecutionError(executionError *core.ExecutionError) } -// Interface for Branch node. All the methods are purely read only except for the GetExecutionStatus. -// p returns ExecutableBranchNodeStatus, which permits some mutations +// ExecutableBranchNode is an interface for Branch node. All the methods are purely read only except for the +// GetExecutionStatus. p returns ExecutableBranchNodeStatus, which permits some mutations type ExecutableBranchNode interface { GetIf() ExecutableIfBlock GetElse() *NodeID @@ -443,6 +443,7 @@ type Meta interface { GetSecurityContext() core.SecurityContext IsInterruptible() bool GetEventVersion() EventVersion + GetDefinitionVersion() WorkflowDefinitionVersion GetRawOutputDataConfig() RawOutputDataConfig } diff --git a/pkg/apis/flyteworkflow/v1alpha1/workflow.go b/pkg/apis/flyteworkflow/v1alpha1/workflow.go index d96a87d14..11e73a167 100644 --- a/pkg/apis/flyteworkflow/v1alpha1/workflow.go +++ b/pkg/apis/flyteworkflow/v1alpha1/workflow.go @@ -23,6 +23,15 @@ const ShardKeyspaceSize = 32 const StartNodeID = "start-node" const EndNodeID = "end-node" +type WorkflowDefinitionVersion uint32 + +var LatestWorkflowDefinitionVersion = WorkflowDefinitionVersion1 + +const ( + WorkflowDefinitionVersion0 WorkflowDefinitionVersion = iota + WorkflowDefinitionVersion1 +) + // +genclient // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object @@ -82,12 +91,25 @@ func (in *FlyteWorkflow) GetEventVersion() EventVersion { return EventVersion0 } +func (in *FlyteWorkflow) GetDefinitionVersion() WorkflowDefinitionVersion { + if meta := in.WorkflowMeta; meta != nil && meta.DefinitionVersion != nil { + return *meta.DefinitionVersion + } + + return WorkflowDefinitionVersion0 +} + func (in *FlyteWorkflow) GetExecutionConfig() ExecutionConfig { return in.ExecutionConfig } type WorkflowMeta struct { EventVersion EventVersion `json:"eventVersion,omitempty"` + // DefinitionVersion allows propeller code that populates the CRD to evolve (in backward incompatible ways) without + // affecting in-flight executions. Once an execution starts, propeller will populate this field with the current or + // latest version. If a newer propeller version is deployed midway that comes with a newer version, code that relies + // on the latest version should be gated behind this. + DefinitionVersion *WorkflowDefinitionVersion `json:"defVersion,omitempty"` } type EventVersion int diff --git a/pkg/controller/handler.go b/pkg/controller/handler.go index e40e4d045..0f653de57 100644 --- a/pkg/controller/handler.go +++ b/pkg/controller/handler.go @@ -83,6 +83,16 @@ func (p *Propeller) Initialize(ctx context.Context) error { return p.workflowExecutor.Initialize(ctx) } +func SetDefinitionVersionIfEmpty(wf *v1alpha1.FlyteWorkflow, version v1alpha1.WorkflowDefinitionVersion) { + if wf.WorkflowMeta == nil { + wf.WorkflowMeta = &v1alpha1.WorkflowMeta{} + } + + if wf.WorkflowMeta.DefinitionVersion == nil { + wf.WorkflowMeta.DefinitionVersion = &version + } +} + // TryMutateWorkflow will try to mutate the workflow by traversing it and reconciling the desired and actual state. // The desired state here is the entire workflow is completed, actual state is each nodes current execution state. func (p *Propeller) TryMutateWorkflow(ctx context.Context, originalW *v1alpha1.FlyteWorkflow) (*v1alpha1.FlyteWorkflow, error) { @@ -120,6 +130,7 @@ func (p *Propeller) TryMutateWorkflow(ctx context.Context, originalW *v1alpha1.F if !mutableW.GetExecutionStatus().IsTerminated() { var err error SetFinalizerIfEmpty(mutableW, FinalizerKey) + SetDefinitionVersionIfEmpty(mutableW, v1alpha1.LatestWorkflowDefinitionVersion) func() { t := p.metrics.RawWorkflowTraversalTime.Start(ctx) diff --git a/pkg/controller/nodes/subworkflow/launchplan.go b/pkg/controller/nodes/subworkflow/launchplan.go index 8d14968d0..037c41b88 100644 --- a/pkg/controller/nodes/subworkflow/launchplan.go +++ b/pkg/controller/nodes/subworkflow/launchplan.go @@ -56,10 +56,12 @@ func (l *launchPlanHandler) StartLaunchPlan(ctx context.Context, nCtx handler.No if err != nil { return handler.UnknownTransition, err } - childID, err := GetChildWorkflowExecutionID( + + childID, err := GetChildWorkflowExecutionIDForExecution( parentNodeExecutionID, - nCtx.CurrentAttempt(), + nCtx, ) + if err != nil { return handler.DoTransition(handler.TransitionTypeEphemeral, handler.PhaseInfoFailure(core.ExecutionError_SYSTEM, errors.RuntimeExecutionError, "failed to create unique ID", nil)), nil } @@ -110,15 +112,31 @@ func (l *launchPlanHandler) StartLaunchPlan(ctx context.Context, nCtx handler.No })), nil } +func GetChildWorkflowExecutionIDForExecution(parentNodeExecID *core.NodeExecutionIdentifier, nCtx handler.NodeExecutionContext) (*core.WorkflowExecutionIdentifier, error) { + // Handle launch plan + if nCtx.ExecutionContext().GetDefinitionVersion() == v1alpha1.WorkflowDefinitionVersion0 { + return GetChildWorkflowExecutionID( + parentNodeExecID, + nCtx.CurrentAttempt(), + ) + } + + return GetChildWorkflowExecutionIDV2( + parentNodeExecID, + nCtx.CurrentAttempt(), + ) +} + func (l *launchPlanHandler) CheckLaunchPlanStatus(ctx context.Context, nCtx handler.NodeExecutionContext) (handler.Transition, error) { parentNodeExecutionID, err := getParentNodeExecutionID(nCtx) if err != nil { return handler.UnknownTransition, err } + // Handle launch plan - childID, err := GetChildWorkflowExecutionID( + childID, err := GetChildWorkflowExecutionIDForExecution( parentNodeExecutionID, - nCtx.CurrentAttempt(), + nCtx, ) if err != nil { @@ -203,9 +221,9 @@ func (l *launchPlanHandler) HandleAbort(ctx context.Context, nCtx handler.NodeEx if err != nil { return err } - childID, err := GetChildWorkflowExecutionID( + childID, err := GetChildWorkflowExecutionIDForExecution( parentNodeExecutionID, - nCtx.CurrentAttempt(), + nCtx, ) if err != nil { // THIS SHOULD NEVER HAPPEN diff --git a/pkg/controller/nodes/subworkflow/util.go b/pkg/controller/nodes/subworkflow/util.go index 0b5bf715b..cedef8469 100644 --- a/pkg/controller/nodes/subworkflow/util.go +++ b/pkg/controller/nodes/subworkflow/util.go @@ -22,3 +22,26 @@ func GetChildWorkflowExecutionID(nodeExecID *core.NodeExecutionIdentifier, attem Name: name, }, nil } + +func GetChildWorkflowExecutionIDV2(nodeExecID *core.NodeExecutionIdentifier, attempt uint32) (*core.WorkflowExecutionIdentifier, error) { + name, err := encoding.FixedLengthUniqueIDForParts(maxLengthForSubWorkflow, nodeExecID.ExecutionId.Name, nodeExecID.NodeId, strconv.Itoa(int(attempt))) + if err != nil { + return nil, err + } + + // Restriction on name is 20 chars + return &core.WorkflowExecutionIdentifier{ + Project: nodeExecID.ExecutionId.Project, + Domain: nodeExecID.ExecutionId.Domain, + Name: EnsureExecIDWithinLength(nodeExecID.ExecutionId.Name, name, maxLengthForSubWorkflow), + }, nil +} + +func EnsureExecIDWithinLength(execID, subName string, maxLength int) string { + maxLengthRemaining := maxLength - len(subName) + if len(execID) < maxLengthRemaining { + return execID + subName + } + + return execID[:maxLengthRemaining] + subName +}