diff --git a/op/reboot.go b/op/reboot.go index 03817c8a..d870d3ae 100644 --- a/op/reboot.go +++ b/op/reboot.go @@ -568,6 +568,13 @@ func drainBackOff(ctx context.Context, inf cke.Infrastructure, entry *cke.Reboot "name": entry.Node, log.FnError: err, }) + etcdEntry, err := inf.Storage().GetRebootsEntry(ctx, entry.Index) + if err != nil { + return err + } + if etcdEntry.Status == cke.RebootStatusCancelled { + return nil + } entry.Status = cke.RebootStatusQueued entry.LastTransitionTime = time.Now().Truncate(time.Second).UTC() entry.DrainBackOffCount++ diff --git a/op/reboot_decide.go b/op/reboot_decide.go index c72e6b88..6f8f388c 100644 --- a/op/reboot_decide.go +++ b/op/reboot_decide.go @@ -196,12 +196,10 @@ func ChooseDrainedNodes(c *cke.Cluster, apiServers map[string]bool, rqEntries [] return nil } } - if len(workerInProgress) >= maxConcurrentReboots { - return nil - } else if len(workerInProgress)+len(workerDrainable) <= maxConcurrentReboots { - return workerDrainable + if len(workerInProgress) < maxConcurrentReboots && len(workerDrainable) > 0 { + return workerDrainable[:1] } else { - return workerDrainable[:maxConcurrentReboots-len(workerInProgress)] + return nil } } diff --git a/server/strategy.go b/server/strategy.go index c02704e5..bc5e5c46 100644 --- a/server/strategy.go +++ b/server/strategy.go @@ -104,7 +104,7 @@ func DecideOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constrain } // 11. Reboot nodes if reboot request has been arrived to the reboot queue, and the number of unreachable nodes is less than a threshold. - if ops, phaseReboot := rebootOps(c, constraints, rebootArgs, nf); phaseReboot { + if ops := rebootOps(c, constraints, rebootArgs, nf); len(ops) > 0 { if !nf.EtcdIsGood() { log.Warn("cannot reboot nodes because etcd cluster is not responding and in-sync", nil) return nil, cke.PhaseRebootNodes @@ -871,26 +871,33 @@ func repairOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constrain return ops, phaseRepair } -func rebootOps(c *cke.Cluster, constraints *cke.Constraints, rebootArgs DecideOpsRebootArgs, nf *NodeFilter) (ops []cke.Operator, phaseReboot bool) { +func rebootOps(c *cke.Cluster, constraints *cke.Constraints, rebootArgs DecideOpsRebootArgs, nf *NodeFilter) (ops []cke.Operator) { if len(rebootArgs.RQEntries) == 0 { - return nil, false + return nil } if len(c.Reboot.RebootCommand) == 0 { log.Warn("reboot command is not specified in the cluster configuration", nil) - return nil, false + return nil } if len(c.Reboot.BootCheckCommand) == 0 { log.Warn("boot check command is not specified in the cluster configuration", nil) - return nil, false + return nil } if len(rebootArgs.RebootCancelled) > 0 { - phaseReboot = true ops = append(ops, op.RebootCancelOp(rebootArgs.RebootCancelled)) - return ops, phaseReboot + } + if len(rebootArgs.RebootDequeued) > 0 { + ops = append(ops, op.RebootDequeueOp(rebootArgs.RebootDequeued)) + } + if len(ops) > 0 { + return ops + } + + if len(rebootArgs.DrainCompleted) > 0 { + ops = append(ops, op.RebootRebootOp(nf.HealthyAPIServer(), rebootArgs.DrainCompleted, &c.Reboot)) } if len(rebootArgs.NewlyDrained) > 0 { - phaseReboot = true sshCheckNodes := make([]*cke.Node, 0, len(nf.cluster.Nodes)) for _, node := range nf.cluster.Nodes { if !rebootProcessing(rebootArgs.RQEntries, node.Address) { @@ -903,23 +910,11 @@ func rebootOps(c *cke.Cluster, constraints *cke.Constraints, rebootArgs DecideOp ops = append(ops, op.RebootDrainStartOp(nf.HealthyAPIServer(), rebootArgs.NewlyDrained, &c.Reboot)) } } - if len(rebootArgs.DrainCompleted) > 0 { - phaseReboot = true - ops = append(ops, op.RebootRebootOp(nf.HealthyAPIServer(), rebootArgs.DrainCompleted, &c.Reboot)) - } if len(rebootArgs.DrainTimedout) > 0 { - phaseReboot = true ops = append(ops, op.RebootDrainTimeoutOp(rebootArgs.DrainTimedout)) } - if len(rebootArgs.RebootDequeued) > 0 { - phaseReboot = true - ops = append(ops, op.RebootDequeueOp(rebootArgs.RebootDequeued)) - } - if len(ops) > 0 { - phaseReboot = true - } - return ops, phaseReboot + return ops } func rebootUncordonOp(cs *cke.ClusterStatus, rqEntries []*cke.RebootQueueEntry, nf *NodeFilter) cke.Operator {