Skip to content

Commit

Permalink
[RayJob] implement deletion policy API
Browse files Browse the repository at this point in the history
Signed-off-by: Andrew Sy Kim <[email protected]>
  • Loading branch information
andrewsykim committed Dec 30, 2024
1 parent f2d7c1f commit 6ddd641
Show file tree
Hide file tree
Showing 10 changed files with 134 additions and 4 deletions.
14 changes: 14 additions & 0 deletions docs/reference/api.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,19 @@ _Appears in:_
| `volumeMounts` _[VolumeMount](https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.28/#volumemount-v1-core) array_ | Optional list of volumeMounts. This is needed for enabling TLS for the autoscaler container. | | |


#### DeletionPolicy

_Underlying type:_ _string_





_Appears in:_
- [RayJobSpec](#rayjobspec)





#### HeadGroupSpec
Expand Down Expand Up @@ -161,6 +174,7 @@ _Appears in:_
| `clusterSelector` _object (keys:string, values:string)_ | clusterSelector is used to select running rayclusters by labels | | |
| `submitterConfig` _[SubmitterConfig](#submitterconfig)_ | Configurations of submitter k8s job. | | |
| `managedBy` _string_ | ManagedBy is an optional configuration for the controller or entity that manages a RayJob.<br />The value must be either 'ray.io/kuberay-operator' or 'kueue.x-k8s.io/multikueue'.<br />The kuberay-operator reconciles a RayJob which doesn't have this field at all or<br />the field value is the reserved string 'ray.io/kuberay-operator',<br />but delegates reconciling the RayJob with 'kueue.x-k8s.io/multikueue' to the Kueue.<br />The field is immutable. | | |
| `deletionPolicy` _[DeletionPolicy](#deletionpolicy)_ | deletionPolicy indicates what resources of the RayJob are deleted upon job completion.<br />Valid values are 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf' or 'None'.<br />If unset, deletion policy is based on 'spec.shutdownAfterJobFinishes'.<br />This field requires the RayJobDeletionPolicy feature gate to be enabled. | | |
| `entrypoint` _string_ | INSERT ADDITIONAL SPEC FIELDS - desired state of cluster<br />Important: Run "make" to regenerate code after modifying this file | | |
| `runtimeEnvYAML` _string_ | RuntimeEnvYAML represents the runtime environment configuration<br />provided as a multi-line YAML string. | | |
| `jobId` _string_ | If jobId is not set, a new jobId will be auto-generated. | | |
Expand Down
6 changes: 6 additions & 0 deletions helm-chart/kuberay-operator/crds/ray.io_rayjobs.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions helm-chart/kuberay-operator/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,8 @@ batchScheduler:
featureGates:
- name: RayClusterStatusConditions
enabled: true
- name: RayJobDeletionPolicy
enabled: false

# Path to the operator binary
operatorComand: /manager
Expand Down
15 changes: 15 additions & 0 deletions ray-operator/apis/ray/v1/rayjob_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,15 @@ const (
InteractiveMode JobSubmissionMode = "InteractiveMode" // Don't submit job in KubeRay. Instead, wait for user to submit job and provide the job submission ID.
)

type DeletionPolicy string

const (
DeleteClusterDeletionPolicy DeletionPolicy = "DeleteCluster"
DeleteWorkersDeletionPolicy DeletionPolicy = "DeleteWorkers"
DeleteSelfDeltionPolicy DeletionPolicy = "DeleteSelf"
DeleteNoneDeletionPolicy DeletionPolicy = "None"
)

type SubmitterConfig struct {
// BackoffLimit of the submitter k8s job.
BackoffLimit *int32 `json:"backoffLimit,omitempty"`
Expand Down Expand Up @@ -95,6 +104,12 @@ type RayJobSpec struct {
// +kubebuilder:validation:XValidation:rule="self == oldSelf",message="the managedBy field is immutable"
// +kubebuilder:validation:XValidation:rule="self in ['ray.io/kuberay-operator', 'kueue.x-k8s.io/multikueue']",message="the managedBy field value must be either 'ray.io/kuberay-operator' or 'kueue.x-k8s.io/multikueue'"
ManagedBy *string `json:"managedBy,omitempty"`
// deletionPolicy indicates what resources of the RayJob are deleted upon job completion.
// Valid values are 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf' or 'None'.
// If unset, deletion policy is based on 'spec.shutdownAfterJobFinishes'.
// This field requires the RayJobDeletionPolicy feature gate to be enabled.
// +kubebuilder:validation:XValidation:rule="self in ['DeleteCluster', 'DeleteWorkers', 'DeleteSelf', 'DeleteNone']",message="the deletionPolicy field value must be either 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf', or 'DeleteNone'"
DeletionPolicy *DeletionPolicy `json:"deletionPolicy,omitempty"`
// INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
// Important: Run "make" to regenerate code after modifying this file
Entrypoint string `json:"entrypoint,omitempty"`
Expand Down
5 changes: 5 additions & 0 deletions ray-operator/apis/ray/v1/zz_generated.deepcopy.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions ray-operator/config/crd/bases/ray.io_rayjobs.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

71 changes: 67 additions & 4 deletions ray-operator/controllers/ray/rayjob_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (

"github.com/ray-project/kuberay/ray-operator/controllers/ray/common"
"github.com/ray-project/kuberay/ray-operator/controllers/ray/utils"
"github.com/ray-project/kuberay/ray-operator/pkg/features"

"k8s.io/apimachinery/pkg/runtime"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -347,10 +348,46 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
case rayv1.JobDeploymentStatusComplete, rayv1.JobDeploymentStatusFailed:
// If this RayJob uses an existing RayCluster (i.e., ClusterSelector is set), we should not delete the RayCluster.
logger.Info(string(rayJobInstance.Status.JobDeploymentStatus), "RayJob", rayJobInstance.Name, "ShutdownAfterJobFinishes", rayJobInstance.Spec.ShutdownAfterJobFinishes, "ClusterSelector", rayJobInstance.Spec.ClusterSelector)
if rayJobInstance.Spec.ShutdownAfterJobFinishes && len(rayJobInstance.Spec.ClusterSelector) == 0 {
ttlSeconds := rayJobInstance.Spec.TTLSecondsAfterFinished
nowTime := time.Now()
shutdownTime := rayJobInstance.Status.EndTime.Add(time.Duration(ttlSeconds) * time.Second)
ttlSeconds := rayJobInstance.Spec.TTLSecondsAfterFinished
nowTime := time.Now()
shutdownTime := rayJobInstance.Status.EndTime.Add(time.Duration(ttlSeconds) * time.Second)

if features.Enabled(features.RayJobDeletionPolicy) &&
rayJobInstance.Spec.DeletionPolicy != nil &&
*rayJobInstance.Spec.DeletionPolicy != rayv1.DeleteNoneDeletionPolicy &&
len(rayJobInstance.Spec.ClusterSelector) == 0 {
logger.Info(
"RayJob deployment status",
"jobDeploymentStatus", rayJobInstance.Status.JobDeploymentStatus,
"deletionPolicy", rayJobInstance.Spec.DeletionPolicy,
"ttlSecondsAfterFinished", ttlSeconds,
"Status.endTime", rayJobInstance.Status.EndTime,
"Now", nowTime,
"ShutdownTime", shutdownTime)
if shutdownTime.After(nowTime) {
delta := int32(time.Until(shutdownTime.Add(2 * time.Second)).Seconds())
logger.Info("shutdownTime not reached, requeue this RayJob for n seconds", "seconds", delta)
return ctrl.Result{RequeueAfter: time.Duration(delta) * time.Second}, nil
}

switch *rayJobInstance.Spec.DeletionPolicy {
case rayv1.DeleteClusterDeletionPolicy:
logger.Info("Deleting RayCluster", "RayCluster", rayJobInstance.Status.RayClusterName)
_, err = r.deleteClusterResources(ctx, rayJobInstance)
case rayv1.DeleteWorkersDeletionPolicy:
logger.Info("Scaling all worker replicas to 0", "RayCluster", rayJobInstance.Status.RayClusterName)
_, err = r.scaleWorkerReplicasToZero(ctx, rayJobInstance)
case rayv1.DeleteSelfDeltionPolicy:
logger.Info("Deleting RayJob")
err = r.Client.Delete(ctx, rayJobInstance)
default:
}
if err != nil {
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
}

if (!features.Enabled(features.RayJobDeletionPolicy) || rayJobInstance.Spec.DeletionPolicy == nil) && rayJobInstance.Spec.ShutdownAfterJobFinishes && len(rayJobInstance.Spec.ClusterSelector) == 0 {
logger.Info(
"RayJob deployment status",
"jobDeploymentStatus", rayJobInstance.Status.JobDeploymentStatus,
Expand All @@ -377,6 +414,7 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request)
return ctrl.Result{RequeueAfter: RayJobDefaultRequeueDuration}, err
}
}

// If the RayJob is completed, we should not requeue it.
return ctrl.Result{}, nil
default:
Expand Down Expand Up @@ -617,6 +655,31 @@ func (r *RayJobReconciler) deleteClusterResources(ctx context.Context, rayJobIns
return isClusterDeleted, nil
}

func (r *RayJobReconciler) scaleWorkerReplicasToZero(ctx context.Context, rayJobInstance *rayv1.RayJob) (bool, error) {
logger := ctrl.LoggerFrom(ctx)
clusterIdentifier := common.RayJobRayClusterNamespacedName(rayJobInstance)

cluster := rayv1.RayCluster{}
if err := r.Get(ctx, clusterIdentifier, &cluster); err != nil {
return false, err
}

for i := range cluster.Spec.WorkerGroupSpecs {
cluster.Spec.WorkerGroupSpecs[i].Replicas = ptr.To[int32](0)
cluster.Spec.WorkerGroupSpecs[i].MinReplicas = ptr.To[int32](0)
}

if err := r.Update(ctx, &cluster); err != nil {
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeWarning, string(utils.FailedToUpdateRayCluster), "Failed to update cluster %s/%s: %v", cluster.Namespace, cluster.Name, err)
return false, err
}

logger.Info("All worker groups for RayCluster has been scaled to 0", "RayCluster", clusterIdentifier)
r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, string(utils.UpdatedRayCluster), "Updated cluster %s/%s", cluster.Namespace, cluster.Name)

return true, nil
}

// SetupWithManager sets up the controller with the Manager.
func (r *RayJobReconciler) SetupWithManager(mgr ctrl.Manager, reconcileConcurrency int) error {
return ctrl.NewControllerManagedBy(mgr).
Expand Down
2 changes: 2 additions & 0 deletions ray-operator/controllers/ray/utils/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,9 +264,11 @@ const (
FailedToCreateRayJobSubmitter K8sEventType = "FailedToCreateRayJobSubmitter"
FailedToDeleteRayJobSubmitter K8sEventType = "FailedToDeleteRayJobSubmitter"
CreatedRayCluster K8sEventType = "CreatedRayCluster"
UpdatedRayCluster K8sEventType = "UpdatedRayCluster"
DeletedRayCluster K8sEventType = "DeletedRayCluster"
FailedToCreateRayCluster K8sEventType = "FailedToCreateRayCluster"
FailedToDeleteRayCluster K8sEventType = "FailedToDeleteRayCluster"
FailedToUpdateRayCluster K8sEventType = "FailedToUpdateRayCluster"

// RayService event list
InvalidRayServiceSpec K8sEventType = "InvalidRayServiceSpec"
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

8 changes: 8 additions & 0 deletions ray-operator/pkg/features/features.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,13 @@ const (
//
// Enables new conditions in RayCluster status
RayClusterStatusConditions featuregate.Feature = "RayClusterStatusConditions"

// owner: @andrewsykim
// rep: N/A
// alpha: v1.3
//
// Enables new deletion policy API in RayJob
RayJobDeletionPolicy featuregate.Feature = "RayJobDeletionPolicy"
)

func init() {
Expand All @@ -25,6 +32,7 @@ func init() {

var defaultFeatureGates = map[featuregate.Feature]featuregate.FeatureSpec{
RayClusterStatusConditions: {Default: true, PreRelease: featuregate.Beta},
RayJobDeletionPolicy: {Default: false, PreRelease: featuregate.Alpha},
}

// SetFeatureGateDuringTest is a helper method to override feature gates in tests.
Expand Down

0 comments on commit 6ddd641

Please sign in to comment.