From feccd8c0e10e1fa060018feadde66bc6550e3644 Mon Sep 17 00:00:00 2001 From: galal-hussein Date: Sat, 19 Oct 2024 03:35:57 +0300 Subject: [PATCH 1/3] Add structured logging properly use a centralized logger wrapper to work with controller and virt-kubelet Signed-off-by: galal-hussein --- go.mod | 4 +- go.sum | 3 -- k3k-kubelet/kubelet.go | 48 ++++++--------------- k3k-kubelet/main.go | 31 +++++++++----- main.go | 57 ++++++++++++++++--------- pkg/controller/cluster/cluster.go | 55 +++++++++++------------- pkg/controller/cluster/pod.go | 32 ++++++++------ pkg/controller/clusterset/clusterset.go | 16 ++++--- pkg/controller/clusterset/node.go | 16 ++++--- pkg/controller/controller.go | 7 --- pkg/log/zap.go | 51 ++++++++++++++++++++++ 11 files changed, 185 insertions(+), 135 deletions(-) create mode 100644 pkg/log/zap.go diff --git a/go.mod b/go.mod index b0ebaf5..73272bb 100644 --- a/go.mod +++ b/go.mod @@ -17,6 +17,7 @@ replace ( ) require ( + github.com/go-logr/zapr v1.3.0 github.com/prometheus/client_model v0.6.1 github.com/rancher/dynamiclistener v1.27.5 github.com/sirupsen/logrus v1.9.3 @@ -30,8 +31,6 @@ require ( k8s.io/apimachinery v0.31.1 k8s.io/apiserver v0.31.0 k8s.io/client-go v0.31.0 - k8s.io/klog v1.0.0 - k8s.io/klog/v2 v2.130.1 k8s.io/metrics v0.29.1 k8s.io/utils v0.0.0-20240711033017-18e509b52bc8 sigs.k8s.io/controller-runtime v0.17.5 @@ -123,6 +122,7 @@ require ( gopkg.in/yaml.v3 v3.0.1 // indirect k8s.io/apiextensions-apiserver v0.29.2 // indirect k8s.io/component-base v0.29.2 // indirect + k8s.io/klog/v2 v2.130.1 // indirect k8s.io/kms v0.29.2 // indirect k8s.io/kube-openapi v0.0.0-20231010175941-2dd684a91f00 // indirect sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.28.0 // indirect diff --git a/go.sum b/go.sum index 1fd87dd..10b86bc 100644 --- a/go.sum +++ b/go.sum @@ -57,7 +57,6 @@ github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBd github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/fsnotify/fsnotify v1.7.0 h1:8JEhPFa5W2WU7YfeZzPNqzMP6Lwt7L2715Ggo0nosvA= github.com/fsnotify/fsnotify v1.7.0/go.mod h1:40Bi/Hjc2AVfZrqy+aj+yEI+/bRxZnMJyTJwOpGvigM= -github.com/go-logr/logr v0.1.0/go.mod h1:ixOQHD9gLJUVQQ2ZOR7zLEifBX6tGkNJF4QyIY7sIas= github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.4.1 h1:pKouT5E8xu9zeFC39JXRDukb6JFQPXM5p5I91188VAQ= github.com/go-logr/logr v1.4.1/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= @@ -396,8 +395,6 @@ k8s.io/client-go v0.29.1 h1:19B/+2NGEwnFLzt0uB5kNJnfTsbV8w6TgQRz9l7ti7A= k8s.io/client-go v0.29.1/go.mod h1:TDG/psL9hdet0TI9mGyHJSgRkW3H9JZk2dNEUS7bRks= k8s.io/component-base v0.29.2 h1:lpiLyuvPA9yV1aQwGLENYyK7n/8t6l3nn3zAtFTJYe8= k8s.io/component-base v0.29.2/go.mod h1:BfB3SLrefbZXiBfbM+2H1dlat21Uewg/5qtKOl8degM= -k8s.io/klog v1.0.0 h1:Pt+yjF5aB1xDSVbau4VsWe+dQNzA0qv1LlXdC2dF6Q8= -k8s.io/klog v1.0.0/go.mod h1:4Bi6QPql/J/LkTDqv7R/cd3hPo4k2DG6Ptcz060Ez5I= k8s.io/klog/v2 v2.130.1 h1:n9Xl7H1Xvksem4KFG4PYbdQCQxqc/tTUyrgXaOhHSzk= k8s.io/klog/v2 v2.130.1/go.mod h1:3Jpz1GvMt720eyJH1ckRHK1EDfpxISzJ7I9OYgaDtPE= k8s.io/kms v0.29.2 h1:MDsbp98gSlEQs7K7dqLKNNTwKFQRYYvO4UOlBOjNy6Y= diff --git a/k3k-kubelet/kubelet.go b/k3k-kubelet/kubelet.go index 6a04b3e..7c1bf02 100644 --- a/k3k-kubelet/kubelet.go +++ b/k3k-kubelet/kubelet.go @@ -6,7 +6,6 @@ import ( "crypto/x509" "fmt" "net/http" - "os" "time" certutil "github.com/rancher/dynamiclistener/cert" @@ -16,6 +15,7 @@ import ( "github.com/rancher/k3k/pkg/controller/cluster/server" "github.com/rancher/k3k/pkg/controller/cluster/server/bootstrap" "github.com/rancher/k3k/pkg/controller/kubeconfig" + k3klog "github.com/rancher/k3k/pkg/log" "github.com/virtual-kubelet/virtual-kubelet/log" "github.com/virtual-kubelet/virtual-kubelet/node" "github.com/virtual-kubelet/virtual-kubelet/node/nodeutil" @@ -32,7 +32,10 @@ import ( ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" ) -var scheme = runtime.NewScheme() +var ( + scheme = runtime.NewScheme() + k3kKubeletName = "k3k-kubelet" +) func init() { _ = clientgoscheme.AddToScheme(scheme) @@ -46,9 +49,10 @@ type kubelet struct { hostClient ctrlruntimeclient.Client virtClient kubernetes.Interface node *nodeutil.Node + logger *k3klog.Logger } -func newKubelet(ctx context.Context, c *config) (*kubelet, error) { +func newKubelet(ctx context.Context, c *config, logger *k3klog.Logger) (*kubelet, error) { hostConfig, err := clientcmd.BuildConfigFromFlags("", c.HostConfigPath) if err != nil { return nil, err @@ -75,6 +79,7 @@ func newKubelet(ctx context.Context, c *config) (*kubelet, error) { hostConfig: hostConfig, hostClient: hostClient, virtClient: virtClient, + logger: logger.Named(k3kKubeletName), }, nil } @@ -92,30 +97,19 @@ func (k *kubelet) registerNode(ctx context.Context, srvPort, namespace, name, ho func (k *kubelet) start(ctx context.Context) { go func() { - logger, err := zap.NewProduction() - if err != nil { - fmt.Println("unable to create logger:", err.Error()) - os.Exit(1) - } - wrapped := LogWrapper{ - *logger.Sugar(), - } - ctx = log.WithLogger(ctx, &wrapped) + ctx = log.WithLogger(ctx, k.logger) if err := k.node.Run(ctx); err != nil { - fmt.Println("node errored when running:", err.Error()) - os.Exit(1) + k.logger.Fatalw("node errored when running", zap.Error(err)) } }() if err := k.node.WaitReady(context.Background(), time.Minute*1); err != nil { - fmt.Println("node was not ready within timeout of 1 minute:", err.Error()) - os.Exit(1) + k.logger.Fatalw("node was not ready within timeout of 1 minute", zap.Error(err)) } <-k.node.Done() if err := k.node.Err(); err != nil { - fmt.Println("node stopped with an error:", err.Error()) - os.Exit(1) + k.logger.Fatalw("node stopped with an error", zap.Error(err)) } - fmt.Println("node exited without an error") + k.logger.Info("node exited without an error") } func (k *kubelet) newProviderFunc(namespace, name, hostname string) nodeutil.NewProviderFunc { @@ -255,19 +249,3 @@ func loadTLSConfig(ctx context.Context, hostClient ctrlruntimeclient.Client, clu Certificates: []tls.Certificate{clientCert}, }, nil } - -type LogWrapper struct { - zap.SugaredLogger -} - -func (l *LogWrapper) WithError(err error) log.Logger { - return l -} - -func (l *LogWrapper) WithField(string, interface{}) log.Logger { - return l -} - -func (l *LogWrapper) WithFields(field log.Fields) log.Logger { - return l -} diff --git a/k3k-kubelet/main.go b/k3k-kubelet/main.go index f653344..6b534b5 100644 --- a/k3k-kubelet/main.go +++ b/k3k-kubelet/main.go @@ -2,16 +2,19 @@ package main import ( "context" - "fmt" "os" + "github.com/rancher/k3k/pkg/log" "github.com/sirupsen/logrus" "github.com/urfave/cli" + "go.uber.org/zap" ) var ( configFile string cfg config + logger *log.Logger + debug bool ) func main() { @@ -69,6 +72,16 @@ func main() { EnvVar: "CONFIG_FILE", Value: "/etc/rancher/k3k/config.yaml", }, + cli.BoolFlag{ + Name: "debug", + Usage: "Enable debug logging", + Destination: &debug, + EnvVar: "DEBUG", + }, + } + app.Before = func(clx *cli.Context) error { + logger = log.New(debug) + return nil } app.Action = run if err := app.Run(os.Args); err != nil { @@ -77,25 +90,21 @@ func main() { } func run(clx *cli.Context) { + ctx := context.Background() if err := cfg.parse(configFile); err != nil { - fmt.Printf("failed to parse config file %s: %v", configFile, err) - os.Exit(1) + logger.Fatalw("failed to parse config file", "path", configFile, zap.Error(err)) } if err := cfg.validate(); err != nil { - fmt.Printf("failed to validate config: %v", err) - os.Exit(1) + logger.Fatalw("failed to validate config", zap.Error(err)) } - ctx := context.Background() - k, err := newKubelet(ctx, &cfg) + k, err := newKubelet(ctx, &cfg, logger) if err != nil { - fmt.Printf("failed to create new virtual kubelet instance: %v", err) - os.Exit(1) + logger.Fatalw("failed to create new virtual kubelet instance", zap.Error(err)) } if err := k.registerNode(ctx, cfg.KubeletPort, cfg.ClusterNamespace, cfg.ClusterName, cfg.AgentHostname); err != nil { - fmt.Printf("failed to register new node: %v", err) - os.Exit(1) + logger.Fatalw("failed to register new node", zap.Error(err)) } k.start(ctx) diff --git a/main.go b/main.go index 6b23ec0..cb3875b 100644 --- a/main.go +++ b/main.go @@ -6,16 +6,19 @@ import ( "fmt" "os" + "github.com/go-logr/zapr" "github.com/rancher/k3k/cli/cmds" "github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1" "github.com/rancher/k3k/pkg/controller/cluster" "github.com/rancher/k3k/pkg/controller/clusterset" + "github.com/rancher/k3k/pkg/log" "github.com/urfave/cli" + "go.uber.org/zap" "k8s.io/apimachinery/pkg/runtime" clientgoscheme "k8s.io/client-go/kubernetes/scheme" "k8s.io/client-go/tools/clientcmd" - "k8s.io/klog/v2" ctrl "sigs.k8s.io/controller-runtime" + ctrlruntimelog "sigs.k8s.io/controller-runtime/pkg/log" "sigs.k8s.io/controller-runtime/pkg/manager" ) @@ -30,6 +33,8 @@ var ( clusterCIDR string sharedAgentImage string kubeconfig string + debug bool + logger *log.Logger flags = []cli.Flag{ cli.StringFlag{ Name: "kubeconfig", @@ -46,10 +51,16 @@ var ( cli.StringFlag{ Name: "shared-agent-image", EnvVar: "SHARED_AGENT_IMAGE", - Usage: "K3K Virtual Kubelet image ", + Usage: "K3K Virtual Kubelet image", Value: "rancher/k3k:k3k-kubelet-dev", Destination: &sharedAgentImage, }, + cli.BoolFlag{ + Name: "debug", + EnvVar: "DEBUG", + Usage: "Debug level logging", + Destination: &debug, + }, } ) @@ -63,9 +74,12 @@ func main() { app.Flags = flags app.Action = run app.Version = version + " (" + gitCommit + ")" - + app.Before = func(clx *cli.Context) error { + logger = log.New(debug) + return nil + } if err := app.Run(os.Args); err != nil { - klog.Fatal(err) + logger.Fatalw("Failed to run k3k controller", zap.Error(err)) } } @@ -75,7 +89,7 @@ func run(clx *cli.Context) error { restConfig, err := clientcmd.BuildConfigFromFlags("", kubeconfig) if err != nil { - return fmt.Errorf("Failed to create config from kubeconfig file: %v", err) + return fmt.Errorf("failed to create config from kubeconfig file: %v", err) } mgr, err := ctrl.NewManager(restConfig, manager.Options{ @@ -83,33 +97,34 @@ func run(clx *cli.Context) error { }) if err != nil { - return fmt.Errorf("Failed to create new controller runtime manager: %v", err) + return fmt.Errorf("failed to create new controller runtime manager: %v", err) } - if err := cluster.Add(ctx, mgr, sharedAgentImage); err != nil { - return fmt.Errorf("Failed to add the new cluster controller: %v", err) + + ctrlruntimelog.SetLogger(zapr.NewLogger(logger.Desugar().WithOptions(zap.AddCallerSkip(1)))) + logger.Info("adding cluster controller") + if err := cluster.Add(ctx, mgr, sharedAgentImage, logger); err != nil { + return fmt.Errorf("failed to add the new cluster controller: %v", err) } - if err := cluster.AddPodController(ctx, mgr); err != nil { - return fmt.Errorf("Failed to add the new cluster controller: %v", err) + logger.Info("adding etcd pod controller") + if err := cluster.AddPodController(ctx, mgr, logger); err != nil { + return fmt.Errorf("failed to add the new cluster controller: %v", err) } - klog.Info("adding clusterset controller") - if err := clusterset.Add(ctx, mgr, clusterCIDR); err != nil { - return fmt.Errorf("Failed to add the clusterset controller: %v", err) + + logger.Info("adding clusterset controller") + if err := clusterset.Add(ctx, mgr, clusterCIDR, logger); err != nil { + return fmt.Errorf("failed to add the clusterset controller: %v", err) } if clusterCIDR == "" { - klog.Info("adding networkpolicy node controller") - if err := clusterset.AddNodeController(ctx, mgr); err != nil { - return fmt.Errorf("Failed to add the clusterset node controller: %v", err) + logger.Info("adding networkpolicy node controller") + if err := clusterset.AddNodeController(ctx, mgr, logger); err != nil { + return fmt.Errorf("failed to add the clusterset node controller: %v", err) } } - if err := cluster.AddPodController(ctx, mgr); err != nil { - return fmt.Errorf("Failed to add the new cluster controller: %v", err) - } - if err := mgr.Start(ctx); err != nil { - return fmt.Errorf("Failed to start the manager: %v", err) + return fmt.Errorf("failed to start the manager: %v", err) } return nil diff --git a/pkg/controller/cluster/cluster.go b/pkg/controller/cluster/cluster.go index f1657c6..f59f53c 100644 --- a/pkg/controller/cluster/cluster.go +++ b/pkg/controller/cluster/cluster.go @@ -8,16 +8,16 @@ import ( "time" "github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1" - k3kcontroller "github.com/rancher/k3k/pkg/controller" "github.com/rancher/k3k/pkg/controller/cluster/agent" "github.com/rancher/k3k/pkg/controller/cluster/server" "github.com/rancher/k3k/pkg/controller/cluster/server/bootstrap" + "github.com/rancher/k3k/pkg/log" + "go.uber.org/zap" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" - "k8s.io/klog" ctrl "sigs.k8s.io/controller-runtime" ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -45,15 +45,17 @@ type ClusterReconciler struct { Client ctrlruntimeclient.Client Scheme *runtime.Scheme SharedAgentImage string + logger *log.Logger } // Add adds a new controller to the manager -func Add(ctx context.Context, mgr manager.Manager, sharedAgentImage string) error { +func Add(ctx context.Context, mgr manager.Manager, sharedAgentImage string, logger *log.Logger) error { // initialize a new Reconciler reconciler := ClusterReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), SharedAgentImage: sharedAgentImage, + logger: logger.Named(clusterController), } return ctrl.NewControllerManagedBy(mgr). For(&v1alpha1.Cluster{}). @@ -64,36 +66,29 @@ func Add(ctx context.Context, mgr manager.Manager, sharedAgentImage string) erro } func (c *ClusterReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { - var ( cluster v1alpha1.Cluster podList v1.PodList ) - + log := c.logger.With("Cluster", req.NamespacedName) if err := c.Client.Get(ctx, req.NamespacedName, &cluster); err != nil { return reconcile.Result{}, ctrlruntimeclient.IgnoreNotFound(err) } - if cluster.DeletionTimestamp.IsZero() { if !controllerutil.ContainsFinalizer(&cluster, clusterFinalizerName) { controllerutil.AddFinalizer(&cluster, clusterFinalizerName) if err := c.Client.Update(ctx, &cluster); err != nil { - return reconcile.Result{}, k3kcontroller.LogAndReturnErr("failed to add cluster finalizer", err) + return reconcile.Result{}, err } } - - klog.Infof("enqueue cluster [%s]", cluster.Name) - if err := c.createCluster(ctx, &cluster); err != nil { - return reconcile.Result{}, k3kcontroller.LogAndReturnErr("failed to create cluster", err) - } - return reconcile.Result{}, nil + log.Info("enqueue cluster") + return reconcile.Result{}, c.createCluster(ctx, &cluster, log) } // remove finalizer from the server pods and update them. matchingLabels := ctrlruntimeclient.MatchingLabels(map[string]string{"role": "server"}) listOpts := &ctrlruntimeclient.ListOptions{Namespace: cluster.Namespace} matchingLabels.ApplyToList(listOpts) - if err := c.Client.List(ctx, &podList, listOpts); err != nil { return reconcile.Result{}, ctrlruntimeclient.IgnoreNotFound(err) } @@ -101,26 +96,24 @@ func (c *ClusterReconciler) Reconcile(ctx context.Context, req reconcile.Request if controllerutil.ContainsFinalizer(&pod, etcdPodFinalizerName) { controllerutil.RemoveFinalizer(&pod, etcdPodFinalizerName) if err := c.Client.Update(ctx, &pod); err != nil { - return reconcile.Result{}, k3kcontroller.LogAndReturnErr("failed to remove etcd finalizer", err) + return reconcile.Result{}, err } } } - if controllerutil.ContainsFinalizer(&cluster, clusterFinalizerName) { // remove finalizer from the cluster and update it. controllerutil.RemoveFinalizer(&cluster, clusterFinalizerName) if err := c.Client.Update(ctx, &cluster); err != nil { - return reconcile.Result{}, k3kcontroller.LogAndReturnErr("failed to remove cluster finalizer", err) + return reconcile.Result{}, err } } - klog.Infof("deleting cluster [%s]", cluster.Name) - + log.Info("deleting cluster") return reconcile.Result{}, nil } -func (c *ClusterReconciler) createCluster(ctx context.Context, cluster *v1alpha1.Cluster) error { +func (c *ClusterReconciler) createCluster(ctx context.Context, cluster *v1alpha1.Cluster, log *zap.SugaredLogger) error { if err := c.validate(cluster); err != nil { - klog.Errorf("invalid change: %v", err) + log.Errorw("invalid change", zap.Error(err)) return nil } s := server.New(cluster, c.Client) @@ -133,7 +126,7 @@ func (c *ClusterReconciler) createCluster(ctx context.Context, cluster *v1alpha1 } } if err := c.Client.Update(ctx, cluster); err != nil { - return k3kcontroller.LogAndReturnErr("failed to update cluster with persistence type", err) + return err } cluster.Status.ClusterCIDR = cluster.Spec.ClusterCIDR @@ -146,35 +139,35 @@ func (c *ClusterReconciler) createCluster(ctx context.Context, cluster *v1alpha1 cluster.Status.ServiceCIDR = defaultClusterServiceCIDR } - klog.Infof("creating cluster service") + log.Info("creating cluster service") serviceIP, err := c.createClusterService(ctx, cluster, s) if err != nil { - return k3kcontroller.LogAndReturnErr("failed to create cluster service", err) + return err } if err := c.createClusterConfigs(ctx, cluster, s, serviceIP); err != nil { - return k3kcontroller.LogAndReturnErr("failed to create cluster configs", err) + return err } // creating statefulsets in case the user chose a persistence type other than ephermal if err := c.server(ctx, cluster, s); err != nil { - return k3kcontroller.LogAndReturnErr("failed to create servers", err) + return err } if err := c.agent(ctx, cluster, serviceIP); err != nil { - return k3kcontroller.LogAndReturnErr("failed to create agents", err) + return err } if cluster.Spec.Expose != nil { if cluster.Spec.Expose.Ingress != nil { serverIngress, err := s.Ingress(ctx, c.Client) if err != nil { - return k3kcontroller.LogAndReturnErr("failed to create ingress object", err) + return err } if err := c.Client.Create(ctx, serverIngress); err != nil { if !apierrors.IsAlreadyExists(err) { - return k3kcontroller.LogAndReturnErr("failed to create server ingress", err) + return err } } } @@ -182,12 +175,12 @@ func (c *ClusterReconciler) createCluster(ctx context.Context, cluster *v1alpha1 bootstrapSecret, err := bootstrap.Generate(ctx, cluster, serviceIP) if err != nil { - return k3kcontroller.LogAndReturnErr("failed to generate new kubeconfig", err) + return err } if err := c.Client.Create(ctx, bootstrapSecret); err != nil { if !apierrors.IsAlreadyExists(err) { - return k3kcontroller.LogAndReturnErr("failed to create kubeconfig secret", err) + return err } } diff --git a/pkg/controller/cluster/pod.go b/pkg/controller/cluster/pod.go index f087869..0a62883 100644 --- a/pkg/controller/cluster/pod.go +++ b/pkg/controller/cluster/pod.go @@ -15,16 +15,16 @@ import ( "github.com/rancher/k3k/pkg/controller/cluster/server" "github.com/rancher/k3k/pkg/controller/cluster/server/bootstrap" "github.com/rancher/k3k/pkg/controller/kubeconfig" - "github.com/sirupsen/logrus" + "github.com/rancher/k3k/pkg/log" "go.etcd.io/etcd/api/v3/v3rpc/rpctypes" clientv3 "go.etcd.io/etcd/client/v3" + "go.uber.org/zap" apps "k8s.io/api/apps/v1" v1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/util/retry" - "k8s.io/klog" ctrl "sigs.k8s.io/controller-runtime" ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" "sigs.k8s.io/controller-runtime/pkg/controller" @@ -41,14 +41,16 @@ const ( type PodReconciler struct { Client ctrlruntimeclient.Client Scheme *runtime.Scheme + logger *log.Logger } // Add adds a new controller to the manager -func AddPodController(ctx context.Context, mgr manager.Manager) error { +func AddPodController(ctx context.Context, mgr manager.Manager, logger *log.Logger) error { // initialize a new Reconciler reconciler := PodReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), + logger: logger.Named(podController), } return ctrl.NewControllerManagedBy(mgr). @@ -61,9 +63,11 @@ func AddPodController(ctx context.Context, mgr manager.Manager) error { } func (p *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + log := p.logger.With("Pod", req.NamespacedName) + s := strings.Split(req.Name, "-") if len(s) < 1 { - return reconcile.Result{}, k3kcontroller.LogAndReturnErr("failed to get cluster namespace", nil) + return reconcile.Result{}, nil } if s[0] != "k3k" { return reconcile.Result{}, nil @@ -84,15 +88,15 @@ func (p *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r return reconcile.Result{}, ctrlruntimeclient.IgnoreNotFound(err) } for _, pod := range podList.Items { - klog.Infof("Handle etcd server pod [%s/%s]", pod.Namespace, pod.Name) - if err := p.handleServerPod(ctx, cluster, &pod); err != nil { - return reconcile.Result{}, k3kcontroller.LogAndReturnErr("failed to handle etcd pod", err) + log.Infof("Handle etcd server pod [%s/%s]", pod.Namespace, pod.Name) + if err := p.handleServerPod(ctx, cluster, &pod, log); err != nil { + return reconcile.Result{}, err } } return reconcile.Result{}, nil } -func (p *PodReconciler) handleServerPod(ctx context.Context, cluster v1alpha1.Cluster, pod *v1.Pod) error { +func (p *PodReconciler) handleServerPod(ctx context.Context, cluster v1alpha1.Cluster, pod *v1.Pod, log *zap.SugaredLogger) error { if _, ok := pod.Labels["role"]; ok { if pod.Labels["role"] != "server" { return nil @@ -112,7 +116,7 @@ func (p *PodReconciler) handleServerPod(ctx context.Context, cluster v1alpha1.Cl } return nil } - tlsConfig, err := p.getETCDTLS(&cluster) + tlsConfig, err := p.getETCDTLS(&cluster, log) if err != nil { return err } @@ -127,7 +131,7 @@ func (p *PodReconciler) handleServerPod(ctx context.Context, cluster v1alpha1.Cl return err } - if err := removePeer(ctx, client, pod.Name, pod.Status.PodIP); err != nil { + if err := removePeer(ctx, client, pod.Name, pod.Status.PodIP, log); err != nil { return err } // remove our finalizer from the list and update it. @@ -146,8 +150,8 @@ func (p *PodReconciler) handleServerPod(ctx context.Context, cluster v1alpha1.Cl return nil } -func (p *PodReconciler) getETCDTLS(cluster *v1alpha1.Cluster) (*tls.Config, error) { - klog.Infof("generating etcd TLS client certificate for cluster [%s]", cluster.Name) +func (p *PodReconciler) getETCDTLS(cluster *v1alpha1.Cluster, log *zap.SugaredLogger) (*tls.Config, error) { + log.Infow("generating etcd TLS client certificate", "Cluster", cluster.Namespace+"/"+cluster.Name) token := cluster.Spec.Token endpoint := fmt.Sprintf("%s.%s", server.ServiceName(cluster.Name), cluster.Namespace) var b *bootstrap.ControlRuntimeBootstrap @@ -184,7 +188,7 @@ func (p *PodReconciler) getETCDTLS(cluster *v1alpha1.Cluster) (*tls.Config, erro } // removePeer removes a peer from the cluster. The peer name and IP address must both match. -func removePeer(ctx context.Context, client *clientv3.Client, name, address string) error { +func removePeer(ctx context.Context, client *clientv3.Client, name, address string, log *zap.SugaredLogger) error { ctx, cancel := context.WithTimeout(ctx, memberRemovalTimeout) defer cancel() members, err := client.MemberList(ctx) @@ -202,7 +206,7 @@ func removePeer(ctx context.Context, client *clientv3.Client, name, address stri return err } if u.Hostname() == address { - logrus.Infof("Removing name=%s id=%d address=%s from etcd", member.Name, member.ID, address) + log.Infow("Removing member from etcd", "name", member.Name, "id", member.ID, "address", address) _, err := client.MemberRemove(ctx, member.ID) if errors.Is(err, rpctypes.ErrGRPCMemberNotFound) { return nil diff --git a/pkg/controller/clusterset/clusterset.go b/pkg/controller/clusterset/clusterset.go index ea81e2a..6eaed18 100644 --- a/pkg/controller/clusterset/clusterset.go +++ b/pkg/controller/clusterset/clusterset.go @@ -2,10 +2,10 @@ package clusterset import ( "context" - "fmt" "github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1" k3kcontroller "github.com/rancher/k3k/pkg/controller" + "github.com/rancher/k3k/pkg/log" v1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" @@ -29,15 +29,17 @@ type ClusterSetReconciler struct { Client ctrlruntimeclient.Client Scheme *runtime.Scheme ClusterCIDR string + logger *log.Logger } // Add adds a new controller to the manager -func Add(ctx context.Context, mgr manager.Manager, clusterCIDR string) error { +func Add(ctx context.Context, mgr manager.Manager, clusterCIDR string, logger *log.Logger) error { // initialize a new Reconciler reconciler := ClusterSetReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), ClusterCIDR: clusterCIDR, + logger: logger.Named(clusterSetController), } return ctrl.NewControllerManagedBy(mgr). For(&v1alpha1.ClusterSet{}). @@ -50,23 +52,25 @@ func Add(ctx context.Context, mgr manager.Manager, clusterCIDR string) error { func (c *ClusterSetReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { var ( clusterSet v1alpha1.ClusterSet + log = c.logger.With("ClusterSet", req.NamespacedName) ) if err := c.Client.Get(ctx, types.NamespacedName{Name: req.Name, Namespace: req.Namespace}, &clusterSet); err != nil { - return reconcile.Result{}, fmt.Errorf("unable to get the clusterset: %w", err) + return reconcile.Result{}, err } if !clusterSet.Spec.DisableNetworkPolicy { + log.Info("Creating NetworkPolicy") setNetworkPolicy, err := netpol(ctx, c.ClusterCIDR, &clusterSet, c.Client) if err != nil { - return reconcile.Result{}, fmt.Errorf("unable to make a networkpolicy for cluster set: %w", err) + return reconcile.Result{}, err } if err := c.Client.Create(ctx, setNetworkPolicy); err != nil { if apierrors.IsAlreadyExists(err) { if err := c.Client.Update(ctx, setNetworkPolicy); err != nil { - return reconcile.Result{}, fmt.Errorf("unable to update networkpolicy for clusterset: %w", err) + return reconcile.Result{}, err } } - return reconcile.Result{}, fmt.Errorf("unable to create networkpolicy for clusterset: %w", err) + return reconcile.Result{}, err } } // TODO: Add resource quota for clustersets diff --git a/pkg/controller/clusterset/node.go b/pkg/controller/clusterset/node.go index fe2a7c9..9fc64fc 100644 --- a/pkg/controller/clusterset/node.go +++ b/pkg/controller/clusterset/node.go @@ -4,7 +4,8 @@ import ( "context" "github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1" - k3kcontroller "github.com/rancher/k3k/pkg/controller" + "github.com/rancher/k3k/pkg/log" + "go.uber.org/zap" v1 "k8s.io/api/core/v1" networkingv1 "k8s.io/api/networking/v1" "k8s.io/apimachinery/pkg/runtime" @@ -23,14 +24,16 @@ type NodeReconciler struct { Client ctrlruntimeclient.Client Scheme *runtime.Scheme ClusterCIDR string + logger *log.Logger } // AddNodeController adds a new controller to the manager -func AddNodeController(ctx context.Context, mgr manager.Manager) error { +func AddNodeController(ctx context.Context, mgr manager.Manager, logger *log.Logger) error { // initialize a new Reconciler reconciler := NodeReconciler{ Client: mgr.GetClient(), Scheme: mgr.GetScheme(), + logger: logger.Named(nodeController), } return ctrl.NewControllerManagedBy(mgr). @@ -43,33 +46,36 @@ func AddNodeController(ctx context.Context, mgr manager.Manager) error { } func (n *NodeReconciler) Reconcile(ctx context.Context, req reconcile.Request) (reconcile.Result, error) { + log := n.logger.With("Node", req.NamespacedName) var clusterSetList v1alpha1.ClusterSetList if err := n.Client.List(ctx, &clusterSetList); err != nil { - return reconcile.Result{}, k3kcontroller.LogAndReturnErr("failed to list clusterSets", err) + return reconcile.Result{}, err } if len(clusterSetList.Items) <= 0 { return reconcile.Result{}, nil } - if err := n.ensureNetworkPolicies(ctx, clusterSetList); err != nil { + if err := n.ensureNetworkPolicies(ctx, clusterSetList, log); err != nil { return reconcile.Result{}, err } return reconcile.Result{}, nil } -func (n *NodeReconciler) ensureNetworkPolicies(ctx context.Context, clusterSetList v1alpha1.ClusterSetList) error { +func (n *NodeReconciler) ensureNetworkPolicies(ctx context.Context, clusterSetList v1alpha1.ClusterSetList, log *zap.SugaredLogger) error { var setNetworkPolicy *networkingv1.NetworkPolicy for _, cs := range clusterSetList.Items { if cs.Spec.DisableNetworkPolicy { continue } var err error + log.Infow("Updating NetworkPolicy for ClusterSet", "name", cs.Name, "namespace", cs.Namespace) setNetworkPolicy, err = netpol(ctx, "", &cs, n.Client) if err != nil { return err } + log.Debugw("New NetworkPolicy for clusterset", "name", cs.Name, "namespace", cs.Namespace) if err := n.Client.Update(ctx, setNetworkPolicy); err != nil { return err } diff --git a/pkg/controller/controller.go b/pkg/controller/controller.go index 29a6d26..2d179b6 100644 --- a/pkg/controller/controller.go +++ b/pkg/controller/controller.go @@ -10,7 +10,6 @@ import ( "github.com/rancher/k3k/pkg/apis/k3k.io/v1alpha1" v1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/util/wait" - "k8s.io/klog" ctrlruntimeclient "sigs.k8s.io/controller-runtime/pkg/client" ) @@ -31,12 +30,6 @@ var Backoff = wait.Backoff{ func K3SImage(cluster *v1alpha1.Cluster) string { return k3SImageName + ":" + cluster.Spec.Version } - -func LogAndReturnErr(errString string, err error) error { - klog.Errorf("%s: %v", errString, err) - return err -} - func nodeAddress(node *v1.Node) string { var externalIP string var internalIP string diff --git a/pkg/log/zap.go b/pkg/log/zap.go new file mode 100644 index 0000000..acccbc2 --- /dev/null +++ b/pkg/log/zap.go @@ -0,0 +1,51 @@ +package log + +import ( + "os" + + "github.com/virtual-kubelet/virtual-kubelet/log" + "go.uber.org/zap" + "go.uber.org/zap/zapcore" + ctrlruntimezap "sigs.k8s.io/controller-runtime/pkg/log/zap" +) + +type Logger struct { + *zap.SugaredLogger +} + +func New(debug bool) *Logger { + return &Logger{newZappLogger(debug).Sugar()} +} + +func (k *Logger) WithError(err error) log.Logger { + return k +} + +func (k *Logger) WithField(string, interface{}) log.Logger { + return k +} + +func (k *Logger) WithFields(field log.Fields) log.Logger { + return k +} + +func (k *Logger) Named(name string) *Logger { + k.SugaredLogger = k.SugaredLogger.Named(name) + return k +} + +func newZappLogger(debug bool) *zap.Logger { + encCfg := zap.NewProductionEncoderConfig() + encCfg.TimeKey = "timestamp" + encCfg.EncodeTime = zapcore.ISO8601TimeEncoder + + lvl := zap.NewAtomicLevelAt(zap.InfoLevel) + if debug { + lvl = zap.NewAtomicLevelAt(zap.DebugLevel) + } + + encoder := zapcore.NewJSONEncoder(encCfg) + core := zapcore.NewCore(&ctrlruntimezap.KubeAwareEncoder{Encoder: encoder}, zapcore.AddSync(os.Stderr), lvl) + + return zap.New(core) +} From 954c3bfdf8650a64d6cee3f2a3866e9f16b03290 Mon Sep 17 00:00:00 2001 From: galal-hussein Date: Tue, 22 Oct 2024 00:13:17 +0300 Subject: [PATCH 2/3] Fix some log messages Signed-off-by: galal-hussein --- pkg/controller/cluster/pod.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/controller/cluster/pod.go b/pkg/controller/cluster/pod.go index 0a62883..4919ad0 100644 --- a/pkg/controller/cluster/pod.go +++ b/pkg/controller/cluster/pod.go @@ -88,7 +88,7 @@ func (p *PodReconciler) Reconcile(ctx context.Context, req reconcile.Request) (r return reconcile.Result{}, ctrlruntimeclient.IgnoreNotFound(err) } for _, pod := range podList.Items { - log.Infof("Handle etcd server pod [%s/%s]", pod.Namespace, pod.Name) + log.Info("Handle etcd server pod") if err := p.handleServerPod(ctx, cluster, &pod, log); err != nil { return reconcile.Result{}, err } @@ -151,9 +151,9 @@ func (p *PodReconciler) handleServerPod(ctx context.Context, cluster v1alpha1.Cl } func (p *PodReconciler) getETCDTLS(cluster *v1alpha1.Cluster, log *zap.SugaredLogger) (*tls.Config, error) { - log.Infow("generating etcd TLS client certificate", "Cluster", cluster.Namespace+"/"+cluster.Name) + log.Infow("generating etcd TLS client certificate", "Cluster", cluster.Name, "Namespace", cluster.Namespace) token := cluster.Spec.Token - endpoint := fmt.Sprintf("%s.%s", server.ServiceName(cluster.Name), cluster.Namespace) + endpoint := server.ServiceName(cluster.Name) + "." + cluster.Namespace var b *bootstrap.ControlRuntimeBootstrap if err := retry.OnError(k3kcontroller.Backoff, func(err error) bool { return true From d09a015a6446cd7cf1f26378ac2adb3c5e879ff7 Mon Sep 17 00:00:00 2001 From: galal-hussein Date: Tue, 22 Oct 2024 00:37:56 +0300 Subject: [PATCH 3/3] fixes Signed-off-by: galal-hussein --- k3k-kubelet/kubelet.go | 2 +- main.go | 2 +- pkg/log/zap.go | 18 +++++++++--------- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/k3k-kubelet/kubelet.go b/k3k-kubelet/kubelet.go index 7c1bf02..f6b05d0 100644 --- a/k3k-kubelet/kubelet.go +++ b/k3k-kubelet/kubelet.go @@ -109,7 +109,7 @@ func (k *kubelet) start(ctx context.Context) { if err := k.node.Err(); err != nil { k.logger.Fatalw("node stopped with an error", zap.Error(err)) } - k.logger.Info("node exited without an error") + k.logger.Info("node exited successfully") } func (k *kubelet) newProviderFunc(namespace, name, hostname string) nodeutil.NewProviderFunc { diff --git a/main.go b/main.go index cb3875b..b0911f4 100644 --- a/main.go +++ b/main.go @@ -79,7 +79,7 @@ func main() { return nil } if err := app.Run(os.Args); err != nil { - logger.Fatalw("Failed to run k3k controller", zap.Error(err)) + logger.Fatalw("failed to run k3k controller", zap.Error(err)) } } diff --git a/pkg/log/zap.go b/pkg/log/zap.go index acccbc2..1e5599d 100644 --- a/pkg/log/zap.go +++ b/pkg/log/zap.go @@ -17,21 +17,21 @@ func New(debug bool) *Logger { return &Logger{newZappLogger(debug).Sugar()} } -func (k *Logger) WithError(err error) log.Logger { - return k +func (l *Logger) WithError(err error) log.Logger { + return l } -func (k *Logger) WithField(string, interface{}) log.Logger { - return k +func (l *Logger) WithField(string, interface{}) log.Logger { + return l } -func (k *Logger) WithFields(field log.Fields) log.Logger { - return k +func (l *Logger) WithFields(field log.Fields) log.Logger { + return l } -func (k *Logger) Named(name string) *Logger { - k.SugaredLogger = k.SugaredLogger.Named(name) - return k +func (l *Logger) Named(name string) *Logger { + l.SugaredLogger = l.SugaredLogger.Named(name) + return l } func newZappLogger(debug bool) *zap.Logger {