diff --git a/changes/24771-mdm-deadlock-fixes b/changes/24771-mdm-deadlock-fixes new file mode 100644 index 000000000000..708576683328 --- /dev/null +++ b/changes/24771-mdm-deadlock-fixes @@ -0,0 +1 @@ +Fixed potential deadlocks when deploying Apple configuration profiles. diff --git a/server/datastore/mysql/apple_mdm.go b/server/datastore/mysql/apple_mdm.go index 5cbd19783a2d..d39dc93886d2 100644 --- a/server/datastore/mysql/apple_mdm.go +++ b/server/datastore/mysql/apple_mdm.go @@ -2814,11 +2814,16 @@ func (ds *Datastore) UpdateOrDeleteHostMDMAppleProfile(ctx context.Context, prof status = &fleet.MDMDeliveryVerified } - _, err := ds.writer(ctx).ExecContext(ctx, ` + // We need to run with retry due to potential deadlocks with BulkSetPendingMDMHostProfiles. + // Deadlock seen in 2024/12/12 loadtest: https://docs.google.com/document/d/1-Q6qFTd7CDm-lh7MVRgpNlNNJijk6JZ4KO49R1fp80U + err := ds.withRetryTxx(ctx, func(tx sqlx.ExtContext) error { + _, err := tx.ExecContext(ctx, ` UPDATE host_mdm_apple_profiles SET status = ?, operation_type = ?, detail = ? WHERE host_uuid = ? AND command_uuid = ? `, status, profile.OperationType, detail, profile.HostUUID, profile.CommandUUID) + return err + }) return err } diff --git a/server/datastore/mysql/common_mysql/retry.go b/server/datastore/mysql/common_mysql/retry.go new file mode 100644 index 000000000000..c12059dd68a3 --- /dev/null +++ b/server/datastore/mysql/common_mysql/retry.go @@ -0,0 +1,93 @@ +package common_mysql + +import ( + "context" + "database/sql" + "errors" + "time" + + "github.com/VividCortex/mysqlerr" + "github.com/cenkalti/backoff/v4" + "github.com/fleetdm/fleet/v4/server/contexts/ctxerr" + "github.com/go-kit/log" + "github.com/go-sql-driver/mysql" + "github.com/jmoiron/sqlx" +) + +var DoRetryErr = errors.New("fleet datastore retry") + +type TxFn func(tx sqlx.ExtContext) error + +// WithRetryTxx provides a common way to commit/rollback a txFn wrapped in a retry with exponential backoff +func WithRetryTxx(ctx context.Context, db *sqlx.DB, fn TxFn, logger log.Logger) error { + operation := func() error { + tx, err := db.BeginTxx(ctx, nil) + if err != nil { + return ctxerr.Wrap(ctx, err, "create transaction") + } + + defer func() { + if p := recover(); p != nil { + if err := tx.Rollback(); err != nil { + logger.Log("err", err, "msg", "error encountered during transaction panic rollback") + } + panic(p) + } + }() + + if err := fn(tx); err != nil { + rbErr := tx.Rollback() + if rbErr != nil && rbErr != sql.ErrTxDone { + // Consider rollback errors to be non-retryable + return backoff.Permanent(ctxerr.Wrapf(ctx, err, "got err '%s' rolling back after err", rbErr.Error())) + } + + if retryableError(err) { + return err + } + + // Consider any other errors to be non-retryable + return backoff.Permanent(err) + } + + if err := tx.Commit(); err != nil { + err = ctxerr.Wrap(ctx, err, "commit transaction") + + if retryableError(err) { + return err + } + + return backoff.Permanent(err) + } + + return nil + } + + expBo := backoff.NewExponentialBackOff() + // MySQL innodb_lock_wait_timeout default is 50 seconds, so transaction can be waiting for a lock for several seconds. + // Setting a higher MaxElapsedTime to increase probability that transaction will be retried. + // This will reduce the number of retryable 'Deadlock found' errors. However, with a loaded DB, we will still see + // 'Context cancelled' errors when the server drops long-lasting connections. + expBo.MaxElapsedTime = 1 * time.Minute + bo := backoff.WithMaxRetries(expBo, 5) + return backoff.Retry(operation, bo) +} + +// retryableError determines whether a MySQL error can be retried. By default +// errors are considered non-retryable. Only errors that we know have a +// possibility of succeeding on a retry should return true in this function. +func retryableError(err error) bool { + base := ctxerr.Cause(err) + if b, ok := base.(*mysql.MySQLError); ok { + switch b.Number { + // Consider lock related errors to be retryable + case mysqlerr.ER_LOCK_DEADLOCK, mysqlerr.ER_LOCK_WAIT_TIMEOUT: + return true + } + } + if errors.Is(err, DoRetryErr) { + return true + } + + return false +} diff --git a/server/datastore/mysql/mysql.go b/server/datastore/mysql/mysql.go index 356dabb9c643..d5d0c0372f84 100644 --- a/server/datastore/mysql/mysql.go +++ b/server/datastore/mysql/mysql.go @@ -14,15 +14,14 @@ import ( "sync" "time" - "github.com/VividCortex/mysqlerr" "github.com/WatchBeam/clock" "github.com/XSAM/otelsql" - "github.com/cenkalti/backoff/v4" "github.com/doug-martin/goqu/v9" "github.com/doug-martin/goqu/v9/exp" "github.com/fleetdm/fleet/v4/server/config" "github.com/fleetdm/fleet/v4/server/contexts/ctxdb" "github.com/fleetdm/fleet/v4/server/contexts/ctxerr" + "github.com/fleetdm/fleet/v4/server/datastore/mysql/common_mysql" "github.com/fleetdm/fleet/v4/server/datastore/mysql/migrations/data" "github.com/fleetdm/fleet/v4/server/datastore/mysql/migrations/tables" "github.com/fleetdm/fleet/v4/server/fleet" @@ -175,8 +174,6 @@ func (ds *Datastore) NewSCEPDepot() (scep_depot.Depot, error) { return newSCEPDepot(ds.primary.DB, ds) } -type txFn func(tx sqlx.ExtContext) error - type entity struct { name string } @@ -190,88 +187,12 @@ var ( usersTable = entity{"users"} ) -var doRetryErr = errors.New("fleet datastore retry") - -// retryableError determines whether a MySQL error can be retried. By default -// errors are considered non-retryable. Only errors that we know have a -// possibility of succeeding on a retry should return true in this function. -func retryableError(err error) bool { - base := ctxerr.Cause(err) - if b, ok := base.(*mysql.MySQLError); ok { - switch b.Number { - // Consider lock related errors to be retryable - case mysqlerr.ER_LOCK_DEADLOCK, mysqlerr.ER_LOCK_WAIT_TIMEOUT: - return true - } - } - if errors.Is(err, doRetryErr) { - return true - } - - return false -} - -func (ds *Datastore) withRetryTxx(ctx context.Context, fn txFn) (err error) { - return withRetryTxx(ctx, ds.writer(ctx), fn, ds.logger) -} - -// withRetryTxx provides a common way to commit/rollback a txFn wrapped in a retry with exponential backoff -func withRetryTxx(ctx context.Context, db *sqlx.DB, fn txFn, logger log.Logger) (err error) { - operation := func() error { - tx, err := db.BeginTxx(ctx, nil) - if err != nil { - return ctxerr.Wrap(ctx, err, "create transaction") - } - - defer func() { - if p := recover(); p != nil { - if err := tx.Rollback(); err != nil { - logger.Log("err", err, "msg", "error encountered during transaction panic rollback") - } - panic(p) - } - }() - - if err := fn(tx); err != nil { - rbErr := tx.Rollback() - if rbErr != nil && rbErr != sql.ErrTxDone { - // Consider rollback errors to be non-retryable - return backoff.Permanent(ctxerr.Wrapf(ctx, err, "got err '%s' rolling back after err", rbErr.Error())) - } - - if retryableError(err) { - return err - } - - // Consider any other errors to be non-retryable - return backoff.Permanent(err) - } - - if err := tx.Commit(); err != nil { - err = ctxerr.Wrap(ctx, err, "commit transaction") - - if retryableError(err) { - return err - } - - return backoff.Permanent(err) - } - - return nil - } - - expBo := backoff.NewExponentialBackOff() - // MySQL innodb_lock_wait_timeout default is 50 seconds, so transaction can be waiting for a lock for several seconds. - // Setting a higher MaxElapsedTime to increase probability that transaction will be retried. - // This will reduce the number of retryable 'Deadlock found' errors. However, with a loaded DB, we will still see - // 'Context cancelled' errors when the server drops long-lasting connections. - expBo.MaxElapsedTime = 1 * time.Minute - bo := backoff.WithMaxRetries(expBo, 5) - return backoff.Retry(operation, bo) +func (ds *Datastore) withRetryTxx(ctx context.Context, fn common_mysql.TxFn) (err error) { + return common_mysql.WithRetryTxx(ctx, ds.writer(ctx), fn, ds.logger) } // withTx provides a common way to commit/rollback a txFn -func (ds *Datastore) withTx(ctx context.Context, fn txFn) (err error) { +func (ds *Datastore) withTx(ctx context.Context, fn common_mysql.TxFn) (err error) { tx, err := ds.writer(ctx).BeginTxx(ctx, nil) if err != nil { return ctxerr.Wrap(ctx, err, "create transaction") diff --git a/server/datastore/mysql/nanomdm_storage.go b/server/datastore/mysql/nanomdm_storage.go index 585bc68949dd..964f03ab738e 100644 --- a/server/datastore/mysql/nanomdm_storage.go +++ b/server/datastore/mysql/nanomdm_storage.go @@ -10,6 +10,7 @@ import ( abmctx "github.com/fleetdm/fleet/v4/server/contexts/apple_bm" "github.com/fleetdm/fleet/v4/server/contexts/ctxerr" + "github.com/fleetdm/fleet/v4/server/datastore/mysql/common_mysql" "github.com/fleetdm/fleet/v4/server/fleet" "github.com/fleetdm/fleet/v4/server/mdm/assets" nanodep_client "github.com/fleetdm/fleet/v4/server/mdm/nanodep/client" @@ -125,7 +126,7 @@ func (s *NanoMDMStorage) EnqueueDeviceLockCommand( cmd *mdm.Command, pin string, ) error { - return withRetryTxx(ctx, s.db, func(tx sqlx.ExtContext) error { + return common_mysql.WithRetryTxx(ctx, s.db, func(tx sqlx.ExtContext) error { if err := enqueueCommandDB(ctx, tx, []string{host.UUID}, cmd); err != nil { return err } @@ -154,7 +155,7 @@ func (s *NanoMDMStorage) EnqueueDeviceLockCommand( // EnqueueDeviceWipeCommand enqueues a EraseDevice command for the given host. func (s *NanoMDMStorage) EnqueueDeviceWipeCommand(ctx context.Context, host *fleet.Host, cmd *mdm.Command) error { - return withRetryTxx(ctx, s.db, func(tx sqlx.ExtContext) error { + return common_mysql.WithRetryTxx(ctx, s.db, func(tx sqlx.ExtContext) error { if err := enqueueCommandDB(ctx, tx, []string{host.UUID}, cmd); err != nil { return err } diff --git a/server/datastore/mysql/operating_systems.go b/server/datastore/mysql/operating_systems.go index b1d8e295d250..73160b28e468 100644 --- a/server/datastore/mysql/operating_systems.go +++ b/server/datastore/mysql/operating_systems.go @@ -6,6 +6,7 @@ import ( "errors" "github.com/fleetdm/fleet/v4/server/contexts/ctxerr" + "github.com/fleetdm/fleet/v4/server/datastore/mysql/common_mysql" "github.com/fleetdm/fleet/v4/server/fleet" "github.com/jmoiron/sqlx" ) @@ -93,7 +94,7 @@ func newOperatingSystemDB(ctx context.Context, tx sqlx.ExtContext, hostOS fleet. case err == nil: return storedOS, nil case errors.Is(err, sql.ErrNoRows): - return nil, doRetryErr + return nil, common_mysql.DoRetryErr default: return nil, ctxerr.Wrap(ctx, err, "get new operating system") } diff --git a/server/datastore/mysql/policies.go b/server/datastore/mysql/policies.go index b1fa426365e6..bb4b0f9d9eae 100644 --- a/server/datastore/mysql/policies.go +++ b/server/datastore/mysql/policies.go @@ -13,6 +13,7 @@ import ( "golang.org/x/text/unicode/norm" "github.com/fleetdm/fleet/v4/server/contexts/ctxerr" + "github.com/fleetdm/fleet/v4/server/datastore/mysql/common_mysql" "github.com/fleetdm/fleet/v4/server/fleet" "github.com/fleetdm/fleet/v4/server/ptr" kitlog "github.com/go-kit/log" @@ -238,7 +239,7 @@ func cleanupPolicy( } if _, isDB := extContext.(*sqlx.DB); isDB { // wrapping in a retry to avoid deadlocks with the cleanups_then_aggregation cron job - err = withRetryTxx(ctx, extContext.(*sqlx.DB), fn, logger) + err = common_mysql.WithRetryTxx(ctx, extContext.(*sqlx.DB), fn, logger) } else { err = fn(extContext) } diff --git a/server/mdm/apple/profile_verifier.go b/server/mdm/apple/profile_verifier.go index d8428f54e80f..ac4048340fa7 100644 --- a/server/mdm/apple/profile_verifier.go +++ b/server/mdm/apple/profile_verifier.go @@ -3,6 +3,7 @@ package apple_mdm import ( "context" + "github.com/fleetdm/fleet/v4/server/contexts/ctxerr" "github.com/fleetdm/fleet/v4/server/fleet" "github.com/fleetdm/fleet/v4/server/mdm" ) @@ -118,11 +119,15 @@ func HandleHostMDMProfileInstallResult(ctx context.Context, ds fleet.ProfileVeri } // otherwise update status and detail as usual - return ds.UpdateOrDeleteHostMDMAppleProfile(ctx, &fleet.HostMDMAppleProfile{ + err := ds.UpdateOrDeleteHostMDMAppleProfile(ctx, &fleet.HostMDMAppleProfile{ CommandUUID: cmdUUID, HostUUID: hostUUID, Status: status, Detail: detail, OperationType: fleet.MDMOperationTypeInstall, }) + if err != nil { + return ctxerr.Wrap(ctx, err, "updating host MDM Apple profile install result") + } + return nil } diff --git a/server/mdm/nanomdm/storage/mysql/mysql.go b/server/mdm/nanomdm/storage/mysql/mysql.go index 628ab42c965f..9a9e3ae4a650 100644 --- a/server/mdm/nanomdm/storage/mysql/mysql.go +++ b/server/mdm/nanomdm/storage/mysql/mysql.go @@ -10,6 +10,7 @@ import ( "os" "time" + "github.com/fleetdm/fleet/v4/server/datastore/mysql/common_mysql" "github.com/fleetdm/fleet/v4/server/fleet" "github.com/fleetdm/fleet/v4/server/mdm/nanomdm/cryptoutil" "github.com/fleetdm/fleet/v4/server/mdm/nanomdm/mdm" @@ -113,7 +114,7 @@ func New(opts ...Option) (*MySQLStorage, error) { mysqlStore := &MySQLStorage{db: cfg.db, logger: cfg.logger, rm: cfg.rm} if cfg.reader == nil { mysqlStore.reader = func(ctx context.Context) fleet.DBReader { - return sqlx.NewDb(mysqlStore.db, "mysql") + return sqlx.NewDb(mysqlStore.db, "") } } else { mysqlStore.reader = cfg.reader @@ -337,7 +338,10 @@ func (s *MySQLStorage) updateLastSeenBatch(ctx context.Context, ids []string) { return } - _, err = s.db.ExecContext(ctx, stmt, args...) + err = common_mysql.WithRetryTxx(ctx, sqlx.NewDb(s.db, ""), func(tx sqlx.ExtContext) error { + _, err := tx.ExecContext(ctx, stmt, args...) + return err + }, loggerWrapper{s.logger}) if err != nil { s.logger.Info("msg", "error batch updating nano_enrollments.last_seen_at", "err", err) } diff --git a/server/mdm/nanomdm/storage/mysql/queue.go b/server/mdm/nanomdm/storage/mysql/queue.go index bce893253d2d..9654d01f88fa 100644 --- a/server/mdm/nanomdm/storage/mysql/queue.go +++ b/server/mdm/nanomdm/storage/mysql/queue.go @@ -8,11 +8,14 @@ import ( "strings" "github.com/fleetdm/fleet/v4/server/contexts/ctxerr" + "github.com/fleetdm/fleet/v4/server/datastore/mysql/common_mysql" "github.com/fleetdm/fleet/v4/server/mdm/nanomdm/mdm" "github.com/google/uuid" + "github.com/jmoiron/sqlx" + "github.com/micromdm/nanolib/log" ) -func enqueue(ctx context.Context, tx *sql.Tx, ids []string, cmd *mdm.Command) error { +func enqueue(ctx context.Context, tx sqlx.ExtContext, ids []string, cmd *mdm.Command) error { if len(ids) < 1 { return errors.New("no id(s) supplied to queue command to") } @@ -50,18 +53,22 @@ func enqueue(ctx context.Context, tx *sql.Tx, ids []string, cmd *mdm.Command) er return nil } +type loggerWrapper struct { + logger log.Logger +} + +func (l loggerWrapper) Log(keyvals ...interface{}) error { + l.logger.Info(keyvals...) + return nil +} + func (m *MySQLStorage) EnqueueCommand(ctx context.Context, ids []string, cmd *mdm.Command) (map[string]error, error) { - tx, err := m.db.BeginTx(ctx, nil) - if err != nil { - return nil, err - } - if err = enqueue(ctx, tx, ids, cmd); err != nil { - if rbErr := tx.Rollback(); rbErr != nil { - return nil, fmt.Errorf("rollback error: %w; while trying to handle error: %v", rbErr, err) - } - return nil, err - } - return nil, tx.Commit() + // We need to retry because this transaction may deadlock with updates to nano_enrollment.last_seen_at + // Deadlock seen in 2024/12/12 loadtest: https://docs.google.com/document/d/1-Q6qFTd7CDm-lh7MVRgpNlNNJijk6JZ4KO49R1fp80U + err := common_mysql.WithRetryTxx(ctx, sqlx.NewDb(m.db, ""), func(tx sqlx.ExtContext) error { + return enqueue(ctx, tx, ids, cmd) + }, loggerWrapper{m.logger}) + return nil, err } func (m *MySQLStorage) deleteCommand(ctx context.Context, tx *sql.Tx, id, uuid string) error {