Skip to content

Commit

Permalink
Show plugin activation status (#1256)
Browse files Browse the repository at this point in the history
* Show plugin activation status
  • Loading branch information
Josef Karasek authored Sep 18, 2023
1 parent 1a9c02b commit 03a50c9
Show file tree
Hide file tree
Showing 13 changed files with 149 additions and 54 deletions.
4 changes: 3 additions & 1 deletion cmd/botkube-agent/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,9 +180,10 @@ func run(ctx context.Context) (err error) {
}()

schedulerChan := make(chan string)
pluginHealthStats := plugin.NewHealthStats(conf.Plugins.RestartPolicy.Threshold)
collector := plugin.NewCollector(logger)
enabledPluginExecutors, enabledPluginSources := collector.GetAllEnabledAndUsedPlugins(conf)
pluginManager := plugin.NewManager(logger, conf.Settings.Log, conf.Plugins, enabledPluginExecutors, enabledPluginSources, schedulerChan)
pluginManager := plugin.NewManager(logger, conf.Settings.Log, conf.Plugins, enabledPluginExecutors, enabledPluginSources, schedulerChan, pluginHealthStats)

err = pluginManager.Start(ctx)
if err != nil {
Expand Down Expand Up @@ -219,6 +220,7 @@ func run(ctx context.Context) (err error) {
BotKubeVersion: botkubeVersion,
RestCfg: kubeConfig,
AuditReporter: auditReporter,
PluginHealthStats: pluginHealthStats,
},
)

Expand Down
14 changes: 7 additions & 7 deletions internal/plugin/health_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ type HealthMonitor struct {
executorsStore *store[executor.Executor]
sourcesStore *store[source.Source]
policy config.PluginRestartPolicy
pluginRestartStats map[string]int
pluginHealthStats *HealthStats
healthCheckInterval time.Duration
}

// NewHealthMonitor returns a new HealthMonitor instance.
func NewHealthMonitor(logger logrus.FieldLogger, logCfg config.Logger, policy config.PluginRestartPolicy, schedulerChan chan string, sourceSupervisorChan, executorSupervisorChan chan pluginMetadata, executorsStore *store[executor.Executor], sourcesStore *store[source.Source], healthCheckInterval time.Duration) *HealthMonitor {
func NewHealthMonitor(logger logrus.FieldLogger, logCfg config.Logger, policy config.PluginRestartPolicy, schedulerChan chan string, sourceSupervisorChan, executorSupervisorChan chan pluginMetadata, executorsStore *store[executor.Executor], sourcesStore *store[source.Source], healthCheckInterval time.Duration, stats *HealthStats) *HealthMonitor {
return &HealthMonitor{
log: logger,
logConfig: logCfg,
Expand All @@ -36,7 +36,7 @@ func NewHealthMonitor(logger logrus.FieldLogger, logCfg config.Logger, policy co
executorSupervisorChan: executorSupervisorChan,
executorsStore: executorsStore,
sourcesStore: sourcesStore,
pluginRestartStats: make(map[string]int),
pluginHealthStats: stats,
healthCheckInterval: healthCheckInterval,
}
}
Expand All @@ -54,7 +54,7 @@ func (m *HealthMonitor) monitorSourcePluginHealth(ctx context.Context) {
case <-ctx.Done():
return
case plugin := <-m.sourceSupervisorChan:
m.log.Infof("Restarting source plugin %q, attempt %d/%d...", plugin.pluginKey, m.pluginRestartStats[plugin.pluginKey]+1, m.policy.Threshold)
m.log.Infof("Restarting source plugin %q, attempt %d/%d...", plugin.pluginKey, m.pluginHealthStats.GetRestartCount(plugin.pluginKey)+1, m.policy.Threshold)
if source, ok := m.sourcesStore.EnabledPlugins.Get(plugin.pluginKey); ok && source.Cleanup != nil {
m.log.Debugf("Releasing resources of source plugin %q...", plugin.pluginKey)
source.Cleanup()
Expand Down Expand Up @@ -86,7 +86,7 @@ func (m *HealthMonitor) monitorExecutorPluginHealth(ctx context.Context) {
case <-ctx.Done():
return
case plugin := <-m.executorSupervisorChan:
m.log.Infof("Restarting executor plugin %q, attempt %d/%d...", plugin.pluginKey, m.pluginRestartStats[plugin.pluginKey]+1, m.policy.Threshold)
m.log.Infof("Restarting executor plugin %q, attempt %d/%d...", plugin.pluginKey, m.pluginHealthStats.GetRestartCount(plugin.pluginKey)+1, m.policy.Threshold)

if executor, ok := m.executorsStore.EnabledPlugins.Get(plugin.pluginKey); ok && executor.Cleanup != nil {
m.log.Infof("Releasing executors of executor plugin %q...", plugin.pluginKey)
Expand All @@ -111,8 +111,8 @@ func (m *HealthMonitor) monitorExecutorPluginHealth(ctx context.Context) {
}

func (m *HealthMonitor) shouldRestartPlugin(plugin string) bool {
restarts := m.pluginRestartStats[plugin]
m.pluginRestartStats[plugin]++
restarts := m.pluginHealthStats.GetRestartCount(plugin)
m.pluginHealthStats.Increment(plugin)

switch m.policy.Type.ToLower() {
case config.KeepAgentRunningWhenThresholdReached.ToLower():
Expand Down
75 changes: 75 additions & 0 deletions internal/plugin/health_stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
package plugin

import (
"sync"
"time"
)

const (
pluginRunning = "Running"
pluginDeactivated = "Deactivated"
)

// HealthStats holds information about plugin health and restarts.
type HealthStats struct {
sync.RWMutex
pluginStats map[string]pluginStats
globalRestartThreshold int
}

type pluginStats struct {
restartCount int
restartThreshold int
lastTransitionTime string
}

// NewHealthStats returns a new HealthStats instance.
func NewHealthStats(threshold int) *HealthStats {
return &HealthStats{
pluginStats: map[string]pluginStats{},
globalRestartThreshold: threshold,
}
}

// Increment increments restart count for a plugin.
func (h *HealthStats) Increment(plugin string) {
h.Lock()
defer h.Unlock()
if _, ok := h.pluginStats[plugin]; !ok {
h.pluginStats[plugin] = pluginStats{}
}
h.pluginStats[plugin] = pluginStats{
restartCount: h.pluginStats[plugin].restartCount + 1,
lastTransitionTime: time.Now().Format(time.RFC3339),
restartThreshold: h.globalRestartThreshold,
}
}

// GetRestartCount returns restart count for a plugin.
func (h *HealthStats) GetRestartCount(plugin string) int {
h.RLock()
defer h.RUnlock()
if _, ok := h.pluginStats[plugin]; !ok {
return 0
}
return h.pluginStats[plugin].restartCount
}

// GetStats returns plugin status, restart count, restart threshold and last transition time.
func (h *HealthStats) GetStats(plugin string) (status string, restarts int, threshold int, timestamp string) {
h.RLock()
defer h.RUnlock()
status = pluginRunning
if _, ok := h.pluginStats[plugin]; !ok {
threshold = h.globalRestartThreshold
return
}

if h.pluginStats[plugin].restartCount > h.pluginStats[plugin].restartThreshold {
status = pluginDeactivated
}
restarts = h.pluginStats[plugin].restartCount
threshold = h.pluginStats[plugin].restartThreshold
timestamp = h.pluginStats[plugin].lastTransitionTime
return
}
3 changes: 2 additions & 1 deletion internal/plugin/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ type pluginMetadata struct {
}

// NewManager returns a new Manager instance.
func NewManager(logger logrus.FieldLogger, logCfg config.Logger, cfg config.PluginManagement, executors, sources []string, schedulerChan chan string) *Manager {
func NewManager(logger logrus.FieldLogger, logCfg config.Logger, cfg config.PluginManagement, executors, sources []string, schedulerChan chan string, stats *HealthStats) *Manager {
sourceSupervisorChan := make(chan pluginMetadata)
executorSupervisorChan := make(chan pluginMetadata)
executorsStore := newStore[executor.Executor]()
Expand Down Expand Up @@ -102,6 +102,7 @@ func NewManager(logger logrus.FieldLogger, logCfg config.Logger, cfg config.Plug
&executorsStore,
&sourcesStore,
cfg.HealthCheckInterval,
stats,
),
}
}
Expand Down
2 changes: 1 addition & 1 deletion internal/plugin/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func TestCollectEnabledRepositories(t *testing.T) {
// given
manager := NewManager(loggerx.NewNoop(), config.Logger{}, config.PluginManagement{
Repositories: tc.definedRepositories,
}, tc.enabledExecutors, tc.enabledSources, make(chan string))
}, tc.enabledExecutors, tc.enabledSources, make(chan string), NewHealthStats(1))

// when
out, err := manager.collectEnabledRepositories()
Expand Down
10 changes: 6 additions & 4 deletions pkg/execute/exec.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (

"github.com/sirupsen/logrus"

"github.com/kubeshop/botkube/internal/plugin"
"github.com/kubeshop/botkube/pkg/bot/interactive"
"github.com/kubeshop/botkube/pkg/config"
"github.com/kubeshop/botkube/pkg/execute/alias"
Expand Down Expand Up @@ -52,20 +53,21 @@ func (e *ExecExecutor) FeatureName() FeatureName {
// List returns a tabular representation of Executors
func (e *ExecExecutor) List(_ context.Context, cmdCtx CommandContext) (interactive.CoreMessage, error) {
e.log.Debug("Listing executors...")
return respond(e.TabularOutput(cmdCtx.Conversation.ExecutorBindings), cmdCtx), nil
return respond(e.TabularOutput(cmdCtx.Conversation.ExecutorBindings, cmdCtx.PluginHealthStats), cmdCtx), nil
}

// TabularOutput sorts executor groups by key and returns a printable table
func (e *ExecExecutor) TabularOutput(bindings []string) string {
func (e *ExecExecutor) TabularOutput(bindings []string, stats *plugin.HealthStats) string {
executors := executorsForBindings(e.cfg.Executors, bindings)

buf := new(bytes.Buffer)
w := tabwriter.NewWriter(buf, 5, 0, 1, ' ', 0)
fmt.Fprintf(w, "EXECUTOR\tENABLED\tALIASES")
fmt.Fprintf(w, "EXECUTOR\tENABLED\tALIASES\tRESTARTS\tSTATUS\tLAST_RESTART")
for _, name := range maputil.SortKeys(executors) {
enabled := executors[name]
aliases := alias.ListExactForExecutor(name, e.cfg.Aliases)
fmt.Fprintf(w, "\n%s\t%t\t%s", name, enabled, strings.Join(aliases, ", "))
status, restarts, threshold, timestamp := stats.GetStats(name)
fmt.Fprintf(w, "\n%s\t%t\t%s\t%d/%d\t%s\t%s", name, enabled, strings.Join(aliases, ", "), restarts, threshold, status, timestamp)
}

w.Flush()
Expand Down
20 changes: 11 additions & 9 deletions pkg/execute/exec_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/kubeshop/botkube/internal/loggerx"
"github.com/kubeshop/botkube/internal/plugin"
"github.com/kubeshop/botkube/pkg/config"
)

Expand Down Expand Up @@ -50,9 +51,9 @@ func TestExecutorBindingsExecutor(t *testing.T) {
},
bindings: []string{"kubectl-team-a", "kubectl-team-b"},
expOutput: heredoc.Doc(`
EXECUTOR ENABLED ALIASES
botkube/echo true
botkube/kubectl true k, kc`),
EXECUTOR ENABLED ALIASES RESTARTS STATUS LAST_RESTART
botkube/echo true 0/1 Running
botkube/kubectl true k, kc 0/1 Running`),
},
{
name: "executors and plugins",
Expand Down Expand Up @@ -91,17 +92,18 @@ func TestExecutorBindingsExecutor(t *testing.T) {
},
bindings: []string{"kubectl", "botkube/helm", "botkube/[email protected]"},
expOutput: heredoc.Doc(`
EXECUTOR ENABLED ALIASES
botkube/[email protected] true e
botkube/helm true h
botkube/kubectl true`),
EXECUTOR ENABLED ALIASES RESTARTS STATUS LAST_RESTART
botkube/[email protected] true e 0/1 Running
botkube/helm true h 0/1 Running
botkube/kubectl true 0/1 Running`),
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
cmdCtx := CommandContext{
ExecutorFilter: newExecutorTextFilter(""),
Conversation: Conversation{ExecutorBindings: tc.bindings},
ExecutorFilter: newExecutorTextFilter(""),
Conversation: Conversation{ExecutorBindings: tc.bindings},
PluginHealthStats: plugin.NewHealthStats(1),
}
e := NewExecExecutor(loggerx.NewNoop(), tc.cfg)
msg, err := e.List(context.Background(), cmdCtx)
Expand Down
19 changes: 11 additions & 8 deletions pkg/execute/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/sirupsen/logrus"

"github.com/kubeshop/botkube/internal/audit"
"github.com/kubeshop/botkube/internal/plugin"
remoteapi "github.com/kubeshop/botkube/internal/remote"
"github.com/kubeshop/botkube/pkg/api"
"github.com/kubeshop/botkube/pkg/bot/interactive"
Expand Down Expand Up @@ -58,6 +59,7 @@ type DefaultExecutor struct {
user UserInput
cmdsMapping *CommandMapping
auditReporter audit.AuditReporter
pluginHealthStats *plugin.HealthStats
}

// Execute executes commands and returns output
Expand All @@ -70,14 +72,15 @@ func (e *DefaultExecutor) Execute(ctx context.Context) interactive.CoreMessage {
Debugf("Expanding aliases from command...")

cmdCtx := CommandContext{
ClusterName: e.cfg.Settings.ClusterName,
ExpandedRawCmd: expandedRawCmd,
CommGroupName: e.commGroupName,
User: e.user,
Conversation: e.conversation,
Platform: e.platform,
NotifierHandler: e.notifierHandler,
Mapping: e.cmdsMapping,
ClusterName: e.cfg.Settings.ClusterName,
ExpandedRawCmd: expandedRawCmd,
CommGroupName: e.commGroupName,
User: e.user,
Conversation: e.conversation,
Platform: e.platform,
NotifierHandler: e.notifierHandler,
Mapping: e.cmdsMapping,
PluginHealthStats: e.pluginHealthStats,
}

flags, err := ParseFlags(expandedRawCmd)
Expand Down
4 changes: 4 additions & 0 deletions pkg/execute/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type DefaultExecutorFactory struct {
sourceExecutor *SourceExecutor
cmdsMapping *CommandMapping
auditReporter audit.AuditReporter
pluginHealthStats *plugin.HealthStats
}

// DefaultExecutorFactoryParams contains input parameters for DefaultExecutorFactory.
Expand All @@ -46,6 +47,7 @@ type DefaultExecutorFactoryParams struct {
RestCfg *rest.Config
BotKubeVersion string
AuditReporter audit.AuditReporter
PluginHealthStats *plugin.HealthStats
}

// Executor is an interface for processes to execute commands
Expand Down Expand Up @@ -153,6 +155,7 @@ func NewExecutorFactory(params DefaultExecutorFactoryParams) (*DefaultExecutorFa
sourceExecutor: sourceExecutor,
cmdsMapping: mappings,
auditReporter: params.AuditReporter,
pluginHealthStats: params.PluginHealthStats,
}, nil
}

Expand Down Expand Up @@ -203,6 +206,7 @@ func (f *DefaultExecutorFactory) NewDefault(cfg NewDefaultInput) Executor {
sourceExecutor: f.sourceExecutor,
cmdsMapping: f.cmdsMapping,
auditReporter: f.auditReporter,
pluginHealthStats: f.pluginHealthStats,
user: cfg.User,
notifierHandler: cfg.NotifierHandler,
conversation: cfg.Conversation,
Expand Down
2 changes: 2 additions & 0 deletions pkg/execute/mapping.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"fmt"
"text/tabwriter"

"github.com/kubeshop/botkube/internal/plugin"
"github.com/kubeshop/botkube/pkg/api"
"github.com/kubeshop/botkube/pkg/bot/interactive"
"github.com/kubeshop/botkube/pkg/config"
Expand Down Expand Up @@ -54,6 +55,7 @@ type CommandContext struct {
NotifierHandler NotifierHandler
Mapping *CommandMapping
CmdHeader string
PluginHealthStats *plugin.HealthStats
}

// ProvidedClusterNameEqualOrEmpty returns true when provided cluster name is empty
Expand Down
10 changes: 6 additions & 4 deletions pkg/execute/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/sirupsen/logrus"

"github.com/kubeshop/botkube/internal/plugin"
"github.com/kubeshop/botkube/pkg/bot/interactive"
"github.com/kubeshop/botkube/pkg/config"
"github.com/kubeshop/botkube/pkg/execute/command"
Expand Down Expand Up @@ -50,11 +51,11 @@ func (e *SourceExecutor) FeatureName() FeatureName {
// List returns a tabular representation of Executors
func (e *SourceExecutor) List(ctx context.Context, cmdCtx CommandContext) (interactive.CoreMessage, error) {
e.log.Debug("List sources")
return respond(e.TabularOutput(cmdCtx.Conversation.SourceBindings), cmdCtx), nil
return respond(e.TabularOutput(cmdCtx.Conversation.SourceBindings, cmdCtx.PluginHealthStats), cmdCtx), nil
}

// TabularOutput sorts source groups by key and returns a printable table
func (e *SourceExecutor) TabularOutput(bindings []string) string {
func (e *SourceExecutor) TabularOutput(bindings []string, stats *plugin.HealthStats) string {
sources := make(map[string]bool)
for _, b := range bindings {
s, ok := e.cfg.Sources[b]
Expand All @@ -69,10 +70,11 @@ func (e *SourceExecutor) TabularOutput(bindings []string) string {

buf := new(bytes.Buffer)
w := tabwriter.NewWriter(buf, 5, 0, 1, ' ', 0)
fmt.Fprintf(w, "SOURCE\tENABLED")
fmt.Fprintf(w, "SOURCE\tENABLED\tRESTARTS\tSTATUS\tLAST_RESTART")
for _, key := range maputil.SortKeys(sources) {
enabled := sources[key]
fmt.Fprintf(w, "\n%s\t%t", key, enabled)
status, restarts, threshold, timestamp := stats.GetStats(key)
fmt.Fprintf(w, "\n%s\t%t\t%d/%d\t%s\t%s", key, enabled, restarts, threshold, status, timestamp)
}
w.Flush()
return buf.String()
Expand Down
Loading

0 comments on commit 03a50c9

Please sign in to comment.