diff --git a/pkg/controller/statefulset/stateful_pod_control.go b/pkg/controller/statefulset/stateful_pod_control.go index b995d0892f5b2..985127c1d2d04 100644 --- a/pkg/controller/statefulset/stateful_pod_control.go +++ b/pkg/controller/statefulset/stateful_pod_control.go @@ -50,6 +50,8 @@ type StatefulPodControlInterface interface { // DeleteStatefulPod deletes a Pod in a StatefulSet. The pods PVCs are not deleted. If the delete is successful, // the returned error is nil. DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error + // CreateMissingPersistentVolumeClaims creates all of the required PersistentVolumeClaims for pod, and updates its retention policy + CreateMissingPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error } func NewRealStatefulPodControl( @@ -174,6 +176,14 @@ func (spc *realStatefulPodControl) recordClaimEvent(verb string, set *apps.State } } +func (spc *realStatefulPodControl) CreateMissingPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error { + if err := spc.createPersistentVolumeClaims(set, pod); err != nil { + return err + } + + return nil +} + // createPersistentVolumeClaims creates all of the required PersistentVolumeClaims for pod, which must be a member of // set. If all of the claims for Pod are successfully created, the returned error is nil. If creation fails, this method // may be called again until no error is returned, indicating the PersistentVolumeClaims for pod are consistent with diff --git a/pkg/controller/statefulset/stateful_set_control.go b/pkg/controller/statefulset/stateful_set_control.go index d43b8e621c199..8e812313c03cb 100644 --- a/pkg/controller/statefulset/stateful_set_control.go +++ b/pkg/controller/statefulset/stateful_set_control.go @@ -434,6 +434,18 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet( // pod created, no more work possible for this round continue } + + // If the Pod is in pending state then trigger PVC creation to create missing PVCs + if isPending(replicas[i]) { + klog.V(4).Infof( + "StatefulSet %s/%s is triggering PVC creation for pending Pod %s", + set.Namespace, + set.Name, + replicas[i].Name) + if err := ssc.podControl.CreateMissingPersistentVolumeClaims(set, replicas[i]); err != nil { + return &status, err + } + } // If we find a Pod that is currently terminating, we must wait until graceful deletion // completes before we continue to make progress. if isTerminating(replicas[i]) && monotonic { diff --git a/pkg/controller/statefulset/stateful_set_control_test.go b/pkg/controller/statefulset/stateful_set_control_test.go index 6911e799fb245..7b10d94a41b58 100644 --- a/pkg/controller/statefulset/stateful_set_control_test.go +++ b/pkg/controller/statefulset/stateful_set_control_test.go @@ -98,6 +98,7 @@ func TestStatefulSetControl(t *testing.T) { {UpdatePodFailure, simpleSetFn}, {UpdateSetStatusFailure, simpleSetFn}, {PodRecreateDeleteFailure, simpleSetFn}, + {RecreatesPVCForPendingPod, simpleSetFn}, } for _, testCase := range testCases { @@ -451,6 +452,45 @@ func PodRecreateDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants in } } +func RecreatesPVCForPendingPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) { + client := fake.NewSimpleClientset() + om, _, ssc, _ := setupController(client) + selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector) + if err != nil { + t.Error(err) + } + pods, err := om.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Error(err) + } + if _, err := ssc.UpdateStatefulSet(set, pods); err != nil { + t.Errorf("Error updating StatefulSet %s", err) + } + if err := invariants(set, om); err != nil { + t.Error(err) + } + pods, err = om.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Error(err) + } + for _, claim := range getPersistentVolumeClaims(set, pods[0]) { + om.claimsIndexer.Delete(&claim) + } + pods[0].Status.Phase = v1.PodPending + om.podsIndexer.Update(pods[0]) + if _, err := ssc.UpdateStatefulSet(set, pods); err != nil { + t.Errorf("Error updating StatefulSet %s", err) + } + // invariants check if there any missing PVCs for the Pods + if err := invariants(set, om); err != nil { + t.Error(err) + } + _, err = om.podsLister.Pods(set.Namespace).List(selector) + if err != nil { + t.Error(err) + } +} + func TestStatefulSetControlScaleDownDeleteError(t *testing.T) { invariants := assertMonotonicInvariants set := newStatefulSet(3) @@ -1874,6 +1914,13 @@ func (spc *fakeStatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod return nil } +func (spc *fakeStatefulPodControl) CreateMissingPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error { + for _, claim := range getPersistentVolumeClaims(set, pod) { + spc.claimsIndexer.Update(&claim) + } + return nil +} + var _ StatefulPodControlInterface = &fakeStatefulPodControl{} type fakeStatefulSetStatusUpdater struct { diff --git a/pkg/controller/statefulset/stateful_set_utils.go b/pkg/controller/statefulset/stateful_set_utils.go index 88ae93d2834b9..1e13bcfbd5b86 100644 --- a/pkg/controller/statefulset/stateful_set_utils.go +++ b/pkg/controller/statefulset/stateful_set_utils.go @@ -213,6 +213,11 @@ func isCreated(pod *v1.Pod) bool { return pod.Status.Phase != "" } +// isPending returns true if pod has a Phase of PodPending +func isPending(pod *v1.Pod) bool { + return pod.Status.Phase == v1.PodPending +} + // isFailed returns true if pod has a Phase of PodFailed func isFailed(pod *v1.Pod) bool { return pod.Status.Phase == v1.PodFailed diff --git a/test/e2e/apps/statefulset.go b/test/e2e/apps/statefulset.go index d3924189c3469..c3f02ef167c76 100644 --- a/test/e2e/apps/statefulset.go +++ b/test/e2e/apps/statefulset.go @@ -20,6 +20,8 @@ import ( "context" "encoding/json" "fmt" + "regexp" + "strconv" "strings" "sync" "time" @@ -36,6 +38,7 @@ import ( "k8s.io/apimachinery/pkg/runtime/schema" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/intstr" + "k8s.io/apimachinery/pkg/util/strategicpatch" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/apimachinery/pkg/watch" clientset "k8s.io/client-go/kubernetes" @@ -1154,8 +1157,166 @@ var _ = SIGDescribe("StatefulSet", func() { framework.ExpectNoError(err) e2estatefulset.WaitForStatusAvailableReplicas(c, ss, 1) }) + + ginkgo.Describe("Automatically recreate PVC for pending pod when PVC is missing", func() { + ssName := "ss" + labels := map[string]string{ + "foo": "bar", + "baz": "blah", + } + headlessSvcName := "test" + var statefulPodMounts []v1.VolumeMount + var ss *appsv1.StatefulSet + + ginkgo.BeforeEach(func(ctx context.Context) { + statefulPodMounts = []v1.VolumeMount{{Name: "datadir", MountPath: "/data/"}} + ss = e2estatefulset.NewStatefulSet(ssName, ns, headlessSvcName, 1, statefulPodMounts, nil, labels) + }) + + ginkgo.AfterEach(func(ctx context.Context) { + if ginkgo.CurrentGinkgoTestDescription().Failed { + framework.DumpDebugInfo(c, ns) + } + framework.Logf("Deleting all statefulset in ns %v", ns) + e2estatefulset.DeleteAllStatefulSets(c, ns) + }) + + ginkgo.It("PVC should be recreated when pod is pending due to missing PVC [Disruptive][Serial]", func(ctx context.Context) { + e2epv.SkipIfNoDefaultStorageClass(c) + + readyNode, err := e2enode.GetRandomReadySchedulableNode(c) + framework.ExpectNoError(err) + hostLabel := "kubernetes.io/hostname" + hostLabelVal := readyNode.Labels[hostLabel] + + ss.Spec.Template.Spec.NodeSelector = map[string]string{hostLabel: hostLabelVal} // force the pod on a specific node + ginkgo.By("Creating statefulset " + ssName + " in namespace " + ns) + _, err = c.AppsV1().StatefulSets(ns).Create(context.TODO(), ss, metav1.CreateOptions{}) + framework.ExpectNoError(err) + + ginkgo.By("Confirming PVC exists") + err = verifyStatefulSetPVCsExist(c, ss, []int{0}) + framework.ExpectNoError(err) + + ginkgo.By("Confirming Pod is ready") + e2estatefulset.WaitForStatusReadyReplicas(c, ss, 1) + podName := getStatefulSetPodNameAtIndex(0, ss) + pod, err := c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{}) + framework.ExpectNoError(err) + + nodeName := pod.Spec.NodeName + framework.ExpectEqual(nodeName, readyNode.Name) + node, err := c.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{}) + framework.ExpectNoError(err) + + oldData, err := json.Marshal(node) + framework.ExpectNoError(err) + + node.Spec.Unschedulable = true + + newData, err := json.Marshal(node) + framework.ExpectNoError(err) + + // cordon node, to make sure pod does not get scheduled to the node until the pvc is deleted + patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{}) + framework.ExpectNoError(err) + ginkgo.By("Cordoning Node") + _, err = c.CoreV1().Nodes().Patch(context.TODO(), nodeName, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{}) + framework.ExpectNoError(err) + cordoned := true + + defer func() { + if cordoned { + uncordonNode(c, oldData, newData, nodeName) + } + }() + + // wait for the node to be unschedulable + e2enode.WaitForNodeSchedulable(c, nodeName, 10*time.Second, false) + + ginkgo.By("Deleting Pod") + err = c.CoreV1().Pods(ns).Delete(context.TODO(), podName, metav1.DeleteOptions{}) + framework.ExpectNoError(err) + + // wait for the pod to be recreated + e2estatefulset.WaitForStatusCurrentReplicas(c, ss, 1) + _, err = c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{}) + framework.ExpectNoError(err) + + pvcList, err := c.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), metav1.ListOptions{LabelSelector: klabels.Everything().String()}) + framework.ExpectNoError(err) + framework.ExpectEqual(len(pvcList.Items), 1) + pvcName := pvcList.Items[0].Name + + ginkgo.By("Deleting PVC") + err = c.CoreV1().PersistentVolumeClaims(ns).Delete(context.TODO(), pvcName, metav1.DeleteOptions{}) + framework.ExpectNoError(err) + + uncordonNode(c, oldData, newData, nodeName) + cordoned = false + + ginkgo.By("Confirming PVC recreated") + err = verifyStatefulSetPVCsExist(c, ss, []int{0}) + framework.ExpectNoError(err) + + ginkgo.By("Confirming Pod is ready after being recreated") + e2estatefulset.WaitForStatusReadyReplicas(c, ss, 1) + pod, err = c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{}) + framework.ExpectNoError(err) + framework.ExpectEqual(pod.Spec.NodeName, readyNode.Name) // confirm the pod was scheduled back to the original node + }) + }) }) +// verifyStatefulSetPVCsExist confirms that exactly the PVCs for ss with the specified ids exist. This polls until the situation occurs, an error happens, or until timeout (in the latter case an error is also returned). Beware that this cannot tell if a PVC will be deleted at some point in the future, so if used to confirm that no PVCs are deleted, the caller should wait for some event giving the PVCs a reasonable chance to be deleted, before calling this function. +func verifyStatefulSetPVCsExist(c clientset.Interface, ss *appsv1.StatefulSet, claimIds []int) error { + idSet := map[int]struct{}{} + for _, id := range claimIds { + idSet[id] = struct{}{} + } + return wait.PollImmediate(e2estatefulset.StatefulSetPoll, e2estatefulset.StatefulSetTimeout, func() (bool, error) { + pvcList, err := c.CoreV1().PersistentVolumeClaims(ss.Namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: klabels.Everything().String()}) + if err != nil { + framework.Logf("WARNING: Failed to list pvcs for verification, retrying: %v", err) + return false, nil + } + for _, claim := range ss.Spec.VolumeClaimTemplates { + pvcNameRE := regexp.MustCompile(fmt.Sprintf("^%s-%s-([0-9]+)$", claim.Name, ss.Name)) + seenPVCs := map[int]struct{}{} + for _, pvc := range pvcList.Items { + matches := pvcNameRE.FindStringSubmatch(pvc.Name) + if len(matches) != 2 { + continue + } + ordinal, err := strconv.ParseInt(matches[1], 10, 32) + if err != nil { + framework.Logf("ERROR: bad pvc name %s (%v)", pvc.Name, err) + return false, err + } + if _, found := idSet[int(ordinal)]; !found { + return false, nil // Retry until the PVCs are consistent. + } else { + seenPVCs[int(ordinal)] = struct{}{} + } + } + if len(seenPVCs) != len(idSet) { + framework.Logf("Found %d of %d PVCs", len(seenPVCs), len(idSet)) + return false, nil // Retry until the PVCs are consistent. + } + } + return true, nil + }) +} + +func uncordonNode(c clientset.Interface, oldData, newData []byte, nodeName string) { + ginkgo.By("Uncordoning Node") + // uncordon node, by reverting patch + revertPatchBytes, err := strategicpatch.CreateTwoWayMergePatch(newData, oldData, v1.Node{}) + framework.ExpectNoError(err) + _, err = c.CoreV1().Nodes().Patch(context.TODO(), nodeName, types.StrategicMergePatchType, revertPatchBytes, metav1.PatchOptions{}) + framework.ExpectNoError(err) +} + func kubectlExecWithRetries(ns string, args ...string) (out string) { var err error for i := 0; i < 3; i++ { diff --git a/test/e2e/framework/node/wait.go b/test/e2e/framework/node/wait.go index c903ab0779c86..6ded5457103b6 100644 --- a/test/e2e/framework/node/wait.go +++ b/test/e2e/framework/node/wait.go @@ -143,6 +143,23 @@ func WaitForNodeToBeReady(c clientset.Interface, name string, timeout time.Durat return WaitConditionToBe(c, name, v1.NodeReady, true, timeout) } +func WaitForNodeSchedulable(c clientset.Interface, name string, timeout time.Duration, wantSchedulable bool) bool { + e2elog.Logf("Waiting up to %v for node %s to be schedulable: %t", timeout, name, wantSchedulable) + for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) { + node, err := c.CoreV1().Nodes().Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + e2elog.Logf("Couldn't get node %s", name) + continue + } + + if IsNodeSchedulable(node) == wantSchedulable { + return true + } + } + e2elog.Logf("Node %s didn't reach desired schedulable status (%t) within %v", name, wantSchedulable, timeout) + return false +} + // CheckReady waits up to timeout for cluster to has desired size and // there is no not-ready nodes in it. By cluster size we mean number of schedulable Nodes. func CheckReady(c clientset.Interface, size int, timeout time.Duration) ([]v1.Node, error) { diff --git a/test/e2e/framework/statefulset/wait.go b/test/e2e/framework/statefulset/wait.go index b1fb70a5cfe52..740013a86d750 100644 --- a/test/e2e/framework/statefulset/wait.go +++ b/test/e2e/framework/statefulset/wait.go @@ -171,6 +171,31 @@ func WaitForStatusReplicas(c clientset.Interface, ss *appsv1.StatefulSet, expect } } +// WaitForStatusCurrentReplicas waits for the ss.Status.CurrentReplicas to be equal to expectedReplicas +func WaitForStatusCurrentReplicas(c clientset.Interface, ss *appsv1.StatefulSet, expectedReplicas int32) { + framework.Logf("Waiting for statefulset status.currentReplicas updated to %d", expectedReplicas) + + ns, name := ss.Namespace, ss.Name + pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout, + func() (bool, error) { + ssGet, err := c.AppsV1().StatefulSets(ns).Get(context.TODO(), name, metav1.GetOptions{}) + if err != nil { + return false, err + } + if ssGet.Status.ObservedGeneration < ss.Generation { + return false, nil + } + if ssGet.Status.CurrentReplicas != expectedReplicas { + framework.Logf("Waiting for stateful set status.currentReplicas to become %d, currently %d", expectedReplicas, ssGet.Status.CurrentReplicas) + return false, nil + } + return true, nil + }) + if pollErr != nil { + framework.Failf("Failed waiting for stateful set status.currentReplicas updated to %d: %v", expectedReplicas, pollErr) + } +} + // Saturate waits for all Pods in ss to become Running and Ready. func Saturate(c clientset.Interface, ss *appsv1.StatefulSet) { var i int32