Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Monitor and alert on errors in cron jobs #24347

Merged
merged 33 commits into from
Dec 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
33 commits
Select commit Hold shift + click to select a range
6a49421
add stack trace to cron job panic recovery
sgress454 Nov 26, 2024
85cdaee
update tests
sgress454 Nov 26, 2024
d18791f
schedule updates
sgress454 Nov 26, 2024
d67a557
add migration
sgress454 Nov 26, 2024
3cbe83f
update cron schedule errors, tests
sgress454 Nov 26, 2024
aa48034
make new topics for p1 and p2 alerts
sgress454 Nov 26, 2024
2afe530
don't change default slack module name
sgress454 Dec 3, 2024
f13e6d6
add new alert
sgress454 Dec 3, 2024
091304c
send to correct topic
sgress454 Dec 3, 2024
7c15ef2
don't change default cron sns topic name
sgress454 Dec 3, 2024
80a165a
Merge branch 'main' into 19930-alert-cron-failures
sgress454 Dec 3, 2024
49b503d
add changelog
sgress454 Dec 3, 2024
cf244ea
lint
sgress454 Dec 3, 2024
5a066a8
revert dogfood tf changes (will make separate PR)
sgress454 Dec 3, 2024
1ac2df1
update migration timestamp
sgress454 Dec 4, 2024
0af018e
add monitor interval var
sgress454 Dec 4, 2024
f08c4c3
use lambda duration, not cron duration
sgress454 Dec 4, 2024
790c134
update schema.sql
sgress454 Dec 4, 2024
4efe16d
Merge branch 'main' into 19930-alert-cron-failures
sgress454 Dec 4, 2024
c4339be
Merge branch 'main' into 19930-alert-cron-failures
sgress454 Dec 5, 2024
cbfc575
handle nulls in test
sgress454 Dec 10, 2024
6fe214d
Merge branch 'main' into 19930-alert-cron-failures
sgress454 Dec 10, 2024
9b4ad8b
update migration
sgress454 Dec 10, 2024
1a1f97a
removed "errors" from GetLatestCronStats return val
sgress454 Dec 11, 2024
0f077ad
Merge branch 'main' into 19930-alert-cron-failures
sgress454 Dec 11, 2024
4979484
update schema.sql
sgress454 Dec 11, 2024
8115750
Merge branch 'main' into 19930-alert-cron-failures
sgress454 Dec 12, 2024
c765339
Merge branch 'main' into 19930-alert-cron-failures
sgress454 Dec 18, 2024
bb56e25
update monitoring readme
sgress454 Dec 18, 2024
72f845f
don't commit lock file
sgress454 Dec 18, 2024
31dd84b
allow test to recover from panic
sgress454 Dec 18, 2024
97cdca2
Merge branch 'main' into 19930-alert-cron-failures
sgress454 Dec 19, 2024
d347593
Merge branch 'main' into 19930-alert-cron-failures
sgress454 Dec 19, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changes/19930-alert-on-cron-errors
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
- Send alert via SNS when a scheduled "cron" job returns errors
- SNS topic for job error alerts can be configured separately from the existing monitor alert by adding "cron_job_failure_monitoring" to sns_topic_arns_map, otherwise defaults to the using the same topic
17 changes: 14 additions & 3 deletions server/datastore/mysql/cron_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package mysql

import (
"context"
"database/sql"
"encoding/json"

"github.com/fleetdm/fleet/v4/server/contexts/ctxerr"
"github.com/fleetdm/fleet/v4/server/fleet"
Expand Down Expand Up @@ -63,10 +65,19 @@ func (ds *Datastore) InsertCronStats(ctx context.Context, statsType fleet.CronSt
return int(id), nil
}

func (ds *Datastore) UpdateCronStats(ctx context.Context, id int, status fleet.CronStatsStatus) error {
stmt := `UPDATE cron_stats SET status = ? WHERE id = ?`
func (ds *Datastore) UpdateCronStats(ctx context.Context, id int, status fleet.CronStatsStatus, cronErrors *fleet.CronScheduleErrors) error {
stmt := `UPDATE cron_stats SET status = ?, errors = ? WHERE id = ?`

if _, err := ds.writer(ctx).ExecContext(ctx, stmt, status, id); err != nil {
errorsJSON := sql.NullString{}
if len(*cronErrors) > 0 {
b, err := json.Marshal(cronErrors)
if err == nil {
errorsJSON.String = string(b)
errorsJSON.Valid = true
}
}

if _, err := ds.writer(ctx).ExecContext(ctx, stmt, status, errorsJSON, id); err != nil {
return ctxerr.Wrap(ctx, err, "update cron stats")
}

Expand Down
37 changes: 32 additions & 5 deletions server/datastore/mysql/cron_stats_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package mysql

import (
"context"
"database/sql"
"encoding/json"
"errors"
"testing"
"time"

Expand All @@ -10,6 +13,12 @@ import (
"github.com/stretchr/testify/require"
)

type testCronStats struct {
fleet.CronStats
// Errors is a JSON string containing any errors encountered during the run.
Errors sql.NullString `db:"errors"`
}

func TestInsertUpdateCronStats(t *testing.T) {
const (
scheduleName = "test_sched"
Expand All @@ -28,7 +37,10 @@ func TestInsertUpdateCronStats(t *testing.T) {
require.Equal(t, fleet.CronStatsTypeScheduled, res[0].StatsType)
require.Equal(t, fleet.CronStatsStatusPending, res[0].Status)

err = ds.UpdateCronStats(ctx, id, fleet.CronStatsStatusCompleted)
err = ds.UpdateCronStats(ctx, id, fleet.CronStatsStatusCompleted, &fleet.CronScheduleErrors{
"some_job": errors.New("some error"),
"some_other_job": errors.New("some other error"),
})
require.NoError(t, err)

res, err = ds.GetLatestCronStats(ctx, scheduleName)
Expand All @@ -37,6 +49,21 @@ func TestInsertUpdateCronStats(t *testing.T) {
require.Equal(t, id, res[0].ID)
require.Equal(t, fleet.CronStatsTypeScheduled, res[0].StatsType)
require.Equal(t, fleet.CronStatsStatusCompleted, res[0].Status)

var stats []testCronStats
err = sqlx.SelectContext(ctx, ds.reader(ctx), &stats, `SELECT * FROM cron_stats ORDER BY id`)
require.NoError(t, err)
// Make sure we got valid JSON back.
var actualMap map[string]string
err = json.Unmarshal([]byte(stats[0].Errors.String), &actualMap)
require.NoError(t, err)

// Compare the error JSON with the expected object.
expectedJSON := `{"some_job": "some error", "some_other_job": "some other error"}`
var expectedMap map[string]string
err = json.Unmarshal([]byte(expectedJSON), &expectedMap)
require.NoError(t, err)
require.Equal(t, actualMap, expectedMap)
}

func TestGetLatestCronStats(t *testing.T) {
Expand Down Expand Up @@ -171,7 +198,7 @@ func TestCleanupCronStats(t *testing.T) {
require.NoError(t, err)
}

var stats []fleet.CronStats
var stats []testCronStats
err := sqlx.SelectContext(ctx, ds.reader(ctx), &stats, `SELECT * FROM cron_stats ORDER BY id`)
require.NoError(t, err)
require.Len(t, stats, len(cases))
Expand All @@ -183,7 +210,7 @@ func TestCleanupCronStats(t *testing.T) {
err = ds.CleanupCronStats(ctx)
require.NoError(t, err)

stats = []fleet.CronStats{}
stats = []testCronStats{}
err = sqlx.SelectContext(ctx, ds.reader(ctx), &stats, `SELECT * FROM cron_stats ORDER BY id`)
require.NoError(t, err)
require.Len(t, stats, len(cases)-1) // case[7] was deleted because it exceeded max age
Expand Down Expand Up @@ -254,7 +281,7 @@ func TestUpdateAllCronStatsForInstance(t *testing.T) {
require.NoError(t, err)
}

var stats []fleet.CronStats
var stats []testCronStats
err := sqlx.SelectContext(ctx, ds.reader(ctx), &stats, `SELECT * FROM cron_stats ORDER BY id`)
require.NoError(t, err)
require.Len(t, stats, len(cases))
Expand All @@ -267,7 +294,7 @@ func TestUpdateAllCronStatsForInstance(t *testing.T) {
err = ds.UpdateAllCronStatsForInstance(ctx, "inst1", fleet.CronStatsStatusPending, fleet.CronStatsStatusCanceled)
require.NoError(t, err)

stats = []fleet.CronStats{}
stats = []testCronStats{}
err = sqlx.SelectContext(ctx, ds.reader(ctx), &stats, `SELECT * FROM cron_stats ORDER BY id`)
require.NoError(t, err)
require.Len(t, stats, len(cases))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package tables

import (
"database/sql"
"fmt"
)

func init() {
MigrationClient.AddMigration(Up_20241126140021, Down_20241126140021)
}

func Up_20241126140021(tx *sql.Tx) error {
// Add columns
_, err := tx.Exec(`ALTER TABLE cron_stats ADD COLUMN errors JSON`)
if err != nil {
return fmt.Errorf("failed to add errors to cron_stats: %w", err)
}
return nil
}

func Down_20241126140021(tx *sql.Tx) error {
return nil
}
5 changes: 3 additions & 2 deletions server/datastore/mysql/schema.sql

Large diffs are not rendered by default.

19 changes: 19 additions & 0 deletions server/fleet/cron_schedules.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package fleet

import (
"encoding/json"
"fmt"
"net/http"
"sort"
Expand Down Expand Up @@ -147,6 +148,22 @@ func (e triggerNotFoundError) StatusCode() int {
return http.StatusNotFound
}

type CronScheduleErrors map[string]error

func (cse CronScheduleErrors) MarshalJSON() ([]byte, error) {
// Create a temporary map for JSON serialization
stringMap := make(map[string]string)
for key, err := range cse {
if err != nil {
stringMap[key] = err.Error()
} else {
stringMap[key] = ""
}
}
// Serialize the temporary map to JSON
return json.Marshal(stringMap)
}
Comment on lines +151 to +165
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

New type to hold errors from a cron schedule and turn them into a JSON string.


// CronStats represents statistics recorded in connection with a named set of jobs (sometimes
// referred to as a "cron" or "schedule"). Each record represents a separate "run" of the named job set.
type CronStats struct {
Expand All @@ -167,6 +184,8 @@ type CronStats struct {
// Status is the current status of the run. Recognized statuses are "pending", "completed", and
// "expired".
Status CronStatsStatus `db:"status"`
// Errors is a JSON string containing any errors encountered during the run.
Errors string `db:"errors"`
}

// CronStatsType is one of two recognized types of cron stats (i.e. "scheduled" or "triggered")
Expand Down
2 changes: 1 addition & 1 deletion server/fleet/datastore.go
Original file line number Diff line number Diff line change
Expand Up @@ -786,7 +786,7 @@ type Datastore interface {
// InsertCronStats inserts cron stats for the named cron schedule.
InsertCronStats(ctx context.Context, statsType CronStatsType, name string, instance string, status CronStatsStatus) (int, error)
// UpdateCronStats updates the status of the identified cron stats record.
UpdateCronStats(ctx context.Context, id int, status CronStatsStatus) error
UpdateCronStats(ctx context.Context, id int, status CronStatsStatus, cronErrors *CronScheduleErrors) error
// UpdateAllCronStatsForInstance updates all records for the identified instance with the
// specified statuses
UpdateAllCronStatsForInstance(ctx context.Context, instance string, fromStatus CronStatsStatus, toStatus CronStatsStatus) error
Expand Down
6 changes: 3 additions & 3 deletions server/mock/datastore_mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -581,7 +581,7 @@ type GetLatestCronStatsFunc func(ctx context.Context, name string) ([]fleet.Cron

type InsertCronStatsFunc func(ctx context.Context, statsType fleet.CronStatsType, name string, instance string, status fleet.CronStatsStatus) (int, error)

type UpdateCronStatsFunc func(ctx context.Context, id int, status fleet.CronStatsStatus) error
type UpdateCronStatsFunc func(ctx context.Context, id int, status fleet.CronStatsStatus, cronErrors *fleet.CronScheduleErrors) error

type UpdateAllCronStatsForInstanceFunc func(ctx context.Context, instance string, fromStatus fleet.CronStatsStatus, toStatus fleet.CronStatsStatus) error

Expand Down Expand Up @@ -4885,11 +4885,11 @@ func (s *DataStore) InsertCronStats(ctx context.Context, statsType fleet.CronSta
return s.InsertCronStatsFunc(ctx, statsType, name, instance, status)
}

func (s *DataStore) UpdateCronStats(ctx context.Context, id int, status fleet.CronStatsStatus) error {
func (s *DataStore) UpdateCronStats(ctx context.Context, id int, status fleet.CronStatsStatus, cronErrors *fleet.CronScheduleErrors) error {
s.mu.Lock()
s.UpdateCronStatsFuncInvoked = true
s.mu.Unlock()
return s.UpdateCronStatsFunc(ctx, id, status)
return s.UpdateCronStatsFunc(ctx, id, status, &fleet.CronScheduleErrors{})
}

func (s *DataStore) UpdateAllCronStatsForInstance(ctx context.Context, instance string, fromStatus fleet.CronStatsStatus, toStatus fleet.CronStatsStatus) error {
Expand Down
12 changes: 8 additions & 4 deletions server/service/schedule/schedule.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"context"
"fmt"
"os"
"runtime/debug"
"sync"
"time"

Expand Down Expand Up @@ -47,7 +48,8 @@ type Schedule struct {

altLockName string

jobs []Job
jobs []Job
errors fleet.CronScheduleErrors

statsStore CronStatsStore

Expand Down Expand Up @@ -80,7 +82,7 @@ type CronStatsStore interface {
// InsertCronStats inserts cron stats for the named cron schedule
InsertCronStats(ctx context.Context, statsType fleet.CronStatsType, name string, instance string, status fleet.CronStatsStatus) (int, error)
// UpdateCronStats updates the status of the identified cron stats record
UpdateCronStats(ctx context.Context, id int, status fleet.CronStatsStatus) error
UpdateCronStats(ctx context.Context, id int, status fleet.CronStatsStatus, cronErrors *fleet.CronScheduleErrors) error
}

// Option allows configuring a Schedule.
Expand Down Expand Up @@ -178,6 +180,7 @@ func New(
sch.logger = log.NewNopLogger()
}
sch.logger = log.With(sch.logger, "instanceID", instanceID)
sch.errors = make(fleet.CronScheduleErrors)
return sch
}

Expand Down Expand Up @@ -461,6 +464,7 @@ func (s *Schedule) runAllJobs() {
for _, job := range s.jobs {
level.Debug(s.logger).Log("msg", "starting", "jobID", job.ID)
if err := runJob(s.ctx, job.Fn); err != nil {
s.errors[job.ID] = err
level.Error(s.logger).Log("err", "running job", "details", err, "jobID", job.ID)
ctxerr.Handle(s.ctx, err)
}
Expand All @@ -472,7 +476,7 @@ func runJob(ctx context.Context, fn JobFn) (err error) {
defer func() {
if os.Getenv("TEST_CRON_NO_RECOVER") != "1" { // for detecting panics in tests
if r := recover(); r != nil {
err = fmt.Errorf("%v", r)
err = fmt.Errorf("%v\n%s", r, string(debug.Stack()))
}
}
}()
Expand Down Expand Up @@ -617,7 +621,7 @@ func (s *Schedule) insertStats(statsType fleet.CronStatsType, status fleet.CronS
}

func (s *Schedule) updateStats(id int, status fleet.CronStatsStatus) error {
return s.statsStore.UpdateCronStats(s.ctx, id, status)
return s.statsStore.UpdateCronStats(s.ctx, id, status, &s.errors)
}

func (s *Schedule) getLockName() string {
Expand Down
24 changes: 22 additions & 2 deletions server/service/schedule/schedule_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,9 @@ func TestMultipleSchedules(t *testing.T) {
}

func TestMultipleJobsInOrder(t *testing.T) {
os.Setenv("TEST_CRON_NO_RECOVER", "0")
defer os.Unsetenv("TEST_CRON_NO_RECOVER")

ctx, cancel := context.WithCancel(context.Background())

jobs := make(chan int)
Expand All @@ -211,12 +214,16 @@ func TestMultipleJobsInOrder(t *testing.T) {
}),
WithJob("test_job_2", func(ctx context.Context) error {
jobs <- 2
return errors.New("test_job_2")
return errors.New("whoops")
}),
WithJob("test_job_3", func(ctx context.Context) error {
jobs <- 3
return nil
}),
WithJob("test_job_4", func(ctx context.Context) error {
jobs <- 4
panic("oh no")
}),
)
s.Start()

Expand All @@ -233,7 +240,7 @@ func TestMultipleJobsInOrder(t *testing.T) {
return fmt.Errorf("mismatch id: %d vs %d", job, i)
}
i++
if i == 4 {
if i == 5 {
i = 1
}
case <-time.After(5 * time.Second):
Expand All @@ -253,6 +260,19 @@ func TestMultipleJobsInOrder(t *testing.T) {

err := g.Wait()
require.NoError(t, err)

// There should be errors from 2 jobs.
require.Equal(t, 2, len(s.errors))

// Check that the correct 2 jobs have errors.
test_job_2_err, ok := s.errors["test_job_2"]
require.True(t, ok)
test_job_4_err, ok := s.errors["test_job_4"]
require.True(t, ok)

// Check the errors that were returned.
require.Equal(t, "whoops", test_job_2_err.Error())
require.Contains(t, test_job_4_err.Error(), "oh no\n")
}

func TestConfigReloadCheck(t *testing.T) {
Expand Down
4 changes: 2 additions & 2 deletions server/service/schedule/testing_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (NopStatsStore) InsertCronStats(ctx context.Context, statsType fleet.CronSt
return 0, nil
}

func (NopStatsStore) UpdateCronStats(ctx context.Context, id int, status fleet.CronStatsStatus) error {
func (NopStatsStore) UpdateCronStats(ctx context.Context, id int, status fleet.CronStatsStatus, cronErrors *fleet.CronScheduleErrors) error {
return nil
}

Expand Down Expand Up @@ -179,7 +179,7 @@ func (m *MockStatsStore) InsertCronStats(ctx context.Context, statsType fleet.Cr
return id, nil
}

func (m *MockStatsStore) UpdateCronStats(ctx context.Context, id int, status fleet.CronStatsStatus) error {
func (m *MockStatsStore) UpdateCronStats(ctx context.Context, id int, status fleet.CronStatsStatus, cronErrors *fleet.CronScheduleErrors) error {
m.Lock()
defer m.Unlock()

Expand Down
4 changes: 3 additions & 1 deletion terraform/addons/monitoring/.header.md
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ module "monitoring" {
sns_topic_arns_map = {
alb_httpcode_5xx = [var.sns_topic_arn]
cron_monitoring = [var.sns_topic_arn]
cron_job_failure_monitoring = [var.sns_another_topic_arn]
}
mysql_cluster_members = module.main.byo-vpc.rds.cluster_members
# The cloudposse module seems to have a nested list here.
Expand Down Expand Up @@ -85,7 +86,8 @@ Valid targets for `sns_topic_arns_map`:
- alb_helthyhosts
- alb_httpcode_5xx
- backend_response_time
- cron_monitoring
- cron_monitoring (notifications about failures in the cron scheduler)
- cron_job_failure_monitoring (notifications about errors in individual cron jobs - defaults to value of `cron_monitoring`)
- rds_cpu_untilizaton_too_high
- rds_db_event_subscription
- redis_cpu_engine_utilization
Expand Down
Loading
Loading