From 84c02517d6f330b6f2bc93753893e82cb9f4a5c2 Mon Sep 17 00:00:00 2001 From: Haytham AbuelFutuh Date: Thu, 21 May 2020 13:07:17 -0700 Subject: [PATCH] Counting objects of kind --- cmd/controller/cmd/root.go | 7 ++++ config.yaml | 3 +- .../nodes/task/k8s/plugin_manager.go | 38 +++++++++++++++++++ pkg/controller/workers.go | 23 ++++++----- 4 files changed, 58 insertions(+), 13 deletions(-) diff --git a/cmd/controller/cmd/root.go b/cmd/controller/cmd/root.go index 315df6533..24bf51818 100644 --- a/cmd/controller/cmd/root.go +++ b/cmd/controller/cmd/root.go @@ -215,6 +215,13 @@ func executeRootCmd(cfg *config2.Config) { logger.Fatalf(ctx, "Failed to initialize controller run-time manager. Error: %v", err) } + go func() { + err = mgr.Start(ctx.Done()) + if err != nil { + logger.Fatalf(ctx, "Failed to start manager. Error: %v", err) + } + }() + c, err := controller.New(ctx, cfg, kubeClient, flyteworkflowClient, flyteworkflowInformerFactory, mgr, propellerScope) if err != nil { diff --git a/config.yaml b/config.yaml index b086508ed..5a9c4e3e9 100644 --- a/config.yaml +++ b/config.yaml @@ -31,6 +31,7 @@ tasks: - container - K8S-ARRAY - qubole-hive-executor + - spark # Sample plugins config plugins: # All k8s plugins default configuration @@ -62,7 +63,7 @@ storage: endpoint: http://localhost:30084 region: us-east-1 secret-key: miniostorage - type: minio + type: mem container: "my-s3-bucket" event: type: admin diff --git a/pkg/controller/nodes/task/k8s/plugin_manager.go b/pkg/controller/nodes/task/k8s/plugin_manager.go index 42f6556a2..11722a7bc 100644 --- a/pkg/controller/nodes/task/k8s/plugin_manager.go +++ b/pkg/controller/nodes/task/k8s/plugin_manager.go @@ -3,9 +3,12 @@ package k8s import ( "context" "fmt" + "reflect" "strings" "time" + "k8s.io/client-go/tools/cache" + "github.com/lyft/flytepropeller/pkg/controller/nodes/task/backoff" v1 "k8s.io/api/core/v1" @@ -417,8 +420,31 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry handler.Funcs{ CreateFunc: func(evt event.CreateEvent, q2 workqueue.RateLimitingInterface) { logger.Debugf(context.Background(), "Create received for %s, ignoring.", evt.Meta.GetName()) + i, err := iCtx.KubeClient().GetCache().GetInformer(entry.ResourceToWatch) + if err != nil { + panic(err) + } + + si, casted := i.(cache.SharedIndexInformer) + if !casted { + panic(fmt.Errorf("wrong type. Actual: %v", reflect.TypeOf(i))) + } + + logger.Infof(ctx, "Found items in store [%v]", len(si.GetStore().List())) }, UpdateFunc: func(evt event.UpdateEvent, q2 workqueue.RateLimitingInterface) { + i, err := iCtx.KubeClient().GetCache().GetInformer(entry.ResourceToWatch) + if err != nil { + panic(err) + } + + si, casted := i.(cache.SharedIndexInformer) + if !casted { + panic(fmt.Errorf("wrong type. Actual: %v", reflect.TypeOf(i))) + } + + logger.Infof(ctx, "Found items in store [%v] for kind [%v]", len(si.GetStore().List()), entry.ResourceToWatch) + if evt.MetaNew == nil { logger.Warn(context.Background(), "Received an Update event with nil MetaNew.") } else if evt.MetaOld == nil || evt.MetaOld.GetResourceVersion() != evt.MetaNew.GetResourceVersion() { @@ -466,6 +492,18 @@ func NewPluginManager(ctx context.Context, iCtx pluginsCore.SetupContext, entry return nil, err } + i, err := iCtx.KubeClient().GetCache().GetInformer(entry.ResourceToWatch) + if err != nil { + panic(err) + } + + si, casted := i.(cache.SharedIndexInformer) + if !casted { + panic(fmt.Errorf("wrong type. Actual: %v", reflect.TypeOf(i))) + } + + logger.Infof(ctx, "Found items in store [%v]", len(si.GetStore().List())) + return &PluginManager{ id: entry.ID, plugin: entry.Plugin, diff --git a/pkg/controller/workers.go b/pkg/controller/workers.go index 7b6303c81..06347e0c0 100644 --- a/pkg/controller/workers.go +++ b/pkg/controller/workers.go @@ -3,7 +3,6 @@ package controller import ( "context" "fmt" - "runtime/pprof" "time" "github.com/lyft/flytestdlib/contextutils" @@ -139,17 +138,17 @@ func (w *WorkerPool) Run(ctx context.Context, threadiness int, synced ...cache.I } logger.Infof(ctx, "Starting workers [%d]", threadiness) - // Launch workers to process FlyteWorkflow resources - for i := 0; i < threadiness; i++ { - w.metrics.FreeWorkers.Inc() - logger.Infof(ctx, "Starting worker [%d]", i) - workerLabel := fmt.Sprintf("worker-%v", i) - go func() { - workerCtx := contextutils.WithGoroutineLabel(ctx, workerLabel) - pprof.SetGoroutineLabels(workerCtx) - w.runWorker(workerCtx) - }() - } + //// Launch workers to process FlyteWorkflow resources + //for i := 0; i < threadiness; i++ { + // w.metrics.FreeWorkers.Inc() + // logger.Infof(ctx, "Starting worker [%d]", i) + // workerLabel := fmt.Sprintf("worker-%v", i) + // go func() { + // workerCtx := contextutils.WithGoroutineLabel(ctx, workerLabel) + // pprof.SetGoroutineLabels(workerCtx) + // w.runWorker(workerCtx) + // }() + //} w.workQueue.Start(ctx) logger.Info(ctx, "Started workers")