Skip to content

Commit

Permalink
remove pod informer (#601)
Browse files Browse the repository at this point in the history
* remove pod informer

* remove unneeded interface
  • Loading branch information
asm582 authored Aug 23, 2023
1 parent ccd2acf commit 44696cf
Show file tree
Hide file tree
Showing 6 changed files with 126 additions and 444 deletions.
87 changes: 63 additions & 24 deletions pkg/controller/queuejob/queuejob_controller_ex.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,7 @@ import (

v1 "k8s.io/api/core/v1"

"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejobresources"
"github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejobresources/genericresource"
respod "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/queuejobresources/pod"
"k8s.io/apimachinery/pkg/labels"

arbv1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1"
Expand All @@ -79,9 +77,9 @@ type XController struct {

appwrapperInformer arbinformers.AppWrapperInformer
// resources registered for the AppWrapper
qjobRegisteredResources queuejobresources.RegisteredResources
//qjobRegisteredResources queuejobresources.RegisteredResources
// controllers for these resources
qjobResControls map[arbv1.ResourceType]queuejobresources.Interface
//qjobResControls map[arbv1.ResourceType]queuejobresources.Interface

// Captures all available resources in the cluster
genericresources *genericresource.GenericResources
Expand Down Expand Up @@ -140,9 +138,9 @@ type JobAndClusterAgent struct {
}

// RegisterAllQueueJobResourceTypes - registers all resources
func RegisterAllQueueJobResourceTypes(regs *queuejobresources.RegisteredResources) {
respod.Register(regs)
}
// func RegisterAllQueueJobResourceTypes(regs *queuejobresources.RegisteredResources) {
// respod.Register(regs)
// }

func GetQueueJobKey(obj interface{}) (string, error) {
qj, ok := obj.(*arbv1.AppWrapper)
Expand All @@ -153,6 +151,47 @@ func GetQueueJobKey(obj interface{}) (string, error) {
return fmt.Sprintf("%s/%s", qj.Namespace, qj.Name), nil
}

//UpdateQueueJobStatus was part of pod informer, this is now a method of queuejob_controller file.
//This change is done in an effort to simplify the controller and enable to move to controller runtime.
func (qjm *XController) UpdateQueueJobStatus(queuejob *arbv1.AppWrapper) error {

labelSelector := fmt.Sprintf("%s=%s", "appwrapper.mcad.ibm.com", queuejob.Name)
pods, errt := qjm.clients.CoreV1().Pods("").List(context.TODO(), metav1.ListOptions{LabelSelector: labelSelector})
if errt != nil {
return errt
}

running := int32(FilterPods(pods.Items, v1.PodRunning))
podPhases := []v1.PodPhase{v1.PodRunning, v1.PodSucceeded}
totalResourcesConsumedForPodPhases := clusterstateapi.EmptyResource()
for _, phase := range podPhases {
totalResourcesConsumedForPodPhases.Add(GetPodResourcesByPhase(phase, pods.Items))
}
pending := int32(FilterPods(pods.Items, v1.PodPending))
succeeded := int32(FilterPods(pods.Items, v1.PodSucceeded))
failed := int32(FilterPods(pods.Items, v1.PodFailed))
podsConditionMap := PendingPodsFailedSchd(pods.Items)
klog.V(10).Infof("[UpdateQueueJobStatus] There are %d pods of AppWrapper %s: pending %d, running %d, succeeded %d, failed %d, pendingpodsfailedschd %d, total resource consumed %v",
len(pods.Items), queuejob.Name, pending, running, succeeded, failed, len(podsConditionMap), totalResourcesConsumedForPodPhases)

queuejob.Status.Pending = pending
queuejob.Status.Running = running
queuejob.Status.Succeeded = succeeded
queuejob.Status.Failed = failed
// Total resources by all running pods
queuejob.Status.TotalGPU = int32(totalResourcesConsumedForPodPhases.GPU)
queuejob.Status.TotalCPU = int32(totalResourcesConsumedForPodPhases.MilliCPU)
queuejob.Status.TotalMemory = int32(totalResourcesConsumedForPodPhases.Memory)

queuejob.Status.PendingPodConditions = nil
for podName, cond := range podsConditionMap {
podCond := GeneratePodFailedCondition(podName, cond)
queuejob.Status.PendingPodConditions = append(queuejob.Status.PendingPodConditions, podCond)
}

return nil
}

//allocatableCapacity calculates the capacity available on each node by substracting resources
//consumed by existing pods.
//For a large cluster with thousands of nodes and hundreds of thousands of pods this
Expand Down Expand Up @@ -217,20 +256,20 @@ func NewJobController(config *rest.Config, serverOption *options.ServerOption) *

cc.genericresources = genericresource.NewAppWrapperGenericResource(config)

cc.qjobResControls = map[arbv1.ResourceType]queuejobresources.Interface{}
RegisterAllQueueJobResourceTypes(&cc.qjobRegisteredResources)
//cc.qjobResControls = map[arbv1.ResourceType]queuejobresources.Interface{}
//RegisterAllQueueJobResourceTypes(&cc.qjobRegisteredResources)

// initialize pod sub-resource control
resControlPod, found, err := cc.qjobRegisteredResources.InitQueueJobResource(arbv1.ResourceTypePod, config)
if err != nil {
klog.Errorf("fail to create queuejob resource control")
return nil
}
if !found {
klog.Errorf("queuejob resource type Pod not found")
return nil
}
cc.qjobResControls[arbv1.ResourceTypePod] = resControlPod
// resControlPod, found, err := cc.qjobRegisteredResources.InitQueueJobResource(arbv1.ResourceTypePod, config)
// if err != nil {
// klog.Errorf("fail to create queuejob resource control")
// return nil
// }
// if !found {
// klog.Errorf("queuejob resource type Pod not found")
// return nil
// }
// cc.qjobResControls[arbv1.ResourceTypePod] = resControlPod

appWrapperClient, err := clientset.NewForConfig(cc.config)
if err != nil {
Expand Down Expand Up @@ -816,7 +855,7 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust

}

err := qjm.qjobResControls[arbv1.ResourceTypePod].UpdateQueueJobStatus(value)
err := qjm.UpdateQueueJobStatus(value)
if err != nil {
klog.Warningf("[getAggAvaiResPri] Error updating pod status counts for AppWrapper job: %s, err=%+v", value.Name, err)
}
Expand All @@ -843,7 +882,7 @@ func (qjm *XController) getAggregatedAvailableResourcesPriority(unallocatedClust
klog.V(10).Infof("[getAggAvaiResPri] Subtract all resources %+v in genericItem=%T for job %s which can-run is set to: %v but state is still pending.", qjv, genericItem, value.Name, value.Status.CanRun)
}

err := qjm.qjobResControls[arbv1.ResourceTypePod].UpdateQueueJobStatus(value)
err := qjm.UpdateQueueJobStatus(value)
if err != nil {
klog.Warningf("[getAggAvaiResPri] Error updating pod status counts for AppWrapper job: %s, err=%+v", value.Name, err)
}
Expand Down Expand Up @@ -1458,7 +1497,7 @@ func (qjm *XController) backoff(ctx context.Context, q *arbv1.AppWrapper, reason
func (cc *XController) Run(stopCh <-chan struct{}) {
go cc.appwrapperInformer.Informer().Run(stopCh)

go cc.qjobResControls[arbv1.ResourceTypePod].Run(stopCh)
//go cc.qjobResControls[arbv1.ResourceTypePod].Run(stopCh)

cache.WaitForCacheSync(stopCh, cc.appWrapperSynced)

Expand Down Expand Up @@ -1508,7 +1547,7 @@ func (qjm *XController) UpdateQueueJobs() {
}
}
if (newjob.Status.State == arbv1.AppWrapperStateActive || newjob.Status.State == arbv1.AppWrapperStateRunningHoldCompletion) && containsCompletionStatus {
err := qjm.qjobResControls[arbv1.ResourceTypePod].UpdateQueueJobStatus(newjob)
err := qjm.UpdateQueueJobStatus(newjob)
if err != nil {
klog.Errorf("[UpdateQueueJobs] Error updating pod status counts for AppWrapper job: %s, err=%+v", newjob.Name, err)
continue
Expand Down Expand Up @@ -1911,7 +1950,7 @@ func (cc *XController) syncQueueJob(ctx context.Context, qj *arbv1.AppWrapper) e
awNew := qj.DeepCopy()
// we call sync to update pods running, pending,...
if qj.Status.State == arbv1.AppWrapperStateActive {
err := cc.qjobResControls[arbv1.ResourceTypePod].UpdateQueueJobStatus(awNew)
err := cc.UpdateQueueJobStatus(awNew)
if err != nil {
klog.Errorf("[syncQueueJob] Error updating pod status counts for AppWrapper job: %s, err=%+v", qj.Name, err)
return err
Expand Down
63 changes: 63 additions & 0 deletions pkg/controller/queuejob/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,14 @@ limitations under the License.
package queuejob

import (
"strings"

corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"

arbv1 "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/apis/controller/v1beta1"
clusterstateapi "github.com/project-codeflare/multi-cluster-app-dispatcher/pkg/controller/clusterstate/api"
)

func GetXQJFullName(qj *arbv1.AppWrapper) string {
Expand Down Expand Up @@ -77,3 +81,62 @@ func getIndexOfMatchedCondition(aw *arbv1.AppWrapper, condType arbv1.AppWrapperC
}
return index
}

// PendingPodsFailedSchd checks if pods pending have failed scheduling
func PendingPodsFailedSchd(pods []v1.Pod) map[string][]v1.PodCondition {
var podCondition = make(map[string][]v1.PodCondition)
for i := range pods {
if pods[i].Status.Phase == v1.PodPending {
for _, cond := range pods[i].Status.Conditions {
// Hack: ignore pending pods due to co-scheduler FailedScheduling event
// this exists until coscheduler performance issue is resolved.
if cond.Type == v1.PodScheduled && cond.Status == v1.ConditionFalse && cond.Reason == v1.PodReasonUnschedulable {
if strings.Contains(cond.Message, "pgName") && strings.Contains(cond.Message, "last") && strings.Contains(cond.Message, "failed") && strings.Contains(cond.Message, "deny") {
// ignore co-scheduled pending pods for coscheduler version:0.22.6
continue
} else if strings.Contains(cond.Message, "optimistic") && strings.Contains(cond.Message, "rejection") && strings.Contains(cond.Message, "PostFilter") ||
strings.Contains(cond.Message, "cannot") && strings.Contains(cond.Message, "find") && strings.Contains(cond.Message, "enough") && strings.Contains(cond.Message, "sibling") {
// ignore co-scheduled pending pods for coscheduler version:0.23.10
continue
} else {
podName := pods[i].Name
podCondition[podName] = append(podCondition[podName], *cond.DeepCopy())
}
}
}
}
}
return podCondition
}

// filterPods returns pods based on their phase.
func FilterPods(pods []v1.Pod, phase v1.PodPhase) int {
result := 0
for i := range pods {
if phase == pods[i].Status.Phase {
result++
}
}
return result
}

//GetPodResourcesByPhase returns pods based on their phase.
func GetPodResourcesByPhase(phase v1.PodPhase, pods []v1.Pod) *clusterstateapi.Resource {
req := clusterstateapi.EmptyResource()
for i := range pods {
if pods[i].Status.Phase == phase {
for _, c := range pods[i].Spec.Containers {
req.Add(clusterstateapi.NewResource(c.Resources.Requests))
}
}
}
return req
}

//GeneratePodFailedCondition returns condition of a AppWrapper condition.
func GeneratePodFailedCondition(podName string, podCondition []v1.PodCondition) arbv1.PendingPodSpec {
return arbv1.PendingPodSpec{
PodName: podName,
Conditions: podCondition,
}
}
29 changes: 0 additions & 29 deletions pkg/controller/queuejobresources/interfaces.go

This file was deleted.

Loading

0 comments on commit 44696cf

Please sign in to comment.