Skip to content

Commit

Permalink
support immediate mode storage
Browse files Browse the repository at this point in the history
Signed-off-by: zhouhao_yewu <[email protected]>
  • Loading branch information
zhouhaoA1 committed May 31, 2024
1 parent 786c3be commit abe1b2a
Show file tree
Hide file tree
Showing 6 changed files with 148 additions and 29 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,90 @@ func (r *RootPodReconciler) SetupWithManager(mgr manager.Manager) error {
Complete(r)
}

func (r *RootPodReconciler) createPvcInLeafCluster(ctx context.Context, lr *leafUtils.LeafResource, pvcs []string, rootpod *corev1.Pod, cn *leafUtils.ClusterNode) error {
for _, pvcName := range pvcs {
rootPVC := &corev1.PersistentVolumeClaim{}
err := r.RootClient.Get(ctx, types.NamespacedName{Namespace: rootpod.Namespace, Name: pvcName}, rootPVC)
if err != nil {
return fmt.Errorf("could not get pvc %s from root cluster: %v", pvcName, err)
}

anno := rootPVC.GetAnnotations()
anno = utils.AddResourceClusters(anno, lr.ClusterName)
anno[utils.KosmosGlobalLabel] = "true"
rootPVC.SetAnnotations(anno)
err = r.RootClient.Update(ctx, rootPVC)
if err != nil {
return fmt.Errorf("could not update pvc %s/%s in host cluster: %v", rootpod.Namespace, pvcName, err)
}

leafPvc, err := lr.Clientset.CoreV1().PersistentVolumeClaims(rootpod.Namespace).Get(ctx, pvcName, metav1.GetOptions{})
if err != nil {
if errors.IsNotFound(err) {
leafPvc = rootPVC.DeepCopy()
err = utils.ResetMetadata(leafPvc)
if err != nil {
return err
}
if rootPVC.Status.Phase == corev1.ClaimBound {
anno[utils.KosmosPvcImmediateMode] = "true"
}
delete(anno, utils.PVCSelectedNodeKey)
leafPvc.SetAnnotations(anno)
leafPvc, err = lr.Clientset.CoreV1().PersistentVolumeClaims(rootpod.Namespace).Create(ctx, leafPvc, metav1.CreateOptions{})
if err != nil {
klog.Errorf("Failed to create pvc %s/%s err: %v", rootpod.Namespace, pvcName, err)
return err
}
klog.V(4).Infof("Create pvc %s/%s success", rootpod.Namespace, pvcName)
if rootPVC.Status.Phase == corev1.ClaimBound {
return r.createImmediateModePvInLeafCluster(ctx, lr, leafPvc)
}
}
return fmt.Errorf("could not check pvc %s/%s in external cluster: %v", rootpod.Namespace, pvcName, err)
} else {
if utils.IsImmediateModePvc(leafPvc.Annotations) && leafPvc.Status.Phase != corev1.ClaimBound {
_, err := lr.Clientset.CoreV1().PersistentVolumes().Get(ctx, leafPvc.Spec.VolumeName, metav1.GetOptions{})
if errors.IsNotFound(err) {
return r.createImmediateModePvInLeafCluster(ctx, lr, leafPvc)
}
}
}
}
return nil
}

func (r *RootPodReconciler) createImmediateModePvInLeafCluster(ctx context.Context, lr *leafUtils.LeafResource, leafPvc *corev1.PersistentVolumeClaim) error {
rootPV := &corev1.PersistentVolume{}
err := r.RootClient.Get(ctx, types.NamespacedName{Namespace: "", Name: leafPvc.Spec.VolumeName}, rootPV)
if err != nil {
return fmt.Errorf("could not get pv %s from root cluster: %v", leafPvc.Spec.VolumeName, err)
}
pv := rootPV.DeepCopy()
anno := pv.Annotations
if anno == nil {
anno = make(map[string]string)
}
anno[utils.KosmosGlobalLabel] = "true"
anno[utils.KosmosPvcImmediateMode] = "true"
pv.SetAnnotations(anno)
err = utils.ResetMetadata(pv)
if err != nil {
return err
}
pv.Spec.ClaimRef.ResourceVersion = leafPvc.ResourceVersion
pv.Spec.ClaimRef.UID = leafPvc.UID
_, err = lr.Clientset.CoreV1().PersistentVolumes().Create(ctx, pv, metav1.CreateOptions{})
if err != nil {
if errors.IsAlreadyExists(err) {
return nil
}
klog.Errorf("Failed to create pv %s err: %v", pv.Name, err)
return err
}
return nil
}

func (r *RootPodReconciler) createStorageInLeafCluster(ctx context.Context, lr *leafUtils.LeafResource, gvr schema.GroupVersionResource, resourcenames []string, rootpod *corev1.Pod, cn *leafUtils.ClusterNode) error {
ns := rootpod.Namespace
storageHandler, err := NewStorageHandler(gvr)
Expand Down Expand Up @@ -769,10 +853,11 @@ func (r *RootPodReconciler) createVolumes(ctx context.Context, lr *leafUtils.Lea
return false, err
}
klog.V(4).Info("Trying to creating dependent pvc")
if err := r.createStorageInLeafCluster(ctx, lr, utils.GVR_PVC, pvcsWithoutEs, basicPod, clusterNodeInfo); err != nil {
if err := r.createPvcInLeafCluster(ctx, lr, pvcsWithoutEs, basicPod, clusterNodeInfo); err != nil {
klog.Error(err)
return false, nil
}

klog.V(4).Infof("Create pvc %v of %v/%v success", pvcsWithoutEs, basicPod.Namespace, basicPod.Name)
// }
return true, nil
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -177,15 +177,15 @@ func (l *LeafPVController) SetupWithManager(mgr manager.Manager) error {
For(&v1.PersistentVolume{}, builder.WithPredicates(predicate.Funcs{
CreateFunc: func(createEvent event.CreateEvent) bool {
curr := createEvent.Object.(*v1.PersistentVolume)
return !podutils.IsOneWayPV(curr)
return !podutils.IsOneWayPV(curr) && !utils.IsImmediateModePvc(curr.Annotations)
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
curr := updateEvent.ObjectNew.(*v1.PersistentVolume)
return !podutils.IsOneWayPV(curr)
return !podutils.IsOneWayPV(curr) && !utils.IsImmediateModePvc(curr.Annotations)
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
curr := deleteEvent.Object.(*v1.PersistentVolume)
return !podutils.IsOneWayPV(curr)
return !podutils.IsOneWayPV(curr) && !utils.IsImmediateModePvc(curr.Annotations)
},
GenericFunc: func(genericEvent event.GenericEvent) bool {
return false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ func (l *LeafPVCController) SetupWithManager(mgr manager.Manager) error {
},
UpdateFunc: func(updateEvent event.UpdateEvent) bool {
pvc := updateEvent.ObjectOld.(*v1.PersistentVolumeClaim)
return utils.IsObjectGlobal(&pvc.ObjectMeta) && !podutils.IsOneWayPVC(pvc)
return utils.IsObjectGlobal(&pvc.ObjectMeta) && !podutils.IsOneWayPVC(pvc) && !utils.IsImmediateModePvc(pvc.Annotations)
},
DeleteFunc: func(deleteEvent event.DeleteEvent) bool {
return false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -133,17 +133,19 @@ func (r *RootPVCController) cleanupPvc(pvc *v1.PersistentVolumeClaim) (reconcile
return reconcile.Result{}, nil
}

lr, err := r.GlobalLeafManager.GetLeafResource(clusters[0])
if err != nil {
klog.Warningf("pvc leaf %q: %q doesn't existed in LeafResources", pvc.GetNamespace(), pvc.GetName())
return reconcile.Result{}, nil
}

if err = lr.Clientset.CoreV1().PersistentVolumeClaims(pvc.GetNamespace()).Delete(context.TODO(), pvc.GetName(), metav1.DeleteOptions{}); err != nil {
if !errors.IsNotFound(err) {
klog.Errorf("delete pvc from leaf cluster failed, %q: %q, error: %v", pvc.GetNamespace(), pvc.GetName(), err)
for _, cluster := range clusters {
lr, err := r.GlobalLeafManager.GetLeafResource(cluster)
if err != nil {
klog.Errorf("Delete pvc %s/%s from leaf cluster %s, get leafresource error %s.", pvc.GetNamespace(), pvc.GetName(), cluster, err.Error())
return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, err
}

if err = lr.Clientset.CoreV1().PersistentVolumeClaims(pvc.GetNamespace()).Delete(context.TODO(), pvc.GetName(), metav1.DeleteOptions{}); err != nil {
if !errors.IsNotFound(err) {
klog.Errorf("delete pvc from leaf cluster failed, %q: %q, error: %v", pvc.GetNamespace(), pvc.GetName(), err)
return reconcile.Result{RequeueAfter: utils.DefaultRequeueTime}, err
}
}
}
return reconcile.Result{}, nil
}
1 change: 1 addition & 0 deletions pkg/utils/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ const (
KosmosNodeTaintEffect = "NoSchedule"
KosmosPodLabel = "kosmos-io/pod"
KosmosGlobalLabel = "kosmos.io/global"
KosmosPvcImmediateMode = "kosmos/pvc-immediate-mode"
KosmosSelectorKey = "kosmos.io/cluster-selector"
KosmosTrippedLabels = "kosmos-io/tripped"
KosmosConvertLabels = "kosmos-io/convert-policy"
Expand Down
61 changes: 46 additions & 15 deletions pkg/utils/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package utils

import (
"encoding/json"
"fmt"
"reflect"
"strings"

jsonpatch "github.com/evanphx/json-patch"
Expand Down Expand Up @@ -133,30 +135,59 @@ func IsObjectUnstructuredGlobal(obj map[string]string) bool {
return false
}

func ResetMetadata(obj interface{}) error {
value := reflect.ValueOf(obj)
if value.Kind() != reflect.Ptr {
return fmt.Errorf("obj must be a pointer")
}

value = value.Elem()
metaField := value.FieldByName("ObjectMeta")
if !metaField.IsValid() || metaField.Kind() != reflect.Struct {
return fmt.Errorf("obj does not have an ObjectMeta field")
}
metaField.FieldByName("ResourceVersion").SetString("")
metaField.FieldByName("UID").SetString("")
metaField.FieldByName("Generation").SetInt(0)
metaField.FieldByName("SelfLink").SetString("")
ownerRefsField := metaField.FieldByName("OwnerReferences")
if ownerRefsField.IsValid() && ownerRefsField.Kind() == reflect.Slice && ownerRefsField.CanSet() {
ownerRefsField.Set(reflect.MakeSlice(ownerRefsField.Type(), 0, 0))
}
return nil
}

func IsImmediateModePvc(annotations map[string]string) bool {
if _, ok := annotations[KosmosPvcImmediateMode]; ok {
return true
}
return false
}

func AddResourceClusters(anno map[string]string, clusterName string) map[string]string {
if anno == nil {
anno = map[string]string{}
anno = make(map[string]string)
}

ownerStr := anno[KosmosResourceOwnersAnnotations]
if ownerStr == "" {
anno[KosmosResourceOwnersAnnotations] = clusterName
return anno
}
owners := strings.Split(anno[KosmosResourceOwnersAnnotations], ",")
newowners := make([]string, 0)

flag := false
owners := strings.Split(ownerStr, ",")
found := false
for _, v := range owners {
if len(v) == 0 {
continue
}
newowners = append(newowners, v)
if v == clusterName {
// already existed
flag = true
if strings.TrimSpace(v) == clusterName {
found = true
break
}
}

if !flag {
newowners = append(newowners, clusterName)
if !found {
owners = append(owners, clusterName)
anno[KosmosResourceOwnersAnnotations] = strings.Join(owners, ",")
}

anno[KosmosResourceOwnersAnnotations] = strings.Join(newowners, ",")
return anno
}

Expand Down

0 comments on commit abe1b2a

Please sign in to comment.