Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add separate PVC for wal #43

Open
wants to merge 11 commits into
base: v0.7.0-dev
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
70 changes: 70 additions & 0 deletions pkg/apis/cpo.opensource.cybertec.at/v1/crds.go
Original file line number Diff line number Diff line change
Expand Up @@ -1347,6 +1347,76 @@ var PostgresCRDResourceValidation = apiextv1.CustomResourceValidation{
},
},
},
"walPvc": {
Type: "object",
Required: []string{"size"},
Properties: map[string]apiextv1.JSONSchemaProps{
"iops": {
Type: "integer",
},
"selector": {
Type: "object",
Properties: map[string]apiextv1.JSONSchemaProps{
"matchExpressions": {
Type: "array",
Items: &apiextv1.JSONSchemaPropsOrArray{
Schema: &apiextv1.JSONSchemaProps{
Type: "object",
Required: []string{"key", "operator"},
Properties: map[string]apiextv1.JSONSchemaProps{
"key": {
Type: "string",
},
"operator": {
Type: "string",
Enum: []apiextv1.JSON{
{
Raw: []byte(`"DoesNotExist"`),
},
{
Raw: []byte(`"Exists"`),
},
{
Raw: []byte(`"In"`),
},
{
Raw: []byte(`"NotIn"`),
},
},
},
"values": {
Type: "array",
Items: &apiextv1.JSONSchemaPropsOrArray{
Schema: &apiextv1.JSONSchemaProps{
Type: "string",
},
},
},
},
},
},
},
"matchLabels": {
Type: "object",
XPreserveUnknownFields: util.True(),
},
},
},
"size": {
Type: "string",
Pattern: "^(\\d+(e\\d+)?|\\d+(\\.\\d+)?(e\\d+)?[EPTGMK]i?)$",
},
"storageClass": {
Type: "string",
},
"subPath": {
Type: "string",
},
"throughput": {
Type: "integer",
},
},
},
},
},
"status": {
Expand Down
1 change: 1 addition & 0 deletions pkg/apis/cpo.opensource.cybertec.at/v1/postgresql_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ type PostgresSpec struct {
Backup *Backup `json:"backup,omitempty"`
TDE *TDE `json:"tde,omitempty"`
Monitoring *Monitoring `json:"monitor,omitempty"`
WalPvc *Volume `json:"walPvc,omitempty"`
}

// +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 24 additions & 2 deletions pkg/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -617,7 +617,16 @@ func (c *Cluster) compareStatefulSetWith(oldSts, newSts *appsv1.StatefulSet) *co
needsReplace = true
reasons = append(reasons, "new statefulset's volumeClaimTemplates contains different number of volumes to the old one")
}
for i := 0; i < len(oldSts.Spec.VolumeClaimTemplates); i++ {

//Account for the deleted PVC for wal
lenVCT := 0
if len(oldSts.Spec.VolumeClaimTemplates) < len(newSts.Spec.VolumeClaimTemplates) {
lenVCT = len(oldSts.Spec.VolumeClaimTemplates)
} else {
lenVCT = len(newSts.Spec.VolumeClaimTemplates)
}

for i := 0; i < lenVCT; i++ {
name := oldSts.Spec.VolumeClaimTemplates[i].Name
// Some generated fields like creationTimestamp make it not possible to use DeepCompare on ObjectMeta
if name != newSts.Spec.VolumeClaimTemplates[i].Name {
Expand Down Expand Up @@ -1012,8 +1021,17 @@ func (c *Cluster) Update(oldSpec, newSpec *cpov1.Postgresql) error {
c.syncMonitoringSecret(oldSpec, newSpec)
}

//sync WAL-PVC
if !reflect.DeepEqual(oldSpec.Spec.WalPvc, newSpec.Spec.WalPvc) {
if err := c.syncWalPvc(oldSpec, newSpec); err != nil {
c.logger.Warningf("could not sync PVC WAL %v", err)
}
}

//sync sts when there is a change in the pgbackrest secret, since we need to mount this
if !reflect.DeepEqual(oldSpec.Spec.Backup.Pgbackrest.Configuration, newSpec.Spec.Backup.Pgbackrest.Configuration) {
if oldSpec.Spec.Backup != nil && newSpec.Spec.Backup != nil &&
oldSpec.Spec.Backup.Pgbackrest != nil && newSpec.Spec.Backup.Pgbackrest != nil &&
!reflect.DeepEqual(oldSpec.Spec.Backup.Pgbackrest.Configuration, newSpec.Spec.Backup.Pgbackrest.Configuration) {
syncStatefulSet = true
}

Expand Down Expand Up @@ -1068,6 +1086,10 @@ func (c *Cluster) Update(oldSpec, newSpec *cpov1.Postgresql) error {
updateFailed = true
return
}
if oldSpec.Spec.WalPvc != nil {
//if pvc wal is removed then carry the relevant env vars to the new sts

}

if c.restoreInProgress() {
c.applyRestoreStatefulSetSyncOverrides(newSs, oldSs)
Expand Down
37 changes: 34 additions & 3 deletions pkg/cluster/k8sres.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,6 +649,10 @@ func isBootstrapOnlyParameter(param string) bool {
return result
}

func getWALPVCName(cluster_name string) string {
return "walpvc" + cluster_name
}

func generateVolumeMounts(volume cpov1.Volume) []v1.VolumeMount {
return []v1.VolumeMount{
{
Expand All @@ -659,6 +663,14 @@ func generateVolumeMounts(volume cpov1.Volume) []v1.VolumeMount {
}
}

func generateWalVolumeMounts(volume cpov1.Volume, cluster_name string) v1.VolumeMount {
return v1.VolumeMount{
Name: getWALPVCName(cluster_name),
MountPath: constants.PostgresPVCWalMount,
SubPath: volume.SubPath,
}
}

func generateContainer(
name string,
dockerImage *string,
Expand Down Expand Up @@ -1005,6 +1017,10 @@ func (c *Cluster) generateSpiloPodEnvVars(
envVars = append(envVars, v1.EnvVar{Name: "cpo_monitoring_stack", Value: "true"})
}

if spec.WalPvc != nil {
envVars = append(envVars, v1.EnvVar{Name: "NEWWALDIR", Value: constants.PostgresPVCWalMount})
envVars = append(envVars, v1.EnvVar{Name: "OLDWALDIR",Value: ""})
}
if c.OpConfig.EnablePgVersionEnvVar {
envVars = append(envVars, v1.EnvVar{Name: "PGVERSION", Value: c.GetDesiredMajorVersion()})
}
Expand Down Expand Up @@ -1290,9 +1306,9 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu
sidecarContainers []v1.Container
podTemplate *v1.PodTemplateSpec
volumeClaimTemplate *v1.PersistentVolumeClaim
WalPvcClaim *v1.PersistentVolumeClaim
additionalVolumes = spec.AdditionalVolumes
)

defaultResources := makeDefaultResources(&c.OpConfig)
resourceRequirements, err := c.generateResourceRequirements(
spec.Resources, defaultResources, constants.PostgresContainerName)
Expand Down Expand Up @@ -1365,6 +1381,10 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu

volumeMounts := generateVolumeMounts(spec.Volume)

if spec.WalPvc != nil {
volumeMounts = append(volumeMounts, generateWalVolumeMounts(*spec.WalPvc, c.Spec.ClusterName))
}

// configure TLS with a custom secret volume
if spec.TLS != nil && spec.TLS.SecretName != "" {
getSpiloTLSEnv := func(k string) string {
Expand Down Expand Up @@ -1509,7 +1529,13 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu
additionalVolumes = append(additionalVolumes, c.generateCertSecretVolume())
}
}

if spec.WalPvc != nil {
WalPvcClaim, err = c.generatePersistentVolumeClaimTemplate(spec.WalPvc.Size,
spec.WalPvc.StorageClass, spec.WalPvc.Selector, getWALPVCName(spec.ClusterName))
if err != nil {
c.logger.Errorf("could not generate volume claim template for WAL directory: %v", err)
}
}
// generate pod template for the statefulset, based on the spilo container and sidecars
podTemplate, err = c.generatePodTemplate(
c.Namespace,
Expand Down Expand Up @@ -1578,6 +1604,11 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu
persistentVolumeClaimRetentionPolicy.WhenScaled = appsv1.RetainPersistentVolumeClaimRetentionPolicyType
}

final_vols := []v1.PersistentVolumeClaim{*volumeClaimTemplate}
if spec.WalPvc != nil {
final_vols = []v1.PersistentVolumeClaim{*volumeClaimTemplate, *WalPvcClaim}
}

statefulSet := &appsv1.StatefulSet{
ObjectMeta: metav1.ObjectMeta{
Name: c.statefulSetName(),
Expand All @@ -1590,7 +1621,7 @@ func (c *Cluster) generateStatefulSet(spec *cpov1.PostgresSpec) (*appsv1.Statefu
Selector: c.labelsSelector(TYPE_POSTGRESQL),
ServiceName: c.serviceName(Master),
Template: *podTemplate,
VolumeClaimTemplates: []v1.PersistentVolumeClaim{*volumeClaimTemplate},
VolumeClaimTemplates: final_vols,
UpdateStrategy: updateStrategy,
PodManagementPolicy: podManagementPolicy,
PersistentVolumeClaimRetentionPolicy: &persistentVolumeClaimRetentionPolicy,
Expand Down
39 changes: 37 additions & 2 deletions pkg/cluster/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,11 @@ func (c *Cluster) Sync(newSpec *cpov1.Postgresql) error {
return err
}

// sync volume may already transition volumes to gp3, if iops/throughput or type is specified
if err = c.syncWalPvc(&oldSpec, newSpec); err != nil {
return fmt.Errorf("could not sync WAL-PVC: %v", err)
}

//sync volume may already transition volumes to gp3, if iops/throughput or type is specified
if err = c.syncVolumes(); err != nil {
return err
}
Expand Down Expand Up @@ -1632,7 +1636,7 @@ func (c *Cluster) syncPgbackrestJob(forceRemove bool) error {
if err := c.createPgbackrestJob(job); err != nil {
return fmt.Errorf("could not create a pgbackrest cronjob: %v", err)
}
c.logger.Info("pgbackrest cronjob for %v %v has been successfully created", rep, schedul)
c.logger.Infof("pgbackrest cronjob for %v %v has been successfully created", rep, schedul)
}
}
}
Expand Down Expand Up @@ -1756,6 +1760,37 @@ func (c *Cluster) syncMonitoringSecret(oldSpec, newSpec *cpov1.Postgresql) error
return nil
}

func (c *Cluster) syncWalPvc(oldSpec, newSpec *cpov1.Postgresql) error {
c.logger.Info("syncing PVC for WAL")
c.setProcessName("syncing PVC for WAL")

if newSpec.Spec.WalPvc == nil && oldSpec.Spec.WalPvc != nil {

containers := c.Statefulset.Spec.Template.Spec.Containers
for _, con := range containers {
con.Env = append(con.Env, v1.EnvVar{Name: "NEWWALDIR", Value: ""})
con.Env = append(con.Env, v1.EnvVar{Name: "OLDWALDIR",Value: constants.PostgresPVCWalMount})
}
// run the script to move the wal files and then remove the pvc
//result, err = c.ExecCommand(podName, "scripts/move_wal_dir.sh" + constants.PostgresPVCWalMount + " " + constants.PostgresWALPath)

pvcs, err := c.listPersistentVolumeClaims()
if err != nil {
return fmt.Errorf("Could not list PVCs")
} else {
for _, pvc := range pvcs {
if strings.Contains(pvc.Name, getWALPVCName(c.Spec.ClusterName)) {
c.logger.Infof("deleting WAL-PVC %q", util.NameFromMeta(pvc.ObjectMeta))
if err := c.KubeClient.PersistentVolumeClaims(pvc.Namespace).Delete(context.TODO(), pvc.Name, c.deleteOptions); err != nil {
return fmt.Errorf("could not delete WAL PVC: %v", err)
}
}
}
}
}
return nil
}

func generateRootCertificate(
privateKey *ecdsa.PrivateKey, serialNumber *big.Int,
) (*x509.Certificate, error) {
Expand Down
3 changes: 3 additions & 0 deletions pkg/util/constants/postgresql.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,4 +18,7 @@ const (

RunVolumeName = "postgresql-run"
RunVolumePath = "/var/run/postgresql"

PostgresPVCWalMount = "/home/postgres/pvc/"
PostgresWALPath = PostgresDataPath + "/pg_wal"
)
Loading