Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V1.32.0+k3s1 #11478

Merged
merged 3 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Dockerfile.dapper
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ARG GOLANG=golang:1.22.9-alpine3.20
ARG GOLANG=golang:1.23.3-alpine3.20
FROM ${GOLANG}

# Set proxy environment variables
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.local
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ARG GOLANG=golang:1.22.9-alpine3.19
ARG GOLANG=golang:1.23.3-alpine3.20
FROM ${GOLANG} AS infra

ARG http_proxy
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.manifest
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ARG GOLANG=golang:1.22.9-alpine3.20
ARG GOLANG=golang:1.23.3-alpine3.20
FROM ${GOLANG}

COPY --from=plugins/manifest:1.2.3 /bin/* /bin/
Expand Down
2 changes: 1 addition & 1 deletion Dockerfile.test
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
ARG GOLANG=golang:1.22.9-alpine3.20
ARG GOLANG=golang:1.23.3-alpine3.20
FROM ${GOLANG} AS test-base

RUN apk -U --no-cache add bash jq
Expand Down
190 changes: 95 additions & 95 deletions go.mod

Large diffs are not rendered by default.

292 changes: 148 additions & 144 deletions go.sum

Large diffs are not rendered by default.

8 changes: 8 additions & 0 deletions manifests/rolebindings.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,14 @@ rules:
- nodes
verbs:
- get
- list
- watch
- apiGroups:
- ""
resources:
- nodes/status
verbs:
- patch
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This matches the RBAC from upstream flannel: https://github.com/flannel-io/flannel/blob/master/chart/kube-flannel/templates/rbac.yaml#L6-L21

I'm not super stoked on all agents being able to patch each others status, but it seems like this is how flannel works at the moment.

- apiGroups:
- ""
resources:
Expand Down
29 changes: 22 additions & 7 deletions pkg/agent/flannel/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,12 @@ import (
goruntime "runtime"
"strings"

"github.com/k3s-io/k3s/pkg/agent/util"
agentutil "github.com/k3s-io/k3s/pkg/agent/util"
"github.com/k3s-io/k3s/pkg/daemons/config"
"github.com/k3s-io/k3s/pkg/util"
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
authorizationv1 "k8s.io/api/authorization/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
Expand Down Expand Up @@ -64,9 +66,22 @@ func Prepare(ctx context.Context, nodeConfig *config.Node) error {
return createFlannelConf(nodeConfig)
}

func Run(ctx context.Context, nodeConfig *config.Node, nodes typedcorev1.NodeInterface) error {
func Run(ctx context.Context, nodeConfig *config.Node) error {
logrus.Infof("Starting flannel with backend %s", nodeConfig.FlannelBackend)
if err := waitForPodCIDR(ctx, nodeConfig.AgentConfig.NodeName, nodes); err != nil {

if err := util.WaitForRBACReady(ctx, nodeConfig.AgentConfig.KubeConfigK3sController, util.DefaultAPIServerReadyTimeout, authorizationv1.ResourceAttributes{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This avoids having flannel spam the log with errors while we wait for the RBAC manifest to be applied.

Verb: "list",
Resource: "nodes",
}, ""); err != nil {
return errors.Wrap(err, "flannel failed to wait for RBAC")
}

coreClient, err := util.GetClientSet(nodeConfig.AgentConfig.KubeConfigK3sController)
if err != nil {
return err
}

if err := waitForPodCIDR(ctx, nodeConfig.AgentConfig.NodeName, coreClient.CoreV1().Nodes()); err != nil {
return errors.Wrap(err, "flannel failed to wait for PodCIDR assignment")
}

Expand All @@ -75,7 +90,7 @@ func Run(ctx context.Context, nodeConfig *config.Node, nodes typedcorev1.NodeInt
return errors.Wrap(err, "failed to check netMode for flannel")
}
go func() {
err := flannel(ctx, nodeConfig.FlannelIface, nodeConfig.FlannelConfFile, nodeConfig.AgentConfig.KubeConfigKubelet, nodeConfig.FlannelIPv6Masq, netMode)
err := flannel(ctx, nodeConfig.FlannelIface, nodeConfig.FlannelConfFile, nodeConfig.AgentConfig.KubeConfigK3sController, nodeConfig.FlannelIPv6Masq, netMode)
if err != nil && !errors.Is(err, context.Canceled) {
logrus.Errorf("flannel exited: %v", err)
os.Exit(1)
Expand Down Expand Up @@ -123,7 +138,7 @@ func createCNIConf(dir string, nodeConfig *config.Node) error {

if nodeConfig.AgentConfig.FlannelCniConfFile != "" {
logrus.Debugf("Using %s as the flannel CNI conf", nodeConfig.AgentConfig.FlannelCniConfFile)
return util.CopyFile(nodeConfig.AgentConfig.FlannelCniConfFile, p, false)
return agentutil.CopyFile(nodeConfig.AgentConfig.FlannelCniConfFile, p, false)
}

cniConfJSON := cniConf
Expand All @@ -138,7 +153,7 @@ func createCNIConf(dir string, nodeConfig *config.Node) error {
cniConfJSON = strings.ReplaceAll(cniConfJSON, "%SERVICE_CIDR%", nodeConfig.AgentConfig.ServiceCIDR.String())
}

return util.WriteFile(p, cniConfJSON)
return agentutil.WriteFile(p, cniConfJSON)
}

func createFlannelConf(nodeConfig *config.Node) error {
Expand Down Expand Up @@ -235,7 +250,7 @@ func createFlannelConf(nodeConfig *config.Node) error {
confJSON = strings.ReplaceAll(confJSON, "%backend%", backendConf)

logrus.Debugf("The flannel configuration is %s", confJSON)
return util.WriteFile(nodeConfig.FlannelConfFile, confJSON)
return agentutil.WriteFile(nodeConfig.FlannelConfFile, confJSON)
}

// fundNetMode returns the mode (ipv4, ipv6 or dual-stack) in which flannel is operating
Expand Down
7 changes: 4 additions & 3 deletions pkg/agent/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,17 +177,18 @@ func run(ctx context.Context, cfg cmds.Agent, proxy proxy.Proxy) error {
return errors.Wrap(err, "failed to wait for apiserver ready")
}

coreClient, err := util.GetClientSet(nodeConfig.AgentConfig.KubeConfigKubelet)
// Use the kubelet kubeconfig to update annotations on the local node
kubeletClient, err := util.GetClientSet(nodeConfig.AgentConfig.KubeConfigKubelet)
if err != nil {
return err
}

if err := configureNode(ctx, nodeConfig, coreClient.CoreV1().Nodes()); err != nil {
if err := configureNode(ctx, nodeConfig, kubeletClient.CoreV1().Nodes()); err != nil {
return err
}

if !nodeConfig.NoFlannel {
if err := flannel.Run(ctx, nodeConfig, coreClient.CoreV1().Nodes()); err != nil {
if err := flannel.Run(ctx, nodeConfig); err != nil {
return err
}
}
Expand Down
18 changes: 5 additions & 13 deletions pkg/cloudprovider/servicelb.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package cloudprovider

import (
"context"
"encoding/json"
"fmt"
"sort"
"strconv"
"strings"
"time"
"encoding/json"

"sigs.k8s.io/yaml"

"github.com/k3s-io/k3s/pkg/util"
Expand All @@ -27,11 +28,9 @@ import (
"k8s.io/apimachinery/pkg/util/intstr"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apimachinery/pkg/util/wait"
utilfeature "k8s.io/apiserver/pkg/util/feature"
"k8s.io/client-go/util/retry"
"k8s.io/cloud-provider/names"
servicehelper "k8s.io/cloud-provider/service/helpers"
"k8s.io/kubernetes/pkg/features"
utilsnet "k8s.io/utils/net"
utilsptr "k8s.io/utils/ptr"
)
Expand Down Expand Up @@ -563,7 +562,7 @@ func (k *k3s) newDaemonSet(svc *core.Service) (*apps.DaemonSet, error) {
Name: "DEST_IPS",
ValueFrom: &core.EnvVarSource{
FieldRef: &core.ObjectFieldSelector{
FieldPath: getHostIPsFieldPath(),
FieldPath: "status.hostIPs",
},
},
},
Expand Down Expand Up @@ -710,8 +709,8 @@ func (k *k3s) getPriorityClassName(svc *core.Service) string {
return k.LBDefaultPriorityClassName
}

// getTolerations retrieves the tolerations from a service's annotations.
// It parses the tolerations from a JSON or YAML string stored in the annotations.
// getTolerations retrieves the tolerations from a service's annotations.
// It parses the tolerations from a JSON or YAML string stored in the annotations.
func (k *k3s) getTolerations(svc *core.Service) ([]core.Toleration, error) {
tolerationsStr, ok := svc.Annotations[tolerationsAnnotation]
if !ok {
Expand Down Expand Up @@ -778,10 +777,3 @@ func ingressToString(ingresses []core.LoadBalancerIngress) []string {
}
return parts
}

func getHostIPsFieldPath() string {
if utilfeature.DefaultFeatureGate.Enabled(features.PodHostIPs) {
return "status.hostIPs"
}
return "status.hostIP"
}
2 changes: 0 additions & 2 deletions pkg/daemons/agent/agent_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,8 +141,6 @@ func kubeletArgs(cfg *config.Agent) map[string]string {
argsMap["node-ip"] = cfg.NodeIP
}
} else {
// Cluster is using the embedded CCM, we know that the feature-gate will be enabled there as well.
argsMap["feature-gates"] = util.AddFeatureGate(argsMap["feature-gates"], "CloudDualStackNodeIPs=true")
if nodeIPs := util.JoinIPs(cfg.NodeIPs); nodeIPs != "" {
argsMap["node-ip"] = util.JoinIPs(cfg.NodeIPs)
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/daemons/agent/agent_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,6 @@ func kubeletArgs(cfg *config.Agent) map[string]string {
argsMap["node-ip"] = cfg.NodeIP
}
} else {
// Cluster is using the embedded CCM, we know that the feature-gate will be enabled there as well.
argsMap["feature-gates"] = util.AddFeatureGate(argsMap["feature-gates"], "CloudDualStackNodeIPs=true")
if nodeIPs := util.JoinIPs(cfg.NodeIPs); nodeIPs != "" {
argsMap["node-ip"] = util.JoinIPs(cfg.NodeIPs)
}
Expand Down
22 changes: 11 additions & 11 deletions pkg/daemons/control/deps/deps.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,8 @@ import (
"github.com/sirupsen/logrus"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/util/sets"
"k8s.io/apiserver/pkg/apis/apiserver"
apiserverconfigv1 "k8s.io/apiserver/pkg/apis/apiserver/v1"
apiserverv1beta1 "k8s.io/apiserver/pkg/apis/apiserver/v1beta1"
"k8s.io/apiserver/pkg/authentication/user"
"k8s.io/client-go/util/keyutil"
)
Expand Down Expand Up @@ -785,19 +785,19 @@ func genEncryptionConfigAndState(controlConfig *config.Control) error {
}

func genEgressSelectorConfig(controlConfig *config.Control) error {
var clusterConn apiserver.Connection
var clusterConn apiserverv1beta1.Connection

if controlConfig.EgressSelectorMode == config.EgressSelectorModeDisabled {
clusterConn = apiserver.Connection{
ProxyProtocol: apiserver.ProtocolDirect,
clusterConn = apiserverv1beta1.Connection{
ProxyProtocol: apiserverv1beta1.ProtocolDirect,
}
} else {
clusterConn = apiserver.Connection{
ProxyProtocol: apiserver.ProtocolHTTPConnect,
Transport: &apiserver.Transport{
TCP: &apiserver.TCPTransport{
clusterConn = apiserverv1beta1.Connection{
ProxyProtocol: apiserverv1beta1.ProtocolHTTPConnect,
Transport: &apiserverv1beta1.Transport{
TCP: &apiserverv1beta1.TCPTransport{
URL: fmt.Sprintf("https://%s:%d", controlConfig.BindAddressOrLoopback(false, true), controlConfig.SupervisorPort),
TLSConfig: &apiserver.TLSConfig{
TLSConfig: &apiserverv1beta1.TLSConfig{
CABundle: controlConfig.Runtime.ServerCA,
ClientKey: controlConfig.Runtime.ClientKubeAPIKey,
ClientCert: controlConfig.Runtime.ClientKubeAPICert,
Expand All @@ -807,12 +807,12 @@ func genEgressSelectorConfig(controlConfig *config.Control) error {
}
}

egressConfig := apiserver.EgressSelectorConfiguration{
egressConfig := apiserverv1beta1.EgressSelectorConfiguration{
TypeMeta: metav1.TypeMeta{
Kind: "EgressSelectorConfiguration",
APIVersion: "apiserver.k8s.io/v1beta1",
},
EgressSelections: []apiserver.EgressSelection{
EgressSelections: []apiserverv1beta1.EgressSelection{
{
Name: "cluster",
Connection: clusterConn,
Expand Down
87 changes: 84 additions & 3 deletions pkg/daemons/control/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,14 @@ import (
"github.com/pkg/errors"
"github.com/sirupsen/logrus"
authorizationv1 "k8s.io/api/authorization/v1"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
k8sruntime "k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/watch"
typedcorev1 "k8s.io/client-go/kubernetes/typed/core/v1"
"k8s.io/client-go/tools/cache"
toolswatch "k8s.io/client-go/tools/watch"
cloudproviderapi "k8s.io/cloud-provider/api"
logsapi "k8s.io/component-base/logs/api/v1"
"k8s.io/kubernetes/pkg/kubeapiserver/authorizer/modes"
"k8s.io/kubernetes/pkg/registry/core/node"
Expand Down Expand Up @@ -157,8 +164,36 @@ func scheduler(ctx context.Context, cfg *config.Control) error {

args := config.GetArgs(argsMap, cfg.ExtraSchedulerAPIArgs)

schedulerNodeReady := make(chan struct{})

go func() {
defer close(schedulerNodeReady)

apiReadyLoop:
for {
select {
case <-ctx.Done():
return
case <-cfg.Runtime.APIServerReady:
break apiReadyLoop
case <-time.After(30 * time.Second):
logrus.Infof("Waiting for API server to become available to start kube-scheduler")
}
}

// If we're running the embedded cloud controller, wait for it to untaint at least one
Copy link
Member

@brandond brandond Dec 19, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved this here from pkg/daemons/executor/embed.go so that we can use the kube-scheduler's own kubeconfig to wait for ready nodes. Without parsing component args, executor only knows the location of agent kubeconfigs.

// node (usually, the local node) before starting the scheduler to ensure that it
// finds a node that is ready to run pods during its initial scheduling loop.
if !cfg.DisableCCM {
logrus.Infof("Waiting for untainted node")
if err := waitForUntaintedNode(ctx, runtime.KubeConfigScheduler); err != nil {
logrus.Fatalf("failed to wait for untained node: %v", err)
}
}
}()

logrus.Infof("Running kube-scheduler %s", config.ArgString(args))
return executor.Scheduler(ctx, cfg.Runtime.APIServerReady, args)
return executor.Scheduler(ctx, schedulerNodeReady, args)
}

func apiServer(ctx context.Context, cfg *config.Control) error {
Expand Down Expand Up @@ -323,7 +358,6 @@ func cloudControllerManager(ctx context.Context, cfg *config.Control) error {
"authentication-kubeconfig": runtime.KubeConfigCloudController,
"node-status-update-frequency": "1m0s",
"bind-address": cfg.Loopback(false),
"feature-gates": "CloudDualStackNodeIPs=true",
}
if cfg.NoLeaderElect {
argsMap["leader-elect"] = "false"
Expand Down Expand Up @@ -359,7 +393,7 @@ func cloudControllerManager(ctx context.Context, cfg *config.Control) error {
case <-cfg.Runtime.APIServerReady:
break apiReadyLoop
case <-time.After(30 * time.Second):
logrus.Infof("Waiting for API server to become available")
logrus.Infof("Waiting for API server to become available to start cloud-controller-manager")
}
}

Expand Down Expand Up @@ -449,3 +483,50 @@ func promise(f func() error) <-chan error {
}()
return c
}

// waitForUntaintedNode watches nodes, waiting to find one not tainted as
// uninitialized by the external cloud provider.
func waitForUntaintedNode(ctx context.Context, kubeConfig string) error {

restConfig, err := util.GetRESTConfig(kubeConfig)
if err != nil {
return err
}
coreClient, err := typedcorev1.NewForConfig(restConfig)
if err != nil {
return err
}
nodes := coreClient.Nodes()

lw := &cache.ListWatch{
ListFunc: func(options metav1.ListOptions) (object k8sruntime.Object, e error) {
return nodes.List(ctx, options)
},
WatchFunc: func(options metav1.ListOptions) (i watch.Interface, e error) {
return nodes.Watch(ctx, options)
},
}

condition := func(ev watch.Event) (bool, error) {
if node, ok := ev.Object.(*v1.Node); ok {
return getCloudTaint(node.Spec.Taints) == nil, nil
}
return false, errors.New("event object not of type v1.Node")
}

if _, err := toolswatch.UntilWithSync(ctx, lw, &v1.Node{}, nil, condition); err != nil {
return errors.Wrap(err, "failed to wait for untainted node")
}
return nil
}

// getCloudTaint returns the external cloud provider taint, if present.
// Cribbed from k8s.io/cloud-provider/controllers/node/node_controller.go
func getCloudTaint(taints []v1.Taint) *v1.Taint {
for _, taint := range taints {
if taint.Key == cloudproviderapi.TaintExternalCloudProvider {
return &taint
}
}
return nil
}
Loading
Loading