From 1f5875f5d86098916d48e9efbd4a7991503fe5e4 Mon Sep 17 00:00:00 2001 From: Erwin de Haan Date: Mon, 26 Jun 2023 13:45:26 +0200 Subject: [PATCH] Reduced the number of prints and updated how flags are handled --- README.md | 23 ++++++++++++ cmd/root.go | 14 +++---- cmd/wait.go | 62 +++++++++---------------------- flags/flags.go | 61 ++++++++++++++++++++++++++++++ kube/client.go | 94 ---------------------------------------------- pkg/utils.go | 23 ++++++++++++ pkg/waitables.go | 97 ++++++++++++++++++++++++++++++++++-------------- utils/strings.go | 16 ++++++++ 8 files changed, 216 insertions(+), 174 deletions(-) create mode 100644 flags/flags.go delete mode 100644 kube/client.go diff --git a/README.md b/README.md index ac8fa5f..96a7597 100644 --- a/README.md +++ b/README.md @@ -23,6 +23,29 @@ For jobs it wait until the `Completed` condition is true. For services it will wait until all pods that match the service selector are Ready and Available (like above). +## Example + +``` +$ kube-wait-for-multi default,job,some-job default,service,some-service default,pod,some-pod-88bb5f7bb-wx4f7 +``` +Wait for the job `some-job` to complete, the service `some-service` to have all available pods and the pod `some-pod-88bb5f7bb-wx4f7` to be ready and available. + +The program will also wait when a service does not exist yet. +``` +$ kube-wait-for-multi default,job,some-job default,job,test default,service,service1 default,service,service2 +Starting with namespaces: [default] +Starting informers... +wait status +└── [❔] namespace/default + ├── [✅] service/service1: Available + ├── [❔] service/service2: Unavailable + ├── [✅] job/some-job: Complete + └── [❌] job/test: NotComplete +[... some time later ...] +wait status +└── [✅] namespace/default +``` + ## Docker Hosted on Docker Hub: https://hub.docker.com/r/erayan/k8s-wait-for-multi diff --git a/cmd/root.go b/cmd/root.go index c7196ab..d88d2b7 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -18,7 +18,8 @@ package cmd import ( "os" - "time" + + "github.com/erayan/k8s-wait-for-multi/flags" "github.com/spf13/cobra" "k8s.io/cli-runtime/pkg/genericclioptions" @@ -30,6 +31,7 @@ var ( KubeResourceBuilderFlags *genericclioptions.ResourceBuilderFlags KubernetesPrintFlags *genericclioptions.PrintFlags KubernetesGetPrintFlags *kubectlget.PrintFlags + WaitForConfigFlags *flags.ConfigFlags ) // rootCmd represents the base command when called without any subcommands @@ -40,7 +42,7 @@ var rootCmd = &cobra.Command{ This is an implementation of k8s-wait-for that allows you to wait for multiple items in one process. This uses informers to get the status updates for all the items that this application is waiting for. -You can omit the NAMESPACE and KIND, they default to the value of the --namespace flag and pod respectively. Supported string for KIND are service, job and pod.`, +You can omit the NAMESPACE and KIND, they default to the value of the --namespace flag and 'pod' respectively. Supported strings for KIND are service, job and pod.`, RunE: wait, Version: version, } @@ -57,12 +59,8 @@ func init() { KubernetesConfigFlags.AddFlags(rootCmd.PersistentFlags()) - rootCmd.Flags().BoolP("version", "v", false, "Display version info") - - rootCmd.Flags().Bool("no-collapse", false, "Do not collapse the status tree for done subtrees") - - rootCmd.Flags().Bool("no-tree", false, "Do not print the status as a tree") + WaitForConfigFlags = flags.NewConfigFlags() - rootCmd.PersistentFlags().DurationP("timeout", "t", time.Duration(600*time.Second), "The length of time to wait before ending watch, zero means never. Any other values should contain a corresponding time unit (e.g. 1s, 2m, 3h). Default is 10 minutes") + WaitForConfigFlags.AddFlags(rootCmd.Flags()) } diff --git a/cmd/wait.go b/cmd/wait.go index 351ae14..6bf25fd 100644 --- a/cmd/wait.go +++ b/cmd/wait.go @@ -44,28 +44,10 @@ var mu sync.Mutex var waits *pkg.Waitables func wait(cmd *cobra.Command, args []string) error { - isVersion, err := cmd.Flags().GetBool("version") - - if err != nil { - return err - } - - if isVersion { + if *WaitForConfigFlags.PrintVersion { return printVersion(cmd, args) } - noCollapseTree, err := cmd.Flags().GetBool("no-collapse") - - if err != nil { - return err - } - - noTree, err := cmd.Flags().GetBool("no-tree") - - if err != nil { - return err - } - if len(args) < 1 { return errors.New("command needs one or more arguments to wait for") } @@ -74,7 +56,9 @@ func wait(cmd *cobra.Command, args []string) error { KubernetesConfigFlags.Namespace = pointer.String("default") } - waits = pkg.NewWaitables() + waits = pkg.NewWaitables(WaitForConfigFlags) + + waits.Start() timeout, err := cmd.Flags().GetDuration("timeout") @@ -155,11 +139,7 @@ func wait(cmd *cobra.Command, args []string) error { return errors.New("not enough arguments") } - if noTree { - log.Println(waits.GetStatusString()) - } else { - log.Println(waits.GetStatusTreeString(noCollapseTree)) - } + waits.PrintStatus() if waits.HasServices() { svc_informer, err := cc.GetInformerForKind(timeoutCtx, schema.FromAPIVersionAndKind("v1", "Service")) @@ -169,13 +149,13 @@ func wait(cmd *cobra.Command, args []string) error { svc_informer.AddEventHandler(toolscache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - handleEvent(timeoutCtx, waits.ProcessEventAddService, obj.(*corev1.Service), !noCollapseTree, noTree) + handleEvent(timeoutCtx, waits.ProcessEventAddService, obj.(*corev1.Service)) }, UpdateFunc: func(obj interface{}, newObj interface{}) { - handleEvent(timeoutCtx, waits.ProcessEventUpdateService, newObj.(*corev1.Service), !noCollapseTree, noTree) + handleEvent(timeoutCtx, waits.ProcessEventUpdateService, newObj.(*corev1.Service)) }, DeleteFunc: func(obj interface{}) { - handleEvent(timeoutCtx, waits.ProcessEventDeleteService, obj.(*corev1.Service), !noCollapseTree, noTree) + handleEvent(timeoutCtx, waits.ProcessEventDeleteService, obj.(*corev1.Service)) }, }) } @@ -188,13 +168,13 @@ func wait(cmd *cobra.Command, args []string) error { pod_informer.AddEventHandler(toolscache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - handleEvent(timeoutCtx, waits.ProcessEventAddPod, obj.(*corev1.Pod), !noCollapseTree, noTree) + handleEvent(timeoutCtx, waits.ProcessEventAddPod, obj.(*corev1.Pod)) }, UpdateFunc: func(obj interface{}, newObj interface{}) { - handleEvent(timeoutCtx, waits.ProcessEventUpdatePod, newObj.(*corev1.Pod), !noCollapseTree, noTree) + handleEvent(timeoutCtx, waits.ProcessEventUpdatePod, newObj.(*corev1.Pod)) }, DeleteFunc: func(obj interface{}) { - handleEvent(timeoutCtx, waits.ProcessEventDeletePod, obj.(*corev1.Pod), !noCollapseTree, noTree) + handleEvent(timeoutCtx, waits.ProcessEventDeletePod, obj.(*corev1.Pod)) }, }) } @@ -207,34 +187,32 @@ func wait(cmd *cobra.Command, args []string) error { job_informer.AddEventHandler(toolscache.ResourceEventHandlerFuncs{ AddFunc: func(obj interface{}) { - handleEvent(timeoutCtx, waits.ProcessEventAddJob, obj.(*batchv1.Job), !noCollapseTree, noTree) + handleEvent(timeoutCtx, waits.ProcessEventAddJob, obj.(*batchv1.Job)) }, UpdateFunc: func(obj interface{}, newObj interface{}) { - handleEvent(timeoutCtx, waits.ProcessEventUpdateJob, newObj.(*batchv1.Job), !noCollapseTree, noTree) + handleEvent(timeoutCtx, waits.ProcessEventUpdateJob, newObj.(*batchv1.Job)) }, DeleteFunc: func(obj interface{}) { - handleEvent(timeoutCtx, waits.ProcessEventDeleteJob, obj.(*batchv1.Job), !noCollapseTree, noTree) + handleEvent(timeoutCtx, waits.ProcessEventDeleteJob, obj.(*batchv1.Job)) }, }) } - log.Printf("Starting informers...") - err = cc.Start(timeoutCtx) if err != nil { return err } - log.Printf("Shutdown informers.") + + waits.Done() return nil } func processCompletion() { - log.Printf("All items have completed or are ready") cancelFn() } -func handleEvent[V *corev1.Pod | *corev1.Service | *batchv1.Job](ctx context.Context, f func(ctx context.Context, obj V) (bool, error), obj V, collapseTree bool, noTree bool) { +func handleEvent[V *corev1.Pod | *corev1.Service | *batchv1.Job](ctx context.Context, f func(ctx context.Context, obj V) (bool, error), obj V) { mu.Lock() defer mu.Unlock() @@ -244,11 +222,7 @@ func handleEvent[V *corev1.Pod | *corev1.Service | *batchv1.Job](ctx context.Con } if matches { - if noTree { - log.Println(waits.GetStatusString()) - } else { - log.Println(waits.GetStatusTreeString(collapseTree)) - } + waits.PrintStatus() if waits.IsDone() { processCompletion() diff --git a/flags/flags.go b/flags/flags.go new file mode 100644 index 0000000..53355f3 --- /dev/null +++ b/flags/flags.go @@ -0,0 +1,61 @@ +/* + * Copyright 2023 The k8s-wait-for-multi authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package flags + +import ( + "time" + + "github.com/spf13/pflag" + + utilpointer "k8s.io/utils/pointer" +) + +type ConfigFlags struct { + PrintVersion *bool + PrintTree *bool + PrintCollapsedTree *bool + + Timeout *time.Duration +} + +func NewConfigFlags() *ConfigFlags { + return &ConfigFlags{ + PrintVersion: utilpointer.Bool(false), + PrintTree: utilpointer.Bool(true), + PrintCollapsedTree: utilpointer.Bool(true), + + Timeout: utilpointer.Duration(time.Duration(600 * time.Second)), + } +} + +func (f *ConfigFlags) AddFlags(flags *pflag.FlagSet) { + if f.Timeout != nil { + flags.DurationVarP(f.Timeout, "timeout", "t", *f.Timeout, "The length of time to wait before ending watch, zero means never. Any other values should contain a corresponding time unit (e.g. 1s, 2m, 3h)") + } + + if f.PrintVersion != nil { + flags.BoolVarP(f.PrintVersion, "version", "v", *f.PrintVersion, "Display version info") + } + + if f.PrintTree != nil { + flags.BoolVar(f.PrintTree, "print-tree", *f.PrintTree, "Print the status as a tree") + } + + if f.PrintCollapsedTree != nil { + flags.BoolVar(f.PrintCollapsedTree, "print-collapsed-tree", *f.PrintCollapsedTree, "Collapse the status tree for done subtrees") + } +} diff --git a/kube/client.go b/kube/client.go deleted file mode 100644 index 0cebc4d..0000000 --- a/kube/client.go +++ /dev/null @@ -1,94 +0,0 @@ -/* - * Copyright 2023 The k8s-wait-for-multi authors. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package kube - -import ( - "log" - - "k8s.io/apimachinery/pkg/runtime" - utilruntime "k8s.io/apimachinery/pkg/util/runtime" - "k8s.io/cli-runtime/pkg/genericclioptions" - clientgoscheme "k8s.io/client-go/kubernetes/scheme" - "k8s.io/utils/pointer" - "sigs.k8s.io/controller-runtime/pkg/client" -) - -var ( - scheme = runtime.NewScheme() - c client.Client - ww client.WithWatch -) - -func init() { - utilruntime.Must(clientgoscheme.AddToScheme(scheme)) -} - -func Client(flags *genericclioptions.ConfigFlags, rbFlags *genericclioptions.ResourceBuilderFlags) (client.Client, error) { - if c != nil { - return c, nil - } - - kubeconfig, err := flags.ToRESTConfig() - if err != nil { - log.Fatal(err) - return nil, err - } - - kubeclient, err := client.New(kubeconfig, client.Options{Scheme: scheme}) - if err != nil { - log.Fatal(err) - return nil, err - } - - if rbFlags != nil { - if rbFlags.AllNamespaces == nil || !*rbFlags.AllNamespaces { - if flags.Namespace != nil && len(*flags.Namespace) > 0 { - log.Printf("Running with namespace %s.\n", *flags.Namespace) - kubeclient = client.NewNamespacedClient(kubeclient, *flags.Namespace) - } else { - log.Println("Running with namespace default.") - kubeclient = client.NewNamespacedClient(kubeclient, "default") - } - } else { - log.Println("Running with all namespaces.") - rbFlags.AllNamespaces = pointer.Bool(true) - } - } - c = kubeclient - return c, nil -} - -func ClientWithWatch(flags *genericclioptions.ConfigFlags) (client.WithWatch, error) { - if ww != nil { - return ww, nil - } - - kubeconfig, err := flags.ToRESTConfig() - if err != nil { - log.Fatal(err) - return nil, err - } - - kubeclient, err := client.NewWithWatch(kubeconfig, client.Options{Scheme: scheme}) - if err != nil { - log.Fatal(err) - return nil, err - } - - ww = kubeclient - return ww, nil -} diff --git a/pkg/utils.go b/pkg/utils.go index c1d2704..481818d 100644 --- a/pkg/utils.go +++ b/pkg/utils.go @@ -34,3 +34,26 @@ func metaInSlice(a treeprint.MetaValue, list []treeprint.MetaValue) bool { } return false } + +func getNodesStatus(item *treeprint.Node, collapseTree bool) treeprint.MetaValue { + if len(item.Nodes) > 0 && item.Meta == TreeStatusUnknown { + status := []treeprint.MetaValue{} + + for _, node := range item.Nodes { + status = append(status, getNodesStatus(node, collapseTree)) + } + if metaInSlice(TreeStatusUnknown, status) { + item.Meta = TreeStatusUnknown + } else if metaInSlice(TreeStatusNotDone, status) { + item.Meta = TreeStatusNotDone + } else { + if collapseTree { + item.Nodes = nil + } + item.Meta = TreeStatusDone + } + // branch nodes + } + + return item.Meta +} diff --git a/pkg/waitables.go b/pkg/waitables.go index f04bdb4..549cceb 100644 --- a/pkg/waitables.go +++ b/pkg/waitables.go @@ -18,8 +18,11 @@ package pkg import ( "fmt" + "log" "strings" + "time" + "github.com/erayan/k8s-wait-for-multi/flags" "github.com/erayan/k8s-wait-for-multi/pkg/items" "github.com/erayan/k8s-wait-for-multi/utils" "github.com/xlab/treeprint" @@ -34,6 +37,14 @@ import ( type Waitables struct { cache.Cache + printTree bool + printCollapsedTree bool + + ticker *time.Ticker + queuedPrints int + tickerDone chan bool + tickerFinished chan bool + UnprocessablePodEvents map[types.UID]Event Services items.NamespacedServiceCollection @@ -109,7 +120,19 @@ func (w *Waitables) IsDone() bool { return s && p && j } -func (w *Waitables) GetStatusString() string { +func (w *Waitables) PrintStatus() { + w.queuedPrints += 1 +} + +func (w *Waitables) internalPrintStatus() { + if w.printTree { + log.Println(w.getStatusTreeString()) + } else { + log.Println(w.getStatusString()) + } +} + +func (w *Waitables) getStatusString() string { items := []string{} for ns, nsitems := range w.Services { for n, val := range nsitems { @@ -135,7 +158,7 @@ func (w *Waitables) GetStatusString() string { return fmt.Sprintf("Waiting for: %s", strings.Join(items, ", ")) } -func (w *Waitables) GetStatusTreeString(collapseTree bool) string { +func (w *Waitables) getStatusTreeString() string { tree := treeprint.NewWithRoot("wait status") @@ -193,35 +216,12 @@ func (w *Waitables) GetStatusTreeString(collapseTree bool) string { // if you need to iterate over the whole tree // call `VisitAll` from your top root node. tree.VisitAll(func(item *treeprint.Node) { - GetNodesStatus(item, collapseTree) + getNodesStatus(item, w.printCollapsedTree) }) return tree.String() } -func GetNodesStatus(item *treeprint.Node, collapseTree bool) treeprint.MetaValue { - if len(item.Nodes) > 0 && item.Meta == TreeStatusUnknown { - status := []treeprint.MetaValue{} - - for _, node := range item.Nodes { - status = append(status, GetNodesStatus(node, collapseTree)) - } - if metaInSlice(TreeStatusUnknown, status) { - item.Meta = TreeStatusUnknown - } else if metaInSlice(TreeStatusNotDone, status) { - item.Meta = TreeStatusNotDone - } else { - if collapseTree { - item.Nodes = nil - } - item.Meta = TreeStatusDone - } - // branch nodes - } - - return item.Meta -} - func (w *Waitables) SetPodReadyFromPod(pod *corev1.Pod) { w.Pods[pod.Namespace][pod.Name].WithReadyFromPod(pod) } @@ -281,16 +281,57 @@ func (w *Waitables) GetAllNamespaces() []string { return namespaces } +func (w *Waitables) Done() { + w.ticker.Stop() + w.tickerDone <- true + <-w.tickerFinished +} + +func (w *Waitables) Start() { + go func() { + for { + if w.Ticker() { + break + } + } + w.tickerFinished <- true + }() +} + +func (w *Waitables) Ticker() bool { + select { + case <-w.tickerDone: + w.internalPrintStatus() + return true + case <-w.ticker.C: + if w.queuedPrints > 0 { + w.queuedPrints = 0 + w.internalPrintStatus() + } + } + return false +} + func (w *Waitables) WithCache(c cache.Cache) *Waitables { w.Cache = c return w } -func NewWaitables() *Waitables { - return &Waitables{ +func NewWaitables(c *flags.ConfigFlags) *Waitables { + w := &Waitables{ UnprocessablePodEvents: map[types.UID]Event{}, Services: items.NamespacedServiceCollection{}, Pods: items.NamespacedPodCollection{}, Jobs: items.NamespacedJobCollection{}, + + ticker: time.NewTicker(250 * time.Millisecond), + queuedPrints: 0, + tickerDone: make(chan bool), + tickerFinished: make(chan bool), + + printTree: *c.PrintTree, + printCollapsedTree: *c.PrintCollapsedTree, } + + return w } diff --git a/utils/strings.go b/utils/strings.go index bcde9f5..b7a0f72 100644 --- a/utils/strings.go +++ b/utils/strings.go @@ -1,3 +1,19 @@ +/* + * Copyright 2023 The k8s-wait-for-multi authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package utils func StringInSlice(a string, list []string) bool {