Skip to content

Commit

Permalink
1.add more workers for IPPool and App controller
Browse files Browse the repository at this point in the history
2.decrease the requeue delay duration
3.fix application controller error handler

Signed-off-by: Icarus9913 [email protected]
  • Loading branch information
Icarus9913 committed Nov 28, 2022
1 parent 8f46b25 commit 3cf27c9
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 25 deletions.
6 changes: 3 additions & 3 deletions cmd/spiderpool-controller/cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ var envInfo = []envConf{
{"SPIDERPOOL_WORKLOADENDPOINT_MAX_HISTORY_RECORDS", "100", false, nil, nil, &controllerContext.Cfg.WorkloadEndpointMaxHistoryRecords},
{"SPIDERPOOL_IPPOOL_MAX_ALLOCATED_IPS", "5000", false, nil, nil, &controllerContext.Cfg.IPPoolMaxAllocatedIPs},
{"SPIDERPOOL_SUBNET_RESYNC_PERIOD", "300", false, nil, nil, &controllerContext.Cfg.SubnetResyncPeriod},
{"SPIDERPOOL_SUBNET_INFORMER_WORKERS", "3", false, nil, nil, &controllerContext.Cfg.SubnetInformerWorkers},
{"SPIDERPOOL_SUBNET_INFORMER_WORKERS", "5", false, nil, nil, &controllerContext.Cfg.SubnetInformerWorkers},
{"SPIDERPOOL_SUBNET_INFORMER_MAX_WORKQUEUE_LENGTH", "10000", false, nil, nil, &controllerContext.Cfg.SubnetInformerMaxWorkqueueLength},
{"SPIDERPOOL_UPDATE_CR_MAX_RETRIES", "3", false, nil, nil, &controllerContext.Cfg.UpdateCRMaxRetries},
{"SPIDERPOOL_UPDATE_CR_RETRY_UNIT_TIME", "300", false, nil, nil, &controllerContext.Cfg.UpdateCRRetryUnitTime},
Expand All @@ -82,8 +82,8 @@ var envInfo = []envConf{
{"VERSION", "", false, &controllerContext.Cfg.AppVersion, nil, nil},
{"SPIDERPOOL_ENABLE_SUBNET_DELETE_STALE_IPPOOL", "false", true, nil, &controllerContext.Cfg.EnableSubnetDeleteStaleIPPool, nil},
{"SPIDERPOOL_AUTO_IPPOOL_HANDLER_MAX_WORKQUEUE_LENGTH", "10000", true, nil, nil, &controllerContext.Cfg.IPPoolInformerMaxWorkQueueLength},
{"SPIDERPOOL_AUTO_IPPOOL_SCALE_RETRY_DELAY_DURATION", "5", true, nil, nil, &controllerContext.Cfg.WorkQueueRequeueDelayDuration},
{"SPIDERPOOL_IPPOOL_INFORMER_WORKERS", "3", true, nil, nil, &controllerContext.Cfg.IPPoolInformerWorkers},
{"SPIDERPOOL_AUTO_IPPOOL_SCALE_RETRY_DELAY_DURATION", "1", true, nil, nil, &controllerContext.Cfg.WorkQueueRequeueDelayDuration},
{"SPIDERPOOL_IPPOOL_INFORMER_WORKERS", "5", true, nil, nil, &controllerContext.Cfg.IPPoolInformerWorkers},
{"SPIDERPOOL_WORKQUEUE_MAX_RETRIES", "500", true, nil, nil, &controllerContext.Cfg.WorkQueueMaxRetries},
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/ippoolmanager/ippool_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,18 +268,18 @@ func (c *poolInformerController) Run(workers int, stopCh <-chan struct{}) error

for i := 0; i < workers; i++ {
informerLogger.Sugar().Debugf("Starting All IPPool processing worker '%d'", i)
go wait.Until(c.runAllIPPoolWorker, time.Second, stopCh)
go wait.Until(c.runAllIPPoolWorker, 500*time.Millisecond, stopCh)
}

if c.poolMgr.config.EnableSpiderSubnet && enableV4 {
informerLogger.Debug("Staring IPv4 Auto-created IPPool processing worker")
defer c.poolMgr.v4AutoCreatedRateLimitQueue.ShutDown()
go wait.Until(c.runV4AutoCreatePoolWorker, time.Second, stopCh)
go wait.Until(c.runV4AutoCreatePoolWorker, 500*time.Millisecond, stopCh)
}
if c.poolMgr.config.EnableSpiderSubnet && enableV6 {
informerLogger.Debug("Staring IPv6 Auto-created IPPool processing worker")
defer c.poolMgr.v6AutoCreatedRateLimitQueue.ShutDown()
go wait.Until(c.runV6AutoCreatePoolWorker, time.Second, stopCh)
go wait.Until(c.runV6AutoCreatePoolWorker, 500*time.Millisecond, stopCh)
}

informerLogger.Info("IPPool controller workers started")
Expand Down Expand Up @@ -358,14 +358,14 @@ func (c *poolInformerController) processNextWorkItem(workQueue workqueue.RateLim

if apierrors.IsConflict(err) {
workQueue.AddRateLimited(poolName)
log.Sugar().Warnf("encountered update conflict '%v', retrying...", err)
log.Sugar().Warnf("encountered ippool informer update conflict '%v', retrying...", err)
return nil
}

// if we set nonnegative number for the requeue delay duration, we will requeue it. otherwise we will discard it.
if c.poolMgr.config.WorkQueueRequeueDelayDuration >= 0 {
if workQueue.NumRequeues(obj) < c.poolMgr.config.WorkQueueMaxRetries {
log.Sugar().Errorf("encountered error '%v', requeue it after '%v'", err, c.poolMgr.config.WorkQueueRequeueDelayDuration)
log.Sugar().Errorf("encountered ippool informer error '%v', requeue it after '%v'", err, c.poolMgr.config.WorkQueueRequeueDelayDuration)
workQueue.AddAfter(poolName, c.poolMgr.config.WorkQueueRequeueDelayDuration)
return nil
}
Expand Down
8 changes: 1 addition & 7 deletions pkg/ippoolmanager/ippool_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,6 @@ func (im *ipPoolManager) DeleteIPPool(ctx context.Context, pool *spiderpoolv1.Sp
// Notice: we shouldn't get retries in this method and the upper level calling function will requeue the workqueue once we return an error,
func (im *ipPoolManager) ScaleIPPoolWithIPs(ctx context.Context, pool *spiderpoolv1.SpiderIPPool, ipRanges []string, action ippoolmanagertypes.ScaleAction, desiredIPNum int) error {
log := logutils.FromContext(ctx)
rand.Seed(time.Now().UnixNano())

var err error

Expand Down Expand Up @@ -425,11 +424,6 @@ func (im *ipPoolManager) ScaleIPPoolWithIPs(ctx context.Context, pool *spiderpoo
}

func (im *ipPoolManager) UpdateDesiredIPNumber(ctx context.Context, pool *spiderpoolv1.SpiderIPPool, ipNum int) error {
pool, err := im.GetIPPoolByName(ctx, pool.Name)
if nil != err {
return err
}

if pool.Status.AutoDesiredIPCount == nil {
pool.Status.AutoDesiredIPCount = new(int64)
} else {
Expand All @@ -439,7 +433,7 @@ func (im *ipPoolManager) UpdateDesiredIPNumber(ctx context.Context, pool *spider
}

*pool.Status.AutoDesiredIPCount = int64(ipNum)
err = im.client.Status().Update(ctx, pool)
err := im.client.Status().Update(ctx, pool)
if nil != err {
return err
}
Expand Down
44 changes: 35 additions & 9 deletions pkg/subnetmanager/subnet_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,8 @@ func (c *appController) Run(workers int, stopCh <-chan struct{}) error {
}

for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
logger.Sugar().Debugf("Starting application controller processing worker '%d'", i)
go wait.Until(c.runWorker, 500*time.Millisecond, stopCh)
}

logger.Info("application controller workers started")
Expand Down Expand Up @@ -554,15 +555,16 @@ func (c *appController) processNextWorkItem() bool {
// requeue the conflict items
if apierrors.IsConflict(err) {
c.subnetMgr.workQueue.AddRateLimited(obj)
log.Sugar().Warnf("encountered conflict '%v', retrying...", err)
log.Sugar().Warnf("encountered app controller syncHandler conflict '%v', retrying...", err)
return nil
}

// if we set nonnegative number for the requeue delay duration, we will requeue it. otherwise we will discard it.
if c.subnetMgr.config.RequeueDelayDuration >= 0 {
if c.subnetMgr.workQueue.NumRequeues(obj) < c.subnetMgr.config.MaxWorkqueueLength {
log.Sugar().Errorf("encountered app controller syncHandler error '%v', requeue it after '%v'", err, c.subnetMgr.config.RequeueDelayDuration)
c.subnetMgr.workQueue.AddAfter(obj, c.subnetMgr.config.RequeueDelayDuration)
return fmt.Errorf("encountered error '%v', requeue it after '%v'", err, c.subnetMgr.config.RequeueDelayDuration)
return nil
}

log.Warn("out of work queue max retries, drop it")
Expand Down Expand Up @@ -598,7 +600,11 @@ func (c *appController) syncHandler(appKey appWorkQueueKey, log *zap.Logger) (er
switch appKey.AppKind {
case constant.OwnerDeployment:
deployment, err := c.deploymentsLister.Deployments(namespace).Get(name)
if client.IgnoreNotFound(err) != nil {
if nil != err {
if apierrors.IsNotFound(err) {
log.Sugar().Debugf("application in work queue no longer exists")
return nil
}
return err
}

Expand All @@ -609,7 +615,11 @@ func (c *appController) syncHandler(appKey appWorkQueueKey, log *zap.Logger) (er

case constant.OwnerReplicaSet:
replicaSet, err := c.replicaSetLister.ReplicaSets(namespace).Get(name)
if client.IgnoreNotFound(err) != nil {
if nil != err {
if apierrors.IsNotFound(err) {
log.Sugar().Debugf("application in work queue no longer exists")
return nil
}
return err
}

Expand All @@ -620,7 +630,11 @@ func (c *appController) syncHandler(appKey appWorkQueueKey, log *zap.Logger) (er

case constant.OwnerDaemonSet:
daemonSet, err := c.daemonSetLister.DaemonSets(namespace).Get(name)
if client.IgnoreNotFound(err) != nil {
if nil != err {
if apierrors.IsNotFound(err) {
log.Sugar().Debugf("application in work queue no longer exists")
return nil
}
return err
}

Expand All @@ -631,7 +645,11 @@ func (c *appController) syncHandler(appKey appWorkQueueKey, log *zap.Logger) (er

case constant.OwnerStatefulSet:
statefulSet, err := c.statefulSetLister.StatefulSets(namespace).Get(name)
if client.IgnoreNotFound(err) != nil {
if nil != err {
if apierrors.IsNotFound(err) {
log.Sugar().Debugf("application in work queue no longer exists")
return nil
}
return err
}

Expand All @@ -642,7 +660,11 @@ func (c *appController) syncHandler(appKey appWorkQueueKey, log *zap.Logger) (er

case constant.OwnerJob:
job, err := c.jobLister.Jobs(namespace).Get(name)
if client.IgnoreNotFound(err) != nil {
if nil != err {
if apierrors.IsNotFound(err) {
log.Sugar().Debugf("application in work queue no longer exists")
return nil
}
return err
}

Expand All @@ -653,7 +675,11 @@ func (c *appController) syncHandler(appKey appWorkQueueKey, log *zap.Logger) (er

case constant.OwnerCronJob:
cronJob, err := c.cronJobLister.CronJobs(namespace).Get(name)
if client.IgnoreNotFound(err) != nil {
if nil != err {
if apierrors.IsNotFound(err) {
log.Sugar().Debugf("application in work queue no longer exists")
return nil
}
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/subnetmanager/subnet_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (sc *SubnetController) Run(workers int, stopCh <-chan struct{}) error {

informerLogger.Info("Starting workers")
for i := 0; i < workers; i++ {
go wait.Until(sc.runWorker, time.Second, stopCh)
go wait.Until(sc.runWorker, 500*time.Millisecond, stopCh)
}

informerLogger.Info("Started workers")
Expand Down

0 comments on commit 3cf27c9

Please sign in to comment.