From b3f803d487d5943a3fe5129cd38307bc83052dd6 Mon Sep 17 00:00:00 2001 From: Yuzuki Mimura <33110971+YZ775@users.noreply.github.com> Date: Mon, 9 Dec 2024 17:13:15 +0900 Subject: [PATCH] add feature to execute user-defined command when repair is successfully finished (#753) --- cluster.go | 3 +++ docs/cluster.md | 16 ++++++++-------- docs/repair.md | 6 ++++++ mtest/cke-cluster.yml | 2 ++ mtest/repair_test.go | 34 ++++++++++++++++++++++++++++++++-- op/repair_execute.go | 16 ++++++++++------ op/repair_finish.go | 43 ++++++++++++++++++++++++++++++++++++++++--- server/strategy.go | 8 ++++---- 8 files changed, 105 insertions(+), 23 deletions(-) diff --git a/cluster.go b/cluster.go index b5e14a50..9cab2ab3 100644 --- a/cluster.go +++ b/cluster.go @@ -304,6 +304,8 @@ type RepairOperation struct { RepairSteps []RepairStep `json:"repair_steps"` HealthCheckCommand []string `json:"health_check_command"` CommandTimeoutSeconds *int `json:"command_timeout_seconds,omitempty"` + SuccessCommand []string `json:"success_command,omitempty"` + SuccessCommandTimeout *int `json:"success_command_timeout,omitempty"` } type RepairStep struct { @@ -319,6 +321,7 @@ const DefaultMaxConcurrentRepairs = 1 const DefaultRepairEvictionTimeoutSeconds = 600 const DefaultRepairHealthCheckCommandTimeoutSeconds = 30 const DefaultRepairCommandTimeoutSeconds = 30 +const DefaultRepairSuccessCommandTimeoutSeconds = 30 // Options is a set of optional parameters for k8s components. type Options struct { diff --git a/docs/cluster.md b/docs/cluster.md index 30f0f96a..79a3ace2 100644 --- a/docs/cluster.md +++ b/docs/cluster.md @@ -124,12 +124,14 @@ The repair configurations control the [repair functionality](repair.md). #### RepairOperation -| Name | Required | Type | Description | -| ------------------------- | -------- | -------------- | --------------------------------------------------------------- | -| `operation` | true | string | Name of repair operation. | -| `repair_steps` | true | `[]RepairStep` | Sequences of [repair steps](#repairstep). | -| `health_check_command` | true | array | A command to check repaired machine's health. List of strings. | -| `command_timeout_seconds` | false | \*int | Deadline for health retrieval. Zero means infinity. Default: 30 | +| Name | Required | Type | Description | +| ------------------------- | -------- | -------------- | ----------------------------------------------------------------------------- | +| `operation` | true | string | Name of repair operation. | +| `repair_steps` | true | `[]RepairStep` | Sequences of [repair steps](#repairstep). | +| `health_check_command` | true | array | A command to check repaired machine's health. List of strings. | +| `command_timeout_seconds` | false | \*int | Deadline for health retrieval. Zero means infinity. Default: 30 | +| `success_command` | false | array | A command executed when repair succeeded. List of strings. | +| `success_command_timeout` | false | \*int | Deadline for execution of succcess_command. Zero means infinity. Default: 30 | ##### RepairStep @@ -296,6 +298,4 @@ It should end with either `.conf` or `.conflist`. Fields in `config` may have default values. Some fields are overwritten by CKE. Please see the source code for more details. -[CRI]: https://github.com/kubernetes/kubernetes/blob/242a97307b34076d5d8f5bbeb154fa4d97c9ef1d/docs/devel/container-runtime-interface.md -[log rotation for CRI runtime]: https://github.com/kubernetes/kubernetes/issues/58823 [LabelSelector]: https://kubernetes.io/docs/concepts/overview/working-with-objects/labels/#label-selectors diff --git a/docs/repair.md b/docs/repair.md index 906399b5..092925f3 100644 --- a/docs/repair.md +++ b/docs/repair.md @@ -108,6 +108,12 @@ CKE decides to execute a repair operations if its `operation` matches `OPERATION When CKE executes the check command, it appends the IP address of the target machine to the command. The command should return a string `true` if it evaluates the machine as healthy. +`success_command` and its timeout are used when the machine is evaluated as healthy and the repair operation finishes successfully. +When CKE executes the success command, it appends the IP address of the target machine to the command. +If the repair operation has failed, the command is not executed. +If the `success_command` fails, CKE changes the status of the queue entry to `failed`. +Users can use this command if they want to execute a command as a post-processing of repair operation. + ### Repair steps A repair step is a combination of: diff --git a/mtest/cke-cluster.yml b/mtest/cke-cluster.yml index c7858207..214a24de 100644 --- a/mtest/cke-cluster.yml +++ b/mtest/cke-cluster.yml @@ -28,6 +28,8 @@ repair: command_timeout_seconds: 30 need_drain: true watch_seconds: 30 + success_command: ["sh", "-c", "touch /tmp/mtest-repair-success-$1", "success"] + success_command_timeout_seconds: 30 health_check_command: ["sh", "-c", "test -f /tmp/mtest-repair-$1 && echo true", "health_check"] options: kube-api: diff --git a/mtest/repair_test.go b/mtest/repair_test.go index 668b6d70..9242f2e0 100644 --- a/mtest/repair_test.go +++ b/mtest/repair_test.go @@ -62,6 +62,17 @@ func repairShouldNotProceed() { }).WithTimeout(time.Second * 60).Should(Succeed()) } +func repairSuccessCommandSuccess(node string) { + cmdSuccess := false + for _, host := range []string{host1, host2} { + _, _, err := execAt(host, "docker", "exec", "cke", "test", "-f", "/tmp/mtest-repair-success-"+node) + if err == nil { + cmdSuccess = true + } + } + Expect(cmdSuccess).To(BeTrue()) +} + func testRepairOperations() { // this will run: // - RepairDrainStartOp @@ -110,15 +121,34 @@ func testRepairOperations() { repairQueueAdd(node1) waitRepairSuccess(cluster) nodesShouldBeSchedulable(node1) + repairSuccessCommandSuccess(node1) ckecliSafe("repair-queue", "delete-finished") waitRepairEmpty(cluster) + By("setting erroneous success command") + originalSuccessCommand := cluster.Repair.RepairProcedures[0].RepairOperations[0].SuccessCommand + cluster.Repair.RepairProcedures[0].RepairOperations[0].SuccessCommand = []string{"false"} + _, err := ckecliClusterSet(cluster) + Expect(err).NotTo(HaveOccurred()) + time.Sleep(time.Second * 3) + + repairQueueAdd(node1) + waitRepairFailure(cluster) + + ckecliSafe("repair-queue", "delete-finished") + waitRepairEmpty(cluster) + + By("restoring success command") + cluster.Repair.RepairProcedures[0].RepairOperations[0].SuccessCommand = originalSuccessCommand + _, err = ckecliClusterSet(cluster) + Expect(err).NotTo(HaveOccurred()) + time.Sleep(time.Second * 3) + By("setting erroneous repair command") originalRepairCommand := cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].RepairCommand - cluster.Repair.RepairProcedures[0].RepairOperations[0].RepairSteps[0].RepairCommand = []string{"false"} - _, err := ckecliClusterSet(cluster) + _, err = ckecliClusterSet(cluster) Expect(err).NotTo(HaveOccurred()) time.Sleep(time.Second * 3) diff --git a/op/repair_execute.go b/op/repair_execute.go index d855551a..4c19382b 100644 --- a/op/repair_execute.go +++ b/op/repair_execute.go @@ -13,14 +13,16 @@ import ( type repairExecuteOp struct { finished bool - entry *cke.RepairQueueEntry - step *cke.RepairStep + entry *cke.RepairQueueEntry + step *cke.RepairStep + cluster *cke.Cluster } -func RepairExecuteOp(entry *cke.RepairQueueEntry, step *cke.RepairStep) cke.Operator { +func RepairExecuteOp(entry *cke.RepairQueueEntry, step *cke.RepairStep, cluster *cke.Cluster) cke.Operator { return &repairExecuteOp{ - entry: entry, - step: step, + entry: entry, + step: step, + cluster: cluster, } } @@ -40,6 +42,7 @@ func (o *repairExecuteOp) NextCommand() cke.Commander { timeoutSeconds: o.step.CommandTimeoutSeconds, retries: o.step.CommandRetries, interval: o.step.CommandInterval, + cluster: o.cluster, } } @@ -53,6 +56,7 @@ type repairExecuteCommand struct { timeoutSeconds *int retries *int interval *int + cluster *cke.Cluster } func (c repairExecuteCommand) Run(ctx context.Context, inf cke.Infrastructure, _ string) error { @@ -110,7 +114,7 @@ RETRY: "address": c.entry.Address, "command": strings.Join(c.command, " "), }) - return repairFinish(ctx, inf, c.entry, false) + return repairFinish(ctx, inf, c.entry, false, c.cluster) } func (c repairExecuteCommand) Command() cke.Command { diff --git a/op/repair_finish.go b/op/repair_finish.go index fe889dbc..9a0b205c 100644 --- a/op/repair_finish.go +++ b/op/repair_finish.go @@ -5,6 +5,8 @@ import ( "time" "github.com/cybozu-go/cke" + "github.com/cybozu-go/log" + "github.com/cybozu-go/well" ) type repairFinishOp struct { @@ -12,12 +14,14 @@ type repairFinishOp struct { entry *cke.RepairQueueEntry succeeded bool + cluster *cke.Cluster } -func RepairFinishOp(entry *cke.RepairQueueEntry, succeeded bool) cke.Operator { +func RepairFinishOp(entry *cke.RepairQueueEntry, succeeded bool, cluster *cke.Cluster) cke.Operator { return &repairFinishOp{ entry: entry, succeeded: succeeded, + cluster: cluster, } } @@ -34,6 +38,7 @@ func (o *repairFinishOp) NextCommand() cke.Commander { return repairFinishCommand{ entry: o.entry, succeeded: o.succeeded, + cluster: o.cluster, } } @@ -44,10 +49,11 @@ func (o *repairFinishOp) Targets() []string { type repairFinishCommand struct { entry *cke.RepairQueueEntry succeeded bool + cluster *cke.Cluster } func (c repairFinishCommand) Run(ctx context.Context, inf cke.Infrastructure, _ string) error { - return repairFinish(ctx, inf, c.entry, c.succeeded) + return repairFinish(ctx, inf, c.entry, c.succeeded, c.cluster) } func (c repairFinishCommand) Command() cke.Command { @@ -57,9 +63,40 @@ func (c repairFinishCommand) Command() cke.Command { } } -func repairFinish(ctx context.Context, inf cke.Infrastructure, entry *cke.RepairQueueEntry, succeeded bool) error { +func repairFinish(ctx context.Context, inf cke.Infrastructure, entry *cke.RepairQueueEntry, succeeded bool, cluster *cke.Cluster) error { if succeeded { entry.Status = cke.RepairStatusSucceeded + //execute Success command + err := func() error { + op, err := entry.GetMatchingRepairOperation(cluster) + if err != nil { + return err + } + if op.SuccessCommand == nil { + return nil + } + ctx := ctx + timeout := cke.DefaultRepairSuccessCommandTimeoutSeconds + if op.SuccessCommandTimeout != nil { + timeout = *op.SuccessCommandTimeout + } + if timeout != 0 { + var cancel context.CancelFunc + ctx, cancel = context.WithTimeout(ctx, time.Second*time.Duration(timeout)) + defer cancel() + } + args := append(op.SuccessCommand[1:], entry.Address) + command := well.CommandContext(ctx, op.SuccessCommand[0], args...) + return command.Run() + }() + if err != nil { + entry.Status = cke.RepairStatusFailed + log.Warn("SuccessCommand failed", map[string]interface{}{ + log.FnError: err, + "index": entry.Index, + "address": entry.Address, + }) + } } else { entry.Status = cke.RepairStatusFailed } diff --git a/server/strategy.go b/server/strategy.go index a29b3c3d..5c849c87 100644 --- a/server/strategy.go +++ b/server/strategy.go @@ -731,7 +731,7 @@ func repairOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constrain continue } if rqs.RepairCompleted[entry.Address] { - ops = append(ops, op.RepairFinishOp(entry, true)) + ops = append(ops, op.RepairFinishOp(entry, true, c)) continue } switch entry.Status { @@ -826,7 +826,7 @@ func repairOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constrain // Though ErrRepairStepOutOfRange may be caused by real misconfiguration, // e.g., by decreasing "repair_steps" in cluster.yaml, we treat the error // as the end of the steps for simplicity. - ops = append(ops, op.RepairFinishOp(entry, false)) + ops = append(ops, op.RepairFinishOp(entry, false, c)) continue } @@ -838,7 +838,7 @@ func repairOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constrain continue } if !(step.NeedDrain && entry.IsInCluster()) { - ops = append(ops, op.RepairExecuteOp(entry, step)) + ops = append(ops, op.RepairExecuteOp(entry, step, c)) continue } // DrainBackOffExpire has been confirmed, so start drain now. @@ -849,7 +849,7 @@ func repairOps(c *cke.Cluster, cs *cke.ClusterStatus, constraints *cke.Constrain continue } if rqs.DrainCompleted[entry.Address] { - ops = append(ops, op.RepairExecuteOp(entry, step)) + ops = append(ops, op.RepairExecuteOp(entry, step, c)) continue } if entry.LastTransitionTime.Before(evictionStartLimit) {