diff --git a/cmd/informer/main.go b/cmd/informer/main.go index 70cd320..285d075 100644 --- a/cmd/informer/main.go +++ b/cmd/informer/main.go @@ -1,10 +1,10 @@ package main import ( + "context" "flag" "github.com/norbjd/k8s-pod-cpu-booster/pkg/informer" - "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/klog/v2" @@ -12,8 +12,27 @@ import ( func main() { klog.InitFlags(nil) + + var id string + var leaseLockNamespace string + var leaseLockName string + + flag.StringVar(&id, "id", "", "the lease lock resource name") + flag.StringVar(&leaseLockNamespace, "lease-lock-namespace", "", "the lease lock resource namespace") + flag.StringVar(&leaseLockName, "lease-lock-name", "", "path to key file") + flag.Parse() + if id == "" { + klog.Fatal("lease holder identity is required (missing id flag)") + } + if leaseLockNamespace == "" { + klog.Fatal("unable to get lease lock resource namespace (missing lease-lock-namespace flag)") + } + if leaseLockName == "" { + klog.Fatal("unable to get lease lock resource name (missing lease-lock-name flag)") + } + config, err := rest.InClusterConfig() if err != nil { panic(err) @@ -24,5 +43,8 @@ func main() { panic(err) } - informer.Run(clientset) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + informer.Run(ctx, clientset, id, leaseLockNamespace, leaseLockName) } diff --git a/helm/templates/pod-cpu-boost-reset.yaml b/helm/templates/pod-cpu-boost-reset.yaml index 80adced..aaa8be1 100644 --- a/helm/templates/pod-cpu-boost-reset.yaml +++ b/helm/templates/pod-cpu-boost-reset.yaml @@ -1,4 +1,9 @@ --- +apiVersion: coordination.k8s.io/v1 +kind: Lease +metadata: + name: pod-cpu-boost-reset +--- apiVersion: v1 kind: ServiceAccount metadata: @@ -32,12 +37,40 @@ subjects: name: pod-cpu-boost-reset namespace: {{ .Release.Namespace }} --- +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: acquire-lease +rules: + - apiGroups: + - coordination.k8s.io + resources: + - leases + resourceNames: + - pod-cpu-boost-reset + verbs: + - get + - update +--- +apiVersion: rbac.authorization.k8s.io/v1 +kind: RoleBinding +metadata: + name: pod-cpu-boost-reset +roleRef: + apiGroup: rbac.authorization.k8s.io + kind: Role + name: acquire-lease +subjects: + - kind: ServiceAccount + name: pod-cpu-boost-reset + namespace: {{ .Release.Namespace }} +--- apiVersion: apps/v1 kind: Deployment metadata: name: pod-cpu-boost-reset spec: - replicas: 1 # for now, we don't support multiple replicas + replicas: 3 selector: matchLabels: app: pod-cpu-boost-reset @@ -49,6 +82,15 @@ spec: containers: - name: pod-cpu-boost-reset image: {{ .Values.informer.image }} + args: + - --id=$(POD_NAME) + - --lease-lock-namespace={{ .Release.Namespace }} + - --lease-lock-name=pod-cpu-boost-reset + env: + - name: POD_NAME + valueFrom: + fieldRef: + fieldPath: metadata.name imagePullPolicy: {{ .Values.informer.imagePullPolicy }} resources: {{ toYaml .Values.resources }} diff --git a/pkg/informer/informer.go b/pkg/informer/informer.go index 5b085ec..7cc2247 100644 --- a/pkg/informer/informer.go +++ b/pkg/informer/informer.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "time" "github.com/google/go-cmp/cmp" "github.com/norbjd/k8s-pod-cpu-booster/pkg/shared" @@ -15,6 +16,8 @@ import ( "k8s.io/client-go/informers/internalinterfaces" "k8s.io/client-go/kubernetes" "k8s.io/client-go/tools/cache" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" "k8s.io/client-go/util/retry" "k8s.io/klog/v2" ) @@ -26,9 +29,43 @@ const ( cpuBoostDoneLabelValue = "has-been-boosted" ) +func Run(ctx context.Context, clientset *kubernetes.Clientset, leaseHolderIdentity, leaseLockNamespace, leaseLockName string) { + lock := &resourcelock.LeaseLock{ + LeaseMeta: metav1.ObjectMeta{ + Name: leaseLockName, + Namespace: leaseLockNamespace, + }, + Client: clientset.CoordinationV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: leaseHolderIdentity, + }, + } + + leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ + Lock: lock, + ReleaseOnCancel: true, + LeaseDuration: 3 * time.Second, + RenewDeadline: 2 * time.Second, + RetryPeriod: 1 * time.Second, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: func(ctx context.Context) { + run(clientset) + }, + OnStoppedLeading: func() { + klog.Infof("stop leading: %s", leaseHolderIdentity) + }, + OnNewLeader: func(identity string) { + if identity != leaseHolderIdentity { + klog.Infof("new leader elected: %s", identity) + } + }, + }, + }) +} + // Inspired by: // - https://www.cncf.io/blog/2019/10/15/extend-kubernetes-via-a-shared-informer/ -func Run(clientset *kubernetes.Clientset) { +func run(clientset *kubernetes.Clientset) { factory := informers.NewSharedInformerFactoryWithOptions(clientset, 0, informers.WithTweakListOptions(podCPUBoosterTweakFunc())) informer := factory.Core().V1().Pods().Informer()