From a9c9105c2511de5d62fb7e130da868c60f68cd7e Mon Sep 17 00:00:00 2001 From: Yonas Habteab Date: Thu, 1 Aug 2024 10:00:34 +0200 Subject: [PATCH] Offload evaluable configs from `Incident` to a common place --- internal/config/evaluable_config.go | 137 +++++++++++++ internal/config/evaluable_config_test.go | 243 +++++++++++++++++++++++ internal/incident/incident.go | 199 +++++++++---------- internal/incident/incidents.go | 2 +- 4 files changed, 471 insertions(+), 110 deletions(-) create mode 100644 internal/config/evaluable_config.go create mode 100644 internal/config/evaluable_config_test.go diff --git a/internal/config/evaluable_config.go b/internal/config/evaluable_config.go new file mode 100644 index 00000000..5c753e3c --- /dev/null +++ b/internal/config/evaluable_config.go @@ -0,0 +1,137 @@ +package config + +import ( + "github.com/icinga/icinga-notifications/internal/filter" + "github.com/icinga/icinga-notifications/internal/rule" +) + +// EvalOptions specifies optional callbacks that are executed upon certain filter evaluation events. +type EvalOptions[T, U any] struct { + // OnPreEvaluate can be used to perform arbitrary actions before evaluating the current entry of type "T". + // An entry of type "T" for which this hook returns "false" will be excluded from evaluation. + OnPreEvaluate func(T) bool + + // OnError can be used to perform arbitrary actions on filter evaluation errors. + // The original filter evaluation error is passed to this function as well as the current + // entry of type "T", whose filter evaluation triggered the error. + // + // By default, the filter evaluation doesn't get interrupted if any of them fail, instead it will continue + // evaluating all the remaining entries. However, you can override this behaviour by returning "false" in + // your handler, in which case the filter evaluation is aborted prematurely. + OnError func(T, error) bool + + // OnFilterMatch can be used to perform arbitrary actions after a successful filter evaluation of type "T". + // This callback obtains the current entry of type "T" as an argument, whose filter matched on the filterableTest. + // + // Note, any error returned by the OnFilterMatch hook causes the filter evaluation to be aborted + // immediately before even reaching the remaining ones. + OnFilterMatch func(T) error + + // OnAllConfigEvaluated can be used to perform some post filter evaluation actions. + // This handler receives an arbitrary value, be it a result of any filter evaluation or a made-up one of type "U". + // + // OnAllConfigEvaluated will only be called once all the entries of type "T" are evaluated, though it doesn't + // necessarily depend on the result of the individual entry filter evaluation. If the individual Eval* receivers + // don't return prematurely with an error, this hook is guaranteed to be called in any other cases. However, you + // should be aware, that this hook may not be supported by all Eval* methods. + OnAllConfigEvaluated func(U) +} + +// Evaluable manages an evaluable config types in a centralised and structured way. +// An evaluable config is a config type that allows to evaluate filter expressions in some way. +type Evaluable struct { + Rules map[int64]bool `db:"-"` + RuleEntries map[int64]*rule.Entry `db:"-" json:"-"` +} + +// NewEvaluable returns a fully initialised and ready to use Evaluable type. +func NewEvaluable() *Evaluable { + return &Evaluable{ + Rules: make(map[int64]bool), + RuleEntries: make(map[int64]*rule.Entry), + } +} + +// EvaluateRules evaluates all the configured event rule.Rule(s) for the given filter.Filterable object. +// +// Please note that this function may not always evaluate *all* configured rules from the specified RuntimeConfig, +// as it internally caches all previously matched rules based on their ID. +// +// EvaluateRules allows you to specify EvalOptions and hook up certain filter evaluation steps. +// This function does not support the EvalOptions.OnAllConfigEvaluated callback and will never trigger +// it (if provided). Please refer to the description of the individual EvalOptions to find out more about +// when the hooks get triggered and possible special cases. +// +// Returns an error if any of the provided callbacks return an error, otherwise always nil. +func (e *Evaluable) EvaluateRules(r *RuntimeConfig, filterable filter.Filterable, options EvalOptions[*rule.Rule, any]) error { + for _, ru := range r.Rules { + if !e.Rules[ru.ID] && (options.OnPreEvaluate == nil || options.OnPreEvaluate(ru)) { + matched, err := ru.Eval(filterable) + if err != nil && options.OnError != nil && !options.OnError(ru, err) { + return err + } + if err != nil || !matched { + continue + } + + if options.OnFilterMatch != nil { + if err := options.OnFilterMatch(ru); err != nil { + return err + } + } + + e.Rules[ru.ID] = true + } + } + + return nil +} + +// EvaluateRuleEntries evaluates all the configured rule.Entry for the provided filter.Filterable object. +// +// This function allows you to specify EvalOptions and hook up certain filter evaluation steps. +// Currently, EvaluateRuleEntries fully support all the available EvalOptions. Please refer to the +// description of the individual EvalOptions to find out more about when the hooks get triggered and +// possible special cases. +// +// Returns an error if any of the provided callbacks return an error, otherwise always nil. +func (e *Evaluable) EvaluateRuleEntries(r *RuntimeConfig, filterable filter.Filterable, options EvalOptions[*rule.Entry, any]) error { + retryAfter := rule.RetryNever + + for ruleID := range e.Rules { + ru := r.Rules[ruleID] + if ru == nil { + // It would be appropriate to have a debug log here, but unfortunately we don't have access to a logger. + continue + } + + for _, entry := range ru.Entries { + if options.OnPreEvaluate != nil && !options.OnPreEvaluate(entry) { + continue + } + + if matched, err := entry.Eval(filterable); err != nil { + if options.OnError != nil && !options.OnError(entry, err) { + return err + } + } else if cond, ok := filterable.(*rule.EscalationFilter); !matched && ok { + incidentAgeFilter := cond.ReevaluateAfter(entry.Condition) + retryAfter = min(retryAfter, incidentAgeFilter) + } else if matched { + if options.OnFilterMatch != nil { + if err := options.OnFilterMatch(entry); err != nil { + return err + } + } + + e.RuleEntries[entry.ID] = entry + } + } + } + + if options.OnAllConfigEvaluated != nil { + options.OnAllConfigEvaluated(retryAfter) + } + + return nil +} diff --git a/internal/config/evaluable_config_test.go b/internal/config/evaluable_config_test.go new file mode 100644 index 00000000..59a4a9a4 --- /dev/null +++ b/internal/config/evaluable_config_test.go @@ -0,0 +1,243 @@ +package config + +import ( + "fmt" + "github.com/icinga/icinga-notifications/internal/filter" + "github.com/icinga/icinga-notifications/internal/rule" + "github.com/stretchr/testify/require" + "maps" + "testing" + "time" +) + +const defaultDivisor = 3 + +func TestEvaluableConfig(t *testing.T) { + t.Parallel() + + runtimeConfigTest := new(RuntimeConfig) + runtimeConfigTest.Rules = make(map[int64]*rule.Rule) + for i := 1; i <= 50; i++ { + runtimeConfigTest.Rules[int64(i)] = makeRule(t, i) + } + + t.Run("NewEvaluable", func(t *testing.T) { + t.Parallel() + + e := NewEvaluable() + require.NotNil(t, e, "it should create a fully initialised evaluable config") + require.NotNil(t, e.Rules) + require.NotNil(t, e.RuleEntries) + }) + + t.Run("EvaluateRules", func(t *testing.T) { + t.Parallel() + + runtime := new(RuntimeConfig) + runtime.Rules = maps.Clone(runtimeConfigTest.Rules) + + expectedLen := len(runtime.Rules) / defaultDivisor + options := EvalOptions[*rule.Rule, any]{} + e := NewEvaluable() + assertRules := func(expectedLen *int, expectError bool) { + if expectError { + require.Error(t, e.EvaluateRules(runtime, new(filterableTest), options)) + } else { + require.NoError(t, e.EvaluateRules(runtime, new(filterableTest), options)) + } + require.Len(t, e.Rules, *expectedLen) + } + + assertRules(&expectedLen, false) + maps.DeleteFunc(e.Rules, func(ruleID int64, _ bool) bool { return int(ruleID) > expectedLen/2 }) + + options.OnPreEvaluate = func(r *rule.Rule) bool { + require.Falsef(t, e.Rules[r.ID], "EvaluateRules() shouldn't evaluate %q twice", r.Name) + return true + } + options.OnError = func(r *rule.Rule, err error) bool { + require.EqualError(t, err, `"nonexistent" is not a valid filter key`) + require.Truef(t, r.ID%defaultDivisor != 0, "evaluating rule %q should not fail", r.Name) + return true + } + options.OnFilterMatch = func(r *rule.Rule) error { + require.Falsef(t, e.Rules[r.ID], "EvaluateRules() shouldn't evaluate %q twice", r.Name) + return nil + } + + assertRules(&expectedLen, false) + maps.DeleteFunc(e.Rules, func(ruleID int64, _ bool) bool { return int(ruleID) > expectedLen/2 }) + + lenBeforeError := new(int) + options.OnError = func(r *rule.Rule, err error) bool { + if *lenBeforeError != 0 { + require.Fail(t, "OnError() shouldn't have been called again") + } + + require.EqualError(t, err, `"nonexistent" is not a valid filter key`) + require.Truef(t, r.ID%defaultDivisor != 0, "evaluating rule %q should not fail", r.Name) + + *lenBeforeError = len(e.Rules) + return false // This should let the evaluation fail completely! + } + assertRules(lenBeforeError, true) + maps.DeleteFunc(e.Rules, func(ruleID int64, _ bool) bool { return int(ruleID) > expectedLen/2 }) + + *lenBeforeError = 0 + options.OnError = nil + options.OnFilterMatch = func(r *rule.Rule) error { + if *lenBeforeError != 0 { + require.Fail(t, "OnFilterMatch() shouldn't have been called again") + } + + *lenBeforeError = len(e.Rules) + return fmt.Errorf("OnFilterMatch() failed badly") // This should let the evaluation fail completely! + } + assertRules(lenBeforeError, true) + }) + + t.Run("EvaluateRuleEntries", func(t *testing.T) { + t.Parallel() + + runtime := new(RuntimeConfig) + runtime.Rules = maps.Clone(runtimeConfigTest.Rules) + + e := NewEvaluable() + options := EvalOptions[*rule.Entry, any]{} + + expectedLen := 0 + filterContext := &rule.EscalationFilter{IncidentSeverity: 9} // Event severity "emergency" + assertEntries := func(expectedLen *int, expectError bool) { + if expectError { + require.Error(t, e.EvaluateRuleEntries(runtime, filterContext, options)) + } else { + require.NoError(t, e.EvaluateRuleEntries(runtime, filterContext, options)) + } + require.Len(t, e.RuleEntries, *expectedLen) + e.RuleEntries = make(map[int64]*rule.Entry) + } + + assertEntries(&expectedLen, false) + require.NoError(t, e.EvaluateRules(runtime, new(filterableTest), EvalOptions[*rule.Rule, any]{})) + require.Len(t, e.Rules, len(runtime.Rules)/defaultDivisor) + expectedLen = len(runtime.Rules)/defaultDivisor - 5 // 15/3 => (5) valid entries are going to be deleted below. + + // Drop some random rules from the runtime config to simulate a runtime config deletion! + maps.DeleteFunc(runtime.Rules, func(ruleID int64, _ *rule.Rule) bool { return ruleID > 35 && ruleID%defaultDivisor == 0 }) + + options.OnPreEvaluate = func(re *rule.Entry) bool { + if re.RuleID > 35 && re.RuleID%defaultDivisor == 0 { // Those rules are deleted from our runtime config. + require.Failf(t, "OnPreEvaluate() shouldn't have been called", "rule %d was deleted from runtime config", re.RuleID) + } + + require.Nilf(t, e.RuleEntries[re.ID], "EvaluateRuleEntries() shouldn't evaluate entry %d twice", re.ID) + return true + } + options.OnError = func(re *rule.Entry, err error) bool { + require.EqualError(t, err, `unknown severity "evaluable"`) + require.Truef(t, re.RuleID%defaultDivisor == 0, "evaluating rule entry %d should not fail", re.ID) + return true + } + options.OnFilterMatch = func(re *rule.Entry) error { + require.Nilf(t, e.RuleEntries[re.ID], "OnPreEvaluate() shouldn't evaluate %d twice", re.ID) + return nil + } + assertEntries(&expectedLen, false) + + lenBeforeError := new(int) + options.OnError = func(re *rule.Entry, err error) bool { + if *lenBeforeError != 0 { + require.Fail(t, "OnError() shouldn't have been called again") + } + + require.EqualError(t, err, `unknown severity "evaluable"`) + require.Truef(t, re.RuleID%defaultDivisor == 0, "evaluating rule entry %d should not fail", re.ID) + + *lenBeforeError = len(e.RuleEntries) + return false // This should let the evaluation fail completely! + } + assertEntries(lenBeforeError, true) + + *lenBeforeError = 0 + options.OnError = nil + options.OnFilterMatch = func(re *rule.Entry) error { + if *lenBeforeError != 0 { + require.Fail(t, "OnFilterMatch() shouldn't have been called again") + } + + *lenBeforeError = len(e.RuleEntries) + return fmt.Errorf("OnFilterMatch() failed badly") // This should let the evaluation fail completely! + } + assertEntries(lenBeforeError, true) + + expectedLen = 0 + filterContext.IncidentSeverity = 1 // OK + filterContext.IncidentAge = 5 * time.Minute + + options.OnFilterMatch = nil + options.OnPreEvaluate = func(re *rule.Entry) bool { return re.RuleID < 5 } + options.OnAllConfigEvaluated = func(result any) { + retryAfter := result.(time.Duration) + // The filter string of the escalation condition is incident_age>=10m and the actual incident age is 5m. + require.Equal(t, 5*time.Minute, retryAfter) + } + assertEntries(&expectedLen, false) + }) +} + +func makeRule(t *testing.T, i int) *rule.Rule { + r := new(rule.Rule) + r.ID = int64(i) + r.Name = fmt.Sprintf("rule-%d", i) + r.Entries = make(map[int64]*rule.Entry) + + invalidSeverity, err := filter.Parse("incident_severity=evaluable") + require.NoError(t, err, "parsing incident_severity=evaluable shouldn't fail") + + redundant := new(rule.Entry) + redundant.ID = r.ID * 150 // It must be large enough to avoid colliding with others! + redundant.RuleID = r.ID + redundant.Condition = invalidSeverity + + nonexistent, err := filter.Parse("nonexistent=evaluable") + require.NoError(t, err, "parsing nonexistent=evaluable shouldn't fail") + + r.Entries[redundant.ID] = redundant + r.ObjectFilter = nonexistent + if i%defaultDivisor == 0 { + objCond, err := filter.Parse("host=evaluable") + require.NoError(t, err, "parsing host=evaluable shouldn't fail") + + escalationCond, err := filter.Parse("incident_severity>warning||incident_age>=10m") + require.NoError(t, err, "parsing incident_severity>=ok shouldn't fail") + + entry := new(rule.Entry) + entry.ID = r.ID * 2 + entry.RuleID = r.ID + entry.Condition = escalationCond + + r.ObjectFilter = objCond + r.Entries[entry.ID] = entry + } + + return r +} + +// filterableTest is a test type that simulates a filter evaluation and eliminates +// the need of having to import e.g. the object package. +type filterableTest struct{} + +func (f *filterableTest) EvalEqual(k string, v string) (bool, error) { + if k != "host" { + return false, fmt.Errorf("%q is not a valid filter key", k) + } + + return v == "evaluable", nil +} + +func (f *filterableTest) EvalExists(_ string) bool { return true } +func (f *filterableTest) EvalLess(_ string, _ string) (bool, error) { + panic("Oh dear - you shouldn't have called me") +} +func (f *filterableTest) EvalLike(_, _ string) (bool, error) { return f.EvalLess("", "") } +func (f *filterableTest) EvalLessOrEqual(_, _ string) (bool, error) { return f.EvalLess("", "") } diff --git a/internal/incident/incident.go b/internal/incident/incident.go index 4ede9396..386d4a75 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -19,7 +19,6 @@ import ( "time" ) -type ruleID = int64 type escalationID = int64 type Incident struct { @@ -32,7 +31,6 @@ type Incident struct { Object *object.Object `db:"-"` EscalationState map[escalationID]*EscalationState `db:"-"` - Rules map[ruleID]struct{} `db:"-"` Recipients map[recipient.Key]*RecipientState `db:"-"` // timer calls RetriggerEscalations the next time any escalation could be reached on the incident. @@ -51,6 +49,10 @@ type Incident struct { // It is embedded to allow direct access to its members, such as logger, DB etc. notification.Notifier + // config.Evaluable encapsulates all evaluable configuration types, such as rule.Rule, rule.Entry etc. + // It is embedded to enable direct access to its members. + *config.Evaluable + sync.Mutex } @@ -60,8 +62,8 @@ func NewIncident( i := &Incident{ Object: obj, Notifier: notification.Notifier{DB: db, RuntimeConfig: runtimeConfig, Logger: logger}, + Evaluable: config.NewEvaluable(), EscalationState: map[escalationID]*EscalationState{}, - Rules: map[ruleID]struct{}{}, Recipients: map[recipient.Key]*RecipientState{}, } @@ -156,20 +158,30 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { } } - // Check if any (additional) rules match this object. Filters of rules that already have a state don't have - // to be checked again, these rules already matched and stay effective for the ongoing incident. - err = i.evaluateRules(ctx, tx, ev.ID) + // Check if any (additional) rules match this object. Incident filter rules are stateful, which means that + // once they have been matched, they remain effective for the ongoing incident and never need to be rechecked. + err := i.EvaluateRules(i.RuntimeConfig, i.Object, config.EvalOptions[*rule.Rule, any]{ + OnPreEvaluate: func(r *rule.Rule) bool { return r.Type == rule.TypeEscalation }, + OnFilterMatch: func(r *rule.Rule) error { return i.onFilterRuleMatch(ctx, r, tx, ev) }, + OnError: func(r *rule.Rule, err error) bool { + i.Logger.Warnw("Failed to evaluate object filter", zap.Object("rule", r), zap.Error(err)) + + // We don't want to stop evaluating the remaining rules just because one of them couldn't be evaluated. + return true + }, + }) if err != nil { return err } + // Reset the evaluated escalations when leaving this function while holding the incident lock, + // otherwise the pointers could be invalidated in the meantime and lead to unexpected behaviour. + defer func() { i.RuleEntries = make(map[int64]*rule.Entry) }() + // Re-evaluate escalations based on the newly evaluated rules. - escalations, err := i.evaluateEscalations(ev.Time) - if err != nil { - return err - } + i.evaluateEscalations(ev.Time) - if err := i.triggerEscalations(ctx, tx, ev, escalations); err != nil { + if err := i.triggerEscalations(ctx, tx, ev); err != nil { return err } case event.TypeAcknowledgementSet: @@ -218,31 +230,30 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) { return } - escalations, err := i.evaluateEscalations(ev.Time) - if err != nil { - i.Logger.Errorw("Reevaluating time-based escalations failed", zap.Error(err)) - return - } + // Reset the evaluated escalations when leaving this function while holding the incident lock, + // otherwise the pointers could be invalidated in the meantime and lead to unexpected behaviour. + defer func() { i.RuleEntries = make(map[int64]*rule.Entry) }() - if len(escalations) == 0 { + i.evaluateEscalations(ev.Time) + if len(i.RuleEntries) == 0 { i.Logger.Debug("Reevaluated escalations, no new escalations triggered") return } notifications := make(notification.PendingNotifications) ctx := context.Background() - err = utils.RunInTx(ctx, i.DB, func(tx *sqlx.Tx) error { + err := utils.RunInTx(ctx, i.DB, func(tx *sqlx.Tx) error { err := ev.Sync(ctx, tx, i.DB, i.Object.ID) if err != nil { return err } - if err = i.triggerEscalations(ctx, tx, ev, escalations); err != nil { + if err = i.triggerEscalations(ctx, tx, ev); err != nil { return err } channels := make(rule.ContactChannels) - for _, escalation := range escalations { + for _, escalation := range i.RuleEntries { channels.LoadFromEntryRecipients(escalation, ev.Time, i.isRecipientNotifiable) } @@ -368,54 +379,38 @@ func (i *Incident) handleMuteUnmute(ctx context.Context, tx *sqlx.Tx, ev *event. return hr.Sync(ctx, i.DB, tx) } -// evaluateRules evaluates all the configured rules for this *incident.Object and -// generates history entries for each matched rule. -// Returns error on database failure. -func (i *Incident) evaluateRules(ctx context.Context, tx *sqlx.Tx, eventID int64) error { - if i.Rules == nil { - i.Rules = make(map[int64]struct{}) - } - - for _, r := range i.RuntimeConfig.Rules { - if _, ok := i.Rules[r.ID]; !ok { - matched, err := r.Eval(i.Object) - if err != nil { - i.Logger.Warnw("Failed to evaluate object filter", zap.Object("rule", r), zap.Error(err)) - } - - if err != nil || !matched { - continue - } - - i.Rules[r.ID] = struct{}{} - i.Logger.Infow("Rule matches", zap.Object("rule", r)) +// onFilterRuleMatch records a database entry in the `incident_rule` table that refers to the specified rule.Rule. +// In addition, it generates a RuleMatched Incident History and synchronises it with the database. +// +// This function should only be used as an OnFilterMatch handler that is passed to the Evaluable#EvaluateRules +// function, which only fires when the event rule filter matches on the current Incident Object. +// +// Returns an error if it fails to persist the database entries. +func (i *Incident) onFilterRuleMatch(ctx context.Context, r *rule.Rule, tx *sqlx.Tx, ev *event.Event) error { + i.Logger.Infow("Rule matches", zap.Object("rule", r)) - err = i.AddRuleMatched(ctx, tx, r) - if err != nil { - i.Logger.Errorw("Failed to upsert incident rule", zap.Object("rule", r), zap.Error(err)) - return err - } + if err := i.AddRuleMatched(ctx, tx, r); err != nil { + i.Logger.Errorw("Failed to upsert incident rule", zap.Object("rule", r), zap.Error(err)) + return err + } - hr := &HistoryRow{ - IncidentID: i.ID, - Time: types.UnixMilli(time.Now()), - EventID: utils.ToDBInt(eventID), - RuleID: utils.ToDBInt(r.ID), - Type: RuleMatched, - } - if err := hr.Sync(ctx, i.DB, tx); err != nil { - i.Logger.Errorw("Failed to insert rule matched incident history", zap.Object("rule", r), zap.Error(err)) - return err - } - } + hr := &HistoryRow{ + IncidentID: i.ID, + Time: types.UnixMilli(time.Now()), + EventID: utils.ToDBInt(ev.ID), + RuleID: utils.ToDBInt(r.ID), + Type: RuleMatched, + } + if err := hr.Sync(ctx, i.DB, tx); err != nil { + i.Logger.Errorw("Failed to insert rule matched incident history", zap.Object("rule", r), zap.Error(err)) + return err } return nil } // evaluateEscalations evaluates this incidents rule escalations to be triggered if they aren't already. -// Returns the newly evaluated escalations to be triggered or an error on database failure. -func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Entry, error) { +func (i *Incident) evaluateEscalations(eventTime time.Time) { if i.EscalationState == nil { i.EscalationState = make(map[int64]*EscalationState) } @@ -430,61 +425,47 @@ func (i *Incident) evaluateEscalations(eventTime time.Time) ([]*rule.Entry, erro filterContext := &rule.EscalationFilter{IncidentAge: eventTime.Sub(i.StartedAt.Time()), IncidentSeverity: i.Severity} - var escalations []*rule.Entry - retryAfter := rule.RetryNever - - for rID := range i.Rules { - r := i.RuntimeConfig.Rules[rID] - if r == nil { - i.Logger.Debugw("Incident refers unknown rule, might got deleted", zap.Int64("rule_id", rID)) - continue - } + // EvaluateRuleEntries only returns an error if one of the provided callback hooks returns + // an error or the OnError handler returns false, and since none of our callbacks return an + // error nor false, we can safely discard the return value here. + _ = i.EvaluateRuleEntries(i.RuntimeConfig, filterContext, config.EvalOptions[*rule.Entry, any]{ + // Prevent reevaluation of an already triggered escalation via the pre run hook. + OnPreEvaluate: func(escalation *rule.Entry) bool { return i.EscalationState[escalation.ID] == nil }, + OnError: func(escalation *rule.Entry, err error) bool { + r := i.RuntimeConfig.Rules[escalation.RuleID] + i.Logger.Warnw("Failed to evaluate escalation condition", zap.Object("rule", r), + zap.Object("escalation", escalation), zap.Error(err)) - // Check if new escalation stages are reached - for _, escalation := range r.Entries { - if _, ok := i.EscalationState[escalation.ID]; !ok { - matched, err := escalation.Eval(filterContext) - if err != nil { - i.Logger.Warnw( - "Failed to evaluate escalation condition", zap.Object("rule", r), - zap.Object("escalation", escalation), zap.Error(err), - ) - } else if !matched { - incidentAgeFilter := filterContext.ReevaluateAfter(escalation.Condition) - retryAfter = min(retryAfter, incidentAgeFilter) - } else { - escalations = append(escalations, escalation) - } + return true + }, + OnAllConfigEvaluated: func(result any) { + retryAfter := result.(time.Duration) + if retryAfter != rule.RetryNever { + // The retryAfter duration is relative to the incident duration represented by the escalation filter, + // i.e. if an incident is 15m old and an escalation rule evaluates incident_age>=1h the retryAfter + // would contain 45m (1h - incident age (15m)). Therefore, we have to use the event time instead of + // the incident start time here. + nextEvalAt := eventTime.Add(retryAfter) + + i.Logger.Infow("Scheduling escalation reevaluation", zap.Duration("after", retryAfter), zap.Time("at", nextEvalAt)) + i.timer = time.AfterFunc(retryAfter, func() { + i.Logger.Info("Reevaluating escalations") + + i.RetriggerEscalations(&event.Event{ + Time: nextEvalAt, + Type: event.TypeIncidentAge, + Message: fmt.Sprintf("Incident reached age %v", nextEvalAt.Sub(i.StartedAt.Time())), + }) + }) } - } - } - - if retryAfter != rule.RetryNever { - // The retryAfter duration is relative to the incident duration represented by the escalation filter, - // i.e. if an incident is 15m old and an escalation rule evaluates incident_age>=1h the retryAfter would - // contain 45m (1h - incident age (15m)). Therefore, we have to use the event time instead of the incident - // start time here. - nextEvalAt := eventTime.Add(retryAfter) - - i.Logger.Infow("Scheduling escalation reevaluation", zap.Duration("after", retryAfter), zap.Time("at", nextEvalAt)) - i.timer = time.AfterFunc(retryAfter, func() { - i.Logger.Info("Reevaluating escalations") - - i.RetriggerEscalations(&event.Event{ - Time: nextEvalAt, - Type: event.TypeIncidentAge, - Message: fmt.Sprintf("Incident reached age %v", nextEvalAt.Sub(i.StartedAt.Time())), - }) - }) - } - - return escalations, nil + }, + }) } // triggerEscalations triggers the given escalations and generates incident history items for each of them. // Returns an error on database failure. -func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *event.Event, escalations []*rule.Entry) error { - for _, escalation := range escalations { +func (i *Incident) triggerEscalations(ctx context.Context, tx *sqlx.Tx, ev *event.Event) error { + for _, escalation := range i.RuleEntries { r := i.RuntimeConfig.Rules[escalation.RuleID] if r == nil { i.Logger.Debugw("Incident refers unknown rule, might got deleted", zap.Int64("rule_id", escalation.RuleID)) diff --git a/internal/incident/incidents.go b/internal/incident/incidents.go index 1f130c98..061d1cfe 100644 --- a/internal/incident/incidents.go +++ b/internal/incident/incidents.go @@ -103,7 +103,7 @@ func LoadOpenIncidents(ctx context.Context, db *database.DB, logger *logging.Log escalation := i.RuntimeConfig.GetRuleEntry(state.RuleEscalationID) if escalation != nil { - i.Rules[escalation.RuleID] = struct{}{} + i.Rules[escalation.RuleID] = true } }) if err != nil {