Skip to content

Commit

Permalink
Fixed potential deadlocks when deploying Apple configuration profiles (
Browse files Browse the repository at this point in the history
…#24777) (#24797)

Cherry pick for #24771

Fixing deadlocks found in loadtest:

https://docs.google.com/document/d/1-Q6qFTd7CDm-lh7MVRgpNlNNJijk6JZ4KO49R1fp80U/edit?tab=t.0
- added retries to statements prone to deadlocks

# Checklist for submitter

- [x] Changes file added for user-visible changes in `changes/`,
`orbit/changes/` or `ee/fleetd-chrome/changes`.
- [x] Manual QA for all new/changed functionality

(cherry picked from commit 1e5da18)
  • Loading branch information
getvictor authored Dec 16, 2024
1 parent d4b91e3 commit 3f36aff
Show file tree
Hide file tree
Showing 10 changed files with 142 additions and 103 deletions.
1 change: 1 addition & 0 deletions changes/24771-mdm-deadlock-fixes
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Fixed potential deadlocks when deploying Apple configuration profiles.
7 changes: 6 additions & 1 deletion server/datastore/mysql/apple_mdm.go
Original file line number Diff line number Diff line change
Expand Up @@ -2814,11 +2814,16 @@ func (ds *Datastore) UpdateOrDeleteHostMDMAppleProfile(ctx context.Context, prof
status = &fleet.MDMDeliveryVerified
}

_, err := ds.writer(ctx).ExecContext(ctx, `
// We need to run with retry due to potential deadlocks with BulkSetPendingMDMHostProfiles.
// Deadlock seen in 2024/12/12 loadtest: https://docs.google.com/document/d/1-Q6qFTd7CDm-lh7MVRgpNlNNJijk6JZ4KO49R1fp80U
err := ds.withRetryTxx(ctx, func(tx sqlx.ExtContext) error {
_, err := tx.ExecContext(ctx, `
UPDATE host_mdm_apple_profiles
SET status = ?, operation_type = ?, detail = ?
WHERE host_uuid = ? AND command_uuid = ?
`, status, profile.OperationType, detail, profile.HostUUID, profile.CommandUUID)
return err
})
return err
}

Expand Down
93 changes: 93 additions & 0 deletions server/datastore/mysql/common_mysql/retry.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package common_mysql

import (
"context"
"database/sql"
"errors"
"time"

"github.com/VividCortex/mysqlerr"
"github.com/cenkalti/backoff/v4"
"github.com/fleetdm/fleet/v4/server/contexts/ctxerr"
"github.com/go-kit/log"
"github.com/go-sql-driver/mysql"
"github.com/jmoiron/sqlx"
)

var DoRetryErr = errors.New("fleet datastore retry")

type TxFn func(tx sqlx.ExtContext) error

// WithRetryTxx provides a common way to commit/rollback a txFn wrapped in a retry with exponential backoff
func WithRetryTxx(ctx context.Context, db *sqlx.DB, fn TxFn, logger log.Logger) error {
operation := func() error {
tx, err := db.BeginTxx(ctx, nil)
if err != nil {
return ctxerr.Wrap(ctx, err, "create transaction")
}

defer func() {
if p := recover(); p != nil {
if err := tx.Rollback(); err != nil {
logger.Log("err", err, "msg", "error encountered during transaction panic rollback")
}
panic(p)
}
}()

if err := fn(tx); err != nil {
rbErr := tx.Rollback()
if rbErr != nil && rbErr != sql.ErrTxDone {
// Consider rollback errors to be non-retryable
return backoff.Permanent(ctxerr.Wrapf(ctx, err, "got err '%s' rolling back after err", rbErr.Error()))
}

if retryableError(err) {
return err
}

// Consider any other errors to be non-retryable
return backoff.Permanent(err)
}

if err := tx.Commit(); err != nil {
err = ctxerr.Wrap(ctx, err, "commit transaction")

if retryableError(err) {
return err
}

return backoff.Permanent(err)
}

return nil
}

expBo := backoff.NewExponentialBackOff()
// MySQL innodb_lock_wait_timeout default is 50 seconds, so transaction can be waiting for a lock for several seconds.
// Setting a higher MaxElapsedTime to increase probability that transaction will be retried.
// This will reduce the number of retryable 'Deadlock found' errors. However, with a loaded DB, we will still see
// 'Context cancelled' errors when the server drops long-lasting connections.
expBo.MaxElapsedTime = 1 * time.Minute
bo := backoff.WithMaxRetries(expBo, 5)
return backoff.Retry(operation, bo)
}

// retryableError determines whether a MySQL error can be retried. By default
// errors are considered non-retryable. Only errors that we know have a
// possibility of succeeding on a retry should return true in this function.
func retryableError(err error) bool {
base := ctxerr.Cause(err)
if b, ok := base.(*mysql.MySQLError); ok {
switch b.Number {
// Consider lock related errors to be retryable
case mysqlerr.ER_LOCK_DEADLOCK, mysqlerr.ER_LOCK_WAIT_TIMEOUT:
return true
}
}
if errors.Is(err, DoRetryErr) {
return true
}

return false
}
87 changes: 4 additions & 83 deletions server/datastore/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,14 @@ import (
"sync"
"time"

"github.com/VividCortex/mysqlerr"
"github.com/WatchBeam/clock"
"github.com/XSAM/otelsql"
"github.com/cenkalti/backoff/v4"
"github.com/doug-martin/goqu/v9"
"github.com/doug-martin/goqu/v9/exp"
"github.com/fleetdm/fleet/v4/server/config"
"github.com/fleetdm/fleet/v4/server/contexts/ctxdb"
"github.com/fleetdm/fleet/v4/server/contexts/ctxerr"
"github.com/fleetdm/fleet/v4/server/datastore/mysql/common_mysql"
"github.com/fleetdm/fleet/v4/server/datastore/mysql/migrations/data"
"github.com/fleetdm/fleet/v4/server/datastore/mysql/migrations/tables"
"github.com/fleetdm/fleet/v4/server/fleet"
Expand Down Expand Up @@ -175,8 +174,6 @@ func (ds *Datastore) NewSCEPDepot() (scep_depot.Depot, error) {
return newSCEPDepot(ds.primary.DB, ds)
}

type txFn func(tx sqlx.ExtContext) error

type entity struct {
name string
}
Expand All @@ -190,88 +187,12 @@ var (
usersTable = entity{"users"}
)

var doRetryErr = errors.New("fleet datastore retry")

// retryableError determines whether a MySQL error can be retried. By default
// errors are considered non-retryable. Only errors that we know have a
// possibility of succeeding on a retry should return true in this function.
func retryableError(err error) bool {
base := ctxerr.Cause(err)
if b, ok := base.(*mysql.MySQLError); ok {
switch b.Number {
// Consider lock related errors to be retryable
case mysqlerr.ER_LOCK_DEADLOCK, mysqlerr.ER_LOCK_WAIT_TIMEOUT:
return true
}
}
if errors.Is(err, doRetryErr) {
return true
}

return false
}

func (ds *Datastore) withRetryTxx(ctx context.Context, fn txFn) (err error) {
return withRetryTxx(ctx, ds.writer(ctx), fn, ds.logger)
}

// withRetryTxx provides a common way to commit/rollback a txFn wrapped in a retry with exponential backoff
func withRetryTxx(ctx context.Context, db *sqlx.DB, fn txFn, logger log.Logger) (err error) {
operation := func() error {
tx, err := db.BeginTxx(ctx, nil)
if err != nil {
return ctxerr.Wrap(ctx, err, "create transaction")
}

defer func() {
if p := recover(); p != nil {
if err := tx.Rollback(); err != nil {
logger.Log("err", err, "msg", "error encountered during transaction panic rollback")
}
panic(p)
}
}()

if err := fn(tx); err != nil {
rbErr := tx.Rollback()
if rbErr != nil && rbErr != sql.ErrTxDone {
// Consider rollback errors to be non-retryable
return backoff.Permanent(ctxerr.Wrapf(ctx, err, "got err '%s' rolling back after err", rbErr.Error()))
}

if retryableError(err) {
return err
}

// Consider any other errors to be non-retryable
return backoff.Permanent(err)
}

if err := tx.Commit(); err != nil {
err = ctxerr.Wrap(ctx, err, "commit transaction")

if retryableError(err) {
return err
}

return backoff.Permanent(err)
}

return nil
}

expBo := backoff.NewExponentialBackOff()
// MySQL innodb_lock_wait_timeout default is 50 seconds, so transaction can be waiting for a lock for several seconds.
// Setting a higher MaxElapsedTime to increase probability that transaction will be retried.
// This will reduce the number of retryable 'Deadlock found' errors. However, with a loaded DB, we will still see
// 'Context cancelled' errors when the server drops long-lasting connections.
expBo.MaxElapsedTime = 1 * time.Minute
bo := backoff.WithMaxRetries(expBo, 5)
return backoff.Retry(operation, bo)
func (ds *Datastore) withRetryTxx(ctx context.Context, fn common_mysql.TxFn) (err error) {
return common_mysql.WithRetryTxx(ctx, ds.writer(ctx), fn, ds.logger)
}

// withTx provides a common way to commit/rollback a txFn
func (ds *Datastore) withTx(ctx context.Context, fn txFn) (err error) {
func (ds *Datastore) withTx(ctx context.Context, fn common_mysql.TxFn) (err error) {
tx, err := ds.writer(ctx).BeginTxx(ctx, nil)
if err != nil {
return ctxerr.Wrap(ctx, err, "create transaction")
Expand Down
5 changes: 3 additions & 2 deletions server/datastore/mysql/nanomdm_storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

abmctx "github.com/fleetdm/fleet/v4/server/contexts/apple_bm"
"github.com/fleetdm/fleet/v4/server/contexts/ctxerr"
"github.com/fleetdm/fleet/v4/server/datastore/mysql/common_mysql"
"github.com/fleetdm/fleet/v4/server/fleet"
"github.com/fleetdm/fleet/v4/server/mdm/assets"
nanodep_client "github.com/fleetdm/fleet/v4/server/mdm/nanodep/client"
Expand Down Expand Up @@ -125,7 +126,7 @@ func (s *NanoMDMStorage) EnqueueDeviceLockCommand(
cmd *mdm.Command,
pin string,
) error {
return withRetryTxx(ctx, s.db, func(tx sqlx.ExtContext) error {
return common_mysql.WithRetryTxx(ctx, s.db, func(tx sqlx.ExtContext) error {
if err := enqueueCommandDB(ctx, tx, []string{host.UUID}, cmd); err != nil {
return err
}
Expand Down Expand Up @@ -154,7 +155,7 @@ func (s *NanoMDMStorage) EnqueueDeviceLockCommand(

// EnqueueDeviceWipeCommand enqueues a EraseDevice command for the given host.
func (s *NanoMDMStorage) EnqueueDeviceWipeCommand(ctx context.Context, host *fleet.Host, cmd *mdm.Command) error {
return withRetryTxx(ctx, s.db, func(tx sqlx.ExtContext) error {
return common_mysql.WithRetryTxx(ctx, s.db, func(tx sqlx.ExtContext) error {
if err := enqueueCommandDB(ctx, tx, []string{host.UUID}, cmd); err != nil {
return err
}
Expand Down
3 changes: 2 additions & 1 deletion server/datastore/mysql/operating_systems.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"errors"

"github.com/fleetdm/fleet/v4/server/contexts/ctxerr"
"github.com/fleetdm/fleet/v4/server/datastore/mysql/common_mysql"
"github.com/fleetdm/fleet/v4/server/fleet"
"github.com/jmoiron/sqlx"
)
Expand Down Expand Up @@ -93,7 +94,7 @@ func newOperatingSystemDB(ctx context.Context, tx sqlx.ExtContext, hostOS fleet.
case err == nil:
return storedOS, nil
case errors.Is(err, sql.ErrNoRows):
return nil, doRetryErr
return nil, common_mysql.DoRetryErr
default:
return nil, ctxerr.Wrap(ctx, err, "get new operating system")
}
Expand Down
3 changes: 2 additions & 1 deletion server/datastore/mysql/policies.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"golang.org/x/text/unicode/norm"

"github.com/fleetdm/fleet/v4/server/contexts/ctxerr"
"github.com/fleetdm/fleet/v4/server/datastore/mysql/common_mysql"
"github.com/fleetdm/fleet/v4/server/fleet"
"github.com/fleetdm/fleet/v4/server/ptr"
kitlog "github.com/go-kit/log"
Expand Down Expand Up @@ -238,7 +239,7 @@ func cleanupPolicy(
}
if _, isDB := extContext.(*sqlx.DB); isDB {
// wrapping in a retry to avoid deadlocks with the cleanups_then_aggregation cron job
err = withRetryTxx(ctx, extContext.(*sqlx.DB), fn, logger)
err = common_mysql.WithRetryTxx(ctx, extContext.(*sqlx.DB), fn, logger)
} else {
err = fn(extContext)
}
Expand Down
7 changes: 6 additions & 1 deletion server/mdm/apple/profile_verifier.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package apple_mdm
import (
"context"

"github.com/fleetdm/fleet/v4/server/contexts/ctxerr"
"github.com/fleetdm/fleet/v4/server/fleet"
"github.com/fleetdm/fleet/v4/server/mdm"
)
Expand Down Expand Up @@ -118,11 +119,15 @@ func HandleHostMDMProfileInstallResult(ctx context.Context, ds fleet.ProfileVeri
}

// otherwise update status and detail as usual
return ds.UpdateOrDeleteHostMDMAppleProfile(ctx, &fleet.HostMDMAppleProfile{
err := ds.UpdateOrDeleteHostMDMAppleProfile(ctx, &fleet.HostMDMAppleProfile{
CommandUUID: cmdUUID,
HostUUID: hostUUID,
Status: status,
Detail: detail,
OperationType: fleet.MDMOperationTypeInstall,
})
if err != nil {
return ctxerr.Wrap(ctx, err, "updating host MDM Apple profile install result")
}
return nil
}
8 changes: 6 additions & 2 deletions server/mdm/nanomdm/storage/mysql/mysql.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"time"

"github.com/fleetdm/fleet/v4/server/datastore/mysql/common_mysql"
"github.com/fleetdm/fleet/v4/server/fleet"
"github.com/fleetdm/fleet/v4/server/mdm/nanomdm/cryptoutil"
"github.com/fleetdm/fleet/v4/server/mdm/nanomdm/mdm"
Expand Down Expand Up @@ -113,7 +114,7 @@ func New(opts ...Option) (*MySQLStorage, error) {
mysqlStore := &MySQLStorage{db: cfg.db, logger: cfg.logger, rm: cfg.rm}
if cfg.reader == nil {
mysqlStore.reader = func(ctx context.Context) fleet.DBReader {
return sqlx.NewDb(mysqlStore.db, "mysql")
return sqlx.NewDb(mysqlStore.db, "")
}
} else {
mysqlStore.reader = cfg.reader
Expand Down Expand Up @@ -337,7 +338,10 @@ func (s *MySQLStorage) updateLastSeenBatch(ctx context.Context, ids []string) {
return
}

_, err = s.db.ExecContext(ctx, stmt, args...)
err = common_mysql.WithRetryTxx(ctx, sqlx.NewDb(s.db, ""), func(tx sqlx.ExtContext) error {
_, err := tx.ExecContext(ctx, stmt, args...)
return err
}, loggerWrapper{s.logger})
if err != nil {
s.logger.Info("msg", "error batch updating nano_enrollments.last_seen_at", "err", err)
}
Expand Down
31 changes: 19 additions & 12 deletions server/mdm/nanomdm/storage/mysql/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,14 @@ import (
"strings"

"github.com/fleetdm/fleet/v4/server/contexts/ctxerr"
"github.com/fleetdm/fleet/v4/server/datastore/mysql/common_mysql"
"github.com/fleetdm/fleet/v4/server/mdm/nanomdm/mdm"
"github.com/google/uuid"
"github.com/jmoiron/sqlx"
"github.com/micromdm/nanolib/log"
)

func enqueue(ctx context.Context, tx *sql.Tx, ids []string, cmd *mdm.Command) error {
func enqueue(ctx context.Context, tx sqlx.ExtContext, ids []string, cmd *mdm.Command) error {
if len(ids) < 1 {
return errors.New("no id(s) supplied to queue command to")
}
Expand Down Expand Up @@ -50,18 +53,22 @@ func enqueue(ctx context.Context, tx *sql.Tx, ids []string, cmd *mdm.Command) er
return nil
}

type loggerWrapper struct {
logger log.Logger
}

func (l loggerWrapper) Log(keyvals ...interface{}) error {
l.logger.Info(keyvals...)
return nil
}

func (m *MySQLStorage) EnqueueCommand(ctx context.Context, ids []string, cmd *mdm.Command) (map[string]error, error) {
tx, err := m.db.BeginTx(ctx, nil)
if err != nil {
return nil, err
}
if err = enqueue(ctx, tx, ids, cmd); err != nil {
if rbErr := tx.Rollback(); rbErr != nil {
return nil, fmt.Errorf("rollback error: %w; while trying to handle error: %v", rbErr, err)
}
return nil, err
}
return nil, tx.Commit()
// We need to retry because this transaction may deadlock with updates to nano_enrollment.last_seen_at
// Deadlock seen in 2024/12/12 loadtest: https://docs.google.com/document/d/1-Q6qFTd7CDm-lh7MVRgpNlNNJijk6JZ4KO49R1fp80U
err := common_mysql.WithRetryTxx(ctx, sqlx.NewDb(m.db, ""), func(tx sqlx.ExtContext) error {
return enqueue(ctx, tx, ids, cmd)
}, loggerWrapper{m.logger})
return nil, err
}

func (m *MySQLStorage) deleteCommand(ctx context.Context, tx *sql.Tx, id, uuid string) error {
Expand Down

0 comments on commit 3f36aff

Please sign in to comment.