diff --git a/pkg/servicelb/controller.go b/pkg/servicelb/controller.go index eb28b4bf9117..1d7506528f85 100644 --- a/pkg/servicelb/controller.go +++ b/pkg/servicelb/controller.go @@ -33,6 +33,7 @@ import ( ) var ( + finalizerName = "svccontroller." + version.Program + ".cattle.io/daemonset" svcNameLabel = "svccontroller." + version.Program + ".cattle.io/svcname" svcNamespaceLabel = "svccontroller." + version.Program + ".cattle.io/svcnamespace" daemonsetNodeLabel = "svccontroller." + version.Program + ".cattle.io/enablelb" @@ -189,8 +190,8 @@ func (h *handler) onChangeNode(key string, node *core.Node) (*core.Node, error) // updateService ensures that the Service ingress IP address list is in sync // with the Nodes actually running pods for this service. func (h *handler) updateService(svc *core.Service) (runtime.Object, error) { - if !h.enabled || svc.Spec.Type != core.ServiceTypeLoadBalancer { - return svc, nil + if !h.enabled || svc.DeletionTimestamp != nil || svc.Spec.Type != core.ServiceTypeLoadBalancer { + return h.removeFinalizer(svc) } pods, err := h.podCache.List(h.klipperLBNamespace, labels.SelectorFromSet(map[string]string{ @@ -216,6 +217,11 @@ func (h *handler) updateService(svc *core.Service) (runtime.Object, error) { } svc = svc.DeepCopy() + svc, err = h.addFinalizer(svc) + if err != nil { + return svc, err + } + svc.Status.LoadBalancer.Ingress = nil for _, ip := range expectedIPs { svc.Status.LoadBalancer.Ingress = append(svc.Status.LoadBalancer.Ingress, core.LoadBalancerIngress{ @@ -223,7 +229,7 @@ func (h *handler) updateService(svc *core.Service) (runtime.Object, error) { }) } - h.recorder.Eventf(svc, core.EventTypeNormal, "UpdatedIngressIP", "LoadBalancer Ingress IP addresses updated: %s", strings.Join(expectedIPs, ", ")) + defer h.recorder.Eventf(svc, core.EventTypeNormal, "UpdatedIngressIP", "LoadBalancer Ingress IP addresses updated: %s", strings.Join(expectedIPs, ", ")) return h.services.Services(svc.Namespace).UpdateStatus(context.TODO(), svc, meta.UpdateOptions{}) } @@ -358,26 +364,41 @@ func (h *handler) deployPod(svc *core.Service) error { if err := h.deleteOldDeployments(svc); err != nil { return err } - objs := objectset.NewObjectSet() - if !h.enabled || svc.Spec.Type != core.ServiceTypeLoadBalancer || svc.Spec.ClusterIP == "" || svc.Spec.ClusterIP == "None" { - return h.processor.WithOwner(svc).Apply(objs) + + if !h.enabled || svc.DeletionTimestamp != nil || svc.Spec.Type != core.ServiceTypeLoadBalancer || svc.Spec.ClusterIP == "" || svc.Spec.ClusterIP == "None" { + return h.deletePod(svc) } ds, err := h.newDaemonSet(svc) if err != nil { return err } + + objs := objectset.NewObjectSet() if ds != nil { objs.Add(ds) - h.recorder.Eventf(svc, core.EventTypeNormal, "AppliedDaemonSet", "Applied LoadBalancer DaemonSet %s/%s", ds.Namespace, ds.Name) + defer h.recorder.Eventf(svc, core.EventTypeNormal, "AppliedDaemonSet", "Applied LoadBalancer DaemonSet %s/%s", ds.Namespace, ds.Name) } return h.processor.WithOwner(svc).Apply(objs) } +// deletePod ensures that there are no DaemonSets for the given service. +func (h *handler) deletePod(svc *core.Service) error { + name := generateName(svc) + if err := h.daemonsets.DaemonSets(h.klipperLBNamespace).Delete(context.TODO(), name, meta.DeleteOptions{}); err != nil { + if apierrors.IsNotFound(err) { + return nil + } + return err + } + defer h.recorder.Eventf(svc, core.EventTypeNormal, "DeletedDaemonSet", "Deleted LoadBalancer DaemonSet %s/%s", h.klipperLBNamespace, name) + return nil +} + // newDaemonSet creates a DaemonSet to ensure that ServiceLB pods are run on // each eligible node. func (h *handler) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) { - name := fmt.Sprintf("svclb-%s-%s", svc.Name, svc.UID[:8]) + name := generateName(svc) oneInt := intstr.FromInt(1) // If ipv6 is present, we must enable ipv6 forwarding in the manifest @@ -394,6 +415,8 @@ func (h *handler) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) { Namespace: h.klipperLBNamespace, Labels: map[string]string{ nodeSelectorLabel: "false", + svcNameLabel: svc.Name, + svcNamespaceLabel: svc.Namespace, }, }, TypeMeta: meta.TypeMeta{ @@ -562,3 +585,42 @@ func (h *handler) deleteOldDeployments(svc *core.Service) error { } return h.deployments.Deployments(svc.Namespace).Delete(context.TODO(), name, meta.DeleteOptions{}) } + +// addFinalizer ensures that there is a finalizer for this controller on the Service +func (h *handler) addFinalizer(svc *core.Service) (*core.Service, error) { + if !h.hasFinalizer(svc) { + svc.Finalizers = append(svc.Finalizers, finalizerName) + return h.services.Services(svc.Namespace).Update(context.TODO(), svc, meta.UpdateOptions{}) + } + return svc, nil +} + +// removeFinalizer ensures that there is not a finalizer for this controller on the Service +func (h *handler) removeFinalizer(svc *core.Service) (*core.Service, error) { + if !h.hasFinalizer(svc) { + return svc, nil + } + + for k, v := range svc.Finalizers { + if v != finalizerName { + continue + } + svc.Finalizers = append(svc.Finalizers[:k], svc.Finalizers[k+1:]...) + } + return h.services.Services(svc.Namespace).Update(context.TODO(), svc, meta.UpdateOptions{}) +} + +// hasFinalizer returns a boolean indicating whether or not there is a finalizer for this controller on the Service +func (h *handler) hasFinalizer(svc *core.Service) bool { + for _, finalizer := range svc.Finalizers { + if finalizer == finalizerName { + return true + } + } + return false +} + +// generateName generates a distinct name for the DaemonSet based on the service name and UID +func generateName(svc *core.Service) string { + return fmt.Sprintf("svclb-%s-%s", svc.Name, svc.UID[:8]) +}