From cb3980cd428fcafa0eb1ea9583e840525e10b8a0 Mon Sep 17 00:00:00 2001 From: Andrew Sy Kim Date: Mon, 16 Dec 2024 18:26:54 +0000 Subject: [PATCH] add unit tests Signed-off-by: Andrew Sy Kim --- docs/reference/api.md | 2 +- ray-operator/apis/ray/v1/rayjob_types.go | 6 +- .../controllers/ray/rayjob_controller.go | 17 +- .../controllers/ray/rayjob_controller_test.go | 510 ++++++++++++++++++ 4 files changed, 522 insertions(+), 13 deletions(-) diff --git a/docs/reference/api.md b/docs/reference/api.md index 99d6bf597d..b472571b91 100644 --- a/docs/reference/api.md +++ b/docs/reference/api.md @@ -174,7 +174,7 @@ _Appears in:_ | `clusterSelector` _object (keys:string, values:string)_ | clusterSelector is used to select running rayclusters by labels | | | | `submitterConfig` _[SubmitterConfig](#submitterconfig)_ | Configurations of submitter k8s job. | | | | `managedBy` _string_ | ManagedBy is an optional configuration for the controller or entity that manages a RayJob.
The value must be either 'ray.io/kuberay-operator' or 'kueue.x-k8s.io/multikueue'.
The kuberay-operator reconciles a RayJob which doesn't have this field at all or
the field value is the reserved string 'ray.io/kuberay-operator',
but delegates reconciling the RayJob with 'kueue.x-k8s.io/multikueue' to the Kueue.
The field is immutable. | | | -| `deletionPolicy` _[DeletionPolicy](#deletionpolicy)_ | deletionPolicy indicates what resources of the RayJob are deleted upon job completion.
Valid values are 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf' or 'None'.
If unset, deletion policy is based on 'spec.shutdownAfterJobFinishes'.
This field requires the RayJobDeletionPolicy feature gate to be enabled. | | | +| `deletionPolicy` _[DeletionPolicy](#deletionpolicy)_ | deletionPolicy indicates what resources of the RayJob are deleted upon job completion.
Valid values are 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf' or 'DeleteNone'.
If unset, deletion policy is based on 'spec.shutdownAfterJobFinishes'.
This field requires the RayJobDeletionPolicy feature gate to be enabled. | | | | `entrypoint` _string_ | INSERT ADDITIONAL SPEC FIELDS - desired state of cluster
Important: Run "make" to regenerate code after modifying this file | | | | `runtimeEnvYAML` _string_ | RuntimeEnvYAML represents the runtime environment configuration
provided as a multi-line YAML string. | | | | `jobId` _string_ | If jobId is not set, a new jobId will be auto-generated. | | | diff --git a/ray-operator/apis/ray/v1/rayjob_types.go b/ray-operator/apis/ray/v1/rayjob_types.go index 3ec550c83e..dbdf1aae89 100644 --- a/ray-operator/apis/ray/v1/rayjob_types.go +++ b/ray-operator/apis/ray/v1/rayjob_types.go @@ -67,8 +67,8 @@ type DeletionPolicy string const ( DeleteClusterDeletionPolicy DeletionPolicy = "DeleteCluster" DeleteWorkersDeletionPolicy DeletionPolicy = "DeleteWorkers" - DeleteSelfDeltionPolicy DeletionPolicy = "DeleteSelf" - DeleteNoneDeletionPolicy DeletionPolicy = "None" + DeleteSelfDeletionPolicy DeletionPolicy = "DeleteSelf" + DeleteNoneDeletionPolicy DeletionPolicy = "DeleteNone" ) type SubmitterConfig struct { @@ -105,7 +105,7 @@ type RayJobSpec struct { // +kubebuilder:validation:XValidation:rule="self in ['ray.io/kuberay-operator', 'kueue.x-k8s.io/multikueue']",message="the managedBy field value must be either 'ray.io/kuberay-operator' or 'kueue.x-k8s.io/multikueue'" ManagedBy *string `json:"managedBy,omitempty"` // deletionPolicy indicates what resources of the RayJob are deleted upon job completion. - // Valid values are 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf' or 'None'. + // Valid values are 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf' or 'DeleteNone'. // If unset, deletion policy is based on 'spec.shutdownAfterJobFinishes'. // This field requires the RayJobDeletionPolicy feature gate to be enabled. // +kubebuilder:validation:XValidation:rule="self in ['DeleteCluster', 'DeleteWorkers', 'DeleteSelf', 'DeleteNone']",message="the deletionPolicy field value must be either 'DeleteCluster', 'DeleteWorkers', 'DeleteSelf', or 'DeleteNone'" diff --git a/ray-operator/controllers/ray/rayjob_controller.go b/ray-operator/controllers/ray/rayjob_controller.go index a1f4c9a5b4..8139c42c4c 100644 --- a/ray-operator/controllers/ray/rayjob_controller.go +++ b/ray-operator/controllers/ray/rayjob_controller.go @@ -375,9 +375,9 @@ func (r *RayJobReconciler) Reconcile(ctx context.Context, request ctrl.Request) logger.Info("Deleting RayCluster", "RayCluster", rayJobInstance.Status.RayClusterName) _, err = r.deleteClusterResources(ctx, rayJobInstance) case rayv1.DeleteWorkersDeletionPolicy: - logger.Info("Scaling all worker replicas to 0", "RayCluster", rayJobInstance.Status.RayClusterName) - _, err = r.scaleWorkerReplicasToZero(ctx, rayJobInstance) - case rayv1.DeleteSelfDeltionPolicy: + logger.Info("Suspending all worker groups", "RayCluster", rayJobInstance.Status.RayClusterName) + err = r.suspendWorkerGroups(ctx, rayJobInstance) + case rayv1.DeleteSelfDeletionPolicy: logger.Info("Deleting RayJob") err = r.Client.Delete(ctx, rayJobInstance) default: @@ -655,29 +655,28 @@ func (r *RayJobReconciler) deleteClusterResources(ctx context.Context, rayJobIns return isClusterDeleted, nil } -func (r *RayJobReconciler) scaleWorkerReplicasToZero(ctx context.Context, rayJobInstance *rayv1.RayJob) (bool, error) { +func (r *RayJobReconciler) suspendWorkerGroups(ctx context.Context, rayJobInstance *rayv1.RayJob) error { logger := ctrl.LoggerFrom(ctx) clusterIdentifier := common.RayJobRayClusterNamespacedName(rayJobInstance) cluster := rayv1.RayCluster{} if err := r.Get(ctx, clusterIdentifier, &cluster); err != nil { - return false, err + return err } for i := range cluster.Spec.WorkerGroupSpecs { - cluster.Spec.WorkerGroupSpecs[i].Replicas = ptr.To[int32](0) - cluster.Spec.WorkerGroupSpecs[i].MinReplicas = ptr.To[int32](0) + cluster.Spec.WorkerGroupSpecs[i].Suspend = ptr.To[bool](true) } if err := r.Update(ctx, &cluster); err != nil { r.Recorder.Eventf(rayJobInstance, corev1.EventTypeWarning, string(utils.FailedToUpdateRayCluster), "Failed to update cluster %s/%s: %v", cluster.Namespace, cluster.Name, err) - return false, err + return err } logger.Info("All worker groups for RayCluster has been scaled to 0", "RayCluster", clusterIdentifier) r.Recorder.Eventf(rayJobInstance, corev1.EventTypeNormal, string(utils.UpdatedRayCluster), "Updated cluster %s/%s", cluster.Namespace, cluster.Name) - return true, nil + return nil } // SetupWithManager sets up the controller with the Manager. diff --git a/ray-operator/controllers/ray/rayjob_controller_test.go b/ray-operator/controllers/ray/rayjob_controller_test.go index 856d8e8a7f..b6089b5960 100644 --- a/ray-operator/controllers/ray/rayjob_controller_test.go +++ b/ray-operator/controllers/ray/rayjob_controller_test.go @@ -30,6 +30,7 @@ import ( rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1" "github.com/ray-project/kuberay/ray-operator/controllers/ray/common" "github.com/ray-project/kuberay/ray-operator/controllers/ray/utils" + "github.com/ray-project/kuberay/ray-operator/pkg/features" "github.com/ray-project/kuberay/ray-operator/test/support" batchv1 "k8s.io/api/batch/v1" @@ -835,4 +836,513 @@ var _ = Context("RayJob with different submission modes", func() { }) }) }) + + Describe("RayJob with DeletionPolicy=DeleteCluster", Ordered, func() { + features.SetFeatureGateDuringTest(GinkgoTB(), features.RayJobDeletionPolicy, true) + + ctx := context.Background() + namespace := "default" + rayJob := rayJobTemplate("rayjob-test-deletionpolicy-deletecluster", namespace) + deletionPolicy := rayv1.DeleteClusterDeletionPolicy + rayJob.Spec.DeletionPolicy = &deletionPolicy + rayJob.Spec.ShutdownAfterJobFinishes = false + rayCluster := &rayv1.RayCluster{} + + It("Verify RayJob spec", func() { + Expect(*rayJob.Spec.DeletionPolicy).To(Equal(rayv1.DeleteClusterDeletionPolicy)) + }) + + It("Create a RayJob custom resource", func() { + err := k8sClient.Create(ctx, rayJob) + Expect(err).NotTo(HaveOccurred(), "Failed to create RayJob") + Eventually( + getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Name, Namespace: namespace}, rayJob), + time.Second*3, time.Millisecond*500).Should(BeNil(), "Should be able to see RayJob: %v", rayJob.Name) + }) + + It("RayJobs's JobDeploymentStatus transitions from New to Initializing.", func() { + Eventually( + getRayJobDeploymentStatus(ctx, rayJob), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusInitializing), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus) + + // In Initializing state, Status.RayClusterName, Status.JobId, and Status.StartTime must be set. + Expect(rayJob.Status.RayClusterName).NotTo(BeEmpty()) + Expect(rayJob.Status.JobId).NotTo(BeEmpty()) + Expect(rayJob.Status.StartTime).NotTo(BeNil()) + }) + + It("In Initializing state, the RayCluster should eventually be created.", func() { + Eventually( + getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Status.RayClusterName, Namespace: namespace}, rayCluster), + time.Second*3, time.Millisecond*500).Should(BeNil(), "RayCluster %v not found", rayJob.Status.RayClusterName) + + // Check whether RayCluster is consistent with RayJob's RayClusterSpec. + Expect(rayCluster.Spec.WorkerGroupSpecs[0].Replicas).To(Equal(rayJob.Spec.RayClusterSpec.WorkerGroupSpecs[0].Replicas)) + Expect(rayCluster.Spec.RayVersion).To(Equal(rayJob.Spec.RayClusterSpec.RayVersion)) + + // TODO (kevin85421): Check the RayCluster labels and annotations. + Expect(rayCluster.Labels).Should(HaveKeyWithValue(utils.RayOriginatedFromCRNameLabelKey, rayJob.Name)) + Expect(rayCluster.Labels).Should(HaveKeyWithValue(utils.RayOriginatedFromCRDLabelKey, utils.RayOriginatedFromCRDLabelValue(utils.RayJobCRD))) + }) + + It("Make RayCluster.Status.State to be rayv1.Ready", func() { + // The RayCluster is not 'Ready' yet because Pods are not running and ready. + Expect(rayCluster.Status.State).NotTo(Equal(rayv1.Ready)) //nolint:staticcheck // https://github.com/ray-project/kuberay/pull/2288 + + updateHeadPodToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace) + updateWorkerPodsToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace) + + // The RayCluster.Status.State should be Ready. + Eventually( + getClusterState(ctx, namespace, rayCluster.Name), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.Ready)) + }) + + It("RayJobs's JobDeploymentStatus transitions from Initializing to Running.", func() { + Eventually( + getRayJobDeploymentStatus(ctx, rayJob), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusRunning), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus) + + // In Running state, the RayJob's Status.DashboardURL must be set. + Expect(rayJob.Status.DashboardURL).NotTo(BeEmpty()) + + // In Running state, the submitter Kubernetes Job must be created if this RayJob is in K8sJobMode. + namespacedName := common.RayJobK8sJobNamespacedName(rayJob) + job := &batchv1.Job{} + err := k8sClient.Get(ctx, namespacedName, job) + Expect(err).NotTo(HaveOccurred(), "failed to get Kubernetes Job") + }) + + It("RayJobs's JobDeploymentStatus transitions from Running to Complete.", func() { + // Update fake dashboard client to return job info with "Succeeded" status. + getJobInfo := func(context.Context, string) (*utils.RayJobInfo, error) { //nolint:unparam // This is a mock function so parameters are required + return &utils.RayJobInfo{JobStatus: rayv1.JobStatusSucceeded}, nil + } + fakeRayDashboardClient.GetJobInfoMock.Store(&getJobInfo) + defer fakeRayDashboardClient.GetJobInfoMock.Store(nil) + + // RayJob transitions to Complete if and only if the corresponding submitter Kubernetes Job is Complete or Failed. + Consistently( + getRayJobDeploymentStatus(ctx, rayJob), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusRunning), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus) + + // Update the submitter Kubernetes Job to Complete. + namespacedName := common.RayJobK8sJobNamespacedName(rayJob) + job := &batchv1.Job{} + err := k8sClient.Get(ctx, namespacedName, job) + Expect(err).NotTo(HaveOccurred(), "failed to get Kubernetes Job") + + // Update the submitter Kubernetes Job to Complete. + conditions := []batchv1.JobCondition{ + {Type: batchv1.JobComplete, Status: corev1.ConditionTrue}, + } + job.Status.Conditions = conditions + Expect(k8sClient.Status().Update(ctx, job)).Should(Succeed()) + + // RayJob transitions to Complete. + Eventually( + getRayJobDeploymentStatus(ctx, rayJob), + time.Second*5, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusComplete), "jobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus) + }) + + It("If DeletionPolicy=DeleteCluster, RayCluster should be deleted, but not the submitter Job.", func() { + Eventually( + func() bool { + return apierrors.IsNotFound(getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Status.RayClusterName, Namespace: namespace}, rayCluster)()) + }, + time.Second*3, time.Millisecond*500).Should(BeTrue()) + namespacedName := common.RayJobK8sJobNamespacedName(rayJob) + job := &batchv1.Job{} + Consistently( + getResourceFunc(ctx, namespacedName, job), + time.Second*3, time.Millisecond*500).Should(BeNil()) + }) + }) + + Describe("RayJob with DeletionPolicy=DeleteWorkers", Ordered, func() { + features.SetFeatureGateDuringTest(GinkgoTB(), features.RayJobDeletionPolicy, true) + + ctx := context.Background() + namespace := "default" + rayJob := rayJobTemplate("rayjob-test-deletionpolicy-deleteworkers", namespace) + deletionPolicy := rayv1.DeleteWorkersDeletionPolicy + rayJob.Spec.DeletionPolicy = &deletionPolicy + rayJob.Spec.ShutdownAfterJobFinishes = false + rayCluster := &rayv1.RayCluster{} + + It("Verify RayJob spec", func() { + Expect(*rayJob.Spec.DeletionPolicy).To(Equal(rayv1.DeleteWorkersDeletionPolicy)) + }) + + It("Create a RayJob custom resource", func() { + err := k8sClient.Create(ctx, rayJob) + Expect(err).NotTo(HaveOccurred(), "Failed to create RayJob") + Eventually( + getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Name, Namespace: namespace}, rayJob), + time.Second*3, time.Millisecond*500).Should(BeNil(), "Should be able to see RayJob: %v", rayJob.Name) + }) + + It("RayJobs's JobDeploymentStatus transitions from New to Initializing.", func() { + Eventually( + getRayJobDeploymentStatus(ctx, rayJob), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusInitializing), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus) + + // In Initializing state, Status.RayClusterName, Status.JobId, and Status.StartTime must be set. + Expect(rayJob.Status.RayClusterName).NotTo(BeEmpty()) + Expect(rayJob.Status.JobId).NotTo(BeEmpty()) + Expect(rayJob.Status.StartTime).NotTo(BeNil()) + }) + + It("In Initializing state, the RayCluster should eventually be created.", func() { + Eventually( + getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Status.RayClusterName, Namespace: namespace}, rayCluster), + time.Second*3, time.Millisecond*500).Should(BeNil(), "RayCluster %v not found", rayJob.Status.RayClusterName) + + // Check whether RayCluster is consistent with RayJob's RayClusterSpec. + Expect(rayCluster.Spec.WorkerGroupSpecs[0].Replicas).To(Equal(rayJob.Spec.RayClusterSpec.WorkerGroupSpecs[0].Replicas)) + Expect(rayCluster.Spec.RayVersion).To(Equal(rayJob.Spec.RayClusterSpec.RayVersion)) + + // TODO (kevin85421): Check the RayCluster labels and annotations. + Expect(rayCluster.Labels).Should(HaveKeyWithValue(utils.RayOriginatedFromCRNameLabelKey, rayJob.Name)) + Expect(rayCluster.Labels).Should(HaveKeyWithValue(utils.RayOriginatedFromCRDLabelKey, utils.RayOriginatedFromCRDLabelValue(utils.RayJobCRD))) + }) + + It("Make RayCluster.Status.State to be rayv1.Ready", func() { + // The RayCluster is not 'Ready' yet because Pods are not running and ready. + Expect(rayCluster.Status.State).NotTo(Equal(rayv1.Ready)) //nolint:staticcheck // https://github.com/ray-project/kuberay/pull/2288 + + updateHeadPodToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace) + updateWorkerPodsToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace) + + // The RayCluster.Status.State should be Ready. + Eventually( + getClusterState(ctx, namespace, rayCluster.Name), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.Ready)) + }) + + It("RayJobs's JobDeploymentStatus transitions from Initializing to Running.", func() { + Eventually( + getRayJobDeploymentStatus(ctx, rayJob), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusRunning), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus) + + // In Running state, the RayJob's Status.DashboardURL must be set. + Expect(rayJob.Status.DashboardURL).NotTo(BeEmpty()) + + // In Running state, the submitter Kubernetes Job must be created if this RayJob is in K8sJobMode. + namespacedName := common.RayJobK8sJobNamespacedName(rayJob) + job := &batchv1.Job{} + err := k8sClient.Get(ctx, namespacedName, job) + Expect(err).NotTo(HaveOccurred(), "failed to get Kubernetes Job") + }) + + It("RayJobs's JobDeploymentStatus transitions from Running to Complete.", func() { + // Update fake dashboard client to return job info with "Succeeded" status. + getJobInfo := func(context.Context, string) (*utils.RayJobInfo, error) { //nolint:unparam // This is a mock function so parameters are required + return &utils.RayJobInfo{JobStatus: rayv1.JobStatusSucceeded}, nil + } + fakeRayDashboardClient.GetJobInfoMock.Store(&getJobInfo) + defer fakeRayDashboardClient.GetJobInfoMock.Store(nil) + + // RayJob transitions to Complete if and only if the corresponding submitter Kubernetes Job is Complete or Failed. + Consistently( + getRayJobDeploymentStatus(ctx, rayJob), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusRunning), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus) + + // Update the submitter Kubernetes Job to Complete. + namespacedName := common.RayJobK8sJobNamespacedName(rayJob) + job := &batchv1.Job{} + err := k8sClient.Get(ctx, namespacedName, job) + Expect(err).NotTo(HaveOccurred(), "failed to get Kubernetes Job") + + // Update the submitter Kubernetes Job to Complete. + conditions := []batchv1.JobCondition{ + {Type: batchv1.JobComplete, Status: corev1.ConditionTrue}, + } + job.Status.Conditions = conditions + Expect(k8sClient.Status().Update(ctx, job)).Should(Succeed()) + + // RayJob transitions to Complete. + Eventually( + getRayJobDeploymentStatus(ctx, rayJob), + time.Second*5, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusComplete), "jobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus) + }) + + It("If DeletionPolicy=DeleteWorkers, all workers should be deleted, but not the Head pod and submitter Job", func() { + // RayCluster exists + Consistently( + getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Status.RayClusterName, Namespace: namespace}, rayCluster), + time.Second*3, time.Millisecond*500).Should(BeNil(), "RayCluster %v not found", rayJob.Status.RayClusterName) + + // Check worker group is suspended + Expect(*rayCluster.Spec.WorkerGroupSpecs[0].Suspend).To(Equal(true)) + + // 0 worker Pods exist + workerPods := corev1.PodList{} + workerLabels := common.RayClusterWorkerPodsAssociationOptions(rayCluster).ToListOptions() + Eventually( + listResourceFunc(ctx, &workerPods, workerLabels...), + time.Second*3, time.Millisecond*500).Should(Equal(0), "expected 0 workers") + + // Head Pod is still running + headPods := corev1.PodList{} + headLabels := common.RayClusterHeadPodsAssociationOptions(rayCluster).ToListOptions() + Consistently( + listResourceFunc(ctx, &headPods, headLabels...), + time.Second*3, time.Millisecond*500).Should(Equal(1), "Head pod list should have only 1 Pod = %v", headPods.Items) + + namespacedName := common.RayJobK8sJobNamespacedName(rayJob) + job := &batchv1.Job{} + Consistently( + getResourceFunc(ctx, namespacedName, job), + time.Second*3, time.Millisecond*500).Should(BeNil()) + }) + }) + + Describe("RayJob with DeletionPolicy=DeleteSelf", Ordered, func() { + ctx := context.Background() + namespace := "default" + rayJob := rayJobTemplate("rayjob-test-deleteself", namespace) + deletionPolicy := rayv1.DeleteSelfDeletionPolicy + rayJob.Spec.DeletionPolicy = &deletionPolicy + rayJob.Spec.ShutdownAfterJobFinishes = false + rayCluster := &rayv1.RayCluster{} + + It("Create a RayJob custom resource", func() { + err := k8sClient.Create(ctx, rayJob) + Expect(err).NotTo(HaveOccurred(), "Failed to create RayJob") + Eventually( + getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Name, Namespace: namespace}, rayJob), + time.Second*3, time.Millisecond*500).Should(BeNil(), "Should be able to see RayJob: %v", rayJob.Name) + }) + + It("RayJobs's JobDeploymentStatus transitions from New to Initializing.", func() { + Eventually( + getRayJobDeploymentStatus(ctx, rayJob), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusInitializing), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus) + + // In Initializing state, Status.RayClusterName, Status.JobId, and Status.StartTime must be set. + Expect(rayJob.Status.RayClusterName).NotTo(BeEmpty()) + Expect(rayJob.Status.JobId).NotTo(BeEmpty()) + Expect(rayJob.Status.StartTime).NotTo(BeNil()) + }) + + It("In Initializing state, the RayCluster should eventually be created.", func() { + Eventually( + getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Status.RayClusterName, Namespace: namespace}, rayCluster), + time.Second*3, time.Millisecond*500).Should(BeNil(), "RayCluster %v not found", rayJob.Status.RayClusterName) + + // Check whether RayCluster is consistent with RayJob's RayClusterSpec. + Expect(rayCluster.Spec.WorkerGroupSpecs[0].Replicas).To(Equal(rayJob.Spec.RayClusterSpec.WorkerGroupSpecs[0].Replicas)) + Expect(rayCluster.Spec.RayVersion).To(Equal(rayJob.Spec.RayClusterSpec.RayVersion)) + + // TODO (kevin85421): Check the RayCluster labels and annotations. + Expect(rayCluster.Labels).Should(HaveKeyWithValue(utils.RayOriginatedFromCRNameLabelKey, rayJob.Name)) + Expect(rayCluster.Labels).Should(HaveKeyWithValue(utils.RayOriginatedFromCRDLabelKey, utils.RayOriginatedFromCRDLabelValue(utils.RayJobCRD))) + }) + + It("Make RayCluster.Status.State to be rayv1.Ready", func() { + // The RayCluster is not 'Ready' yet because Pods are not running and ready. + Expect(rayCluster.Status.State).NotTo(Equal(rayv1.Ready)) //nolint:staticcheck // https://github.com/ray-project/kuberay/pull/2288 + + updateHeadPodToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace) + updateWorkerPodsToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace) + + // The RayCluster.Status.State should be Ready. + Eventually( + getClusterState(ctx, namespace, rayCluster.Name), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.Ready)) + }) + + It("RayJobs's JobDeploymentStatus transitions from Initializing to Running.", func() { + Eventually( + getRayJobDeploymentStatus(ctx, rayJob), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusRunning), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus) + + // In Running state, the RayJob's Status.DashboardURL must be set. + Expect(rayJob.Status.DashboardURL).NotTo(BeEmpty()) + + // In Running state, the submitter Kubernetes Job must be created if this RayJob is in K8sJobMode. + namespacedName := common.RayJobK8sJobNamespacedName(rayJob) + job := &batchv1.Job{} + err := k8sClient.Get(ctx, namespacedName, job) + Expect(err).NotTo(HaveOccurred(), "failed to get Kubernetes Job") + }) + + It("RayJobs's JobDeploymentStatus transitions from Running to Complete.", func() { + // Update fake dashboard client to return job info with "Succeeded" status. + getJobInfo := func(context.Context, string) (*utils.RayJobInfo, error) { //nolint:unparam // This is a mock function so parameters are required + return &utils.RayJobInfo{JobStatus: rayv1.JobStatusSucceeded}, nil + } + fakeRayDashboardClient.GetJobInfoMock.Store(&getJobInfo) + + // RayJob transitions to Complete if and only if the corresponding submitter Kubernetes Job is Complete or Failed. + Consistently( + getRayJobDeploymentStatus(ctx, rayJob), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusRunning), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus) + + // Update the submitter Kubernetes Job to Complete. + namespacedName := common.RayJobK8sJobNamespacedName(rayJob) + job := &batchv1.Job{} + err := k8sClient.Get(ctx, namespacedName, job) + Expect(err).NotTo(HaveOccurred(), "failed to get Kubernetes Job") + + // Update the submitter Kubernetes Job to Complete. + conditions := []batchv1.JobCondition{ + {Type: batchv1.JobComplete, Status: corev1.ConditionTrue}, + } + job.Status.Conditions = conditions + Expect(k8sClient.Status().Update(ctx, job)).Should(Succeed()) + }) + + It("If DeletionPolicy=DeleteSelf, the RayJob is deleted", func() { + Eventually( + func() bool { + return apierrors.IsNotFound(k8sClient.Get(ctx, client.ObjectKey{Name: rayJob.Name, Namespace: namespace}, rayJob)) + }, time.Second*5, time.Millisecond*500).Should(BeTrue()) + }) + }) + + Describe("RayJob with DeletionPolicy=DeleteNone", Ordered, func() { + features.SetFeatureGateDuringTest(GinkgoTB(), features.RayJobDeletionPolicy, true) + + ctx := context.Background() + namespace := "default" + rayJob := rayJobTemplate("rayjob-test-deletionpolicy-deletenone", namespace) + deletionPolicy := rayv1.DeleteNoneDeletionPolicy + rayJob.Spec.DeletionPolicy = &deletionPolicy + rayJob.Spec.ShutdownAfterJobFinishes = false + rayCluster := &rayv1.RayCluster{} + + It("Verify RayJob spec", func() { + Expect(*rayJob.Spec.DeletionPolicy).To(Equal(rayv1.DeleteNoneDeletionPolicy)) + }) + + It("Create a RayJob custom resource", func() { + err := k8sClient.Create(ctx, rayJob) + Expect(err).NotTo(HaveOccurred(), "Failed to create RayJob") + Eventually( + getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Name, Namespace: namespace}, rayJob), + time.Second*3, time.Millisecond*500).Should(BeNil(), "Should be able to see RayJob: %v", rayJob.Name) + }) + + It("RayJobs's JobDeploymentStatus transitions from New to Initializing.", func() { + Eventually( + getRayJobDeploymentStatus(ctx, rayJob), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusInitializing), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus) + + // In Initializing state, Status.RayClusterName, Status.JobId, and Status.StartTime must be set. + Expect(rayJob.Status.RayClusterName).NotTo(BeEmpty()) + Expect(rayJob.Status.JobId).NotTo(BeEmpty()) + Expect(rayJob.Status.StartTime).NotTo(BeNil()) + }) + + It("In Initializing state, the RayCluster should eventually be created.", func() { + Eventually( + getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Status.RayClusterName, Namespace: namespace}, rayCluster), + time.Second*3, time.Millisecond*500).Should(BeNil(), "RayCluster %v not found", rayJob.Status.RayClusterName) + + // Check whether RayCluster is consistent with RayJob's RayClusterSpec. + Expect(rayCluster.Spec.WorkerGroupSpecs[0].Replicas).To(Equal(rayJob.Spec.RayClusterSpec.WorkerGroupSpecs[0].Replicas)) + Expect(rayCluster.Spec.RayVersion).To(Equal(rayJob.Spec.RayClusterSpec.RayVersion)) + + // TODO (kevin85421): Check the RayCluster labels and annotations. + Expect(rayCluster.Labels).Should(HaveKeyWithValue(utils.RayOriginatedFromCRNameLabelKey, rayJob.Name)) + Expect(rayCluster.Labels).Should(HaveKeyWithValue(utils.RayOriginatedFromCRDLabelKey, utils.RayOriginatedFromCRDLabelValue(utils.RayJobCRD))) + }) + + It("Make RayCluster.Status.State to be rayv1.Ready", func() { + // The RayCluster is not 'Ready' yet because Pods are not running and ready. + Expect(rayCluster.Status.State).NotTo(Equal(rayv1.Ready)) //nolint:staticcheck // https://github.com/ray-project/kuberay/pull/2288 + + updateHeadPodToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace) + updateWorkerPodsToRunningAndReady(ctx, rayJob.Status.RayClusterName, namespace) + + // The RayCluster.Status.State should be Ready. + Eventually( + getClusterState(ctx, namespace, rayCluster.Name), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.Ready)) + }) + + It("RayJobs's JobDeploymentStatus transitions from Initializing to Running.", func() { + Eventually( + getRayJobDeploymentStatus(ctx, rayJob), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusRunning), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus) + + // In Running state, the RayJob's Status.DashboardURL must be set. + Expect(rayJob.Status.DashboardURL).NotTo(BeEmpty()) + + // In Running state, the submitter Kubernetes Job must be created if this RayJob is in K8sJobMode. + namespacedName := common.RayJobK8sJobNamespacedName(rayJob) + job := &batchv1.Job{} + err := k8sClient.Get(ctx, namespacedName, job) + Expect(err).NotTo(HaveOccurred(), "failed to get Kubernetes Job") + }) + + It("RayJobs's JobDeploymentStatus transitions from Running to Complete.", func() { + // Update fake dashboard client to return job info with "Succeeded" status. + getJobInfo := func(context.Context, string) (*utils.RayJobInfo, error) { //nolint:unparam // This is a mock function so parameters are required + return &utils.RayJobInfo{JobStatus: rayv1.JobStatusSucceeded}, nil + } + fakeRayDashboardClient.GetJobInfoMock.Store(&getJobInfo) + defer fakeRayDashboardClient.GetJobInfoMock.Store(nil) + + // RayJob transitions to Complete if and only if the corresponding submitter Kubernetes Job is Complete or Failed. + Consistently( + getRayJobDeploymentStatus(ctx, rayJob), + time.Second*3, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusRunning), "JobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus) + + // Update the submitter Kubernetes Job to Complete. + namespacedName := common.RayJobK8sJobNamespacedName(rayJob) + job := &batchv1.Job{} + err := k8sClient.Get(ctx, namespacedName, job) + Expect(err).NotTo(HaveOccurred(), "failed to get Kubernetes Job") + + // Update the submitter Kubernetes Job to Complete. + conditions := []batchv1.JobCondition{ + {Type: batchv1.JobComplete, Status: corev1.ConditionTrue}, + } + job.Status.Conditions = conditions + Expect(k8sClient.Status().Update(ctx, job)).Should(Succeed()) + + // RayJob transitions to Complete. + Eventually( + getRayJobDeploymentStatus(ctx, rayJob), + time.Second*5, time.Millisecond*500).Should(Equal(rayv1.JobDeploymentStatusComplete), "jobDeploymentStatus = %v", rayJob.Status.JobDeploymentStatus) + }) + + It("If DeletionPolicy=DeleteNone, no resources are deleted", func() { + // RayJob exists + Consistently( + getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Name, Namespace: namespace}, rayJob), + time.Second*3, time.Millisecond*500).Should(BeNil(), "RayJob %v not found", rayJob) + + // RayCluster exists + Consistently( + getResourceFunc(ctx, client.ObjectKey{Name: rayJob.Status.RayClusterName, Namespace: namespace}, rayCluster), + time.Second*3, time.Millisecond*500).Should(BeNil(), "RayCluster %v not found", rayJob.Status.RayClusterName) + + // Worker replicas set to 3 + Expect(*rayCluster.Spec.WorkerGroupSpecs[0].Replicas).To(Equal(int32(3))) + + // 3 worker Pods exist + workerPods := corev1.PodList{} + workerLabels := common.RayClusterWorkerPodsAssociationOptions(rayCluster).ToListOptions() + Consistently( + listResourceFunc(ctx, &workerPods, workerLabels...), + time.Second*3, time.Millisecond*500).Should(Equal(3), "expected 3 workers") + + // Head Pod is still running + headPods := corev1.PodList{} + headLabels := common.RayClusterHeadPodsAssociationOptions(rayCluster).ToListOptions() + Consistently( + listResourceFunc(ctx, &headPods, headLabels...), + time.Second*3, time.Millisecond*500).Should(Equal(1), "Head pod list should have only 1 Pod = %v", headPods.Items) + + namespacedName := common.RayJobK8sJobNamespacedName(rayJob) + job := &batchv1.Job{} + Consistently( + getResourceFunc(ctx, namespacedName, job), + time.Second*3, time.Millisecond*500).Should(BeNil()) + }) + }) })