Skip to content

Commit

Permalink
[#1437] Prevent to stop puller on DB failure
Browse files Browse the repository at this point in the history
  • Loading branch information
fivitti committed Jul 24, 2024
1 parent 02ef394 commit ccede78
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 73 deletions.
8 changes: 5 additions & 3 deletions backend/server/agentcomm/puller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,10 +43,12 @@ func NewPeriodicPuller(db *dbops.PgDB, agents ConnectedAgents, pullerName, inter
lastFinishedAt.Store(time.Now())
return err
},
func() (int64, error) {
func() (time.Duration, error) {
interval, err := dbmodel.GetSettingInt(db, intervalSettingName)
return interval, errors.WithMessagef(err, "Problem getting interval setting %s from db",
intervalSettingName)
return time.Duration(interval) * time.Second,
errors.WithMessagef(err,
"Problem getting interval setting %s from db",
intervalSettingName)
},
)
if err != nil {
Expand Down
45 changes: 43 additions & 2 deletions backend/server/agentcomm/puller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,54 @@ func TestReadIntervalFromDatabase(t *testing.T) {
_ = dbmodel.SetSettingInt(db, "kea_hosts_puller_interval", 10)

// Assert
require.EqualValues(t, 1, initialInterval)
require.EqualValues(t, 1*time.Second, initialInterval)
require.Eventually(t, func() bool {
currentInterval := puller.GetInterval()
return currentInterval == 10
return currentInterval == 10*time.Second
}, 5*time.Second, time.Second, "puller didn't update the interval")
}

// Test that the puller doesn't stop when the read interval from the database
// fails.
func TestExecutePullerWhileDatabaseIsDown(t *testing.T) {
// Arrange
db, _, teardown := dbtest.SetupDatabaseTestCase(t)
defer teardown()
_ = dbmodel.InitializeSettings(db, 0)
_ = dbmodel.SetSettingInt(db, "kea_hosts_puller_interval", 1)

var callCount atomic.Uint64
callCount.Store(0)

puller, _ := NewPeriodicPuller(db, nil, "test puller", "kea_hosts_puller_interval",
func() error {
// Increment a counter to check if the puller is still running.
callCount.Add(1)
return nil
})
defer puller.Shutdown()

// Wait for the initial puller execution.
require.Eventually(t, func() bool {
return callCount.Load() > 0
}, 5*time.Second, time.Second)

// Stop the database to simulate a failure.
teardown()

// Get the counter value after the database failure.
callCountAfterFailure := callCount.Load()

// Act & Assert
require.Eventually(t, func() bool {
// Periodic executor updates the interval after the pulling. So, the
// counter is incremented on failure. If the puller is still running,
// the counter should incremented more times.
currentCallCount := callCount.Load()
return currentCallCount >= callCountAfterFailure+2
}, 5*time.Second, time.Second)
}

// Test that the interval setting name is returned properly.
func TestGetIntervalName(t *testing.T) {
// Arrange
Expand Down
6 changes: 3 additions & 3 deletions backend/server/restservice/pullers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ import (
type pullerMetadata interface {
GetName() string
GetIntervalSettingName() string
GetInterval() int64
GetInterval() time.Duration
GetLastInvokedAt() time.Time
GetLastFinishedAt() time.Time
}
Expand All @@ -44,7 +44,7 @@ func (r *RestAPI) GetPullers(ctx context.Context, params settings.GetPullersPara
metadata := &models.Puller{
Name: puller.GetName(),
ID: puller.GetIntervalSettingName(),
Interval: puller.GetInterval(),
Interval: int64(puller.GetInterval().Truncate(time.Second).Seconds()),
LastInvokedAt: strfmt.DateTime(puller.GetLastInvokedAt()),
LastFinishedAt: strfmt.DateTime(puller.GetLastFinishedAt()),
}
Expand Down Expand Up @@ -83,7 +83,7 @@ func (r *RestAPI) GetPuller(ctx context.Context, params settings.GetPullerParams
metadata := &models.Puller{
Name: puller.GetName(),
ID: puller.GetIntervalSettingName(),
Interval: puller.GetInterval(),
Interval: int64(puller.GetInterval().Truncate(time.Second).Seconds()),
LastInvokedAt: strfmt.DateTime(puller.GetLastInvokedAt()),
LastFinishedAt: strfmt.DateTime(puller.GetLastFinishedAt()),
}
Expand Down
95 changes: 39 additions & 56 deletions backend/util/periodicexecutor.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,24 @@ import (
type PeriodicExecutor struct {
name string
executorFunc func() error
interval int64
interval time.Duration
ticker *time.Ticker
active bool
pauseCount uint16
done chan bool
wg *sync.WaitGroup
mutex *sync.Mutex
getIntervalFunc func() (int64, error)
wg sync.WaitGroup
mutex sync.RWMutex
getIntervalFunc func() (time.Duration, error)
}

// Interval is used while the puller is inactive to check if it was re-enabled.
const InactiveInterval int64 = 60
const InactiveInterval time.Duration = 1 * time.Minute

// Creates an instance of a new periodic executor. The periodic executor offers a mechanism
// to periodically trigger an action. This action is supplied as a function instance.
// This function is executed within a goroutine periodically according to the timer
// interval calculated by `getIntervalFunc`. It accepts previous interval and returns next value.
func NewPeriodicExecutor(name string, executorFunc func() error, getIntervalFunc func() (int64, error)) (*PeriodicExecutor, error) {
func NewPeriodicExecutor(name string, executorFunc func() error, getIntervalFunc func() (time.Duration, error)) (*PeriodicExecutor, error) {
log.Printf("Starting %s", name)

interval, err := getIntervalFunc()
Expand All @@ -51,12 +51,10 @@ func NewPeriodicExecutor(name string, executorFunc func() error, getIntervalFunc
periodicExecutor := &PeriodicExecutor{
name: name,
executorFunc: executorFunc,
ticker: time.NewTicker(time.Duration(interval) * time.Second),
ticker: time.NewTicker(interval),
active: active,
pauseCount: 0,
done: make(chan bool),
wg: &sync.WaitGroup{},
mutex: &sync.Mutex{},
interval: interval,
getIntervalFunc: getIntervalFunc,
}
Expand Down Expand Up @@ -95,61 +93,46 @@ func (executor *PeriodicExecutor) Pause() {

// Checks if the executor is currently paused.
func (executor *PeriodicExecutor) Paused() bool {
executor.mutex.Lock()
defer executor.mutex.Unlock()
executor.mutex.RLock()
defer executor.mutex.RUnlock()
return executor.pauseCount > 0
}

// Unpause implementation which optionally locks the executor's mutex.
// This function is internally called by Unpause() and Reset(). Note
// that Reset() locks the mutex on its own so the lock argument is
// set to false in this case.
func (executor *PeriodicExecutor) unpause(lock bool, intervals ...int64) {
if len(intervals) > 1 {
// This should not happen.
panic("Resume accepts one or zero interval values")
}
if lock {
executor.mutex.Lock()
defer executor.mutex.Unlock()
}
// Unpauses the executor. The optional interval parameter may contain
// one interval value which overrides the current interval. If the interval
// is not specified, the current interval is used.
func (executor *PeriodicExecutor) Unpause() {
executor.mutex.Lock()
defer executor.mutex.Unlock()

if executor.pauseCount > 0 {
executor.pauseCount--
}

// Unpause() called for all earlier calls to Pause(), so we can resume
// the executor action.
if executor.pauseCount == 0 {
if len(intervals) > 0 {
// Override the interval.
executor.interval = intervals[0]
}
// Reschedule the timer.
executor.ticker.Reset(time.Duration(executor.interval) * time.Second)
executor.ticker.Reset(executor.interval)
}
}

// Unpauses the executor. The optional interval parameter may contain
// one interval value which overrides the current interval. If the interval
// is not specified, the current interval is used.
func (executor *PeriodicExecutor) Unpause(interval ...int64) {
executor.unpause(true, interval...)
}

// Return the current interval in seconds.
func (executor *PeriodicExecutor) GetInterval() int64 {
executor.mutex.Lock()
defer executor.mutex.Unlock()
// Return the current interval.
func (executor *PeriodicExecutor) GetInterval() time.Duration {
executor.mutex.RLock()
defer executor.mutex.RUnlock()
return executor.interval
}

// Reschedule the executor timer to a new interval. It forcibly stops
// the executor and reschedules to the new interval.
func (executor *PeriodicExecutor) Reset(interval int64) {
func (executor *PeriodicExecutor) reset(interval time.Duration) {
executor.mutex.Lock()
defer executor.mutex.Unlock()
executor.ticker.Stop()

executor.pauseCount = 0
executor.unpause(false, interval)
executor.interval = interval
executor.ticker.Reset(interval)
}

// This function controls the timing of the function execution and captures the
Expand All @@ -167,7 +150,7 @@ func (executor *PeriodicExecutor) executorLoop() {
err := executor.executorFunc()
executor.Unpause()
if err != nil {
log.Errorf("Errors were encountered while pulling data from apps: %+v", err)
log.WithError(err).Errorf("Errors were encountered while pulling data from apps")
}
}
// wait for done signal from shutdown function
Expand All @@ -177,26 +160,26 @@ func (executor *PeriodicExecutor) executorLoop() {
return
}

executor.mutex.RLock()
prevInterval := executor.interval
executor.mutex.RUnlock()

// Check if the interval has changed. If so, recreate the ticker.
interval, err := executor.getIntervalFunc()
nextInterval, err := executor.getIntervalFunc()
if err != nil {
log.Errorf("Problem getting interval: %+v", err)
return
log.WithError(err).Error("Problem getting interval, keep the current value")
nextInterval = prevInterval
}

executor.mutex.Lock()
executorInterval := executor.interval
executor.mutex.Unlock()

if interval <= 0 && executor.active {
if nextInterval <= 0 && executor.active {
// if executor should be disabled but it is active then
if executorInterval != InactiveInterval {
executor.Reset(InactiveInterval)
if prevInterval != InactiveInterval {
executor.reset(InactiveInterval)
}
executor.active = false
} else if interval > 0 && interval != executorInterval {
} else if nextInterval > 0 && nextInterval != prevInterval {
// if executor interval is changed and is not 0 (disabled)
executor.Reset(interval)
executor.reset(nextInterval)
executor.active = true
}
}
Expand Down
22 changes: 13 additions & 9 deletions backend/util/periodicexecutor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ func (executor *testExecutor) mockPull() error {
// Test test verifies that the executor is paused while handler function is
// being invoked.
func TestPausedWhileHandling(t *testing.T) {
getIntervalFunc := func() (int64, error) { return 1, nil }
getIntervalFunc := func() (time.Duration, error) { return 1 * time.Second, nil }

// Create an instance of the test executor which implements our mock function to
// be invoked by the executor under test.
Expand Down Expand Up @@ -79,9 +79,9 @@ func TestPausedWhileHandling(t *testing.T) {
}

// This test verifies that the executor can be paused and resumed.
func TestPauseAndUnapuseOrReset(t *testing.T) {
func TestPauseAndUnpauseOrReset(t *testing.T) {
testCases := []string{"Unpause", "Reset"}
getIntervalFunc := func() (int64, error) { return 1, nil }
getIntervalFunc := func() (time.Duration, error) { return 1 * time.Second, nil }

// The test is almost the same for both cases. The only difference is
// that we call Resume or Reset to start the executor again.
Expand Down Expand Up @@ -124,9 +124,9 @@ func TestPauseAndUnapuseOrReset(t *testing.T) {

// Depending on the test case, use Unpause or Reset to start the executor again.
if tc == "Unpause" {
executor.Unpause(1)
executor.Unpause()
} else {
executor.Reset(1)
executor.reset(1)
}

// This should result in handler function being called.
Expand All @@ -144,8 +144,8 @@ func TestPauseAndUnapuseOrReset(t *testing.T) {
func TestGetInterval(t *testing.T) {
// Arrange
intervalValue := int64(1)
getIntervalFunc := func() (int64, error) {
return atomic.LoadInt64(&intervalValue), nil
getIntervalFunc := func() (time.Duration, error) {
return time.Duration(atomic.LoadInt64(&intervalValue)) * time.Second, nil
}
executor, _ := NewPeriodicExecutor("", func() error { return nil }, getIntervalFunc)
defer executor.Shutdown()
Expand All @@ -155,15 +155,19 @@ func TestGetInterval(t *testing.T) {

// Assert
require.Eventually(t, func() bool {
return executor.GetInterval() == 10
return executor.GetInterval() == 10*time.Second
}, 5*time.Second, time.Second,
"test executor did not update the interval")
}

// Test that the executor name is returned properly.
func TestGetName(t *testing.T) {
// Arrange
executor, _ := NewPeriodicExecutor("foobar", func() error { return nil }, func() (int64, error) { return 1, nil })
executor, _ := NewPeriodicExecutor(
"foobar",
func() error { return nil },
func() (time.Duration, error) { return 1 * time.Second, nil },
)

// Act
name := executor.GetName()
Expand Down

0 comments on commit ccede78

Please sign in to comment.