diff --git a/internal/event/event.go b/internal/event/event.go index 9d7053e5..b84843a7 100644 --- a/internal/event/event.go +++ b/internal/event/event.go @@ -15,6 +15,10 @@ import ( // ErrSuperfluousStateChange indicates a superfluous state change being ignored and stopping further processing. var ErrSuperfluousStateChange = errors.New("ignoring superfluous state change") +// ErrSuperfluousMuteUnmuteEvent indicates that a superfluous mute or unmute event is being ignored and is +// triggered when trying to mute/unmute an already muted/unmuted incident. +var ErrSuperfluousMuteUnmuteEvent = errors.New("ignoring superfluous (un)mute event") + // Event received of a specified Type for internal processing. // // The JSON struct tags are being used to unmarshal a JSON representation received from the listener.Listener. Some @@ -34,6 +38,9 @@ type Event struct { Username string `json:"username"` Message string `json:"message"` + Mute types.Bool `json:"mute"` + MuteReason string `json:"mute_reason"` + ID int64 `json:"-"` } @@ -49,7 +56,9 @@ const ( TypeFlappingEnd = "flapping-end" TypeFlappingStart = "flapping-start" TypeIncidentAge = "incident-age" + TypeMute = "mute" TypeState = "state" + TypeUnmute = "unmute" ) // Validate validates the current event state. @@ -66,6 +75,15 @@ func (e *Event) Validate() error { if e.Severity != SeverityNone && e.Type != TypeState { return fmt.Errorf("invalid event: if 'severity' is set, 'type' must be set to %q", TypeState) } + if e.Type == TypeMute && (!e.Mute.Valid || !e.Mute.Bool) { + return fmt.Errorf("invalid event: 'mute' must be true if 'type' is set to %q", TypeMute) + } + if e.Type == TypeUnmute && (!e.Mute.Valid || e.Mute.Bool) { + return fmt.Errorf("invalid event: 'mute' must be false if 'type' is set to %q", TypeUnmute) + } + if e.Mute.Valid && e.Mute.Bool && e.MuteReason == "" { + return fmt.Errorf("invalid event: 'mute_reason' must not be empty if 'mute' is set") + } switch e.Type { case "": @@ -80,13 +98,21 @@ func (e *Event) Validate() error { TypeFlappingEnd, TypeFlappingStart, TypeIncidentAge, - TypeState: + TypeMute, + TypeState, + TypeUnmute: return nil default: return fmt.Errorf("invalid event: unsupported event type %q", e.Type) } } +// SetMute alters the event mute and mute reason. +func (e *Event) SetMute(muted bool, reason string) { + e.Mute = types.Bool{Valid: true, Bool: muted} + e.MuteReason = reason +} + func (e *Event) String() string { return fmt.Sprintf("[time=%s type=%q severity=%s]", e.Time, e.Type, e.Severity.String()) } @@ -146,13 +172,15 @@ func (e *Event) Sync(ctx context.Context, tx *sqlx.Tx, db *database.DB, objectId // EventRow represents a single event database row and isn't an in-memory representation of an event. type EventRow struct { - ID int64 `db:"id"` - Time types.UnixMilli `db:"time"` - ObjectID types.Binary `db:"object_id"` - Type types.String `db:"type"` - Severity Severity `db:"severity"` - Username types.String `db:"username"` - Message types.String `db:"message"` + ID int64 `db:"id"` + Time types.UnixMilli `db:"time"` + ObjectID types.Binary `db:"object_id"` + Type types.String `db:"type"` + Severity Severity `db:"severity"` + Username types.String `db:"username"` + Message types.String `db:"message"` + Mute types.Bool `db:"mute"` + MuteReason types.String `db:"mute_reason"` } // TableName implements the contracts.TableNamer interface. @@ -162,11 +190,13 @@ func (er *EventRow) TableName() string { func NewEventRow(e *Event, objectId types.Binary) *EventRow { return &EventRow{ - Time: types.UnixMilli(e.Time), - ObjectID: objectId, - Type: utils.ToDBString(e.Type), - Severity: e.Severity, - Username: utils.ToDBString(e.Username), - Message: utils.ToDBString(e.Message), + Time: types.UnixMilli(e.Time), + ObjectID: objectId, + Type: utils.ToDBString(e.Type), + Severity: e.Severity, + Username: utils.ToDBString(e.Username), + Message: utils.ToDBString(e.Message), + Mute: e.Mute, + MuteReason: utils.ToDBString(e.MuteReason), } } diff --git a/internal/icinga2/api_responses.go b/internal/icinga2/api_responses.go index 27c48da4..b1d4fe40 100644 --- a/internal/icinga2/api_responses.go +++ b/internal/icinga2/api_responses.go @@ -84,13 +84,16 @@ type CheckResult struct { // // NOTE: // - An empty Service field indicates a host downtime. +// - If a downtime was added by a ScheduledDowntime object, ConfigOwner is set to the name of that object and can +// only be cancelled by its owner. Otherwise, it is empty and indicates user-created downtimes (via API or/and UI). // // https://icinga.com/docs/icinga-2/latest/doc/09-object-types/#objecttype-downtime type Downtime struct { - Host string `json:"host_name"` - Service string `json:"service_name"` - Author string `json:"author"` - Comment string `json:"comment"` + Host string `json:"host_name"` + Service string `json:"service_name"` + Author string `json:"author"` + Comment string `json:"comment"` + ConfigOwner string `json:"config_owner"` // RemoveTime is used to indicate whether a downtime was ended automatically or cancelled prematurely by a user. // It is set to zero time for the former case, otherwise to the timestamp at which time has been cancelled. @@ -132,6 +135,7 @@ type HostServiceRuntimeAttributes struct { LastStateChange UnixFloat `json:"last_state_change"` DowntimeDepth int `json:"downtime_depth"` Acknowledgement int `json:"acknowledgement"` + IsFlapping bool `json:"flapping"` AcknowledgementLastChange UnixFloat `json:"acknowledgement_last_change"` } @@ -180,15 +184,19 @@ type StateChange struct { Acknowledgement bool `json:"acknowledgement"` } -// AcknowledgementSet represents the Icinga 2 API Event Stream AcknowledgementSet response for acknowledgements set on hosts/services. +// Acknowledgement represents the Icinga 2 API Event Stream AcknowledgementSet or AcknowledgementCleared +// response for acknowledgements set/cleared on/from hosts/services. // // NOTE: // - An empty Service field indicates a host acknowledgement. // - State might be StateHost{Up,Down} for hosts or StateService{Ok,Warning,Critical,Unknown} for services. // - StateType might be StateTypeSoft or StateTypeHard. +// - EventType is either set to typeAcknowledgementSet or typeAcknowledgementCleared +// - Author and Comment fields are always empty when EventType is set to typeAcknowledgementCleared // // https://icinga.com/docs/icinga-2/latest/doc/12-icinga2-api/#event-stream-type-acknowledgementset -type AcknowledgementSet struct { +// https://icinga.com/docs/icinga-2/latest/doc/12-icinga2-api/#event-stream-type-acknowledgementcleared +type Acknowledgement struct { Timestamp UnixFloat `json:"timestamp"` Host string `json:"host"` Service string `json:"service"` @@ -196,22 +204,7 @@ type AcknowledgementSet struct { StateType int `json:"state_type"` Author string `json:"author"` Comment string `json:"comment"` -} - -// AcknowledgementCleared represents the Icinga 2 API Event Stream AcknowledgementCleared response for acknowledgements cleared on hosts/services. -// -// NOTE: -// - An empty Service field indicates a host acknowledgement. -// - State might be StateHost{Up,Down} for hosts or StateService{Ok,Warning,Critical,Unknown} for services. -// - StateType might be StateTypeSoft or StateTypeHard. -// -// https://icinga.com/docs/icinga-2/latest/doc/12-icinga2-api/#event-stream-type-acknowledgementcleared -type AcknowledgementCleared struct { - Timestamp UnixFloat `json:"timestamp"` - Host string `json:"host"` - Service string `json:"service"` - State int `json:"state"` - StateType int `json:"state_type"` + EventType string `json:"type"` } // CommentAdded represents the Icinga 2 API Event Stream CommentAdded response for added host/service comments. @@ -266,13 +259,21 @@ type DowntimeTriggered struct { // // NOTE: // - An empty Service field indicates a host being in flapping state. +// - State includes the current state of the Checkable at the point in time at which it enters or exits the flapping state. +// - CurrentFlapping indicates the current flapping value of a Checkable in percent. +// - ThresholdLow is the low/min flapping threshold value set by the user (CurrentFlapping < ThresholdLow = flapping end). +// - ThresholdHigh is the high/max flapping threshold value set by the user (CurrentFlapping > ThresholdHigh = flapping start). // // https://icinga.com/docs/icinga-2/latest/doc/12-icinga2-api/#event-stream-type-flapping type Flapping struct { - Timestamp UnixFloat `json:"timestamp"` - Host string `json:"host"` - Service string `json:"service"` - IsFlapping bool `json:"is_flapping"` + Timestamp UnixFloat `json:"timestamp"` + Host string `json:"host"` + Service string `json:"service"` + IsFlapping bool `json:"is_flapping"` + State int `json:"state"` + CurrentFlapping int `json:"current_flapping"` + ThresholdLow int `json:"threshold_low"` + ThresholdHigh int `json:"threshold_high"` } // ObjectCreatedDeleted represents the Icinga 2 API stream object created/deleted response. @@ -319,10 +320,8 @@ func UnmarshalEventStreamResponse(bytes []byte) (any, error) { switch responseType { case typeStateChange: resp = new(StateChange) - case typeAcknowledgementSet: - resp = new(AcknowledgementSet) - case typeAcknowledgementCleared: - resp = new(AcknowledgementCleared) + case typeAcknowledgementSet, typeAcknowledgementCleared: + resp = new(Acknowledgement) case typeCommentAdded: resp = new(CommentAdded) case typeCommentRemoved: diff --git a/internal/icinga2/api_responses_test.go b/internal/icinga2/api_responses_test.go index 13d99718..80cab35c 100644 --- a/internal/icinga2/api_responses_test.go +++ b/internal/icinga2/api_responses_test.go @@ -180,12 +180,13 @@ func TestObjectQueriesResult_UnmarshalJSON(t *testing.T) { Name: "docker-master!load!c27b27c2-e0ab-45ff-8b9b-e95f29851eb0", Type: "Downtime", Attrs: Downtime{ - Host: "docker-master", - Service: "load", - Author: "icingaadmin", - Comment: "Scheduled downtime for backup", - RemoveTime: UnixFloat(time.UnixMilli(0)), - IsFixed: true, + Host: "docker-master", + Service: "load", + Author: "icingaadmin", + Comment: "Scheduled downtime for backup", + ConfigOwner: "docker-master!load!backup-downtime", + RemoveTime: UnixFloat(time.UnixMilli(0)), + IsFixed: true, }, }, }, @@ -373,19 +374,20 @@ func TestApiResponseUnmarshal(t *testing.T) { { name: "acknowledgementset-host", jsonData: `{"acknowledgement_type":1,"author":"icingaadmin","comment":"working on it","expiry":0,"host":"dummy-805","notify":true,"persistent":false,"state":1,"state_type":1,"timestamp":1697201074.579106,"type":"AcknowledgementSet"}`, - expected: &AcknowledgementSet{ + expected: &Acknowledgement{ Timestamp: UnixFloat(time.UnixMicro(1697201074579106)), Host: "dummy-805", State: StateHostDown, StateType: StateTypeHard, Author: "icingaadmin", Comment: "working on it", + EventType: typeAcknowledgementSet, }, }, { name: "acknowledgementset-service", jsonData: `{"acknowledgement_type":1,"author":"icingaadmin","comment":"will be fixed soon","expiry":0,"host":"docker-master","notify":true,"persistent":false,"service":"ssh","state":2,"state_type":1,"timestamp":1697201107.64792,"type":"AcknowledgementSet"}`, - expected: &AcknowledgementSet{ + expected: &Acknowledgement{ Timestamp: UnixFloat(time.UnixMicro(1697201107647920)), Host: "docker-master", Service: "ssh", @@ -393,27 +395,30 @@ func TestApiResponseUnmarshal(t *testing.T) { StateType: StateTypeHard, Author: "icingaadmin", Comment: "will be fixed soon", + EventType: typeAcknowledgementSet, }, }, { name: "acknowledgementcleared-host", jsonData: `{"acknowledgement_type":0,"host":"dummy-805","state":1,"state_type":1,"timestamp":1697201082.440148,"type":"AcknowledgementCleared"}`, - expected: &AcknowledgementCleared{ + expected: &Acknowledgement{ Timestamp: UnixFloat(time.UnixMicro(1697201082440148)), Host: "dummy-805", State: StateHostDown, StateType: StateTypeHard, + EventType: typeAcknowledgementCleared, }, }, { name: "acknowledgementcleared-service", jsonData: `{"acknowledgement_type":0,"host":"docker-master","service":"ssh","state":2,"state_type":1,"timestamp":1697201110.220349,"type":"AcknowledgementCleared"}`, - expected: &AcknowledgementCleared{ + expected: &Acknowledgement{ Timestamp: UnixFloat(time.UnixMicro(1697201110220349)), Host: "docker-master", Service: "ssh", State: StateServiceCritical, StateType: StateTypeHard, + EventType: typeAcknowledgementCleared, }, }, { diff --git a/internal/icinga2/client.go b/internal/icinga2/client.go index 4ed665d9..387635e0 100644 --- a/internal/icinga2/client.go +++ b/internal/icinga2/client.go @@ -161,20 +161,20 @@ func (client *Client) fetchExtraTagsFor(ctx context.Context, host, service strin } extraTags := make(map[string]string) - hostGroups, err := client.fetchHostServiceGroups(ctx, host, "") + queryResult, err := client.fetchCheckable(ctx, host, "") if err != nil { return nil, err } - for _, hostGroup := range hostGroups { + for _, hostGroup := range queryResult.Attrs.Groups { extraTags["hostgroup/"+hostGroup] = "" } if service != "" { - serviceGroups, err := client.fetchHostServiceGroups(ctx, host, service) + queryResult, err := client.fetchCheckable(ctx, host, service) if err != nil { return nil, err } - for _, serviceGroup := range serviceGroups { + for _, serviceGroup := range queryResult.Attrs.Groups { extraTags["servicegroup/"+serviceGroup] = "" } } @@ -237,23 +237,31 @@ func (client *Client) buildHostServiceEvent(ctx context.Context, result CheckRes } // buildAcknowledgementEvent from the given fields. -func (client *Client) buildAcknowledgementEvent( - ctx context.Context, host, service, author, comment string, clearEvent bool, -) (*event.Event, error) { - ev, err := client.buildCommonEvent(ctx, host, service) +func (client *Client) buildAcknowledgementEvent(ctx context.Context, ack *Acknowledgement) (*event.Event, error) { + ev, err := client.buildCommonEvent(ctx, ack.Host, ack.Service) if err != nil { return nil, err } - if clearEvent { + ev.Username = ack.Author + if ack.EventType == typeAcknowledgementCleared { ev.Type = event.TypeAcknowledgementCleared + ev.Message = "Acknowledgement cleared" + + queryResult, err := client.fetchCheckable(ctx, ack.Host, ack.Service) + if err != nil { + return nil, err + } + if !isMuted(queryResult) { + ev.Message = queryResult.Attrs.LastCheckResult.Output + ev.SetMute(false, "Acknowledgement cleared") + } } else { ev.Type = event.TypeAcknowledgementSet + ev.Message = ack.Comment + ev.SetMute(true, fmt.Sprintf("Checkable acknowledged by %q: %s", ack.Author, ack.Comment)) } - ev.Username = author - ev.Message = comment - return ev, nil } @@ -264,30 +272,72 @@ func (client *Client) buildDowntimeEvent(ctx context.Context, d Downtime, startE return nil, err } + var reason string if startEvent { ev.Type = event.TypeDowntimeStart + ev.SetMute(true, "Checkable is in downtime") + ev.Message = d.Comment } else if !d.WasCancelled() { ev.Type = event.TypeDowntimeEnd + reason = "Downtime expired" } else { ev.Type = event.TypeDowntimeRemoved + if d.ConfigOwner != "" { + reason = fmt.Sprintf("Downtime was cancelled by config owner (%s)", d.ConfigOwner) + } else { + reason = "Downtime was cancelled by user" + } } ev.Username = d.Author - ev.Message = d.Comment + if ev.Type != event.TypeDowntimeStart { + ev.Message = reason + + queryResult, err := client.fetchCheckable(ctx, d.Host, d.Service) + if err != nil { + return nil, err + } + if !isMuted(queryResult) { + // When a downtime is cancelled/expired and there's no other active downtime/ack, we're going to send some + // notifications if there's still an active incident. Therefore, we need the most recent CheckResult of + // that Checkable to use it for the notifications. + ev.Message = queryResult.Attrs.LastCheckResult.Output + ev.SetMute(false, reason) + } + } return ev, nil } // buildFlappingEvent from the given fields. -func (client *Client) buildFlappingEvent(ctx context.Context, host, service string, isFlapping bool) (*event.Event, error) { - ev, err := client.buildCommonEvent(ctx, host, service) +func (client *Client) buildFlappingEvent(ctx context.Context, flapping *Flapping) (*event.Event, error) { + ev, err := client.buildCommonEvent(ctx, flapping.Host, flapping.Service) if err != nil { return nil, err } - ev.Type = event.TypeFlappingStart - if !isFlapping { + if flapping.IsFlapping { + ev.Type = event.TypeFlappingStart + ev.SetMute(true, fmt.Sprintf( + "Checkable started flapping (Current flapping value %d%% > high threshold %d%%)", + flapping.CurrentFlapping, flapping.ThresholdHigh, + )) + } else { + reason := fmt.Sprintf( + "Checkable stopped flapping (Current flapping value %d%% < low threshold %d%%)", + flapping.CurrentFlapping, flapping.ThresholdLow, + ) ev.Type = event.TypeFlappingEnd + ev.Message = reason + + queryResult, err := client.fetchCheckable(ctx, flapping.Host, flapping.Service) + if err != nil { + return nil, err + } + if !isMuted(queryResult) { + ev.Message = queryResult.Attrs.LastCheckResult.Output + ev.SetMute(false, reason) + } } return ev, nil diff --git a/internal/icinga2/client_api.go b/internal/icinga2/client_api.go index 577d867a..2e3c3fad 100644 --- a/internal/icinga2/client_api.go +++ b/internal/icinga2/client_api.go @@ -149,8 +149,8 @@ func (client *Client) queryObjectsApiQuery(ctx context.Context, objType string, }) } -// fetchHostServiceGroups fetches all Host or, if service is not empty, Service groups. -func (client *Client) fetchHostServiceGroups(ctx context.Context, host, service string) ([]string, error) { +// fetchCheckable fetches the Checkable config state of the given Host/Service name from the Icinga 2 API. +func (client *Client) fetchCheckable(ctx context.Context, host, service string) (*ObjectQueriesResult[HostServiceRuntimeAttributes], error) { objType, objName := "host", host if service != "" { objType = "service" @@ -171,7 +171,7 @@ func (client *Client) fetchHostServiceGroups(ctx context.Context, host, service objName, objType, len(objQueriesResults)) } - return objQueriesResults[0].Attrs.Groups, nil + return &objQueriesResults[0], nil } // fetchAcknowledgementComment fetches an Acknowledgement Comment for a Host (empty service) or for a Service at a Host. @@ -228,23 +228,26 @@ func (client *Client) checkMissedChanges(ctx context.Context, objType string, ca return err } - var stateChangeEvents, acknowledgementEvents int + var stateChangeEvents, muteEvents, unmuteEvents int defer func() { client.Logger.Debugw("Querying API emitted events", zap.String("object type", objType), zap.Int("state changes", stateChangeEvents), - zap.Int("acknowledgements", acknowledgementEvents)) + zap.Int("mute_events", muteEvents), + zap.Int("unmute_events", unmuteEvents)) }() for _, objQueriesResult := range objQueriesResults { - var hostName, serviceName string + var hostName, serviceName, objectName string switch objQueriesResult.Type { case "Host": hostName = objQueriesResult.Attrs.Name + objectName = hostName case "Service": hostName = objQueriesResult.Attrs.Host serviceName = objQueriesResult.Attrs.Name + objectName = hostName + "!" + serviceName default: return fmt.Errorf("querying API delivered a wrong object type %q", objQueriesResult.Type) @@ -256,46 +259,77 @@ func (client *Client) checkMissedChanges(ctx context.Context, objType string, ca continue } - // First: State change event - ev, err := client.buildHostServiceEvent( - ctx, - objQueriesResult.Attrs.LastCheckResult, objQueriesResult.Attrs.State, - hostName, serviceName) - if err != nil { - return fmt.Errorf("failed to construct Event from Host/Service response, %w", err) + attrs := objQueriesResult.Attrs + var fakeEv *event.Event + if attrs.Acknowledgement != AcknowledgementNone { + ackComment, err := client.fetchAcknowledgementComment(ctx, hostName, serviceName, attrs.AcknowledgementLastChange.Time()) + if err != nil { + return fmt.Errorf("fetching acknowledgement comment for %q failed, %w", objectName, err) + } + + ack := &Acknowledgement{Host: hostName, Service: serviceName, Author: ackComment.Author, Comment: ackComment.Text} + // We do not need to fake ACK set events as they are handled correctly by an incident and any + // redundant/successive ACK set events are discarded accordingly. + ack.EventType = typeAcknowledgementSet + fakeEv, err = client.buildAcknowledgementEvent(ctx, ack) + if err != nil { + return fmt.Errorf("failed to construct Event from Acknowledgement response, %w", err) + } + } else if isMuted(&objQueriesResult) { + fakeEv, err = client.buildCommonEvent(ctx, hostName, serviceName) + if err != nil { + return fmt.Errorf("failed to construct checkable fake mute event: %w", err) + } + + fakeEv.Type = event.TypeMute + if attrs.IsFlapping { + fakeEv.SetMute(true, "Checkable is flapping, but we missed the Icinga 2 FlappingStart event") + } else { + fakeEv.SetMute(true, "Checkable is in downtime, but we missed the Icinga 2 DowntimeStart event") + } + } else { + // This could potentially produce numerous superfluous database (event table) entries if we generate such + // dummy events after each Icinga 2 / Notifications reload, thus they are being identified as such in + // incident#ProcessEvent() and Client.CallbackFn and suppressed accordingly. + fakeEv, err = client.buildCommonEvent(ctx, hostName, serviceName) + if err != nil { + return fmt.Errorf("failed to construct checkable fake unmute event: %w", err) + } + + fakeEv.Type = event.TypeUnmute + fakeEv.SetMute(false, "All mute reasons of the checkable are cleared, but we missed the appropriate unmute event") } + + fakeEv.Message = attrs.LastCheckResult.Output select { + case catchupEventCh <- &catchupEventMsg{eventMsg: &eventMsg{fakeEv, attrs.LastStateChange.Time()}}: + if fakeEv.Type == event.TypeUnmute { + unmuteEvents++ + } else { + muteEvents++ + } case <-ctx.Done(): return ctx.Err() - case catchupEventCh <- &catchupEventMsg{eventMsg: &eventMsg{ev, objQueriesResult.Attrs.LastStateChange.Time()}}: - stateChangeEvents++ - } - - // Second: Optional acknowledgement event - if objQueriesResult.Attrs.Acknowledgement == 0 { - continue - } - - ackComment, err := client.fetchAcknowledgementComment( - ctx, - hostName, serviceName, - objQueriesResult.Attrs.AcknowledgementLastChange.Time()) - if err != nil { - return fmt.Errorf("fetching acknowledgement comment for %v failed, %w", ev, err) } - ev, err = client.buildAcknowledgementEvent( - ctx, - hostName, serviceName, - ackComment.Author, ackComment.Text, false) + ev, err := client.buildHostServiceEvent(ctx, attrs.LastCheckResult, attrs.State, hostName, serviceName) if err != nil { - return fmt.Errorf("failed to construct Event from Acknowledgement response, %w", err) + return fmt.Errorf("failed to construct Event from Host/Service response, %w", err) } select { case <-ctx.Done(): return ctx.Err() - case catchupEventCh <- &catchupEventMsg{eventMsg: &eventMsg{ev, objQueriesResult.Attrs.LastStateChange.Time()}}: - acknowledgementEvents++ + case catchupEventCh <- &catchupEventMsg{eventMsg: &eventMsg{ev, attrs.LastStateChange.Time()}}: + stateChangeEvents++ + if fakeEv.Type == event.TypeAcknowledgementSet { + select { + // Retry the AckSet event so that the author of the ack is set as the incident + // manager if there was no existing incident before the above state change event. + case catchupEventCh <- &catchupEventMsg{eventMsg: &eventMsg{fakeEv, attrs.LastStateChange.Time()}}: + case <-ctx.Done(): + return ctx.Err() + } + } } } return nil @@ -473,11 +507,8 @@ func (client *Client) listenEventStream() error { ev, err = client.buildHostServiceEvent(client.Ctx, respT.CheckResult, respT.State, respT.Host, respT.Service) evTime = respT.Timestamp.Time() - case *AcknowledgementSet: - ev, err = client.buildAcknowledgementEvent(client.Ctx, respT.Host, respT.Service, respT.Author, respT.Comment, false) - evTime = respT.Timestamp.Time() - case *AcknowledgementCleared: - ev, err = client.buildAcknowledgementEvent(client.Ctx, respT.Host, respT.Service, "", "", true) + case *Acknowledgement: + ev, err = client.buildAcknowledgementEvent(client.Ctx, respT) evTime = respT.Timestamp.Time() // case *CommentAdded: // case *CommentRemoved: @@ -506,7 +537,7 @@ func (client *Client) listenEventStream() error { ev, err = client.buildDowntimeEvent(client.Ctx, respT.Downtime, true) evTime = respT.Timestamp.Time() case *Flapping: - ev, err = client.buildFlappingEvent(client.Ctx, respT.Host, respT.Service, respT.IsFlapping) + ev, err = client.buildFlappingEvent(client.Ctx, respT) evTime = respT.Timestamp.Time() case *ObjectCreatedDeleted: if err = client.deleteExtraTagsCacheFor(respT); err == nil { diff --git a/internal/icinga2/launcher.go b/internal/icinga2/launcher.go index d4ba17bb..58d8491d 100644 --- a/internal/icinga2/launcher.go +++ b/internal/icinga2/launcher.go @@ -133,6 +133,8 @@ func (launcher *Launcher) launch(src *config.Source) { switch { case errors.Is(err, event.ErrSuperfluousStateChange): l.Debugw("Stopped processing event with superfluous state change", zap.Error(err)) + case errors.Is(err, event.ErrSuperfluousMuteUnmuteEvent): + l.Debugw("Stopped processing event with superfluous (un)mute object", zap.Error(err)) case err != nil: l.Errorw("Cannot process event", zap.Error(err)) default: diff --git a/internal/icinga2/util.go b/internal/icinga2/util.go index 3cd772ab..0af260b6 100644 --- a/internal/icinga2/util.go +++ b/internal/icinga2/util.go @@ -16,3 +16,8 @@ import ( func rawurlencode(s string) string { return strings.ReplaceAll(url.QueryEscape(s), "+", "%20") } + +// isMuted returns true if the given checkable is either in Downtime, Flapping or acknowledged, otherwise false. +func isMuted(checkable *ObjectQueriesResult[HostServiceRuntimeAttributes]) bool { + return checkable.Attrs.IsFlapping || checkable.Attrs.Acknowledgement != AcknowledgementNone || checkable.Attrs.DowntimeDepth != 0 +} diff --git a/internal/incident/history_event_type.go b/internal/incident/history_event_type.go index e40a750a..2c5aa305 100644 --- a/internal/incident/history_event_type.go +++ b/internal/incident/history_event_type.go @@ -11,6 +11,8 @@ const ( HistoryEventTypeNull HistoryEventType = iota Opened + Muted + Unmuted IncidentSeverityChanged RuleMatched EscalationTriggered @@ -21,6 +23,8 @@ const ( var historyTypeByName = map[string]HistoryEventType{ "opened": Opened, + "muted": Muted, + "unmuted": Unmuted, "incident_severity_changed": IncidentSeverityChanged, "rule_matched": RuleMatched, "escalation_triggered": EscalationTriggered, diff --git a/internal/incident/incident.go b/internal/incident/incident.go index f3d82ccc..ddad5995 100644 --- a/internal/incident/incident.go +++ b/internal/incident/incident.go @@ -44,6 +44,10 @@ type Incident struct { // be reached solely based on the incident aging, so no more timer is necessary and timer stores nil. timer *time.Timer + // isMuted indicates whether the current Object was already muted before the ongoing event.Event being processed. + // This prevents us from generating multiple muted histories when receiving several events that mute our Object. + isMuted bool + db *database.DB logger *zap.SugaredLogger runtimeConfig *config.RuntimeConfig @@ -117,6 +121,17 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { i.runtimeConfig.RLock() defer i.runtimeConfig.RUnlock() + // These event types are not like the others used to mute an object/incident, such as DowntimeStart, which + // uniquely identify themselves why an incident is being muted, but are rather super generic types, and as + // such, we are ignoring superfluous ones that don't have any effect on that incident. + if i.isMuted && ev.Type == event.TypeMute { + i.logger.Debugw("Ignoring superfluous mute event", zap.String("event", ev.String())) + return event.ErrSuperfluousMuteUnmuteEvent + } else if !i.isMuted && ev.Type == event.TypeUnmute { + i.logger.Debugw("Ignoring superfluous unmute event", zap.String("event", ev.String())) + return event.ErrSuperfluousMuteUnmuteEvent + } + tx, err := i.db.BeginTxx(ctx, nil) if err != nil { i.logger.Errorw("Can't start a db transaction", zap.Error(err)) @@ -147,6 +162,11 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { return errors.New("can't insert incident event to the database") } + if err := i.handleMuteUnmute(ctx, tx, ev); err != nil { + i.logger.Errorw("Cannot insert incident muted history", zap.String("event", ev.String()), zap.Error(err)) + return errors.New("cannot insert incident muted history") + } + switch ev.Type { case event.TypeState: if !isNew { @@ -173,11 +193,18 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { } case event.TypeAcknowledgementSet: if err := i.processAcknowledgementEvent(ctx, tx, ev); err != nil { + if errors.Is(err, errSuperfluousAckEvent) { + // That ack error type indicates that the acknowledgement author was already a manager, thus + // we can safely ignore that event and return without even committing the DB transaction. + return nil + } + return err } } - notifications, err := i.addPendingNotifications(ctx, tx, ev, i.getRecipientsChannel(ev.Time)) + var notifications []*NotificationEntry + notifications, err = i.generateNotifications(ctx, tx, ev, i.getRecipientsChannel(ev.Time)) if err != nil { return err } @@ -188,6 +215,9 @@ func (i *Incident) ProcessEvent(ctx context.Context, ev *event.Event) error { return errors.New("can't commit db transaction") } + // We've just committed the DB transaction and can safely update the incident muted flag. + i.isMuted = i.Object.IsMuted() + return i.notifyContacts(ctx, ev, notifications) } @@ -241,7 +271,7 @@ func (i *Incident) RetriggerEscalations(ev *event.Event) { channels.LoadFromEscalationRecipients(escalation, ev.Time, i.isRecipientNotifiable) } - notifications, err = i.addPendingNotifications(ctx, tx, ev, channels) + notifications, err = i.generateNotifications(ctx, tx, ev, channels) return err }) @@ -346,6 +376,31 @@ func (i *Incident) processIncidentOpenedEvent(ctx context.Context, tx *sqlx.Tx, return nil } +// handleMuteUnmute generates an incident Muted or Unmuted history based on the Object state. +// Returns an error if fails to insert the generated history to the database. +func (i *Incident) handleMuteUnmute(ctx context.Context, tx *sqlx.Tx, ev *event.Event) error { + if i.isMuted == i.Object.IsMuted() { + return nil + } + + hr := &HistoryRow{IncidentID: i.Id, EventID: utils.ToDBInt(ev.ID), Time: types.UnixMilli(time.Now())} + logger := i.logger.With(zap.String("event", ev.String())) + if i.Object.IsMuted() { + hr.Type = Muted + // Since the object may have already been muted with previous events before this incident even + // existed, we have to use the mute reason from this object and not from the ongoing event. + hr.Message = i.Object.MuteReason + logger.Infow("Muting incident", zap.String("reason", i.Object.MuteReason.String)) + } else { + hr.Type = Unmuted + // On the other hand, if an object is unmuted, its mute reason is already reset, and we can't access it anymore. + hr.Message = utils.ToDBString(ev.MuteReason) + logger.Infow("Unmuting incident", zap.String("reason", ev.MuteReason)) + } + + return hr.Sync(ctx, i.db, tx) +} + // evaluateRules evaluates all the configured rules for this *incident.Object and // generates history entries for each matched rule. // Returns error on database failure. @@ -563,6 +618,10 @@ func (i *Incident) notifyContact(contact *recipient.Contact, ev *event.Event, ch return nil } +// errSuperfluousAckEvent is returned when the same ack author submits two successive ack set events on an incident. +// This is error is going to be used only within this incident package. +var errSuperfluousAckEvent = errors.New("superfluous acknowledgement set event, author is already a manager") + // processAcknowledgementEvent processes the given ack event. // Promotes the ack author to incident.RoleManager if it's not already the case and generates a history entry. // Returns error on database failure. @@ -583,7 +642,8 @@ func (i *Incident) processAcknowledgementEvent(ctx context.Context, tx *sqlx.Tx, if oldRole == RoleManager { // The user is already a manager - return nil + i.logger.Debugw("Ignoring acknowledgement-set event, author is already a manager", zap.String("author", ev.Username)) + return errSuperfluousAckEvent } } else { i.Recipients[recipientKey] = &RecipientState{Role: newRole} diff --git a/internal/incident/incidents.go b/internal/incident/incidents.go index d6386d30..e35e4f34 100644 --- a/internal/incident/incidents.go +++ b/internal/incident/incidents.go @@ -11,6 +11,7 @@ import ( "github.com/icinga/icinga-notifications/internal/event" "github.com/icinga/icinga-notifications/internal/object" "github.com/icinga/icinga-notifications/internal/utils" + "github.com/jmoiron/sqlx" "github.com/pkg/errors" "go.uber.org/zap" "golang.org/x/sync/errgroup" @@ -119,6 +120,7 @@ func LoadOpenIncidents(ctx context.Context, db *database.DB, logger *logging.Log for _, i := range incidentsById { i.Object = object.GetFromCache(i.ObjectID) + i.isMuted = i.Object.IsMuted() i.logger = logger.With(zap.String("object", i.Object.DisplayName()), zap.String("incident", i.String())) @@ -208,6 +210,11 @@ func ProcessEvent( runtimeConfig *config.RuntimeConfig, ev *event.Event, ) error { + var wasObjectMuted bool + if obj := object.GetFromCache(object.ID(ev.SourceId, ev.Tags)); obj != nil { + wasObjectMuted = obj.IsMuted() + } + obj, err := object.FromEvent(ctx, db, ev) if err != nil { return fmt.Errorf("cannot sync event object: %w", err) @@ -227,10 +234,23 @@ func ProcessEvent( if currentIncident == nil { switch { - // ignore non-state event without incident case ev.Severity == event.SeverityNone: - return fmt.Errorf("%q does not have an active incident, ignoring %q event from source %d", - obj.DisplayName(), ev.Type, ev.SourceId) + // We need to ignore superfluous mute and unmute events here, as would be the case with an existing + // incident, otherwise the event stream catch-up phase will generate useless events after each + // Icinga 2 reload and overwhelm the database with the very same mute/unmute events. + if wasObjectMuted && ev.Type == event.TypeMute { + return event.ErrSuperfluousMuteUnmuteEvent + } else if !wasObjectMuted && ev.Type == event.TypeUnmute { + return event.ErrSuperfluousMuteUnmuteEvent + } + + // There is no active incident, but the event appears to be relevant, so try to persist it in the DB. + err = utils.RunInTx(ctx, db, func(tx *sqlx.Tx) error { return ev.Sync(ctx, tx, db, obj.ID) }) + if err != nil { + return errors.New("cannot sync non-state event to the database") + } + + return nil case ev.Severity != event.SeverityOK: panic(fmt.Sprintf("cannot process event %v with a non-OK state %v without a known incident", ev, ev.Severity)) default: diff --git a/internal/incident/notification_state.go b/internal/incident/notification_state.go index b28eae60..36196839 100644 --- a/internal/incident/notification_state.go +++ b/internal/incident/notification_state.go @@ -9,15 +9,17 @@ type NotificationState int const ( NotificationStateNull NotificationState = iota + NotificationStateSuppressed NotificationStatePending NotificationStateSent NotificationStateFailed ) var notificationStatTypeByName = map[string]NotificationState{ - "pending": NotificationStatePending, - "sent": NotificationStateSent, - "failed": NotificationStateFailed, + "suppressed": NotificationStateSuppressed, + "pending": NotificationStatePending, + "sent": NotificationStateSent, + "failed": NotificationStateFailed, } var notificationStateTypeToName = func() map[NotificationState]string { diff --git a/internal/incident/sync.go b/internal/incident/sync.go index f6a64d43..2645a7a7 100644 --- a/internal/incident/sync.go +++ b/internal/incident/sync.go @@ -135,11 +135,16 @@ func (i *Incident) AddRuleMatched(ctx context.Context, tx *sqlx.Tx, r *rule.Rule return err } -// addPendingNotifications inserts pending notification incident history of the given recipients. -func (i *Incident) addPendingNotifications( +// generateNotifications generates incident notification histories of the given recipients. +// +// This function will just insert NotificationStateSuppressed incident histories and return an empty slice if +// the current Object is muted, otherwise a slice of pending *NotificationEntry(ies) that can be used to update +// the corresponding histories after the actual notifications have been sent out. +func (i *Incident) generateNotifications( ctx context.Context, tx *sqlx.Tx, ev *event.Event, contactChannels rule.ContactChannels, ) ([]*NotificationEntry, error) { var notifications []*NotificationEntry + suppress := i.isMuted && i.Object.IsMuted() for contact, channels := range contactChannels { for chID := range channels { hr := &HistoryRow{ @@ -150,23 +155,28 @@ func (i *Incident) addPendingNotifications( Type: Notified, ChannelID: utils.ToDBInt(chID), NotificationState: NotificationStatePending, + Message: utils.ToDBString(ev.Message), + } + if suppress { + hr.NotificationState = NotificationStateSuppressed } if err := hr.Sync(ctx, i.db, tx); err != nil { - i.logger.Errorw( - "Failed to insert contact pending notification incident history", - zap.String("contact", contact.String()), zap.Error(err), - ) + i.logger.Errorw("Failed to insert incident notification history", + zap.String("contact", contact.FullName), zap.Bool("incident_muted", i.Object.IsMuted()), + zap.Error(err)) - return nil, errors.New("can't insert contact pending notification incident history") + return nil, errors.New("cannot insert incident notification history") } - notifications = append(notifications, &NotificationEntry{ - HistoryRowID: hr.ID, - ContactID: contact.ID, - State: NotificationStatePending, - ChannelID: chID, - }) + if !suppress { + notifications = append(notifications, &NotificationEntry{ + HistoryRowID: hr.ID, + ContactID: contact.ID, + State: NotificationStatePending, + ChannelID: chID, + }) + } } } diff --git a/internal/listener/listener.go b/internal/listener/listener.go index 566d9049..c7d26ae6 100644 --- a/internal/listener/listener.go +++ b/internal/listener/listener.go @@ -121,6 +121,10 @@ func (l *Listener) ProcessEvent(w http.ResponseWriter, req *http.Request) { ev.SourceId = source.ID if ev.Type == "" { ev.Type = event.TypeState + } else if !ev.Mute.Valid && ev.Type == event.TypeMute { + ev.SetMute(true, ev.MuteReason) + } else if !ev.Mute.Valid && ev.Type == event.TypeUnmute { + ev.SetMute(false, ev.MuteReason) } if err := ev.Validate(); err != nil { diff --git a/internal/object/db_types.go b/internal/object/db_types.go index 324f8af5..70ad1ffb 100644 --- a/internal/object/db_types.go +++ b/internal/object/db_types.go @@ -28,7 +28,8 @@ func (e *IdTagRow) TableName() string { // Upsert implements the contracts.Upserter interface. func (o *Object) Upsert() interface{} { return struct { - Name string `db:"name"` - URL types.String `db:"url"` + Name string `db:"name"` + URL types.String `db:"url"` + MuteReason types.String `db:"mute_reason"` }{} } diff --git a/internal/object/object.go b/internal/object/object.go index f58fefd6..5f4daa05 100644 --- a/internal/object/object.go +++ b/internal/object/object.go @@ -4,6 +4,7 @@ import ( "bytes" "context" "crypto/sha256" + "database/sql" "encoding/binary" "encoding/hex" "encoding/json" @@ -25,10 +26,11 @@ var ( ) type Object struct { - ID types.Binary `db:"id"` - SourceID int64 `db:"source_id"` - Name string `db:"name"` - URL types.String `db:"url"` + ID types.Binary `db:"id"` + SourceID int64 `db:"source_id"` + Name string `db:"name"` + URL types.String `db:"url"` + MuteReason types.String `db:"mute_reason"` Tags map[string]string `db:"-"` ExtraTags map[string]string `db:"-"` @@ -38,7 +40,7 @@ type Object struct { // New creates a new object from the given event. func New(db *database.DB, ev *event.Event) *Object { - return &Object{ + obj := &Object{ SourceID: ev.SourceId, Name: ev.Name, db: db, @@ -46,6 +48,11 @@ func New(db *database.DB, ev *event.Event) *Object { Tags: ev.Tags, ExtraTags: ev.ExtraTags, } + if ev.Mute.Valid && ev.Mute.Bool { + obj.MuteReason = types.String{NullString: sql.NullString{String: ev.MuteReason, Valid: true}} + } + + return obj } // GetFromCache fetches an object from the global object cache store matching the given ID. @@ -131,6 +138,14 @@ func FromEvent(ctx context.Context, db *database.DB, ev *event.Event) (*Object, newObject.ExtraTags = ev.ExtraTags newObject.Name = ev.Name newObject.URL = utils.ToDBString(ev.URL) + if ev.Mute.Valid { + if ev.Mute.Bool { + newObject.MuteReason = utils.ToDBString(ev.MuteReason) + } else { + // The ongoing event unmutes the object, so reset the mute reason to null. + newObject.MuteReason = types.String{} + } + } } tx, err := db.BeginTxx(ctx, nil) @@ -179,6 +194,11 @@ func FromEvent(ctx context.Context, db *database.DB, ev *event.Event) (*Object, return object, nil } +// IsMuted returns whether the current object is muted by its source. +func (o *Object) IsMuted() bool { + return o.MuteReason.Valid +} + func (o *Object) DisplayName() string { if o.Name != "" { return o.Name diff --git a/schema/pgsql/schema.sql b/schema/pgsql/schema.sql index b5df6a4d..4002b281 100644 --- a/schema/pgsql/schema.sql +++ b/schema/pgsql/schema.sql @@ -4,6 +4,8 @@ CREATE TYPE boolenum AS ENUM ( 'n', 'y' ); CREATE TYPE incident_history_event_type AS ENUM ( -- Order to be honored for events with identical millisecond timestamps. 'opened', + 'muted', + 'unmuted', 'incident_severity_changed', 'rule_matched', 'escalation_triggered', @@ -12,7 +14,7 @@ CREATE TYPE incident_history_event_type AS ENUM ( 'notified' ); CREATE TYPE rotation_type AS ENUM ( '24-7', 'partial', 'multi' ); -CREATE TYPE notification_state_type AS ENUM ( 'pending', 'sent', 'failed' ); +CREATE TYPE notification_state_type AS ENUM ( 'suppressed', 'pending', 'sent', 'failed' ); -- IPL ORM renders SQL queries with LIKE operators for all suggestions in the search bar, -- which fails for numeric and enum types on PostgreSQL. Just like in Icinga DB Web. @@ -197,6 +199,8 @@ CREATE TABLE object ( name text NOT NULL, url text, + -- mute_reason indicates whether an object is currently muted by its source, and its non-zero value is mapped to true. + mute_reason text, CHECK (length(id) = 256/8), @@ -229,7 +233,9 @@ CREATE TYPE event_type AS ENUM ( 'flapping-end', 'flapping-start', 'incident-age', - 'state' + 'mute', + 'state', + 'unmute' ); CREATE TYPE severity AS ENUM ('ok', 'debug', 'info', 'notice', 'warning', 'err', 'crit', 'alert', 'emerg'); @@ -241,6 +247,8 @@ CREATE TABLE event ( severity severity, message text, username citext, + mute boolenum, + mute_reason text, CONSTRAINT pk_event PRIMARY KEY (id) ); diff --git a/schema/pgsql/upgrades/031.sql b/schema/pgsql/upgrades/031.sql new file mode 100644 index 00000000..3a818dd4 --- /dev/null +++ b/schema/pgsql/upgrades/031.sql @@ -0,0 +1,12 @@ +ALTER TYPE event_type ADD VALUE 'mute' BEFORE 'state'; +ALTER TYPE event_type ADD VALUE 'unmute'; + +ALTER TYPE incident_history_event_type ADD VALUE 'muted' AFTER 'opened'; +ALTER TYPE incident_history_event_type ADD VALUE 'unmuted' AFTER 'muted'; +ALTER TYPE notification_state_type ADD VALUE 'suppressed' BEFORE 'pending'; + +ALTER TABLE object ADD COLUMN mute_reason text; + +ALTER TABLE event + ADD COLUMN mute boolenum, + ADD COLUMN mute_reason text;