From 763285d283ae4304731e71cc41bf553e394197a8 Mon Sep 17 00:00:00 2001 From: Fabrizio Pandini Date: Fri, 20 Dec 2024 16:20:08 +0100 Subject: [PATCH] =?UTF-8?q?=F0=9F=8C=B1=20Reconcile=20topology=20only=20wh?= =?UTF-8?q?en=20necessary=20(#11605)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit * Reconcile topology only when necessary * Address comments * Allow resync for the cluster object --- .../topology/cluster/cluster_controller.go | 117 +++++++++++++++++- 1 file changed, 113 insertions(+), 4 deletions(-) diff --git a/internal/controllers/topology/cluster/cluster_controller.go b/internal/controllers/topology/cluster/cluster_controller.go index 0bc91eb609c0..a5cd93bf1b06 100644 --- a/internal/controllers/topology/cluster/cluster_controller.go +++ b/internal/controllers/topology/cluster/cluster_controller.go @@ -19,22 +19,28 @@ package cluster import ( "context" "fmt" + "reflect" "time" "github.com/go-logr/logr" "github.com/pkg/errors" apierrors "k8s.io/apimachinery/pkg/api/errors" + "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" kerrors "k8s.io/apimachinery/pkg/util/errors" "k8s.io/client-go/tools/record" + "k8s.io/klog/v2" "k8s.io/utils/ptr" ctrl "sigs.k8s.io/controller-runtime" "sigs.k8s.io/controller-runtime/pkg/builder" "sigs.k8s.io/controller-runtime/pkg/cache/informertest" "sigs.k8s.io/controller-runtime/pkg/client" + "sigs.k8s.io/controller-runtime/pkg/client/apiutil" "sigs.k8s.io/controller-runtime/pkg/controller" + "sigs.k8s.io/controller-runtime/pkg/event" "sigs.k8s.io/controller-runtime/pkg/handler" "sigs.k8s.io/controller-runtime/pkg/log" + "sigs.k8s.io/controller-runtime/pkg/predicate" "sigs.k8s.io/controller-runtime/pkg/reconcile" clusterv1 "sigs.k8s.io/cluster-api/api/v1beta1" @@ -103,8 +109,11 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt predicateLog := ctrl.LoggerFrom(ctx).WithValues("controller", "topology/cluster") c, err := ctrl.NewControllerManagedBy(mgr). For(&clusterv1.Cluster{}, builder.WithPredicates( - // Only reconcile Cluster with topology. - predicates.ClusterHasTopology(mgr.GetScheme(), predicateLog), + // Only reconcile Cluster with topology and with changes relevant for this controller. + predicates.All(mgr.GetScheme(), predicateLog, + predicates.ClusterHasTopology(mgr.GetScheme(), predicateLog), + clusterChangeIsRelevant(mgr.GetScheme(), predicateLog), + ), )). Named("topology/cluster"). WatchesRawSource(r.ClusterCache.GetClusterSource("topology/cluster", func(_ context.Context, o client.Object) []ctrl.Request { @@ -118,16 +127,17 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt Watches( &clusterv1.MachineDeployment{}, handler.EnqueueRequestsFromMapFunc(r.machineDeploymentToCluster), - // Only trigger Cluster reconciliation if the MachineDeployment is topology owned. + // Only trigger Cluster reconciliation if the MachineDeployment is topology owned, the resource is changed, and the change is relevant. builder.WithPredicates(predicates.All(mgr.GetScheme(), predicateLog, predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog), predicates.ResourceIsTopologyOwned(mgr.GetScheme(), predicateLog), + machineDeploymentChangeIsRelevant(mgr.GetScheme(), predicateLog), )), ). Watches( &expv1.MachinePool{}, handler.EnqueueRequestsFromMapFunc(r.machinePoolToCluster), - // Only trigger Cluster reconciliation if the MachinePool is topology owned. + // Only trigger Cluster reconciliation if the MachinePool is topology owned, the resource is changed. builder.WithPredicates(predicates.All(mgr.GetScheme(), predicateLog, predicates.ResourceIsChanged(mgr.GetScheme(), predicateLog), predicates.ResourceIsTopologyOwned(mgr.GetScheme(), predicateLog), @@ -155,6 +165,105 @@ func (r *Reconciler) SetupWithManager(ctx context.Context, mgr ctrl.Manager, opt return nil } +func clusterChangeIsRelevant(scheme *runtime.Scheme, logger logr.Logger) predicate.Funcs { + dropNotRelevant := func(cluster *clusterv1.Cluster) *clusterv1.Cluster { + c := cluster.DeepCopy() + // Drop metadata fields which are impacted by not relevant changes. + c.ObjectMeta.ManagedFields = nil + c.ObjectMeta.ResourceVersion = "" + + // Drop changes on v1beta2 conditions; when v1beta2 conditions will be moved top level, we will review this + // selectively drop changes not relevant for this controller. + c.Status.V1Beta2 = nil + return c + } + + return predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + log := logger.WithValues("predicate", "ClusterChangeIsRelevant", "eventType", "update") + if gvk, err := apiutil.GVKForObject(e.ObjectOld, scheme); err == nil { + log = log.WithValues(gvk.Kind, klog.KObj(e.ObjectOld)) + } + + if e.ObjectOld.GetResourceVersion() == e.ObjectNew.GetResourceVersion() { + log.V(6).Info("Cluster resync event, allowing further processing") + return true + } + + oldObj, ok := e.ObjectOld.(*clusterv1.Cluster) + if !ok { + log.V(4).Info("Expected Cluster", "type", fmt.Sprintf("%T", e.ObjectOld)) + return false + } + oldObj = dropNotRelevant(oldObj) + + newObj := e.ObjectNew.(*clusterv1.Cluster) + if !ok { + log.V(4).Info("Expected Cluster", "type", fmt.Sprintf("%T", e.ObjectNew)) + return false + } + newObj = dropNotRelevant(newObj) + + if reflect.DeepEqual(oldObj, newObj) { + log.V(6).Info("Cluster does not have relevant changes, blocking further processing") + return false + } + log.V(6).Info("Cluster has relevant changes, allowing further processing") + return true + }, + CreateFunc: func(event.CreateEvent) bool { return true }, + DeleteFunc: func(event.DeleteEvent) bool { return true }, + GenericFunc: func(event.GenericEvent) bool { return true }, + } +} + +func machineDeploymentChangeIsRelevant(scheme *runtime.Scheme, logger logr.Logger) predicate.Funcs { + dropNotRelevant := func(machineDeployment *clusterv1.MachineDeployment) *clusterv1.MachineDeployment { + md := machineDeployment.DeepCopy() + // Drop metadata fields which are impacted by not relevant changes. + md.ObjectMeta.ManagedFields = nil + md.ObjectMeta.ResourceVersion = "" + + // Drop changes on v1beta2 conditions; when v1beta2 conditions will be moved top level, we will review this + // selectively drop changes not relevant for this controller. + md.Status.V1Beta2 = nil + return md + } + + return predicate.Funcs{ + UpdateFunc: func(e event.UpdateEvent) bool { + log := logger.WithValues("predicate", "MachineDeploymentChangeIsRelevant", "eventType", "update") + if gvk, err := apiutil.GVKForObject(e.ObjectOld, scheme); err == nil { + log = log.WithValues(gvk.Kind, klog.KObj(e.ObjectOld)) + } + + oldObj, ok := e.ObjectOld.(*clusterv1.MachineDeployment) + if !ok { + log.V(4).Info("Expected MachineDeployment", "type", fmt.Sprintf("%T", e.ObjectOld)) + return false + } + oldObj = dropNotRelevant(oldObj) + + newObj := e.ObjectNew.(*clusterv1.MachineDeployment) + if !ok { + log.V(4).Info("Expected MachineDeployment", "type", fmt.Sprintf("%T", e.ObjectNew)) + return false + } + newObj = dropNotRelevant(newObj) + + if reflect.DeepEqual(oldObj, newObj) { + log.V(6).Info("MachineDeployment does not have relevant changes, blocking further processing") + return false + } + log.V(6).Info("MachineDeployment has relevant changes, allowing further processing") + return true + }, + CreateFunc: func(event.CreateEvent) bool { return true }, + DeleteFunc: func(event.DeleteEvent) bool { return true }, + GenericFunc: func(event.GenericEvent) bool { return true }, + } +} + // SetupForDryRun prepares the Reconciler for a dry run execution. func (r *Reconciler) SetupForDryRun(recorder record.EventRecorder) { r.desiredStateGenerator = desiredstate.NewGenerator(r.Client, r.ClusterCache, r.RuntimeClient)