From 59bab1382fd01bc0a6f1cbb040a00e817a605e3c Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Fri, 25 Aug 2023 13:03:43 -0400 Subject: [PATCH] remove unused queues (#605) * remove unused queues * optimize dispatch perf * remove cleanup logic * fix tests * fix failing tests --- .../queuejob/queuejob_controller_ex.go | 67 ++++++++----------- 1 file changed, 28 insertions(+), 39 deletions(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 83876ec1..cdf8ab53 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -93,10 +93,10 @@ type XController struct { // QueueJobs that need to be initialized // Add labels and selectors to AppWrapper - initQueue *cache.FIFO + //initQueue *cache.FIFO // QueueJobs that need to sync up after initialization - updateQueue *cache.FIFO + //updateQueue *cache.FIFO // eventQueue that need to sync up eventQueue *cache.FIFO @@ -241,9 +241,9 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) * arbclients: clientset.NewForConfigOrDie(config), eventQueue: cache.NewFIFO(GetQueueJobKey), agentEventQueue: cache.NewFIFO(GetQueueJobKey), - initQueue: cache.NewFIFO(GetQueueJobKey), - updateQueue: cache.NewFIFO(GetQueueJobKey), - qjqueue: NewSchedulingQueue(), + //initQueue: cache.NewFIFO(GetQueueJobKey), + //updateQueue: cache.NewFIFO(GetQueueJobKey), + qjqueue: NewSchedulingQueue(), //cache is turned-off, issue: https://github.com/project-codeflare/multi-cluster-app-dispatcher/issues/588 //cache: clusterstatecache.New(config), schedulingAW: nil, @@ -1262,7 +1262,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { klog.Infof("[ScheduleNext] [Agent Mode] Blocking dispatch for app wrapper '%s/%s' due to quota limits, activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v msg=%s", qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status, msg) //call update etcd here to retrigger AW execution for failed quota - + //TODO: quota management tests fail if this is converted into go-routine, need to inspect why? qjm.backoff(context.Background(), qj, dispatchFailedReason, dispatchFailedMessage) } @@ -1702,31 +1702,12 @@ func (cc *XController) deleteQueueJob(obj interface{}) { klog.Errorf("[Informer-deleteQJ] obj is not AppWrapper. obj=%+v", obj) return } - current_ts := metav1.NewTime(time.Now()) - klog.V(10).Infof("[Informer-deleteQJ] %s *Delay=%.6f seconds before enqueue &qj=%p Version=%s Status=%+v Deletion Timestame=%+v", qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj, qj.ResourceVersion, qj.Status, qj.GetDeletionTimestamp()) - accessor, err := meta.Accessor(qj) - if err != nil { - klog.V(10).Infof("[Informer-deleteQJ] Error obtaining the accessor for AW job: %s", qj.Name) - qj.SetDeletionTimestamp(¤t_ts) - } else { - accessor.SetDeletionTimestamp(¤t_ts) - } - // validate that app wraper has not been marked for deletion by the infomer's delete handler - if qj.DeletionTimestamp != nil { - klog.V(3).Infof("[Informer-deleteQJ] AW job=%s/%s set for deletion.", qj.Namespace, qj.Name) - // cleanup resources for running job, ignoring errors - if err00 := cc.Cleanup(context.Background(), qj); err00 != nil { - klog.Warningf("Failed to cleanup resources for app wrapper '%s/%s', err = %v", qj.Namespace, qj.Name, err00) - } - // empty finalizers and delete the queuejob again - if accessor, err00 := meta.Accessor(qj); err00 == nil { - accessor.SetFinalizers(nil) - } - // we delete the job from the queue if it is there, ignoring errors - cc.qjqueue.Delete(qj) - cc.eventQueue.Delete(qj) - klog.V(3).Infof("[Informer-deleteQJ] AW job=%s/%s deleted.", qj.Namespace, qj.Name) + // we delete the job from the queue if it is there, ignoring errors + if cc.serverOption.QuotaEnabled && cc.quotaManager != nil { + cc.quotaManager.Release(qj) } + cc.qjqueue.Delete(qj) + cc.eventQueue.Delete(qj) } func (cc *XController) enqueue(obj interface{}) error { @@ -1888,19 +1869,27 @@ func (cc *XController) worker() { //if everything passes then CanRun is set to true and AW is ready for dispatch if !queuejob.Status.CanRun && (queuejob.Status.State != arbv1.AppWrapperStateActive) { cc.ScheduleNext(queuejob) - return nil + //When an AW passes ScheduleNext gate then we want to progress AW to Running to begin with + //Sync queuejob will not unwrap an AW to spawn genericItems + if queuejob.Status.CanRun { + + // errs := make(chan error, 1) + // go func() { + // errs <- cc.syncQueueJob(ctx, queuejob) + // }() + + // // later: + // if err := <-errs; err != nil { + // return err + // } + if err := cc.syncQueueJob(ctx, queuejob); err != nil { + // If any error, requeue it. + return err + } - } - //When an AW passes ScheduleNext gate then we want to progress AW to Running to begin with - //Sync queuejob will not unwrap an AW to spawn genericItems - if queuejob.Status.CanRun { - if err := cc.syncQueueJob(ctx, queuejob); err != nil { - // If any error, requeue it. - return err } } - //asmalvan- ends klog.V(10).Infof("[worker] Ending %s Delay=%.6f seconds &newQJ=%p Version=%s Status=%+v", queuejob.Name, time.Now().Sub(queuejob.Status.ControllerFirstTimestamp.Time).Seconds(), queuejob, queuejob.ResourceVersion, queuejob.Status)