Skip to content

Commit

Permalink
[Feature] Add an e2e test for K8s Job submitter failures (#2688)
Browse files Browse the repository at this point in the history
  • Loading branch information
simotw authored Dec 28, 2024
1 parent c62910f commit 9d25660
Show file tree
Hide file tree
Showing 2 changed files with 129 additions and 0 deletions.
26 changes: 26 additions & 0 deletions ray-operator/test/e2e/long_running_counter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
import ray
import os
import time

ray.init()

@ray.remote
class Counter:
def __init__(self):
# Used to verify runtimeEnv
self.name = os.getenv("counter_name")
assert self.name == "test_counter"
self.counter = 0

def inc(self):
self.counter += 1

def get_counter(self):
return "{} got {}".format(self.name, self.counter)

counter = Counter.remote()

for _ in range(60):
ray.get(counter.inc.remote())
print(ray.get(counter.get_counter.remote()))
time.sleep(1)
103 changes: 103 additions & 0 deletions ray-operator/test/e2e/rayjob_recovery_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
package e2e

import (
"fmt"
"testing"
"time"

. "github.com/onsi/gomega"
corev1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

rayv1 "github.com/ray-project/kuberay/ray-operator/apis/ray/v1"
rayv1ac "github.com/ray-project/kuberay/ray-operator/pkg/client/applyconfiguration/ray/v1"
. "github.com/ray-project/kuberay/ray-operator/test/support"
)

func TestRayJobRecovery(t *testing.T) {
test := With(t)
g := NewWithT(t)

// Create a namespace
namespace := test.NewTestNamespace()

// Job scripts
jobsAC := newConfigMap(namespace.Name, files(test, "long_running_counter.py"))
jobs, err := test.Client().Core().CoreV1().ConfigMaps(namespace.Name).Apply(test.Ctx(), jobsAC, TestApplyOptions)
g.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created ConfigMap %s/%s successfully", jobs.Namespace, jobs.Name)

test.T().Run("RayJob should recover after pod deletion", func(_ *testing.T) {
rayJobAC := rayv1ac.RayJob("counter", namespace.Name).
WithSpec(rayv1ac.RayJobSpec().
WithRayClusterSpec(newRayClusterSpec(mountConfigMap[rayv1ac.RayClusterSpecApplyConfiguration](jobs, "/home/ray/jobs"))).
WithEntrypoint("python /home/ray/jobs/long_running_counter.py").
WithRuntimeEnvYAML(`
env_vars:
counter_name: test_counter
`).
WithShutdownAfterJobFinishes(true).
WithSubmitterPodTemplate(jobSubmitterPodTemplateApplyConfiguration()))

rayJob, err := test.Client().Ray().RayV1().RayJobs(namespace.Name).Apply(test.Ctx(), rayJobAC, TestApplyOptions)
g.Expect(err).NotTo(HaveOccurred())
test.T().Logf("Created RayJob %s/%s successfully", rayJob.Namespace, rayJob.Name)

test.T().Logf("Waiting for RayJob %s/%s to start running", rayJob.Namespace, rayJob.Name)
g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium).
Should(WithTransform(RayJobStatus, Equal(rayv1.JobStatusRunning)))
test.T().Logf("Find RayJob %s/%s running", rayJob.Namespace, rayJob.Name)
// wait for the job to run a bit
test.T().Logf("Sleep RayJob %s/%s 15 seconds", rayJob.Namespace, rayJob.Name)
time.Sleep(15 * time.Second)

// get the running jobpods
jobpods, err := test.Client().Core().CoreV1().Pods(namespace.Name).List(test.Ctx(), metav1.ListOptions{
LabelSelector: fmt.Sprintf("job-name=%s", rayJob.Name),
})
g.Expect(err).NotTo(HaveOccurred())

// remove the running jobpods
propagationPolicy := metav1.DeletePropagationBackground
for _, pod := range jobpods.Items {
test.T().Logf("Delete Pod %s from namespace %s", pod.Name, rayJob.Namespace)
err = test.Client().Core().CoreV1().Pods(namespace.Name).Delete(test.Ctx(), pod.Name, metav1.DeleteOptions{
PropagationPolicy: &propagationPolicy,
})
g.Expect(err).NotTo(HaveOccurred())
}

test.T().Logf("Waiting for new pod to be created and running for RayJob %s/%s", namespace.Name, rayJob.Name)
g.Eventually(func() ([]corev1.Pod, error) {
pods, err := test.Client().Core().CoreV1().Pods(namespace.Name).List(
test.Ctx(),
metav1.ListOptions{
LabelSelector: fmt.Sprintf("job-name=%s", rayJob.Name),
},
)
g.Expect(err).NotTo(HaveOccurred())
return pods.Items, nil
}, TestTimeoutMedium).Should(
WithTransform(func(pods []corev1.Pod) bool {
for _, pod := range pods {
if pod.Status.Phase == corev1.PodRunning {
for _, oldPod := range jobpods.Items {
if pod.Name == oldPod.Name {
continue
}
}
test.T().Logf("Found new running pod %s/%s", pod.Namespace, pod.Name)
return true
}
}
return false
}, BeTrue()),
)

g.Eventually(RayJob(test, rayJob.Namespace, rayJob.Name), TestTimeoutMedium).
Should(WithTransform(RayJobStatus, Equal(rayv1.JobStatusSucceeded)))

g.Eventually(RayJob(test, namespace.Name, rayJob.Name), TestTimeoutMedium).
Should(WithTransform(RayJobDeploymentStatus, Equal(rayv1.JobDeploymentStatusComplete)))
})
}

0 comments on commit 9d25660

Please sign in to comment.