Skip to content

Commit

Permalink
Merge pull request kubernetes#113270 from rrangith/fix/create-pvc-for…
Browse files Browse the repository at this point in the history
…-pending-pod

Automatically recreate PVC for pending STS pod

datadog:patch
  • Loading branch information
k8s-ci-robot authored and nyodas committed May 3, 2023
1 parent 345742c commit 38dd835
Show file tree
Hide file tree
Showing 7 changed files with 277 additions and 0 deletions.
10 changes: 10 additions & 0 deletions pkg/controller/statefulset/stateful_pod_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type StatefulPodControlInterface interface {
// DeleteStatefulPod deletes a Pod in a StatefulSet. The pods PVCs are not deleted. If the delete is successful,
// the returned error is nil.
DeleteStatefulPod(set *apps.StatefulSet, pod *v1.Pod) error
// CreateMissingPersistentVolumeClaims creates all of the required PersistentVolumeClaims for pod, and updates its retention policy
CreateMissingPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error
}

func NewRealStatefulPodControl(
Expand Down Expand Up @@ -174,6 +176,14 @@ func (spc *realStatefulPodControl) recordClaimEvent(verb string, set *apps.State
}
}

func (spc *realStatefulPodControl) CreateMissingPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error {
if err := spc.createPersistentVolumeClaims(set, pod); err != nil {
return err
}

return nil
}

// createPersistentVolumeClaims creates all of the required PersistentVolumeClaims for pod, which must be a member of
// set. If all of the claims for Pod are successfully created, the returned error is nil. If creation fails, this method
// may be called again until no error is returned, indicating the PersistentVolumeClaims for pod are consistent with
Expand Down
12 changes: 12 additions & 0 deletions pkg/controller/statefulset/stateful_set_control.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,6 +434,18 @@ func (ssc *defaultStatefulSetControl) updateStatefulSet(
// pod created, no more work possible for this round
continue
}

// If the Pod is in pending state then trigger PVC creation to create missing PVCs
if isPending(replicas[i]) {
klog.V(4).Infof(
"StatefulSet %s/%s is triggering PVC creation for pending Pod %s",
set.Namespace,
set.Name,
replicas[i].Name)
if err := ssc.podControl.CreateMissingPersistentVolumeClaims(set, replicas[i]); err != nil {
return &status, err
}
}
// If we find a Pod that is currently terminating, we must wait until graceful deletion
// completes before we continue to make progress.
if isTerminating(replicas[i]) && monotonic {
Expand Down
47 changes: 47 additions & 0 deletions pkg/controller/statefulset/stateful_set_control_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ func TestStatefulSetControl(t *testing.T) {
{UpdatePodFailure, simpleSetFn},
{UpdateSetStatusFailure, simpleSetFn},
{PodRecreateDeleteFailure, simpleSetFn},
{RecreatesPVCForPendingPod, simpleSetFn},
}

for _, testCase := range testCases {
Expand Down Expand Up @@ -451,6 +452,45 @@ func PodRecreateDeleteFailure(t *testing.T, set *apps.StatefulSet, invariants in
}
}

func RecreatesPVCForPendingPod(t *testing.T, set *apps.StatefulSet, invariants invariantFunc) {
client := fake.NewSimpleClientset()
om, _, ssc, _ := setupController(client)
selector, err := metav1.LabelSelectorAsSelector(set.Spec.Selector)
if err != nil {
t.Error(err)
}
pods, err := om.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Error(err)
}
if _, err := ssc.UpdateStatefulSet(set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
if err := invariants(set, om); err != nil {
t.Error(err)
}
pods, err = om.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Error(err)
}
for _, claim := range getPersistentVolumeClaims(set, pods[0]) {
om.claimsIndexer.Delete(&claim)
}
pods[0].Status.Phase = v1.PodPending
om.podsIndexer.Update(pods[0])
if _, err := ssc.UpdateStatefulSet(set, pods); err != nil {
t.Errorf("Error updating StatefulSet %s", err)
}
// invariants check if there any missing PVCs for the Pods
if err := invariants(set, om); err != nil {
t.Error(err)
}
_, err = om.podsLister.Pods(set.Namespace).List(selector)
if err != nil {
t.Error(err)
}
}

func TestStatefulSetControlScaleDownDeleteError(t *testing.T) {
invariants := assertMonotonicInvariants
set := newStatefulSet(3)
Expand Down Expand Up @@ -1874,6 +1914,13 @@ func (spc *fakeStatefulPodControl) DeleteStatefulPod(set *apps.StatefulSet, pod
return nil
}

func (spc *fakeStatefulPodControl) CreateMissingPersistentVolumeClaims(set *apps.StatefulSet, pod *v1.Pod) error {
for _, claim := range getPersistentVolumeClaims(set, pod) {
spc.claimsIndexer.Update(&claim)
}
return nil
}

var _ StatefulPodControlInterface = &fakeStatefulPodControl{}

type fakeStatefulSetStatusUpdater struct {
Expand Down
5 changes: 5 additions & 0 deletions pkg/controller/statefulset/stateful_set_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ func isCreated(pod *v1.Pod) bool {
return pod.Status.Phase != ""
}

// isPending returns true if pod has a Phase of PodPending
func isPending(pod *v1.Pod) bool {
return pod.Status.Phase == v1.PodPending
}

// isFailed returns true if pod has a Phase of PodFailed
func isFailed(pod *v1.Pod) bool {
return pod.Status.Phase == v1.PodFailed
Expand Down
161 changes: 161 additions & 0 deletions test/e2e/apps/statefulset.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ import (
"context"
"encoding/json"
"fmt"
"regexp"
"strconv"
"strings"
"sync"
"time"
Expand All @@ -36,6 +38,7 @@ import (
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/strategicpatch"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/apimachinery/pkg/watch"
clientset "k8s.io/client-go/kubernetes"
Expand Down Expand Up @@ -1154,8 +1157,166 @@ var _ = SIGDescribe("StatefulSet", func() {
framework.ExpectNoError(err)
e2estatefulset.WaitForStatusAvailableReplicas(c, ss, 1)
})

ginkgo.Describe("Automatically recreate PVC for pending pod when PVC is missing", func() {
ssName := "ss"
labels := map[string]string{
"foo": "bar",
"baz": "blah",
}
headlessSvcName := "test"
var statefulPodMounts []v1.VolumeMount
var ss *appsv1.StatefulSet

ginkgo.BeforeEach(func(ctx context.Context) {
statefulPodMounts = []v1.VolumeMount{{Name: "datadir", MountPath: "/data/"}}
ss = e2estatefulset.NewStatefulSet(ssName, ns, headlessSvcName, 1, statefulPodMounts, nil, labels)
})

ginkgo.AfterEach(func(ctx context.Context) {
if ginkgo.CurrentGinkgoTestDescription().Failed {
framework.DumpDebugInfo(c, ns)
}
framework.Logf("Deleting all statefulset in ns %v", ns)
e2estatefulset.DeleteAllStatefulSets(c, ns)
})

ginkgo.It("PVC should be recreated when pod is pending due to missing PVC [Disruptive][Serial]", func(ctx context.Context) {
e2epv.SkipIfNoDefaultStorageClass(c)

readyNode, err := e2enode.GetRandomReadySchedulableNode(c)
framework.ExpectNoError(err)
hostLabel := "kubernetes.io/hostname"
hostLabelVal := readyNode.Labels[hostLabel]

ss.Spec.Template.Spec.NodeSelector = map[string]string{hostLabel: hostLabelVal} // force the pod on a specific node
ginkgo.By("Creating statefulset " + ssName + " in namespace " + ns)
_, err = c.AppsV1().StatefulSets(ns).Create(context.TODO(), ss, metav1.CreateOptions{})
framework.ExpectNoError(err)

ginkgo.By("Confirming PVC exists")
err = verifyStatefulSetPVCsExist(c, ss, []int{0})
framework.ExpectNoError(err)

ginkgo.By("Confirming Pod is ready")
e2estatefulset.WaitForStatusReadyReplicas(c, ss, 1)
podName := getStatefulSetPodNameAtIndex(0, ss)
pod, err := c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{})
framework.ExpectNoError(err)

nodeName := pod.Spec.NodeName
framework.ExpectEqual(nodeName, readyNode.Name)
node, err := c.CoreV1().Nodes().Get(context.TODO(), nodeName, metav1.GetOptions{})
framework.ExpectNoError(err)

oldData, err := json.Marshal(node)
framework.ExpectNoError(err)

node.Spec.Unschedulable = true

newData, err := json.Marshal(node)
framework.ExpectNoError(err)

// cordon node, to make sure pod does not get scheduled to the node until the pvc is deleted
patchBytes, err := strategicpatch.CreateTwoWayMergePatch(oldData, newData, v1.Node{})
framework.ExpectNoError(err)
ginkgo.By("Cordoning Node")
_, err = c.CoreV1().Nodes().Patch(context.TODO(), nodeName, types.StrategicMergePatchType, patchBytes, metav1.PatchOptions{})
framework.ExpectNoError(err)
cordoned := true

defer func() {
if cordoned {
uncordonNode(c, oldData, newData, nodeName)
}
}()

// wait for the node to be unschedulable
e2enode.WaitForNodeSchedulable(c, nodeName, 10*time.Second, false)

ginkgo.By("Deleting Pod")
err = c.CoreV1().Pods(ns).Delete(context.TODO(), podName, metav1.DeleteOptions{})
framework.ExpectNoError(err)

// wait for the pod to be recreated
e2estatefulset.WaitForStatusCurrentReplicas(c, ss, 1)
_, err = c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{})
framework.ExpectNoError(err)

pvcList, err := c.CoreV1().PersistentVolumeClaims(ns).List(context.TODO(), metav1.ListOptions{LabelSelector: klabels.Everything().String()})
framework.ExpectNoError(err)
framework.ExpectEqual(len(pvcList.Items), 1)
pvcName := pvcList.Items[0].Name

ginkgo.By("Deleting PVC")
err = c.CoreV1().PersistentVolumeClaims(ns).Delete(context.TODO(), pvcName, metav1.DeleteOptions{})
framework.ExpectNoError(err)

uncordonNode(c, oldData, newData, nodeName)
cordoned = false

ginkgo.By("Confirming PVC recreated")
err = verifyStatefulSetPVCsExist(c, ss, []int{0})
framework.ExpectNoError(err)

ginkgo.By("Confirming Pod is ready after being recreated")
e2estatefulset.WaitForStatusReadyReplicas(c, ss, 1)
pod, err = c.CoreV1().Pods(ns).Get(context.TODO(), podName, metav1.GetOptions{})
framework.ExpectNoError(err)
framework.ExpectEqual(pod.Spec.NodeName, readyNode.Name) // confirm the pod was scheduled back to the original node
})
})
})

// verifyStatefulSetPVCsExist confirms that exactly the PVCs for ss with the specified ids exist. This polls until the situation occurs, an error happens, or until timeout (in the latter case an error is also returned). Beware that this cannot tell if a PVC will be deleted at some point in the future, so if used to confirm that no PVCs are deleted, the caller should wait for some event giving the PVCs a reasonable chance to be deleted, before calling this function.
func verifyStatefulSetPVCsExist(c clientset.Interface, ss *appsv1.StatefulSet, claimIds []int) error {
idSet := map[int]struct{}{}
for _, id := range claimIds {
idSet[id] = struct{}{}
}
return wait.PollImmediate(e2estatefulset.StatefulSetPoll, e2estatefulset.StatefulSetTimeout, func() (bool, error) {
pvcList, err := c.CoreV1().PersistentVolumeClaims(ss.Namespace).List(context.TODO(), metav1.ListOptions{LabelSelector: klabels.Everything().String()})
if err != nil {
framework.Logf("WARNING: Failed to list pvcs for verification, retrying: %v", err)
return false, nil
}
for _, claim := range ss.Spec.VolumeClaimTemplates {
pvcNameRE := regexp.MustCompile(fmt.Sprintf("^%s-%s-([0-9]+)$", claim.Name, ss.Name))
seenPVCs := map[int]struct{}{}
for _, pvc := range pvcList.Items {
matches := pvcNameRE.FindStringSubmatch(pvc.Name)
if len(matches) != 2 {
continue
}
ordinal, err := strconv.ParseInt(matches[1], 10, 32)
if err != nil {
framework.Logf("ERROR: bad pvc name %s (%v)", pvc.Name, err)
return false, err
}
if _, found := idSet[int(ordinal)]; !found {
return false, nil // Retry until the PVCs are consistent.
} else {
seenPVCs[int(ordinal)] = struct{}{}
}
}
if len(seenPVCs) != len(idSet) {
framework.Logf("Found %d of %d PVCs", len(seenPVCs), len(idSet))
return false, nil // Retry until the PVCs are consistent.
}
}
return true, nil
})
}

func uncordonNode(c clientset.Interface, oldData, newData []byte, nodeName string) {
ginkgo.By("Uncordoning Node")
// uncordon node, by reverting patch
revertPatchBytes, err := strategicpatch.CreateTwoWayMergePatch(newData, oldData, v1.Node{})
framework.ExpectNoError(err)
_, err = c.CoreV1().Nodes().Patch(context.TODO(), nodeName, types.StrategicMergePatchType, revertPatchBytes, metav1.PatchOptions{})
framework.ExpectNoError(err)
}

func kubectlExecWithRetries(ns string, args ...string) (out string) {
var err error
for i := 0; i < 3; i++ {
Expand Down
17 changes: 17 additions & 0 deletions test/e2e/framework/node/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,23 @@ func WaitForNodeToBeReady(c clientset.Interface, name string, timeout time.Durat
return WaitConditionToBe(c, name, v1.NodeReady, true, timeout)
}

func WaitForNodeSchedulable(c clientset.Interface, name string, timeout time.Duration, wantSchedulable bool) bool {
e2elog.Logf("Waiting up to %v for node %s to be schedulable: %t", timeout, name, wantSchedulable)
for start := time.Now(); time.Since(start) < timeout; time.Sleep(poll) {
node, err := c.CoreV1().Nodes().Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
e2elog.Logf("Couldn't get node %s", name)
continue
}

if IsNodeSchedulable(node) == wantSchedulable {
return true
}
}
e2elog.Logf("Node %s didn't reach desired schedulable status (%t) within %v", name, wantSchedulable, timeout)
return false
}

// CheckReady waits up to timeout for cluster to has desired size and
// there is no not-ready nodes in it. By cluster size we mean number of schedulable Nodes.
func CheckReady(c clientset.Interface, size int, timeout time.Duration) ([]v1.Node, error) {
Expand Down
25 changes: 25 additions & 0 deletions test/e2e/framework/statefulset/wait.go
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,31 @@ func WaitForStatusReplicas(c clientset.Interface, ss *appsv1.StatefulSet, expect
}
}

// WaitForStatusCurrentReplicas waits for the ss.Status.CurrentReplicas to be equal to expectedReplicas
func WaitForStatusCurrentReplicas(c clientset.Interface, ss *appsv1.StatefulSet, expectedReplicas int32) {
framework.Logf("Waiting for statefulset status.currentReplicas updated to %d", expectedReplicas)

ns, name := ss.Namespace, ss.Name
pollErr := wait.PollImmediate(StatefulSetPoll, StatefulSetTimeout,
func() (bool, error) {
ssGet, err := c.AppsV1().StatefulSets(ns).Get(context.TODO(), name, metav1.GetOptions{})
if err != nil {
return false, err
}
if ssGet.Status.ObservedGeneration < ss.Generation {
return false, nil
}
if ssGet.Status.CurrentReplicas != expectedReplicas {
framework.Logf("Waiting for stateful set status.currentReplicas to become %d, currently %d", expectedReplicas, ssGet.Status.CurrentReplicas)
return false, nil
}
return true, nil
})
if pollErr != nil {
framework.Failf("Failed waiting for stateful set status.currentReplicas updated to %d: %v", expectedReplicas, pollErr)
}
}

// Saturate waits for all Pods in ss to become Running and Ready.
func Saturate(c clientset.Interface, ss *appsv1.StatefulSet) {
var i int32
Expand Down

0 comments on commit 38dd835

Please sign in to comment.