Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Initial networking support for shared mode #154

Merged
merged 5 commits into from
Dec 10, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions k3k-kubelet/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ type config struct {
HostConfigPath string `yaml:"hostConfigPath,omitempty"`
VirtualConfigPath string `yaml:"virtualConfigPath,omitempty"`
KubeletPort string `yaml:"kubeletPort,omitempty"`
AgentIP string `yaml:"agentIP,omitempty"`
ServerIP string `yaml:"serverIP,omitempty"`
}

func (c *config) unmarshalYAML(data []byte) error {
Expand Down Expand Up @@ -51,8 +51,8 @@ func (c *config) unmarshalYAML(data []byte) error {
if c.Token == "" {
c.Token = conf.Token
}
if c.AgentIP == "" {
c.AgentIP = conf.AgentIP
if c.ServerIP == "" {
c.ServerIP = conf.ServerIP
}
return nil
}
Expand Down
89 changes: 89 additions & 0 deletions k3k-kubelet/controller/service.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
package controller

import (
"context"

"github.com/rancher/k3k/k3k-kubelet/translate"
"github.com/rancher/k3k/pkg/log"

v1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
ctrl "sigs.k8s.io/controller-runtime"
ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
)

const (
serviceSyncerController = "service-syncer-controller"
maxConcurrentReconciles = 1
)

// TODO: change into a generic syncer
galal-hussein marked this conversation as resolved.
Show resolved Hide resolved
type ServiceReconciler struct {
virtualClient ctrlruntimeclient.Client
hostClient ctrlruntimeclient.Client
clusterName string
clusterNamespace string
Scheme *runtime.Scheme
logger *log.Logger
Translater translate.ToHostTranslater
}

// AddServiceSyncer adds service syncer controller to the manager of the virtual cluster
func AddServiceSyncer(ctx context.Context, virtMgr, hostMgr manager.Manager, clusterName, clusterNamespace string, logger *log.Logger) error {
translater := translate.ToHostTranslater{
ClusterName: clusterName,
ClusterNamespace: clusterNamespace,
}
// initialize a new Reconciler
reconciler := ServiceReconciler{
virtualClient: virtMgr.GetClient(),
hostClient: hostMgr.GetClient(),
Scheme: virtMgr.GetScheme(),
logger: logger.Named(serviceSyncerController),
Translater: translater,
clusterName: clusterName,
clusterNamespace: clusterNamespace,
}
return ctrl.NewControllerManagedBy(virtMgr).
For(&v1.Service{}).
WithOptions(controller.Options{
MaxConcurrentReconciles: maxConcurrentReconciles,
}).
Complete(&reconciler)
}

func (s *ServiceReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) {
log := s.logger.With("Cluster", s.clusterName, "Service", req.NamespacedName)
// skip kubernetes service
if req.Name == "kubernetes" || req.Name == "kube-dns" {
return reconcile.Result{}, nil
}
var (
virtService v1.Service
hostService v1.Service
)
if err := s.virtualClient.Get(ctx, req.NamespacedName, &virtService); err != nil {
return reconcile.Result{}, err
}
galal-hussein marked this conversation as resolved.
Show resolved Hide resolved
syncedService := s.service(&virtService)
if err := s.hostClient.Get(ctx, types.NamespacedName{Name: syncedService.Name, Namespace: s.clusterNamespace}, &hostService); err != nil {
if apierrors.IsNotFound(err) {
log.Info("creating the service for the first time on the host cluster")
return reconcile.Result{}, s.hostClient.Create(ctx, syncedService)
}
return reconcile.Result{}, err
}
log.Info("updating service on the host cluster")
return reconcile.Result{}, s.hostClient.Update(ctx, syncedService)
galal-hussein marked this conversation as resolved.
Show resolved Hide resolved
}

func (s *ServiceReconciler) service(obj *v1.Service) *v1.Service {
hostService := obj.DeepCopy()
s.Translater.TranslateTo(hostService)
return hostService
}
56 changes: 45 additions & 11 deletions k3k-kubelet/kubelet.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,12 @@ import (
"crypto/tls"
"crypto/x509"
"fmt"
"net"
"net/http"
"time"

certutil "github.com/rancher/dynamiclistener/cert"
k3kkubeletcontroller "github.com/rancher/k3k/k3k-kubelet/controller"
"github.com/rancher/k3k/k3k-kubelet/provider"
"github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1"
"github.com/rancher/k3k/pkg/controller"
Expand All @@ -20,6 +22,7 @@ import (
"github.com/virtual-kubelet/virtual-kubelet/node"
"github.com/virtual-kubelet/virtual-kubelet/node/nodeutil"
"go.uber.org/zap"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apiserver/pkg/authentication/user"
Expand Down Expand Up @@ -50,6 +53,9 @@ type kubelet struct {
name string
port int
hostConfig *rest.Config
virtConfig *rest.Config
agentIP string
dnsIP string
hostClient ctrlruntimeclient.Client
virtClient kubernetes.Interface
hostMgr manager.Manager
Expand Down Expand Up @@ -109,24 +115,50 @@ func newKubelet(ctx context.Context, c *config, logger *k3klog.Logger) (*kubelet
},
})

logger.Info("adding service syncer controller")
if k3kkubeletcontroller.AddServiceSyncer(ctx, virtualMgr, hostMgr, c.ClusterName, c.ClusterNamespace, k3klog.New(false)); err != nil {
return nil, fmt.Errorf("failed to add service syncer controller: %v", err)
}

clusterIP, err := clusterIP(ctx, c.AgentHostname, c.ClusterNamespace, hostClient)
if err != nil {
return nil, err
return nil, fmt.Errorf("failed to extract the clusterIP for the server service %s", err.Error())
galal-hussein marked this conversation as resolved.
Show resolved Hide resolved
}

// get the cluster's DNS IP to be injected to pods
var dnsService v1.Service
dnsName := controller.SafeConcatNameWithPrefix(c.ClusterName, "kube-dns")
if err := hostClient.Get(ctx, types.NamespacedName{Name: dnsName, Namespace: c.ClusterNamespace}, &dnsService); err != nil {
return nil, fmt.Errorf("failed to get the DNS service for the cluster %s", err.Error())
}

return &kubelet{
name: c.NodeName,
hostConfig: hostConfig,
hostClient: hostClient,
virtConfig: virtConfig,
virtClient: virtClient,
hostMgr: hostMgr,
virtualMgr: virtualMgr,
virtClient: virtClient,
agentIP: clusterIP,
logger: logger.Named(k3kKubeletName),
token: c.Token,
dnsIP: dnsService.Spec.ClusterIP,
}, nil
}

func (k *kubelet) registerNode(ctx context.Context, ip, srvPort, namespace, name, hostname string) error {
providerFunc := k.newProviderFunc(namespace, name, hostname, ip)
nodeOpts := k.nodeOpts(ctx, srvPort, namespace, name, hostname)
func clusterIP(ctx context.Context, serviceName, clusterNamespace string, hostClient ctrlruntimeclient.Client) (string, error) {
var service v1.Service
serviceKey := types.NamespacedName{Namespace: clusterNamespace, Name: serviceName}
if err := hostClient.Get(ctx, serviceKey, &service); err != nil {
return "", err
}
return service.Spec.ClusterIP, nil
}

func (k *kubelet) registerNode(ctx context.Context, agentIP, srvPort, namespace, name, hostname, serverIP, dnsIP string) error {
providerFunc := k.newProviderFunc(namespace, name, hostname, agentIP, serverIP, dnsIP)
nodeOpts := k.nodeOpts(ctx, srvPort, namespace, name, hostname, agentIP)

var err error
k.node, err = nodeutil.NewNode(k.name, providerFunc, nodeutil.WithClient(k.virtClient), nodeOpts)
Expand Down Expand Up @@ -172,20 +204,20 @@ func (k *kubelet) start(ctx context.Context) {
k.logger.Info("node exited successfully")
}

func (k *kubelet) newProviderFunc(namespace, name, hostname, ip string) nodeutil.NewProviderFunc {
func (k *kubelet) newProviderFunc(namespace, name, hostname, agentIP, serverIP, dnsIP string) nodeutil.NewProviderFunc {
return func(pc nodeutil.ProviderConfig) (nodeutil.Provider, node.NodeProvider, error) {
utilProvider, err := provider.New(*k.hostConfig, k.hostMgr, k.virtualMgr, k.logger, namespace, name)
utilProvider, err := provider.New(*k.hostConfig, k.hostMgr, k.virtualMgr, k.logger, namespace, name, serverIP, dnsIP)
if err != nil {
return nil, nil, fmt.Errorf("unable to make nodeutil provider %w", err)
}
nodeProvider := provider.Node{}

provider.ConfigureNode(pc.Node, hostname, k.port, ip)
provider.ConfigureNode(pc.Node, hostname, k.port, agentIP)
return utilProvider, &nodeProvider, nil
}
}

func (k *kubelet) nodeOpts(ctx context.Context, srvPort, namespace, name, hostname string) nodeutil.NodeOpt {
func (k *kubelet) nodeOpts(ctx context.Context, srvPort, namespace, name, hostname, agentIP string) nodeutil.NodeOpt {
return func(c *nodeutil.NodeConfig) error {
c.HTTPListenAddr = fmt.Sprintf(":%s", srvPort)
// set up the routes
Expand All @@ -195,7 +227,7 @@ func (k *kubelet) nodeOpts(ctx context.Context, srvPort, namespace, name, hostna
}
c.Handler = mux

tlsConfig, err := loadTLSConfig(ctx, k.hostClient, name, namespace, k.name, hostname, k.token)
tlsConfig, err := loadTLSConfig(ctx, k.hostClient, name, namespace, k.name, hostname, k.token, agentIP)
if err != nil {
return fmt.Errorf("unable to get tls config: %w", err)
}
Expand Down Expand Up @@ -265,7 +297,7 @@ func kubeconfigBytes(url string, serverCA, clientCert, clientKey []byte) ([]byte
return clientcmd.Write(*config)
}

func loadTLSConfig(ctx context.Context, hostClient ctrlruntimeclient.Client, clusterName, clusterNamespace, nodeName, hostname, token string) (*tls.Config, error) {
func loadTLSConfig(ctx context.Context, hostClient ctrlruntimeclient.Client, clusterName, clusterNamespace, nodeName, hostname, token, agentIP string) (*tls.Config, error) {
var (
cluster v1alpha1.Cluster
b *bootstrap.ControlRuntimeBootstrap
Expand All @@ -283,8 +315,10 @@ func loadTLSConfig(ctx context.Context, hostClient ctrlruntimeclient.Client, clu
}); err != nil {
return nil, fmt.Errorf("unable to decode bootstrap: %w", err)
}
ip := net.ParseIP(agentIP)
altNames := certutil.AltNames{
DNSNames: []string{hostname},
IPs: []net.IP{ip},
}
cert, key, err := kubeconfig.CreateClientCertKey(nodeName, nil, &altNames, []x509.ExtKeyUsage{x509.ExtKeyUsageServerAuth}, 0, b.ServerCA.Content, b.ServerCAKey.Content)
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions k3k-kubelet/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func main() {
Usage: "kubelet API port number",
Destination: &cfg.KubeletPort,
EnvVar: "SERVER_PORT",
Value: "9443",
Value: "10250",
},
cli.StringFlag{
Name: "agent-hostname",
Expand All @@ -68,10 +68,10 @@ func main() {
EnvVar: "AGENT_HOSTNAME",
},
cli.StringFlag{
Name: "agent-ip",
Usage: "Agent IP used for registering the virtual kubelet to the cluster",
Destination: &cfg.AgentIP,
EnvVar: "AGENT_IP",
Name: "server-ip",
Usage: "Server IP used for registering the virtual kubelet to the cluster",
Destination: &cfg.ServerIP,
EnvVar: "SERVER_IP",
},
cli.StringFlag{
Name: "config",
Expand Down Expand Up @@ -112,7 +112,7 @@ func run(clx *cli.Context) {
logger.Fatalw("failed to create new virtual kubelet instance", zap.Error(err))
}

if err := k.registerNode(ctx, cfg.AgentIP, cfg.KubeletPort, cfg.ClusterNamespace, cfg.ClusterName, cfg.AgentHostname); err != nil {
if err := k.registerNode(ctx, k.agentIP, cfg.KubeletPort, cfg.ClusterNamespace, cfg.ClusterName, cfg.AgentHostname, cfg.ServerIP, k.dnsIP); err != nil {
logger.Fatalw("failed to register new node", zap.Error(err))
}

Expand Down
64 changes: 56 additions & 8 deletions k3k-kubelet/provider/provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,17 +45,19 @@ type Provider struct {
MetricsClient metricset.Interface
ClusterNamespace string
ClusterName string
serverIP string
dnsIP string
logger *k3klog.Logger
}

func New(hostConfig rest.Config, hostMgr, virtualMgr manager.Manager, logger *k3klog.Logger, Namespace, Name string) (*Provider, error) {
func New(hostConfig rest.Config, hostMgr, virtualMgr manager.Manager, logger *k3klog.Logger, namespace, name, serverIP, dnsIP string) (*Provider, error) {
coreClient, err := cv1.NewForConfig(&hostConfig)
if err != nil {
return nil, err
}
translater := translate.ToHostTranslater{
ClusterName: Name,
ClusterNamespace: Namespace,
ClusterName: name,
ClusterNamespace: namespace,
}
p := Provider{
Handler: controller.ControllerHandler{
Expand All @@ -71,9 +73,11 @@ func New(hostConfig rest.Config, hostMgr, virtualMgr manager.Manager, logger *k3
Translater: translater,
ClientConfig: hostConfig,
CoreClient: coreClient,
ClusterNamespace: Namespace,
ClusterName: Name,
ClusterNamespace: namespace,
ClusterName: name,
logger: logger,
serverIP: serverIP,
dnsIP: dnsIP,
}

return &p, nil
Expand Down Expand Up @@ -246,8 +250,11 @@ func (p *Provider) CreatePod(ctx context.Context, pod *corev1.Pod) error {
if err := p.transformTokens(ctx, pod, tPod); err != nil {
return fmt.Errorf("unable to transform tokens for pod %s/%s: %w", pod.Namespace, pod.Name, err)
}
// inject networking information to the pod including the virtual cluster controlplane endpoint
p.configureNetworking(pod.Name, pod.Namespace, tPod)

p.logger.Infow("Creating pod", "Host Namespace", tPod.Namespace, "Host Name", tPod.Name,
"Virtual Namespace", pod.Namespace, "Virtual Name", pod.Name)
"Virtual Namespace", pod.Namespace, "Virtual Name", "env", pod.Name, pod.Spec.Containers[0].Env)
return p.HostClient.Create(ctx, tPod)
}

Expand Down Expand Up @@ -444,7 +451,7 @@ func (p *Provider) pruneUnusedVolumes(ctx context.Context, pod *corev1.Pod) erro
// concurrently outside of the calling goroutine. Therefore it is recommended
// to return a version after DeepCopy.
func (p *Provider) GetPod(ctx context.Context, namespace, name string) (*corev1.Pod, error) {
p.logger.Infow("got a request for get pod", "Namespace", namespace, "Name", name)
p.logger.Debugw("got a request for get pod", "Namespace", namespace, "Name", name)
hostNamespaceName := types.NamespacedName{
Namespace: p.ClusterNamespace,
Name: p.Translater.TranslateName(namespace, name),
Expand All @@ -463,7 +470,7 @@ func (p *Provider) GetPod(ctx context.Context, namespace, name string) (*corev1.
// concurrently outside of the calling goroutine. Therefore it is recommended
// to return a version after DeepCopy.
func (p *Provider) GetPodStatus(ctx context.Context, namespace, name string) (*corev1.PodStatus, error) {
p.logger.Infow("got a request for pod status", "Namespace", namespace, "Name", name)
p.logger.Debugw("got a request for pod status", "Namespace", namespace, "Name", name)
pod, err := p.GetPod(ctx, namespace, name)
if err != nil {
return nil, fmt.Errorf("unable to get pod for status: %w", err)
Expand Down Expand Up @@ -496,6 +503,47 @@ func (p *Provider) GetPods(ctx context.Context) ([]*corev1.Pod, error) {
return retPods, nil
}

func (p *Provider) configureNetworking(podName, podNamespace string, pod *corev1.Pod) {
// inject networking information to the pod's environment variables
for i := range pod.Spec.Containers {
pod.Spec.Containers[i].Env = append(pod.Spec.Containers[i].Env,
corev1.EnvVar{
Name: "KUBERNETES_PORT_443_TCP",
Value: "tcp://" + p.serverIP + ":6443",
},
corev1.EnvVar{
Name: "KUBERNETES_PORT",
Value: "tcp://" + p.serverIP + ":6443",
},
corev1.EnvVar{
Name: "KUBERNETES_PORT_443_TCP_ADDR",
Value: p.serverIP,
},
corev1.EnvVar{
Name: "KUBERNETES_SERVICE_HOST",
Value: p.serverIP,
},
corev1.EnvVar{
Name: "KUBERNETES_SERVICE_PORT",
Value: "6443",
},
)
}
// injecting cluster DNS IP to the pods except for coredns pod
if !strings.HasPrefix(podName, "coredns") {
pod.Spec.DNSPolicy = corev1.DNSNone
pod.Spec.DNSConfig = &corev1.PodDNSConfig{
Nameservers: []string{
p.dnsIP,
},
Searches: []string{
podNamespace + ".svc.cluster.local", "svc.cluster.local", "cluster.local", "hgalal.az",
galal-hussein marked this conversation as resolved.
Show resolved Hide resolved
},
}
}

}

// getSecretsAndConfigmaps retrieves a list of all secrets/configmaps that are in use by a given pod. Useful
// for removing/seeing which virtual cluster resources need to be in the host cluster.
func getSecretsAndConfigmaps(pod *corev1.Pod) ([]string, []string) {
Expand Down
Loading
Loading