From af47c5ca5601a358cc7d281589eda2718a8f9525 Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Thu, 24 Aug 2023 11:34:32 -0400 Subject: [PATCH 1/5] remove unused queues --- pkg/controller/queuejob/queuejob_controller_ex.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 83876ec1..1703fb18 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -93,10 +93,10 @@ type XController struct { // QueueJobs that need to be initialized // Add labels and selectors to AppWrapper - initQueue *cache.FIFO + //initQueue *cache.FIFO // QueueJobs that need to sync up after initialization - updateQueue *cache.FIFO + //updateQueue *cache.FIFO // eventQueue that need to sync up eventQueue *cache.FIFO @@ -241,9 +241,9 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) * arbclients: clientset.NewForConfigOrDie(config), eventQueue: cache.NewFIFO(GetQueueJobKey), agentEventQueue: cache.NewFIFO(GetQueueJobKey), - initQueue: cache.NewFIFO(GetQueueJobKey), - updateQueue: cache.NewFIFO(GetQueueJobKey), - qjqueue: NewSchedulingQueue(), + //initQueue: cache.NewFIFO(GetQueueJobKey), + //updateQueue: cache.NewFIFO(GetQueueJobKey), + qjqueue: NewSchedulingQueue(), //cache is turned-off, issue: https://github.com/project-codeflare/multi-cluster-app-dispatcher/issues/588 //cache: clusterstatecache.New(config), schedulingAW: nil, From e82838a5927930775613693c04f128e9256677bb Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Thu, 24 Aug 2023 14:14:43 -0400 Subject: [PATCH 2/5] optimize dispatch perf --- .../queuejob/queuejob_controller_ex.go | 32 ++++++++++++------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 1703fb18..7304a0f8 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -1702,6 +1702,9 @@ func (cc *XController) deleteQueueJob(obj interface{}) { klog.Errorf("[Informer-deleteQJ] obj is not AppWrapper. obj=%+v", obj) return } + // we delete the job from the queue if it is there, ignoring errors + cc.qjqueue.Delete(qj) + cc.eventQueue.Delete(qj) current_ts := metav1.NewTime(time.Now()) klog.V(10).Infof("[Informer-deleteQJ] %s *Delay=%.6f seconds before enqueue &qj=%p Version=%s Status=%+v Deletion Timestame=%+v", qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj, qj.ResourceVersion, qj.Status, qj.GetDeletionTimestamp()) accessor, err := meta.Accessor(qj) @@ -1722,9 +1725,6 @@ func (cc *XController) deleteQueueJob(obj interface{}) { if accessor, err00 := meta.Accessor(qj); err00 == nil { accessor.SetFinalizers(nil) } - // we delete the job from the queue if it is there, ignoring errors - cc.qjqueue.Delete(qj) - cc.eventQueue.Delete(qj) klog.V(3).Infof("[Informer-deleteQJ] AW job=%s/%s deleted.", qj.Namespace, qj.Name) } } @@ -1888,19 +1888,27 @@ func (cc *XController) worker() { //if everything passes then CanRun is set to true and AW is ready for dispatch if !queuejob.Status.CanRun && (queuejob.Status.State != arbv1.AppWrapperStateActive) { cc.ScheduleNext(queuejob) - return nil + //When an AW passes ScheduleNext gate then we want to progress AW to Running to begin with + //Sync queuejob will not unwrap an AW to spawn genericItems + if queuejob.Status.CanRun { + + // errs := make(chan error, 1) + // go func() { + // errs <- cc.syncQueueJob(ctx, queuejob) + // }() + + // // later: + // if err := <-errs; err != nil { + // return err + // } + if err := cc.syncQueueJob(ctx, queuejob); err != nil { + // If any error, requeue it. + return err + } - } - //When an AW passes ScheduleNext gate then we want to progress AW to Running to begin with - //Sync queuejob will not unwrap an AW to spawn genericItems - if queuejob.Status.CanRun { - if err := cc.syncQueueJob(ctx, queuejob); err != nil { - // If any error, requeue it. - return err } } - //asmalvan- ends klog.V(10).Infof("[worker] Ending %s Delay=%.6f seconds &newQJ=%p Version=%s Status=%+v", queuejob.Name, time.Now().Sub(queuejob.Status.ControllerFirstTimestamp.Time).Seconds(), queuejob, queuejob.ResourceVersion, queuejob.Status) From acc58fcd36fa4412259ccbb073f0cdfdfc5a3689 Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Thu, 24 Aug 2023 18:38:59 -0400 Subject: [PATCH 3/5] remove cleanup logic --- .../queuejob/queuejob_controller_ex.go | 22 ------------------- 1 file changed, 22 deletions(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 7304a0f8..9c190e88 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -1705,28 +1705,6 @@ func (cc *XController) deleteQueueJob(obj interface{}) { // we delete the job from the queue if it is there, ignoring errors cc.qjqueue.Delete(qj) cc.eventQueue.Delete(qj) - current_ts := metav1.NewTime(time.Now()) - klog.V(10).Infof("[Informer-deleteQJ] %s *Delay=%.6f seconds before enqueue &qj=%p Version=%s Status=%+v Deletion Timestame=%+v", qj.Name, time.Now().Sub(qj.Status.ControllerFirstTimestamp.Time).Seconds(), qj, qj.ResourceVersion, qj.Status, qj.GetDeletionTimestamp()) - accessor, err := meta.Accessor(qj) - if err != nil { - klog.V(10).Infof("[Informer-deleteQJ] Error obtaining the accessor for AW job: %s", qj.Name) - qj.SetDeletionTimestamp(¤t_ts) - } else { - accessor.SetDeletionTimestamp(¤t_ts) - } - // validate that app wraper has not been marked for deletion by the infomer's delete handler - if qj.DeletionTimestamp != nil { - klog.V(3).Infof("[Informer-deleteQJ] AW job=%s/%s set for deletion.", qj.Namespace, qj.Name) - // cleanup resources for running job, ignoring errors - if err00 := cc.Cleanup(context.Background(), qj); err00 != nil { - klog.Warningf("Failed to cleanup resources for app wrapper '%s/%s', err = %v", qj.Namespace, qj.Name, err00) - } - // empty finalizers and delete the queuejob again - if accessor, err00 := meta.Accessor(qj); err00 == nil { - accessor.SetFinalizers(nil) - } - klog.V(3).Infof("[Informer-deleteQJ] AW job=%s/%s deleted.", qj.Namespace, qj.Name) - } } func (cc *XController) enqueue(obj interface{}) error { From f96374a821f56bd6ff3d1c91dc007fe265a56987 Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Thu, 24 Aug 2023 21:34:17 -0400 Subject: [PATCH 4/5] fix tests --- pkg/controller/queuejob/queuejob_controller_ex.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 9c190e88..674949b4 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -1262,7 +1262,7 @@ func (qjm *XController) ScheduleNext(qj *arbv1.AppWrapper) { klog.Infof("[ScheduleNext] [Agent Mode] Blocking dispatch for app wrapper '%s/%s' due to quota limits, activeQ=%t Unsched=%t &qj=%p Version=%s Status=%+v msg=%s", qj.Namespace, qj.Name, time.Now().Sub(HOLStartTime), qjm.qjqueue.IfExistActiveQ(qj), qjm.qjqueue.IfExistUnschedulableQ(qj), qj, qj.ResourceVersion, qj.Status, msg) //call update etcd here to retrigger AW execution for failed quota - + //TODO: quota management tests fail if this is converted into go-routine, need to inspect why? qjm.backoff(context.Background(), qj, dispatchFailedReason, dispatchFailedMessage) } @@ -1703,6 +1703,7 @@ func (cc *XController) deleteQueueJob(obj interface{}) { return } // we delete the job from the queue if it is there, ignoring errors + cc.quotaManager.Release(qj) cc.qjqueue.Delete(qj) cc.eventQueue.Delete(qj) } From 32df8646a4f578c452ad0c051eb930fe321fd9c7 Mon Sep 17 00:00:00 2001 From: Abhishek Malvankar Date: Thu, 24 Aug 2023 23:37:20 -0400 Subject: [PATCH 5/5] fix failing tests --- pkg/controller/queuejob/queuejob_controller_ex.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/pkg/controller/queuejob/queuejob_controller_ex.go b/pkg/controller/queuejob/queuejob_controller_ex.go index 674949b4..cdf8ab53 100644 --- a/pkg/controller/queuejob/queuejob_controller_ex.go +++ b/pkg/controller/queuejob/queuejob_controller_ex.go @@ -1703,7 +1703,9 @@ func (cc *XController) deleteQueueJob(obj interface{}) { return } // we delete the job from the queue if it is there, ignoring errors - cc.quotaManager.Release(qj) + if cc.serverOption.QuotaEnabled && cc.quotaManager != nil { + cc.quotaManager.Release(qj) + } cc.qjqueue.Delete(qj) cc.eventQueue.Delete(qj) }