Skip to content

Commit

Permalink
Merge pull request #1103 from Icarus9913/feat/wk/improve-subnet
Browse files Browse the repository at this point in the history
improve subnet controller performance
  • Loading branch information
weizhoublue authored Nov 28, 2022
2 parents 8f46b25 + 3cf27c9 commit 1c2f223
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 25 deletions.
6 changes: 3 additions & 3 deletions cmd/spiderpool-controller/cmd/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ var envInfo = []envConf{
{"SPIDERPOOL_WORKLOADENDPOINT_MAX_HISTORY_RECORDS", "100", false, nil, nil, &controllerContext.Cfg.WorkloadEndpointMaxHistoryRecords},
{"SPIDERPOOL_IPPOOL_MAX_ALLOCATED_IPS", "5000", false, nil, nil, &controllerContext.Cfg.IPPoolMaxAllocatedIPs},
{"SPIDERPOOL_SUBNET_RESYNC_PERIOD", "300", false, nil, nil, &controllerContext.Cfg.SubnetResyncPeriod},
{"SPIDERPOOL_SUBNET_INFORMER_WORKERS", "3", false, nil, nil, &controllerContext.Cfg.SubnetInformerWorkers},
{"SPIDERPOOL_SUBNET_INFORMER_WORKERS", "5", false, nil, nil, &controllerContext.Cfg.SubnetInformerWorkers},
{"SPIDERPOOL_SUBNET_INFORMER_MAX_WORKQUEUE_LENGTH", "10000", false, nil, nil, &controllerContext.Cfg.SubnetInformerMaxWorkqueueLength},
{"SPIDERPOOL_UPDATE_CR_MAX_RETRIES", "3", false, nil, nil, &controllerContext.Cfg.UpdateCRMaxRetries},
{"SPIDERPOOL_UPDATE_CR_RETRY_UNIT_TIME", "300", false, nil, nil, &controllerContext.Cfg.UpdateCRRetryUnitTime},
Expand All @@ -82,8 +82,8 @@ var envInfo = []envConf{
{"VERSION", "", false, &controllerContext.Cfg.AppVersion, nil, nil},
{"SPIDERPOOL_ENABLE_SUBNET_DELETE_STALE_IPPOOL", "false", true, nil, &controllerContext.Cfg.EnableSubnetDeleteStaleIPPool, nil},
{"SPIDERPOOL_AUTO_IPPOOL_HANDLER_MAX_WORKQUEUE_LENGTH", "10000", true, nil, nil, &controllerContext.Cfg.IPPoolInformerMaxWorkQueueLength},
{"SPIDERPOOL_AUTO_IPPOOL_SCALE_RETRY_DELAY_DURATION", "5", true, nil, nil, &controllerContext.Cfg.WorkQueueRequeueDelayDuration},
{"SPIDERPOOL_IPPOOL_INFORMER_WORKERS", "3", true, nil, nil, &controllerContext.Cfg.IPPoolInformerWorkers},
{"SPIDERPOOL_AUTO_IPPOOL_SCALE_RETRY_DELAY_DURATION", "1", true, nil, nil, &controllerContext.Cfg.WorkQueueRequeueDelayDuration},
{"SPIDERPOOL_IPPOOL_INFORMER_WORKERS", "5", true, nil, nil, &controllerContext.Cfg.IPPoolInformerWorkers},
{"SPIDERPOOL_WORKQUEUE_MAX_RETRIES", "500", true, nil, nil, &controllerContext.Cfg.WorkQueueMaxRetries},
}

Expand Down
10 changes: 5 additions & 5 deletions pkg/ippoolmanager/ippool_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,18 +268,18 @@ func (c *poolInformerController) Run(workers int, stopCh <-chan struct{}) error

for i := 0; i < workers; i++ {
informerLogger.Sugar().Debugf("Starting All IPPool processing worker '%d'", i)
go wait.Until(c.runAllIPPoolWorker, time.Second, stopCh)
go wait.Until(c.runAllIPPoolWorker, 500*time.Millisecond, stopCh)
}

if c.poolMgr.config.EnableSpiderSubnet && enableV4 {
informerLogger.Debug("Staring IPv4 Auto-created IPPool processing worker")
defer c.poolMgr.v4AutoCreatedRateLimitQueue.ShutDown()
go wait.Until(c.runV4AutoCreatePoolWorker, time.Second, stopCh)
go wait.Until(c.runV4AutoCreatePoolWorker, 500*time.Millisecond, stopCh)
}
if c.poolMgr.config.EnableSpiderSubnet && enableV6 {
informerLogger.Debug("Staring IPv6 Auto-created IPPool processing worker")
defer c.poolMgr.v6AutoCreatedRateLimitQueue.ShutDown()
go wait.Until(c.runV6AutoCreatePoolWorker, time.Second, stopCh)
go wait.Until(c.runV6AutoCreatePoolWorker, 500*time.Millisecond, stopCh)
}

informerLogger.Info("IPPool controller workers started")
Expand Down Expand Up @@ -358,14 +358,14 @@ func (c *poolInformerController) processNextWorkItem(workQueue workqueue.RateLim

if apierrors.IsConflict(err) {
workQueue.AddRateLimited(poolName)
log.Sugar().Warnf("encountered update conflict '%v', retrying...", err)
log.Sugar().Warnf("encountered ippool informer update conflict '%v', retrying...", err)
return nil
}

// if we set nonnegative number for the requeue delay duration, we will requeue it. otherwise we will discard it.
if c.poolMgr.config.WorkQueueRequeueDelayDuration >= 0 {
if workQueue.NumRequeues(obj) < c.poolMgr.config.WorkQueueMaxRetries {
log.Sugar().Errorf("encountered error '%v', requeue it after '%v'", err, c.poolMgr.config.WorkQueueRequeueDelayDuration)
log.Sugar().Errorf("encountered ippool informer error '%v', requeue it after '%v'", err, c.poolMgr.config.WorkQueueRequeueDelayDuration)
workQueue.AddAfter(poolName, c.poolMgr.config.WorkQueueRequeueDelayDuration)
return nil
}
Expand Down
8 changes: 1 addition & 7 deletions pkg/ippoolmanager/ippool_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,6 @@ func (im *ipPoolManager) DeleteIPPool(ctx context.Context, pool *spiderpoolv1.Sp
// Notice: we shouldn't get retries in this method and the upper level calling function will requeue the workqueue once we return an error,
func (im *ipPoolManager) ScaleIPPoolWithIPs(ctx context.Context, pool *spiderpoolv1.SpiderIPPool, ipRanges []string, action ippoolmanagertypes.ScaleAction, desiredIPNum int) error {
log := logutils.FromContext(ctx)
rand.Seed(time.Now().UnixNano())

var err error

Expand Down Expand Up @@ -425,11 +424,6 @@ func (im *ipPoolManager) ScaleIPPoolWithIPs(ctx context.Context, pool *spiderpoo
}

func (im *ipPoolManager) UpdateDesiredIPNumber(ctx context.Context, pool *spiderpoolv1.SpiderIPPool, ipNum int) error {
pool, err := im.GetIPPoolByName(ctx, pool.Name)
if nil != err {
return err
}

if pool.Status.AutoDesiredIPCount == nil {
pool.Status.AutoDesiredIPCount = new(int64)
} else {
Expand All @@ -439,7 +433,7 @@ func (im *ipPoolManager) UpdateDesiredIPNumber(ctx context.Context, pool *spider
}

*pool.Status.AutoDesiredIPCount = int64(ipNum)
err = im.client.Status().Update(ctx, pool)
err := im.client.Status().Update(ctx, pool)
if nil != err {
return err
}
Expand Down
44 changes: 35 additions & 9 deletions pkg/subnetmanager/subnet_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,8 @@ func (c *appController) Run(workers int, stopCh <-chan struct{}) error {
}

for i := 0; i < workers; i++ {
go wait.Until(c.runWorker, time.Second, stopCh)
logger.Sugar().Debugf("Starting application controller processing worker '%d'", i)
go wait.Until(c.runWorker, 500*time.Millisecond, stopCh)
}

logger.Info("application controller workers started")
Expand Down Expand Up @@ -554,15 +555,16 @@ func (c *appController) processNextWorkItem() bool {
// requeue the conflict items
if apierrors.IsConflict(err) {
c.subnetMgr.workQueue.AddRateLimited(obj)
log.Sugar().Warnf("encountered conflict '%v', retrying...", err)
log.Sugar().Warnf("encountered app controller syncHandler conflict '%v', retrying...", err)
return nil
}

// if we set nonnegative number for the requeue delay duration, we will requeue it. otherwise we will discard it.
if c.subnetMgr.config.RequeueDelayDuration >= 0 {
if c.subnetMgr.workQueue.NumRequeues(obj) < c.subnetMgr.config.MaxWorkqueueLength {
log.Sugar().Errorf("encountered app controller syncHandler error '%v', requeue it after '%v'", err, c.subnetMgr.config.RequeueDelayDuration)
c.subnetMgr.workQueue.AddAfter(obj, c.subnetMgr.config.RequeueDelayDuration)
return fmt.Errorf("encountered error '%v', requeue it after '%v'", err, c.subnetMgr.config.RequeueDelayDuration)
return nil
}

log.Warn("out of work queue max retries, drop it")
Expand Down Expand Up @@ -598,7 +600,11 @@ func (c *appController) syncHandler(appKey appWorkQueueKey, log *zap.Logger) (er
switch appKey.AppKind {
case constant.OwnerDeployment:
deployment, err := c.deploymentsLister.Deployments(namespace).Get(name)
if client.IgnoreNotFound(err) != nil {
if nil != err {
if apierrors.IsNotFound(err) {
log.Sugar().Debugf("application in work queue no longer exists")
return nil
}
return err
}

Expand All @@ -609,7 +615,11 @@ func (c *appController) syncHandler(appKey appWorkQueueKey, log *zap.Logger) (er

case constant.OwnerReplicaSet:
replicaSet, err := c.replicaSetLister.ReplicaSets(namespace).Get(name)
if client.IgnoreNotFound(err) != nil {
if nil != err {
if apierrors.IsNotFound(err) {
log.Sugar().Debugf("application in work queue no longer exists")
return nil
}
return err
}

Expand All @@ -620,7 +630,11 @@ func (c *appController) syncHandler(appKey appWorkQueueKey, log *zap.Logger) (er

case constant.OwnerDaemonSet:
daemonSet, err := c.daemonSetLister.DaemonSets(namespace).Get(name)
if client.IgnoreNotFound(err) != nil {
if nil != err {
if apierrors.IsNotFound(err) {
log.Sugar().Debugf("application in work queue no longer exists")
return nil
}
return err
}

Expand All @@ -631,7 +645,11 @@ func (c *appController) syncHandler(appKey appWorkQueueKey, log *zap.Logger) (er

case constant.OwnerStatefulSet:
statefulSet, err := c.statefulSetLister.StatefulSets(namespace).Get(name)
if client.IgnoreNotFound(err) != nil {
if nil != err {
if apierrors.IsNotFound(err) {
log.Sugar().Debugf("application in work queue no longer exists")
return nil
}
return err
}

Expand All @@ -642,7 +660,11 @@ func (c *appController) syncHandler(appKey appWorkQueueKey, log *zap.Logger) (er

case constant.OwnerJob:
job, err := c.jobLister.Jobs(namespace).Get(name)
if client.IgnoreNotFound(err) != nil {
if nil != err {
if apierrors.IsNotFound(err) {
log.Sugar().Debugf("application in work queue no longer exists")
return nil
}
return err
}

Expand All @@ -653,7 +675,11 @@ func (c *appController) syncHandler(appKey appWorkQueueKey, log *zap.Logger) (er

case constant.OwnerCronJob:
cronJob, err := c.cronJobLister.CronJobs(namespace).Get(name)
if client.IgnoreNotFound(err) != nil {
if nil != err {
if apierrors.IsNotFound(err) {
log.Sugar().Debugf("application in work queue no longer exists")
return nil
}
return err
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/subnetmanager/subnet_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ func (sc *SubnetController) Run(workers int, stopCh <-chan struct{}) error {

informerLogger.Info("Starting workers")
for i := 0; i < workers; i++ {
go wait.Until(sc.runWorker, time.Second, stopCh)
go wait.Until(sc.runWorker, 500*time.Millisecond, stopCh)
}

informerLogger.Info("Started workers")
Expand Down

0 comments on commit 1c2f223

Please sign in to comment.