From 3cf27c9be5a4a39e75e01fa0780e6e8fdc721e18 Mon Sep 17 00:00:00 2001 From: Icarus9913 Date: Mon, 28 Nov 2022 15:22:19 +0800 Subject: [PATCH] 1.add more workers for IPPool and App controller 2.decrease the requeue delay duration 3.fix application controller error handler Signed-off-by: Icarus9913 icaruswu66@qq.com --- cmd/spiderpool-controller/cmd/config.go | 6 ++-- pkg/ippoolmanager/ippool_informer.go | 10 +++--- pkg/ippoolmanager/ippool_manager.go | 8 +---- pkg/subnetmanager/subnet_controller.go | 44 ++++++++++++++++++++----- pkg/subnetmanager/subnet_informer.go | 2 +- 5 files changed, 45 insertions(+), 25 deletions(-) diff --git a/cmd/spiderpool-controller/cmd/config.go b/cmd/spiderpool-controller/cmd/config.go index 3dd58d7533..6d28a1d98e 100644 --- a/cmd/spiderpool-controller/cmd/config.go +++ b/cmd/spiderpool-controller/cmd/config.go @@ -56,7 +56,7 @@ var envInfo = []envConf{ {"SPIDERPOOL_WORKLOADENDPOINT_MAX_HISTORY_RECORDS", "100", false, nil, nil, &controllerContext.Cfg.WorkloadEndpointMaxHistoryRecords}, {"SPIDERPOOL_IPPOOL_MAX_ALLOCATED_IPS", "5000", false, nil, nil, &controllerContext.Cfg.IPPoolMaxAllocatedIPs}, {"SPIDERPOOL_SUBNET_RESYNC_PERIOD", "300", false, nil, nil, &controllerContext.Cfg.SubnetResyncPeriod}, - {"SPIDERPOOL_SUBNET_INFORMER_WORKERS", "3", false, nil, nil, &controllerContext.Cfg.SubnetInformerWorkers}, + {"SPIDERPOOL_SUBNET_INFORMER_WORKERS", "5", false, nil, nil, &controllerContext.Cfg.SubnetInformerWorkers}, {"SPIDERPOOL_SUBNET_INFORMER_MAX_WORKQUEUE_LENGTH", "10000", false, nil, nil, &controllerContext.Cfg.SubnetInformerMaxWorkqueueLength}, {"SPIDERPOOL_UPDATE_CR_MAX_RETRIES", "3", false, nil, nil, &controllerContext.Cfg.UpdateCRMaxRetries}, {"SPIDERPOOL_UPDATE_CR_RETRY_UNIT_TIME", "300", false, nil, nil, &controllerContext.Cfg.UpdateCRRetryUnitTime}, @@ -82,8 +82,8 @@ var envInfo = []envConf{ {"VERSION", "", false, &controllerContext.Cfg.AppVersion, nil, nil}, {"SPIDERPOOL_ENABLE_SUBNET_DELETE_STALE_IPPOOL", "false", true, nil, &controllerContext.Cfg.EnableSubnetDeleteStaleIPPool, nil}, {"SPIDERPOOL_AUTO_IPPOOL_HANDLER_MAX_WORKQUEUE_LENGTH", "10000", true, nil, nil, &controllerContext.Cfg.IPPoolInformerMaxWorkQueueLength}, - {"SPIDERPOOL_AUTO_IPPOOL_SCALE_RETRY_DELAY_DURATION", "5", true, nil, nil, &controllerContext.Cfg.WorkQueueRequeueDelayDuration}, - {"SPIDERPOOL_IPPOOL_INFORMER_WORKERS", "3", true, nil, nil, &controllerContext.Cfg.IPPoolInformerWorkers}, + {"SPIDERPOOL_AUTO_IPPOOL_SCALE_RETRY_DELAY_DURATION", "1", true, nil, nil, &controllerContext.Cfg.WorkQueueRequeueDelayDuration}, + {"SPIDERPOOL_IPPOOL_INFORMER_WORKERS", "5", true, nil, nil, &controllerContext.Cfg.IPPoolInformerWorkers}, {"SPIDERPOOL_WORKQUEUE_MAX_RETRIES", "500", true, nil, nil, &controllerContext.Cfg.WorkQueueMaxRetries}, } diff --git a/pkg/ippoolmanager/ippool_informer.go b/pkg/ippoolmanager/ippool_informer.go index ebcb1e26ec..977f1c2fd8 100644 --- a/pkg/ippoolmanager/ippool_informer.go +++ b/pkg/ippoolmanager/ippool_informer.go @@ -268,18 +268,18 @@ func (c *poolInformerController) Run(workers int, stopCh <-chan struct{}) error for i := 0; i < workers; i++ { informerLogger.Sugar().Debugf("Starting All IPPool processing worker '%d'", i) - go wait.Until(c.runAllIPPoolWorker, time.Second, stopCh) + go wait.Until(c.runAllIPPoolWorker, 500*time.Millisecond, stopCh) } if c.poolMgr.config.EnableSpiderSubnet && enableV4 { informerLogger.Debug("Staring IPv4 Auto-created IPPool processing worker") defer c.poolMgr.v4AutoCreatedRateLimitQueue.ShutDown() - go wait.Until(c.runV4AutoCreatePoolWorker, time.Second, stopCh) + go wait.Until(c.runV4AutoCreatePoolWorker, 500*time.Millisecond, stopCh) } if c.poolMgr.config.EnableSpiderSubnet && enableV6 { informerLogger.Debug("Staring IPv6 Auto-created IPPool processing worker") defer c.poolMgr.v6AutoCreatedRateLimitQueue.ShutDown() - go wait.Until(c.runV6AutoCreatePoolWorker, time.Second, stopCh) + go wait.Until(c.runV6AutoCreatePoolWorker, 500*time.Millisecond, stopCh) } informerLogger.Info("IPPool controller workers started") @@ -358,14 +358,14 @@ func (c *poolInformerController) processNextWorkItem(workQueue workqueue.RateLim if apierrors.IsConflict(err) { workQueue.AddRateLimited(poolName) - log.Sugar().Warnf("encountered update conflict '%v', retrying...", err) + log.Sugar().Warnf("encountered ippool informer update conflict '%v', retrying...", err) return nil } // if we set nonnegative number for the requeue delay duration, we will requeue it. otherwise we will discard it. if c.poolMgr.config.WorkQueueRequeueDelayDuration >= 0 { if workQueue.NumRequeues(obj) < c.poolMgr.config.WorkQueueMaxRetries { - log.Sugar().Errorf("encountered error '%v', requeue it after '%v'", err, c.poolMgr.config.WorkQueueRequeueDelayDuration) + log.Sugar().Errorf("encountered ippool informer error '%v', requeue it after '%v'", err, c.poolMgr.config.WorkQueueRequeueDelayDuration) workQueue.AddAfter(poolName, c.poolMgr.config.WorkQueueRequeueDelayDuration) return nil } diff --git a/pkg/ippoolmanager/ippool_manager.go b/pkg/ippoolmanager/ippool_manager.go index e4bcc1731c..a1d0a72f12 100644 --- a/pkg/ippoolmanager/ippool_manager.go +++ b/pkg/ippoolmanager/ippool_manager.go @@ -369,7 +369,6 @@ func (im *ipPoolManager) DeleteIPPool(ctx context.Context, pool *spiderpoolv1.Sp // Notice: we shouldn't get retries in this method and the upper level calling function will requeue the workqueue once we return an error, func (im *ipPoolManager) ScaleIPPoolWithIPs(ctx context.Context, pool *spiderpoolv1.SpiderIPPool, ipRanges []string, action ippoolmanagertypes.ScaleAction, desiredIPNum int) error { log := logutils.FromContext(ctx) - rand.Seed(time.Now().UnixNano()) var err error @@ -425,11 +424,6 @@ func (im *ipPoolManager) ScaleIPPoolWithIPs(ctx context.Context, pool *spiderpoo } func (im *ipPoolManager) UpdateDesiredIPNumber(ctx context.Context, pool *spiderpoolv1.SpiderIPPool, ipNum int) error { - pool, err := im.GetIPPoolByName(ctx, pool.Name) - if nil != err { - return err - } - if pool.Status.AutoDesiredIPCount == nil { pool.Status.AutoDesiredIPCount = new(int64) } else { @@ -439,7 +433,7 @@ func (im *ipPoolManager) UpdateDesiredIPNumber(ctx context.Context, pool *spider } *pool.Status.AutoDesiredIPCount = int64(ipNum) - err = im.client.Status().Update(ctx, pool) + err := im.client.Status().Update(ctx, pool) if nil != err { return err } diff --git a/pkg/subnetmanager/subnet_controller.go b/pkg/subnetmanager/subnet_controller.go index 65a078e9fa..77b9cf5da5 100644 --- a/pkg/subnetmanager/subnet_controller.go +++ b/pkg/subnetmanager/subnet_controller.go @@ -506,7 +506,8 @@ func (c *appController) Run(workers int, stopCh <-chan struct{}) error { } for i := 0; i < workers; i++ { - go wait.Until(c.runWorker, time.Second, stopCh) + logger.Sugar().Debugf("Starting application controller processing worker '%d'", i) + go wait.Until(c.runWorker, 500*time.Millisecond, stopCh) } logger.Info("application controller workers started") @@ -554,15 +555,16 @@ func (c *appController) processNextWorkItem() bool { // requeue the conflict items if apierrors.IsConflict(err) { c.subnetMgr.workQueue.AddRateLimited(obj) - log.Sugar().Warnf("encountered conflict '%v', retrying...", err) + log.Sugar().Warnf("encountered app controller syncHandler conflict '%v', retrying...", err) return nil } // if we set nonnegative number for the requeue delay duration, we will requeue it. otherwise we will discard it. if c.subnetMgr.config.RequeueDelayDuration >= 0 { if c.subnetMgr.workQueue.NumRequeues(obj) < c.subnetMgr.config.MaxWorkqueueLength { + log.Sugar().Errorf("encountered app controller syncHandler error '%v', requeue it after '%v'", err, c.subnetMgr.config.RequeueDelayDuration) c.subnetMgr.workQueue.AddAfter(obj, c.subnetMgr.config.RequeueDelayDuration) - return fmt.Errorf("encountered error '%v', requeue it after '%v'", err, c.subnetMgr.config.RequeueDelayDuration) + return nil } log.Warn("out of work queue max retries, drop it") @@ -598,7 +600,11 @@ func (c *appController) syncHandler(appKey appWorkQueueKey, log *zap.Logger) (er switch appKey.AppKind { case constant.OwnerDeployment: deployment, err := c.deploymentsLister.Deployments(namespace).Get(name) - if client.IgnoreNotFound(err) != nil { + if nil != err { + if apierrors.IsNotFound(err) { + log.Sugar().Debugf("application in work queue no longer exists") + return nil + } return err } @@ -609,7 +615,11 @@ func (c *appController) syncHandler(appKey appWorkQueueKey, log *zap.Logger) (er case constant.OwnerReplicaSet: replicaSet, err := c.replicaSetLister.ReplicaSets(namespace).Get(name) - if client.IgnoreNotFound(err) != nil { + if nil != err { + if apierrors.IsNotFound(err) { + log.Sugar().Debugf("application in work queue no longer exists") + return nil + } return err } @@ -620,7 +630,11 @@ func (c *appController) syncHandler(appKey appWorkQueueKey, log *zap.Logger) (er case constant.OwnerDaemonSet: daemonSet, err := c.daemonSetLister.DaemonSets(namespace).Get(name) - if client.IgnoreNotFound(err) != nil { + if nil != err { + if apierrors.IsNotFound(err) { + log.Sugar().Debugf("application in work queue no longer exists") + return nil + } return err } @@ -631,7 +645,11 @@ func (c *appController) syncHandler(appKey appWorkQueueKey, log *zap.Logger) (er case constant.OwnerStatefulSet: statefulSet, err := c.statefulSetLister.StatefulSets(namespace).Get(name) - if client.IgnoreNotFound(err) != nil { + if nil != err { + if apierrors.IsNotFound(err) { + log.Sugar().Debugf("application in work queue no longer exists") + return nil + } return err } @@ -642,7 +660,11 @@ func (c *appController) syncHandler(appKey appWorkQueueKey, log *zap.Logger) (er case constant.OwnerJob: job, err := c.jobLister.Jobs(namespace).Get(name) - if client.IgnoreNotFound(err) != nil { + if nil != err { + if apierrors.IsNotFound(err) { + log.Sugar().Debugf("application in work queue no longer exists") + return nil + } return err } @@ -653,7 +675,11 @@ func (c *appController) syncHandler(appKey appWorkQueueKey, log *zap.Logger) (er case constant.OwnerCronJob: cronJob, err := c.cronJobLister.CronJobs(namespace).Get(name) - if client.IgnoreNotFound(err) != nil { + if nil != err { + if apierrors.IsNotFound(err) { + log.Sugar().Debugf("application in work queue no longer exists") + return nil + } return err } diff --git a/pkg/subnetmanager/subnet_informer.go b/pkg/subnetmanager/subnet_informer.go index 6f5aacd142..eb6e4fe779 100644 --- a/pkg/subnetmanager/subnet_informer.go +++ b/pkg/subnetmanager/subnet_informer.go @@ -206,7 +206,7 @@ func (sc *SubnetController) Run(workers int, stopCh <-chan struct{}) error { informerLogger.Info("Starting workers") for i := 0; i < workers; i++ { - go wait.Until(sc.runWorker, time.Second, stopCh) + go wait.Until(sc.runWorker, 500*time.Millisecond, stopCh) } informerLogger.Info("Started workers")