Skip to content

Commit

Permalink
Use a lease in the informer to be able to run multiple replicas
Browse files Browse the repository at this point in the history
  • Loading branch information
norbjd committed Mar 24, 2024
1 parent aad6ee7 commit e1830aa
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 4 deletions.
26 changes: 24 additions & 2 deletions cmd/informer/main.go
Original file line number Diff line number Diff line change
@@ -1,19 +1,38 @@
package main

import (
"context"
"flag"

"github.com/norbjd/k8s-pod-cpu-booster/pkg/informer"

"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
)

func main() {
klog.InitFlags(nil)

var id string
var leaseLockNamespace string
var leaseLockName string

flag.StringVar(&id, "id", "", "the lease lock resource name")
flag.StringVar(&leaseLockNamespace, "lease-lock-namespace", "", "the lease lock resource namespace")
flag.StringVar(&leaseLockName, "lease-lock-name", "", "path to key file")

flag.Parse()

if id == "" {
klog.Fatal("lease holder identity is required (missing id flag)")
}
if leaseLockNamespace == "" {
klog.Fatal("unable to get lease lock resource namespace (missing lease-lock-namespace flag)")
}
if leaseLockName == "" {
klog.Fatal("unable to get lease lock resource name (missing lease-lock-name flag)")
}

config, err := rest.InClusterConfig()
if err != nil {
panic(err)
Expand All @@ -24,5 +43,8 @@ func main() {
panic(err)
}

informer.Run(clientset)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

informer.Run(ctx, clientset, id, leaseLockNamespace, leaseLockName)
}
44 changes: 43 additions & 1 deletion helm/templates/pod-cpu-boost-reset.yaml
Original file line number Diff line number Diff line change
@@ -1,4 +1,9 @@
---
apiVersion: coordination.k8s.io/v1
kind: Lease
metadata:
name: pod-cpu-boost-reset
---
apiVersion: v1
kind: ServiceAccount
metadata:
Expand Down Expand Up @@ -32,12 +37,40 @@ subjects:
name: pod-cpu-boost-reset
namespace: {{ .Release.Namespace }}
---
apiVersion: rbac.authorization.k8s.io/v1
kind: Role
metadata:
name: acquire-lease
rules:
- apiGroups:
- coordination.k8s.io
resources:
- leases
resourceNames:
- pod-cpu-boost-reset
verbs:
- get
- update
---
apiVersion: rbac.authorization.k8s.io/v1
kind: RoleBinding
metadata:
name: pod-cpu-boost-reset
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: Role
name: acquire-lease
subjects:
- kind: ServiceAccount
name: pod-cpu-boost-reset
namespace: {{ .Release.Namespace }}
---
apiVersion: apps/v1
kind: Deployment
metadata:
name: pod-cpu-boost-reset
spec:
replicas: 1 # for now, we don't support multiple replicas
replicas: 3
selector:
matchLabels:
app: pod-cpu-boost-reset
Expand All @@ -49,6 +82,15 @@ spec:
containers:
- name: pod-cpu-boost-reset
image: {{ .Values.informer.image }}
args:
- --id=$(POD_NAME)
- --lease-lock-namespace={{ .Release.Namespace }}
- --lease-lock-name=pod-cpu-boost-reset
env:
- name: POD_NAME
valueFrom:
fieldRef:
fieldPath: metadata.name
imagePullPolicy: {{ .Values.informer.imagePullPolicy }}
resources:
{{ toYaml .Values.resources }}
Expand Down
39 changes: 38 additions & 1 deletion pkg/informer/informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"time"

"github.com/google/go-cmp/cmp"
"github.com/norbjd/k8s-pod-cpu-booster/pkg/shared"
Expand All @@ -15,6 +16,8 @@ import (
"k8s.io/client-go/informers/internalinterfaces"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/leaderelection"
"k8s.io/client-go/tools/leaderelection/resourcelock"
"k8s.io/client-go/util/retry"
"k8s.io/klog/v2"
)
Expand All @@ -26,9 +29,43 @@ const (
cpuBoostDoneLabelValue = "has-been-boosted"
)

func Run(ctx context.Context, clientset *kubernetes.Clientset, leaseHolderIdentity, leaseLockNamespace, leaseLockName string) {
lock := &resourcelock.LeaseLock{
LeaseMeta: metav1.ObjectMeta{
Name: leaseLockName,
Namespace: leaseLockNamespace,
},
Client: clientset.CoordinationV1(),
LockConfig: resourcelock.ResourceLockConfig{
Identity: leaseHolderIdentity,
},
}

leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{
Lock: lock,
ReleaseOnCancel: true,
LeaseDuration: 3 * time.Second,
RenewDeadline: 2 * time.Second,
RetryPeriod: 1 * time.Second,
Callbacks: leaderelection.LeaderCallbacks{
OnStartedLeading: func(ctx context.Context) {
run(clientset)
},
OnStoppedLeading: func() {
klog.Infof("stop leading: %s", leaseHolderIdentity)
},
OnNewLeader: func(identity string) {
if identity != leaseHolderIdentity {
klog.Infof("new leader elected: %s", identity)
}
},
},
})
}

// Inspired by:
// - https://www.cncf.io/blog/2019/10/15/extend-kubernetes-via-a-shared-informer/
func Run(clientset *kubernetes.Clientset) {
func run(clientset *kubernetes.Clientset) {
factory := informers.NewSharedInformerFactoryWithOptions(clientset, 0, informers.WithTweakListOptions(podCPUBoosterTweakFunc()))
informer := factory.Core().V1().Pods().Informer()

Expand Down

0 comments on commit e1830aa

Please sign in to comment.