From b88904f9cf07708ee6ab679e2d60de2e6954a3b5 Mon Sep 17 00:00:00 2001 From: bob Date: Fri, 20 Jan 2023 18:43:31 +0100 Subject: [PATCH] Fix change to import and support sidecar cherry-pick. datadog:patch --- .github/workflows/dd-build.yml | 2 +- pkg/kubelet/kubelet.go | 34 +- .../kuberuntime/kuberuntime_manager.go | 308 +++++++++++++++++- pkg/kubelet/status/status_manager.go | 3 +- test/e2e/node/pods.go | 8 +- 5 files changed, 332 insertions(+), 23 deletions(-) diff --git a/.github/workflows/dd-build.yml b/.github/workflows/dd-build.yml index a3523871aaf3a..4d8478bbfce77 100644 --- a/.github/workflows/dd-build.yml +++ b/.github/workflows/dd-build.yml @@ -18,7 +18,7 @@ jobs: - name: Set up Go uses: actions/setup-go@v2 with: - go-version: 1.15 + go-version: 1.21 - name: Build env: KUBE_BUILD_PLATFORMS: ${{ matrix.platform }} diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index c7b74922307ca..8d81782cae35f 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -34,12 +34,12 @@ import ( cadvisorapi "github.com/google/cadvisor/info/v1" "github.com/google/go-cmp/cmp" libcontaineruserns "github.com/opencontainers/runc/libcontainer/userns" - "github.com/opencontainers/selinux/go-selinux" + selinux "github.com/opencontainers/selinux/go-selinux" "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/trace" "k8s.io/client-go/informers" - "k8s.io/mount-utils" + mount "k8s.io/mount-utils" "k8s.io/utils/integer" netutils "k8s.io/utils/net" @@ -63,7 +63,7 @@ import ( "k8s.io/component-helpers/apimachinery/lease" internalapi "k8s.io/cri-api/pkg/apis" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" - "k8s.io/klog/v2" + klog "k8s.io/klog/v2" pluginwatcherapi "k8s.io/kubelet/pkg/apis/pluginregistration/v1" statsapi "k8s.io/kubelet/pkg/apis/stats/v1alpha1" podutil "k8s.io/kubernetes/pkg/api/v1/pod" @@ -2640,6 +2640,18 @@ func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) { // to the pod manager. kl.podManager.UpdatePod(pod) + pod, mirrorPod, wasMirror := kl.podManager.GetPodAndMirrorPod(pod) + if wasMirror { + if pod == nil { + klog.V(2).InfoS("Unable to find pod for mirror pod, skipping", "mirrorPod", klog.KObj(mirrorPod), "mirrorPodUID", mirrorPod.UID) + continue + } + // Static pods should be reconciled the same way as regular pods + } + + // TODO: reconcile being calculated in the config manager is questionable, and avoiding + // extra syncs may no longer be necessary. Reevaluate whether Reconcile and Sync can be + // merged (after resolving the next two TODOs). sidecarsStatus := status.GetSidecarsStatus(pod) klog.Infof("Pod: %s, status: Present=%v,Ready=%v,ContainersWaiting=%v", format.Pod(pod), sidecarsStatus.SidecarsPresent, sidecarsStatus.SidecarsReady, sidecarsStatus.ContainersWaiting) @@ -2648,14 +2660,22 @@ func (kl *Kubelet) HandlePodReconcile(pods []*v1.Pod) { // be different than Sync, or if there is a better place for it. For instance, we have // needsReconcile in kubelet/config, here, and in status_manager. if status.NeedToReconcilePodReadiness(pod) { - mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) - kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start) + kl.podWorkers.UpdatePod(UpdatePodOptions{ + Pod: pod, + MirrorPod: mirrorPod, + UpdateType: kubetypes.SyncPodSync, + StartTime: start, + }) } else if sidecarsStatus.ContainersWaiting { // if containers aren't running and the sidecars are all ready trigger a sync so that the containers get started if sidecarsStatus.SidecarsPresent && sidecarsStatus.SidecarsReady { klog.Infof("Pod: %s: sidecars: sidecars are ready, dispatching work", format.Pod(pod)) - mirrorPod, _ := kl.podManager.GetMirrorPodByPod(pod) - kl.dispatchWork(pod, kubetypes.SyncPodSync, mirrorPod, start) + kl.podWorkers.UpdatePod(UpdatePodOptions{ + Pod: pod, + MirrorPod: mirrorPod, + UpdateType: kubetypes.SyncPodSync, + StartTime: start, + }) } } diff --git a/pkg/kubelet/kuberuntime/kuberuntime_manager.go b/pkg/kubelet/kuberuntime/kuberuntime_manager.go index 06457ffde91c1..431ac77888736 100644 --- a/pkg/kubelet/kuberuntime/kuberuntime_manager.go +++ b/pkg/kubelet/kuberuntime/kuberuntime_manager.go @@ -29,7 +29,7 @@ import ( "github.com/google/go-cmp/cmp" "go.opentelemetry.io/otel/trace" crierror "k8s.io/cri-api/pkg/errors" - "k8s.io/klog/v2" + klog "k8s.io/klog/v2" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" @@ -549,6 +549,285 @@ func isSidecar(pod *v1.Pod, containerName string) bool { return pod.Annotations[fmt.Sprintf("sidecars.lyft.net/container-lifecycle-%s", containerName)] == "Sidecar" } +func isInPlacePodVerticalScalingAllowed(pod *v1.Pod) bool { + if !utilfeature.DefaultFeatureGate.Enabled(features.InPlacePodVerticalScaling) { + return false + } + if types.IsStaticPod(pod) { + return false + } + return true +} + +func (m *kubeGenericRuntimeManager) computePodResizeAction(pod *v1.Pod, containerIdx int, kubeContainerStatus *kubecontainer.Status, changes *podActions) bool { + container := pod.Spec.Containers[containerIdx] + if container.Resources.Limits == nil || len(pod.Status.ContainerStatuses) == 0 { + return true + } + + // Determine if the *running* container needs resource update by comparing v1.Spec.Resources (desired) + // with v1.Status.Resources / runtime.Status.Resources (last known actual). + // Proceed only when kubelet has accepted the resize a.k.a v1.Spec.Resources.Requests == v1.Status.AllocatedResources. + // Skip if runtime containerID doesn't match pod.Status containerID (container is restarting) + apiContainerStatus, exists := podutil.GetContainerStatus(pod.Status.ContainerStatuses, container.Name) + if !exists || apiContainerStatus.State.Running == nil || apiContainerStatus.Resources == nil || + kubeContainerStatus.State != kubecontainer.ContainerStateRunning || + kubeContainerStatus.ID.String() != apiContainerStatus.ContainerID || + !cmp.Equal(container.Resources.Requests, apiContainerStatus.AllocatedResources) { + return true + } + + desiredMemoryLimit := container.Resources.Limits.Memory().Value() + desiredCPULimit := container.Resources.Limits.Cpu().MilliValue() + desiredCPURequest := container.Resources.Requests.Cpu().MilliValue() + currentMemoryLimit := apiContainerStatus.Resources.Limits.Memory().Value() + currentCPULimit := apiContainerStatus.Resources.Limits.Cpu().MilliValue() + currentCPURequest := apiContainerStatus.Resources.Requests.Cpu().MilliValue() + // Runtime container status resources (from CRI), if set, supercedes v1(api) container status resrouces. + if kubeContainerStatus.Resources != nil { + if kubeContainerStatus.Resources.MemoryLimit != nil { + currentMemoryLimit = kubeContainerStatus.Resources.MemoryLimit.Value() + } + if kubeContainerStatus.Resources.CPULimit != nil { + currentCPULimit = kubeContainerStatus.Resources.CPULimit.MilliValue() + } + if kubeContainerStatus.Resources.CPURequest != nil { + currentCPURequest = kubeContainerStatus.Resources.CPURequest.MilliValue() + } + } + + // Note: cgroup doesn't support memory request today, so we don't compare that. If canAdmitPod called during + // handlePodResourcesResize finds 'fit', then desiredMemoryRequest == currentMemoryRequest. + if desiredMemoryLimit == currentMemoryLimit && desiredCPULimit == currentCPULimit && desiredCPURequest == currentCPURequest { + return true + } + + desiredResources := containerResources{ + memoryLimit: desiredMemoryLimit, + memoryRequest: apiContainerStatus.AllocatedResources.Memory().Value(), + cpuLimit: desiredCPULimit, + cpuRequest: desiredCPURequest, + } + currentResources := containerResources{ + memoryLimit: currentMemoryLimit, + memoryRequest: apiContainerStatus.Resources.Requests.Memory().Value(), + cpuLimit: currentCPULimit, + cpuRequest: currentCPURequest, + } + + resizePolicy := make(map[v1.ResourceName]v1.ResourceResizeRestartPolicy) + for _, pol := range container.ResizePolicy { + resizePolicy[pol.ResourceName] = pol.RestartPolicy + } + determineContainerResize := func(rName v1.ResourceName, specValue, statusValue int64) (resize, restart bool) { + if specValue == statusValue { + return false, false + } + if resizePolicy[rName] == v1.RestartContainer { + return true, true + } + return true, false + } + markContainerForUpdate := func(rName v1.ResourceName, specValue, statusValue int64) { + cUpdateInfo := containerToUpdateInfo{ + apiContainerIdx: containerIdx, + kubeContainerID: kubeContainerStatus.ID, + desiredContainerResources: desiredResources, + currentContainerResources: ¤tResources, + } + // Order the container updates such that resource decreases are applied before increases + switch { + case specValue > statusValue: // append + changes.ContainersToUpdate[rName] = append(changes.ContainersToUpdate[rName], cUpdateInfo) + case specValue < statusValue: // prepend + changes.ContainersToUpdate[rName] = append(changes.ContainersToUpdate[rName], containerToUpdateInfo{}) + copy(changes.ContainersToUpdate[rName][1:], changes.ContainersToUpdate[rName]) + changes.ContainersToUpdate[rName][0] = cUpdateInfo + } + } + resizeMemLim, restartMemLim := determineContainerResize(v1.ResourceMemory, desiredMemoryLimit, currentMemoryLimit) + resizeCPULim, restartCPULim := determineContainerResize(v1.ResourceCPU, desiredCPULimit, currentCPULimit) + resizeCPUReq, restartCPUReq := determineContainerResize(v1.ResourceCPU, desiredCPURequest, currentCPURequest) + if restartCPULim || restartCPUReq || restartMemLim { + // resize policy requires this container to restart + changes.ContainersToKill[kubeContainerStatus.ID] = containerToKillInfo{ + name: kubeContainerStatus.Name, + container: &pod.Spec.Containers[containerIdx], + message: fmt.Sprintf("Container %s resize requires restart", container.Name), + } + changes.ContainersToStart = append(changes.ContainersToStart, containerIdx) + changes.UpdatePodResources = true + return false + } else { + if resizeMemLim { + markContainerForUpdate(v1.ResourceMemory, desiredMemoryLimit, currentMemoryLimit) + } + if resizeCPULim { + markContainerForUpdate(v1.ResourceCPU, desiredCPULimit, currentCPULimit) + } else if resizeCPUReq { + markContainerForUpdate(v1.ResourceCPU, desiredCPURequest, currentCPURequest) + } + } + return true +} + +func (m *kubeGenericRuntimeManager) doPodResizeAction(pod *v1.Pod, podStatus *kubecontainer.PodStatus, podContainerChanges podActions, result kubecontainer.PodSyncResult) { + pcm := m.containerManager.NewPodContainerManager() + //TODO(vinaykul,InPlacePodVerticalScaling): Figure out best way to get enforceMemoryQoS value (parameter #4 below) in platform-agnostic way + podResources := cm.ResourceConfigForPod(pod, m.cpuCFSQuota, uint64((m.cpuCFSQuotaPeriod.Duration)/time.Microsecond), false) + if podResources == nil { + klog.ErrorS(nil, "Unable to get resource configuration", "pod", pod.Name) + result.Fail(fmt.Errorf("Unable to get resource configuration processing resize for pod %s", pod.Name)) + return + } + setPodCgroupConfig := func(rName v1.ResourceName, setLimitValue bool) error { + var err error + switch rName { + case v1.ResourceCPU: + podCpuResources := &cm.ResourceConfig{CPUPeriod: podResources.CPUPeriod} + if setLimitValue == true { + podCpuResources.CPUQuota = podResources.CPUQuota + } else { + podCpuResources.CPUShares = podResources.CPUShares + } + err = pcm.SetPodCgroupConfig(pod, rName, podCpuResources) + case v1.ResourceMemory: + err = pcm.SetPodCgroupConfig(pod, rName, podResources) + } + if err != nil { + klog.ErrorS(err, "Failed to set cgroup config", "resource", rName, "pod", pod.Name) + } + return err + } + // Memory and CPU are updated separately because memory resizes may be ordered differently than CPU resizes. + // If resize results in net pod resource increase, set pod cgroup config before resizing containers. + // If resize results in net pod resource decrease, set pod cgroup config after resizing containers. + // If an error occurs at any point, abort. Let future syncpod iterations retry the unfinished stuff. + resizeContainers := func(rName v1.ResourceName, currPodCgLimValue, newPodCgLimValue, currPodCgReqValue, newPodCgReqValue int64) error { + var err error + if newPodCgLimValue > currPodCgLimValue { + if err = setPodCgroupConfig(rName, true); err != nil { + return err + } + } + if newPodCgReqValue > currPodCgReqValue { + if err = setPodCgroupConfig(rName, false); err != nil { + return err + } + } + if len(podContainerChanges.ContainersToUpdate[rName]) > 0 { + if err = m.updatePodContainerResources(pod, rName, podContainerChanges.ContainersToUpdate[rName]); err != nil { + klog.ErrorS(err, "updatePodContainerResources failed", "pod", format.Pod(pod), "resource", rName) + return err + } + } + if newPodCgLimValue < currPodCgLimValue { + err = setPodCgroupConfig(rName, true) + } + if newPodCgReqValue < currPodCgReqValue { + if err = setPodCgroupConfig(rName, false); err != nil { + return err + } + } + return err + } + if len(podContainerChanges.ContainersToUpdate[v1.ResourceMemory]) > 0 || podContainerChanges.UpdatePodResources { + if podResources.Memory == nil { + klog.ErrorS(nil, "podResources.Memory is nil", "pod", pod.Name) + result.Fail(fmt.Errorf("podResources.Memory is nil for pod %s", pod.Name)) + return + } + currentPodMemoryConfig, err := pcm.GetPodCgroupConfig(pod, v1.ResourceMemory) + if err != nil { + klog.ErrorS(err, "GetPodCgroupConfig for memory failed", "pod", pod.Name) + result.Fail(err) + return + } + currentPodMemoryUsage, err := pcm.GetPodCgroupMemoryUsage(pod) + if err != nil { + klog.ErrorS(err, "GetPodCgroupMemoryUsage failed", "pod", pod.Name) + result.Fail(err) + return + } + if currentPodMemoryUsage >= uint64(*podResources.Memory) { + klog.ErrorS(nil, "Aborting attempt to set pod memory limit less than current memory usage", "pod", pod.Name) + result.Fail(fmt.Errorf("Aborting attempt to set pod memory limit less than current memory usage for pod %s", pod.Name)) + return + } + if errResize := resizeContainers(v1.ResourceMemory, int64(*currentPodMemoryConfig.Memory), *podResources.Memory, 0, 0); errResize != nil { + result.Fail(errResize) + return + } + } + if len(podContainerChanges.ContainersToUpdate[v1.ResourceCPU]) > 0 || podContainerChanges.UpdatePodResources { + if podResources.CPUQuota == nil || podResources.CPUShares == nil { + klog.ErrorS(nil, "podResources.CPUQuota or podResources.CPUShares is nil", "pod", pod.Name) + result.Fail(fmt.Errorf("podResources.CPUQuota or podResources.CPUShares is nil for pod %s", pod.Name)) + return + } + currentPodCpuConfig, err := pcm.GetPodCgroupConfig(pod, v1.ResourceCPU) + if err != nil { + klog.ErrorS(err, "GetPodCgroupConfig for CPU failed", "pod", pod.Name) + result.Fail(err) + return + } + if errResize := resizeContainers(v1.ResourceCPU, *currentPodCpuConfig.CPUQuota, *podResources.CPUQuota, + int64(*currentPodCpuConfig.CPUShares), int64(*podResources.CPUShares)); errResize != nil { + result.Fail(errResize) + return + } + } +} + +func (m *kubeGenericRuntimeManager) updatePodContainerResources(pod *v1.Pod, resourceName v1.ResourceName, containersToUpdate []containerToUpdateInfo) error { + klog.V(5).InfoS("Updating container resources", "pod", klog.KObj(pod)) + + for _, cInfo := range containersToUpdate { + container := pod.Spec.Containers[cInfo.apiContainerIdx].DeepCopy() + // If updating memory limit, use most recently configured CPU request and limit values. + // If updating CPU request and limit, use most recently configured memory request and limit values. + switch resourceName { + case v1.ResourceMemory: + container.Resources.Limits = v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(cInfo.currentContainerResources.cpuLimit, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(cInfo.desiredContainerResources.memoryLimit, resource.BinarySI), + } + container.Resources.Requests = v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(cInfo.currentContainerResources.cpuRequest, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(cInfo.desiredContainerResources.memoryRequest, resource.BinarySI), + } + case v1.ResourceCPU: + container.Resources.Limits = v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(cInfo.desiredContainerResources.cpuLimit, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(cInfo.currentContainerResources.memoryLimit, resource.BinarySI), + } + container.Resources.Requests = v1.ResourceList{ + v1.ResourceCPU: *resource.NewMilliQuantity(cInfo.desiredContainerResources.cpuRequest, resource.DecimalSI), + v1.ResourceMemory: *resource.NewQuantity(cInfo.currentContainerResources.memoryRequest, resource.BinarySI), + } + } + if err := m.updateContainerResources(pod, container, cInfo.kubeContainerID); err != nil { + // Log error and abort as container updates need to succeed in the order determined by computePodResizeAction. + // The recovery path is for SyncPod to keep retrying at later times until it succeeds. + klog.ErrorS(err, "updateContainerResources failed", "container", container.Name, "cID", cInfo.kubeContainerID, + "pod", format.Pod(pod), "resourceName", resourceName) + return err + } + // If UpdateContainerResources is error-free, it means desired values for 'resourceName' was accepted by runtime. + // So we update currentContainerResources for 'resourceName', which is our view of most recently configured resources. + // Note: We can't rely on GetPodStatus as runtime may lag in actuating the resource values it just accepted. + switch resourceName { + case v1.ResourceMemory: + cInfo.currentContainerResources.memoryLimit = cInfo.desiredContainerResources.memoryLimit + cInfo.currentContainerResources.memoryRequest = cInfo.desiredContainerResources.memoryRequest + case v1.ResourceCPU: + cInfo.currentContainerResources.cpuLimit = cInfo.desiredContainerResources.cpuLimit + cInfo.currentContainerResources.cpuRequest = cInfo.desiredContainerResources.cpuRequest + } + } + return nil +} + // computePodActions checks whether the pod spec has changed and returns the changes if true. func (m *kubeGenericRuntimeManager) computePodActions(ctx context.Context, pod *v1.Pod, podStatus *kubecontainer.PodStatus) podActions { klog.V(5).InfoS("Syncing Pod", "pod", klog.KObj(pod)) @@ -589,6 +868,24 @@ func (m *kubeGenericRuntimeManager) computePodActions(ctx context.Context, pod * changes.CreateSandbox = false return changes } + + // Get the containers to start, excluding the ones that succeeded if RestartPolicy is OnFailure. + var containersToStart []int + for idx, c := range pod.Spec.Containers { + if pod.Spec.RestartPolicy == v1.RestartPolicyOnFailure && containerSucceeded(&c, podStatus) { + continue + } + containersToStart = append(containersToStart, idx) + } + // We should not create a sandbox for a Pod if initialization is done and there is no container to start. + if len(containersToStart) == 0 { + _, _, done := findNextInitContainerToRun(pod, podStatus) + if done { + changes.CreateSandbox = false + return changes + } + } + if len(pod.Spec.InitContainers) != 0 { // Pod has init containers, return the first one. if !utilfeature.DefaultFeatureGate.Enabled(features.SidecarContainers) { @@ -607,14 +904,7 @@ func (m *kubeGenericRuntimeManager) computePodActions(ctx context.Context, pod * } return changes } - // Start all containers by default but exclude the ones that - // succeeded if RestartPolicy is OnFailure - for idx, c := range pod.Spec.Containers { - if containerSucceeded(&c, podStatus) && pod.Spec.RestartPolicy == v1.RestartPolicyOnFailure { - continue - } - changes.ContainersToStart = append(changes.ContainersToStart, idx) - } + changes.ContainersToStart = containersToStart return changes } diff --git a/pkg/kubelet/status/status_manager.go b/pkg/kubelet/status/status_manager.go index 7084f68d5e6d4..fc531150ff8fd 100644 --- a/pkg/kubelet/status/status_manager.go +++ b/pkg/kubelet/status/status_manager.go @@ -35,13 +35,14 @@ import ( "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" utilfeature "k8s.io/apiserver/pkg/util/feature" - "k8s.io/klog/v2" + klog "k8s.io/klog/v2" podutil "k8s.io/kubernetes/pkg/api/v1/pod" "k8s.io/kubernetes/pkg/features" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" "k8s.io/kubernetes/pkg/kubelet/metrics" "k8s.io/kubernetes/pkg/kubelet/status/state" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" + kubeutil "k8s.io/kubernetes/pkg/kubelet/util" "k8s.io/kubernetes/pkg/kubelet/util/format" statusutil "k8s.io/kubernetes/pkg/util/pod" ) diff --git a/test/e2e/node/pods.go b/test/e2e/node/pods.go index b934d0aa9c216..6a93c9b76fe18 100644 --- a/test/e2e/node/pods.go +++ b/test/e2e/node/pods.go @@ -29,6 +29,9 @@ import ( "sync" "time" + ginkgo "github.com/onsi/ginkgo/v2" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/common/expfmt" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/resource" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" @@ -42,11 +45,6 @@ import ( e2epod "k8s.io/kubernetes/test/e2e/framework/pod" imageutils "k8s.io/kubernetes/test/utils/image" admissionapi "k8s.io/pod-security-admission/api" - utilpointer "k8s.io/utils/pointer" - - "github.com/onsi/ginkgo/v2" - "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/common/expfmt" ) var _ = SIGDescribe("Pods Extended", func() {