Skip to content

Commit

Permalink
Merge pull request #744 from cybozu-go/delay-repair-if-non-k8s-node
Browse files Browse the repository at this point in the history
Delay repair of an out-of-cluster unreachable machine
  • Loading branch information
morimoto-cybozu authored Jun 24, 2024
2 parents 2b16d7c + 0d82b02 commit da4e8da
Show file tree
Hide file tree
Showing 5 changed files with 84 additions and 30 deletions.
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ This project employs a versioning scheme described in [RELEASE.md](RELEASE.md#ve

## [Unreleased]

- Delay repair of an out-of-cluster unreachable machine in [#744](https://github.com/cybozu-go/cke/pull/744)

## [1.28.4]

### Changed
Expand Down
6 changes: 5 additions & 1 deletion docs/sabakan-triggered-repair.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,11 @@ As stated above, CKE considers all persisting queue entries as "recent" for simp
A machine may become "UNREACHABLE" very quickly even if it is [being rebooted in a planned manner](reboot.md).
CKE should wait for a while before starting repair operations for a rebooting machine.

A user can [configure the wait time](ckecli.md#ckecli-constraints-set-name-value) as a [constraint `wait-seconds-to-repair-rebooting`](constraints.md)
A user can [configure the wait time](ckecli.md#ckecli-constraints-set-name-value) as a [constraint `wait-seconds-to-repair-rebooting`](constraints.md).

CKE does not manage the reboot operations of out-of-cluster machines.
It cannot distinguish between the reboot and the crash of an out-of-cluster machine.
To avoid filling the repair queue with unnecessary entries, CKE waits for a while also before repairing an out-of-cluster machine.

[sabakan]: https://github.com/cybozu-go/sabakan
[schema]: https://github.com/cybozu-go/sabakan/blob/main/gql/graph/schema.graphqls
2 changes: 1 addition & 1 deletion sabakan/integrate.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,7 +185,7 @@ func (ig integrator) runRepairer(ctx context.Context, clusterStatus *cke.Cluster
return err
}

entries := Repairer(machines, clusterStatus.RepairQueue.Entries, rebootEntries, constraints, time.Now().UTC())
entries := Repairer(machines, clusterStatus.RepairQueue.Entries, rebootEntries, clusterStatus.NodeStatuses, constraints)

for _, entry := range entries {
err := st.RegisterRepairsEntry(ctx, entry)
Expand Down
40 changes: 25 additions & 15 deletions sabakan/repairer.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,24 +2,22 @@ package sabakan

import (
"strings"
"time"

"github.com/cybozu-go/cke"
"github.com/cybozu-go/log"
)

func Repairer(machines []Machine, repairEntries []*cke.RepairQueueEntry, rebootEntries []*cke.RebootQueueEntry, constraints *cke.Constraints, now time.Time) []*cke.RepairQueueEntry {
func Repairer(machines []Machine, repairEntries []*cke.RepairQueueEntry, rebootEntries []*cke.RebootQueueEntry, nodeStatuses map[string]*cke.NodeStatus, constraints *cke.Constraints) []*cke.RepairQueueEntry {
recent := make(map[string]bool)
for _, entry := range repairEntries {
// entry.Operation is ignored when checking duplication
recent[entry.Address] = true
}

rebootLimit := now.Add(time.Duration(-constraints.RepairRebootingSeconds) * time.Second)
rebootingSince := make(map[string]time.Time)
rebooting := make(map[string]bool)
for _, entry := range rebootEntries {
if entry.Status == cke.RebootStatusRebooting {
rebootingSince[entry.Node] = entry.LastTransitionTime // entry.Node denotes IP address
rebooting[entry.Node] = true // entry.Node denotes IP address
}
}

Expand All @@ -32,21 +30,33 @@ func Repairer(machines []Machine, repairEntries []*cke.RepairQueueEntry, rebootE
continue
}

if recent[machine.Spec.IPv4[0]] {
serial := machine.Spec.Serial
address := machine.Spec.IPv4[0]

if recent[address] {
log.Warn("ignore recently-repaired non-healthy machine", map[string]interface{}{
"serial": machine.Spec.Serial,
"address": machine.Spec.IPv4[0],
"serial": serial,
"address": address,
})
continue
}

since, ok := rebootingSince[machine.Spec.IPv4[0]]
if ok && since.After(rebootLimit) && machine.Status.State == StateUnreachable {
log.Info("ignore rebooting unreachable machine", map[string]interface{}{
"serial": machine.Spec.Serial,
"address": machine.Spec.IPv4[0],
})
continue
if machine.Status.State == StateUnreachable && machine.Status.Duration < float64(constraints.RepairRebootingSeconds) {
if rebooting[address] {
log.Info("ignore rebooting unreachable machine", map[string]interface{}{
"serial": serial,
"address": address,
})
continue
}

if _, ok := nodeStatuses[address]; !ok {
log.Info("ignore out-of-cluster unreachable machine", map[string]interface{}{
"serial": serial,
"address": address,
})
continue
}
}

newMachines = append(newMachines, machine)
Expand Down
64 changes: 51 additions & 13 deletions sabakan/repairer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package sabakan

import (
"testing"
"time"

"github.com/cybozu-go/cke"
"github.com/google/go-cmp/cmp"
Expand All @@ -17,10 +16,10 @@ func TestRepairer(t *testing.T) {

machines := []Machine{
{Spec: MachineSpec{Serial: "0000"}, Status: MachineStatus{State: StateUnhealthy}},
{Spec: MachineSpec{Serial: "1111", IPv4: []string{"1.1.1.1"}, BMC: BMC{Type: "type1"}}, Status: MachineStatus{State: StateUnreachable}},
{Spec: MachineSpec{Serial: "2222", IPv4: []string{"2.2.2.2"}, BMC: BMC{Type: "type2"}}, Status: MachineStatus{State: StateUnhealthy}},
{Spec: MachineSpec{Serial: "3333", IPv4: []string{"3.3.3.3"}, BMC: BMC{Type: "type3"}}, Status: MachineStatus{State: StateUnreachable}},
{Spec: MachineSpec{Serial: "4444", IPv4: []string{"4.4.4.4"}, BMC: BMC{Type: "type4"}}, Status: MachineStatus{State: StateUnreachable}},
{Spec: MachineSpec{Serial: "1111", IPv4: []string{"1.1.1.1"}, BMC: BMC{Type: "type1"}}, Status: MachineStatus{State: StateUnreachable, Duration: 30}},
{Spec: MachineSpec{Serial: "2222", IPv4: []string{"2.2.2.2"}, BMC: BMC{Type: "type2"}}, Status: MachineStatus{State: StateUnhealthy, Duration: 30}},
{Spec: MachineSpec{Serial: "3333", IPv4: []string{"3.3.3.3"}, BMC: BMC{Type: "type3"}}, Status: MachineStatus{State: StateUnreachable, Duration: 3000}},
{Spec: MachineSpec{Serial: "4444", IPv4: []string{"4.4.4.4"}, BMC: BMC{Type: "type4"}}, Status: MachineStatus{State: StateUnreachable, Duration: 30}},
}

entries := []*cke.RepairQueueEntry{
Expand All @@ -31,102 +30,141 @@ func TestRepairer(t *testing.T) {
cke.NewRepairQueueEntry("unreachable", "type4", "4.4.4.4"),
}

now := time.Now().UTC()
recent := now.Add(-30 * time.Second)
stale := now.Add(-3000 * time.Second)
rebootEntries := []*cke.RebootQueueEntry{
nil,
{Node: "1.1.1.1", Status: cke.RebootStatusRebooting, LastTransitionTime: recent},
{Node: "2.2.2.2", Status: cke.RebootStatusRebooting, LastTransitionTime: recent},
{Node: "3.3.3.3", Status: cke.RebootStatusRebooting, LastTransitionTime: stale},
{Node: "4.4.4.4", Status: cke.RebootStatusDraining, LastTransitionTime: recent},
{Node: "1.1.1.1", Status: cke.RebootStatusRebooting},
{Node: "2.2.2.2", Status: cke.RebootStatusRebooting},
{Node: "3.3.3.3", Status: cke.RebootStatusRebooting},
{Node: "4.4.4.4", Status: cke.RebootStatusDraining},
}

nodeStatuses := map[string]*cke.NodeStatus{
"1.1.1.1": nil,
"2.2.2.2": nil,
"3.3.3.3": nil,
"4.4.4.4": nil,
}

tests := []struct {
name string
failedMachines []Machine
queuedEntries []*cke.RepairQueueEntry
rebootEntries []*cke.RebootQueueEntry
nodeStatuses map[string]*cke.NodeStatus
expectedEntries []*cke.RepairQueueEntry
}{
{
name: "NoFailedMachine",
failedMachines: []Machine{},
queuedEntries: []*cke.RepairQueueEntry{entries[2]},
rebootEntries: nil,
nodeStatuses: nodeStatuses,
expectedEntries: []*cke.RepairQueueEntry{},
},
{
name: "OneFailedMachine",
failedMachines: []Machine{machines[1]},
queuedEntries: []*cke.RepairQueueEntry{entries[2]},
rebootEntries: nil,
nodeStatuses: nodeStatuses,
expectedEntries: []*cke.RepairQueueEntry{entries[1]},
},
{
name: "IgnoreNoIPAddress",
failedMachines: []Machine{machines[0], machines[1]},
queuedEntries: []*cke.RepairQueueEntry{entries[2]},
rebootEntries: nil,
nodeStatuses: nodeStatuses,
expectedEntries: []*cke.RepairQueueEntry{entries[1]},
},
{
name: "IgnoreRecentlyRepaired",
failedMachines: []Machine{machines[1], machines[2], machines[3]},
queuedEntries: []*cke.RepairQueueEntry{entries[2]},
rebootEntries: nil,
nodeStatuses: nodeStatuses,
expectedEntries: []*cke.RepairQueueEntry{entries[1], entries[3]},
},
{
name: "IgnoreRecentlyRepairedWithDifferentOperation",
failedMachines: []Machine{machines[1], machines[2], machines[3]},
queuedEntries: []*cke.RepairQueueEntry{cke.NewRepairQueueEntry("unreachable", "type2", "2.2.2.2")},
rebootEntries: nil,
nodeStatuses: nodeStatuses,
expectedEntries: []*cke.RepairQueueEntry{entries[1], entries[3]},
},
{
name: "IgnoreTooManyFailedMachines",
failedMachines: []Machine{machines[1], machines[2], machines[3]},
queuedEntries: []*cke.RepairQueueEntry{entries[2], entries[4]},
rebootEntries: nil,
nodeStatuses: nodeStatuses,
expectedEntries: []*cke.RepairQueueEntry{},
},
{
name: "IgnoreRebootingUnreachableMachine",
failedMachines: []Machine{machines[1]},
queuedEntries: []*cke.RepairQueueEntry{},
rebootEntries: []*cke.RebootQueueEntry{rebootEntries[1]},
nodeStatuses: nodeStatuses,
expectedEntries: []*cke.RepairQueueEntry{},
},
{
name: "RebootingButUnhealthy",
failedMachines: []Machine{machines[2]},
queuedEntries: []*cke.RepairQueueEntry{},
rebootEntries: []*cke.RebootQueueEntry{rebootEntries[2]},
nodeStatuses: nodeStatuses,
expectedEntries: []*cke.RepairQueueEntry{entries[2]},
},
{
name: "RebootingButStale",
failedMachines: []Machine{machines[3]},
queuedEntries: []*cke.RepairQueueEntry{},
rebootEntries: []*cke.RebootQueueEntry{rebootEntries[3]},
nodeStatuses: nodeStatuses,
expectedEntries: []*cke.RepairQueueEntry{entries[3]},
},
{
name: "QueuedButNotRebooting",
failedMachines: []Machine{machines[4]},
queuedEntries: []*cke.RepairQueueEntry{},
rebootEntries: []*cke.RebootQueueEntry{rebootEntries[4]},
nodeStatuses: nodeStatuses,
expectedEntries: []*cke.RepairQueueEntry{entries[4]},
},
{
name: "IgnoreOutOfClusterUnreachableMachine",
failedMachines: []Machine{machines[1]},
queuedEntries: []*cke.RepairQueueEntry{},
rebootEntries: []*cke.RebootQueueEntry{rebootEntries[1]},
nodeStatuses: nil,
expectedEntries: []*cke.RepairQueueEntry{},
},
{
name: "OutOfClusterButUnhealthy",
failedMachines: []Machine{machines[2]},
queuedEntries: []*cke.RepairQueueEntry{},
rebootEntries: []*cke.RebootQueueEntry{rebootEntries[2]},
nodeStatuses: nil,
expectedEntries: []*cke.RepairQueueEntry{entries[2]},
},
{
name: "OutOfClusterButStale",
failedMachines: []Machine{machines[3]},
queuedEntries: []*cke.RepairQueueEntry{},
rebootEntries: []*cke.RebootQueueEntry{rebootEntries[3]},
nodeStatuses: nil,
expectedEntries: []*cke.RepairQueueEntry{entries[3]},
},
}

for _, tt := range tests {
tt := tt
t.Run(tt.name, func(t *testing.T) {
t.Parallel()

entries := Repairer(tt.failedMachines, tt.queuedEntries, tt.rebootEntries, constraints, now)
entries := Repairer(tt.failedMachines, tt.queuedEntries, tt.rebootEntries, tt.nodeStatuses, constraints)
if !cmp.Equal(entries, tt.expectedEntries, cmpopts.EquateEmpty()) {
t.Errorf("!cmp.Equal(entries, tt.newEntries), actual: %v, expected: %v", entries, tt.expectedEntries)
}
Expand Down

0 comments on commit da4e8da

Please sign in to comment.