Skip to content

Commit

Permalink
csi: add check for pending clone
Browse files Browse the repository at this point in the history
Incase any pvc is in pending state and the backend
subvolume is create, it will be counted as stale.
this commit adds the error check to make sure user
doesn't delete the subvolume if it the pvc is in
pending state.

Signed-off-by: yati1998 <[email protected]>
  • Loading branch information
yati1998 committed Jul 16, 2024
1 parent 00d1470 commit b8d5479
Showing 1 changed file with 18 additions and 6 deletions.
24 changes: 18 additions & 6 deletions pkg/filesystem/subvolume.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@ package subvolume
import (
"context"
"encoding/json"
"errors"
"fmt"
"strings"
"syscall"

"github.com/rook/kubectl-rook-ceph/pkg/exec"
"github.com/rook/kubectl-rook-ceph/pkg/k8sutil"
Expand Down Expand Up @@ -175,8 +177,19 @@ func listCephFSSubvolumes(ctx context.Context, clientsets *k8sutil.Clientsets, o
}
// append the subvolume which doesn't have any snapshot attached to it.
for _, sv := range subvol {
state := getSubvolumeState(ctx, clientsets, operatorNamespace, clusterNamespace, fs.Name, sv.Name, svg.Name)
state, err := getSubvolumeState(ctx, clientsets, operatorNamespace, clusterNamespace, fs.Name, sv.Name, svg.Name)
// subvolume info returns error in case of pending clone or if it is not ready
// it is suggested to delete the pvc before deleting the subvolume.
if err != nil {
if errors.Is(err, syscall.EAGAIN) {
logging.Info("Subvolume info not found for: %q %q", sv.Name, err)
logging.Warning("Please delete the pending pv if any before deleting the subvolume %s", sv.Name)
logging.Warning("To avoid stale resources, please scale down the cephfs deployment and then run the cleanup script.")
continue
}
logging.Fatal(fmt.Errorf("failed to get subvolume state: %q %q", sv.Name, err))

}
// Assume the volume is stale unless proven otherwise
stalevol := true
// lookup for subvolume in list of the PV references
Expand All @@ -195,7 +208,6 @@ func listCephFSSubvolumes(ctx context.Context, clientsets *k8sutil.Clientsets, o
// check the state of the stale subvolume
// if it is snapshot-retained then skip listing it.
if state == "snapshot-retained" {
status = state
continue
}
// check if the stale subvolume has snapshots.
Expand All @@ -212,14 +224,14 @@ func listCephFSSubvolumes(ctx context.Context, clientsets *k8sutil.Clientsets, o
}

// getSubvolumeState returns the state of the subvolume
func getSubvolumeState(ctx context.Context, clientsets *k8sutil.Clientsets, operatorNamespace, clusterNamespace, fsName, SubVol, SubvolumeGroup string) string {
func getSubvolumeState(ctx context.Context, clientsets *k8sutil.Clientsets, operatorNamespace, clusterNamespace, fsName, SubVol, SubvolumeGroup string) (string, error) {
cmd := "ceph"
args := []string{"fs", "subvolume", "info", fsName, SubVol, SubvolumeGroup, "--format", "json"}

subVolumeInfo, errvol := runCommand(ctx, clientsets, operatorNamespace, clusterNamespace, cmd, args)
if errvol != nil {
logging.Error(errvol, "failed to get filesystems")
return ""
logging.Error(errvol, "failed to get subvolume info")
return "", errvol
}
var info map[string]interface{}
err := json.Unmarshal([]byte(subVolumeInfo), &info)
Expand All @@ -230,7 +242,7 @@ func getSubvolumeState(ctx context.Context, clientsets *k8sutil.Clientsets, oper
if !ok {
logging.Fatal(fmt.Errorf("failed to get the state of subvolume: %q", SubVol))
}
return state
return state, nil
}

// gets list of filesystem
Expand Down

0 comments on commit b8d5479

Please sign in to comment.