diff --git a/CHANGELOG.md b/CHANGELOG.md index db87a888..f1ca93d7 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,6 +5,10 @@ This project employs a versioning scheme described in [RELEASE.md](RELEASE.md#ve ## [Unreleased] +### Added + +- Add sabakan-triggered automatic repair functionality in [#725](https://github.com/cybozu-go/cke/pull/725) + ## [1.28.0] ### Changed diff --git a/constraints.go b/constraints.go index 1e110d21..89d4337c 100644 --- a/constraints.go +++ b/constraints.go @@ -8,6 +8,7 @@ type Constraints struct { MinimumWorkers int `json:"minimum-workers"` MaximumWorkers int `json:"maximum-workers"` RebootMaximumUnreachable int `json:"maximum-unreachable-nodes-for-reboot"` + MaximumRepairs int `json:"maximum-repair-queue-entries"` } // Check checks the cluster satisfies the constraints @@ -41,5 +42,6 @@ func DefaultConstraints() *Constraints { MinimumWorkers: 1, MaximumWorkers: 0, RebootMaximumUnreachable: 0, + MaximumRepairs: 0, } } diff --git a/docs/ckecli.md b/docs/ckecli.md index fd325dc0..70afac5c 100644 --- a/docs/ckecli.md +++ b/docs/ckecli.md @@ -67,6 +67,11 @@ $ ckecli [--config FILE] args... - [`ckecli sabakan get-template`](#ckecli-sabakan-get-template) - [`ckecli sabakan set-variables FILE`](#ckecli-sabakan-set-variables-file) - [`ckecli sabakan get-variables`](#ckecli-sabakan-get-variables) +- [`ckecli auto-repair`](#ckecli-auto-repair) + - [`ckecli auto-repair enable|disable`](#ckecli-auto-repair-enabledisable) + - [`ckecli auto-repair is-enabled`](#ckecli-auto-repair-is-enabled) + - [`ckecli auto-repair set-variables FILE`](#ckecli-auto-repair-set-variables-file) + - [`ckecli auto-repair get-variables`](#ckecli-auto-repair-get-variables) - [`ckecli status`](#ckecli-status) ## `ckecli cluster` @@ -91,6 +96,7 @@ Set a constraint on the cluster configuration. - `minimum-workers` - `maximum-workers` - `maximum-unreachable-nodes-for-reboot` +- `maximum-repair-queue-entries` ### `ckecli constraints show` @@ -408,12 +414,32 @@ Get the cluster configuration template. ### `ckecli sabakan set-variables FILE` -Set the query variables to search machines in sabakan. +Set the query variables to search available machines in sabakan. `FILE` should contain JSON as described in [sabakan integration](sabakan-integration.md#variables). ### `ckecli sabakan get-variables` -Get the query variables to search machines in sabakan. +Get the query variables to search available machines in sabakan. + +## `ckecli auto-repair` + +### `ckecli auto-repair enable|disable` + +Enable/Disable [sabakan-triggered automatic repair](sabakan-triggered-repair.md). + +### `ckecli auto-repair is-enabled` + +Show sabakan-triggered automatic repair is enabled or disabled. +It displays `true` or `false`. + +### `ckecli auto-repair set-variables FILE` + +Set the query variables to search non-healthy machines in sabakan. +`FILE` should contain JSON as described in [sabakan-triggered automatic repair](sabakan-triggered-repair.md#query). + +### `ckecli auto-repair get-variables` + +Get the query variables to search non-healthy machines in sabakan. ## `ckecli status` diff --git a/docs/constraints.md b/docs/constraints.md index a451b70a..89ee9c9d 100644 --- a/docs/constraints.md +++ b/docs/constraints.md @@ -12,3 +12,4 @@ Cluster should satisfy these constraints. | `minimum-workers` | int | 1 | The minimum number of worker nodes | | `maximum-workers` | int | 0 | The maximum number of worker nodes. 0 means unlimited. | | `maximum-unreachable-nodes-for-reboot` | int | 0 | The maximum number of unreachable nodes allowed for operating reboot. | +| `maximum-repair-queue-entries` | int | 0 | The maximum number of repair queue entries | diff --git a/docs/sabakan-triggered-repair.md b/docs/sabakan-triggered-repair.md new file mode 100644 index 00000000..7d4c6460 --- /dev/null +++ b/docs/sabakan-triggered-repair.md @@ -0,0 +1,86 @@ +Automatic repair triggered by sabakan +===================================== + +[Sabakan][sabakan] is management software for server machines in a data center. +It stores the status information of machines as well as their spec information. +By referring to machines' status information in sabakan, CKE can initiate the repair of a non-healthy machine. + +This functionality is similar to [sabakan integration](sabakan-integration.md). + +How it works +------------ + +CKE periodically queries sabakan to retrieve machines' status information in a data center. +If CKE finds non-healthy machines, it creates [repair queue entries](repair.md) for those machines. + +The fields of a repair queue entry are determined based on the [information of the non-healthy machine](https://github.com/cybozu-go/sabakan/blob/main/docs/machine.md). +* `address`: `.spec.ipv4[0]` +* `machine_type`: `.spec.bmc.type` +* `operation`: `.status.state` + +Users can configure the query to choose non-healthy machines. +The queries are executed via sabakan [GraphQL `searchMachines`](https://github.com/cybozu-go/sabakan/blob/master/docs/graphql.md) API. + +Query +----- + +CKE uses the following GraphQL query to retrieve machine information from sabakan. + +``` +query ckeSearch($having: MachineParams, $notHaving: MachineParams) { + searchMachines(having: $having, notHaving: $notHaving) { + # snip + } +} +``` + +The following values are used for `$having` and `$notHaving` variables by default. +Users can change these values by [specifying a JSON object](ckecli.md#ckecli-auto-repair-set-variables-file). + +```json +{ + "having": { + "states": ["UNHEALTHY", "UNREACHABLE"] + }, + "notHaving": { + "roles": ["boot"] + } +} +``` + +The type of `$having` and `$notHaving` is `MachineParams`. +Consult [GraphQL schema][schema] for the definition of `MachineParams`. + +Enqueue limiters +---------------- + +### Limiter for a single machine + +In order not to repeat repair operations too quickly for a single unstable machine, CKE checks recent repair queue entries before enqueueing. +If it finds a recent entry for the machine in question, no matter whether the entry has finished or not, it refrains from creating an additional entry. + +CKE considers all persisting queue entries as "recent" for simplicity. +A user should delete a finished repair queue entry for a machine once they consider the machine repaired. +* If a repair queue entry has finished with success and a user considers the machine stable, they should delete the finished entry. +* If a repair queue entry has finished with failure or a user considers the machine unstable, they should repair the machine manually. After the machine gets repaired, they should delete the finished entry. + +### Limiter for a cluster + +Sabakan may occasionally report false-positive non-healthy machines. +If CKE believes all of the failure reports and initiates a lot of repair operations, the Kubernetes cluster will be stuck -- or worse, corrupted. + +Even when the failure reports are correct, it would be good for CKE to refrain from repairing too many machines. +For example, the failure of many servers might be caused by the temporary power failure of a whole server rack. +In that case, CKE should not mark the machines unrepairable as a result of pointless repair operations. +Once the machines are marked unrepairable, sabakan will delete all data on those machines. + +In order not to initiate too many repair operations, CKE checks the number of recent repair queue entries plus the number of new failure reports before enqueueing. +If it finds excessive numbers of entries/reports, no matter whether the entries have finished or not, it refrains from creating an additional entry. + +The maximum number of recent repair queue entries and new failure reports is [configurable](ckecli.md#ckecli-constraints-set-name-value) as a [constraint `maximum-repair-queue-entries`](constraints.md). + +As stated above, CKE considers all persisting queue entries as "recent" for simplicity. + + +[sabakan]: https://github.com/cybozu-go/sabakan +[schema]: https://github.com/cybozu-go/sabakan/blob/master/gql/schema.graphql diff --git a/mtest/ckecli_test.go b/mtest/ckecli_test.go index f8054266..66062c7b 100644 --- a/mtest/ckecli_test.go +++ b/mtest/ckecli_test.go @@ -137,4 +137,13 @@ func testCKECLI() { ckecliSafe("sabakan", "enable") ckecliSafe("sabakan", "get-url") }) + + It("should invoke auto-repair subcommand successfully", func() { + ckecliSafe("auto-repair", "is-enabled") + ckecliSafe("auto-repair", "disable") + ckecliSafe("auto-repair", "enable") + f := remoteTempFile(`{"having":{"states":["UNHEALTHY","UNREACHABLE"]},"notHaving":{"roles":["boot"]}}`) + ckecliSafe("auto-repair", "set-variables", f) + ckecliSafe("auto-repair", "get-variables") + }) } diff --git a/pkg/ckecli/cmd/auto_repair.go b/pkg/ckecli/cmd/auto_repair.go new file mode 100644 index 00000000..e1eeedd1 --- /dev/null +++ b/pkg/ckecli/cmd/auto_repair.go @@ -0,0 +1,16 @@ +package cmd + +import ( + "github.com/spf13/cobra" +) + +// autoRepairCmd represents the auto-repair command +var autoRepairCmd = &cobra.Command{ + Use: "auto-repair", + Short: "auto-repair subcommand", + Long: `auto-repair subcommand`, +} + +func init() { + rootCmd.AddCommand(autoRepairCmd) +} diff --git a/pkg/ckecli/cmd/auto_repair_disable.go b/pkg/ckecli/cmd/auto_repair_disable.go new file mode 100644 index 00000000..78ec0930 --- /dev/null +++ b/pkg/ckecli/cmd/auto_repair_disable.go @@ -0,0 +1,26 @@ +package cmd + +import ( + "context" + + "github.com/cybozu-go/well" + "github.com/spf13/cobra" +) + +var autoRepairDisableCmd = &cobra.Command{ + Use: "disable", + Short: "disable sabakan-triggered automatic repair", + Long: `Disable sabakan-triggered automatic repair.`, + + RunE: func(cmd *cobra.Command, args []string) error { + well.Go(func(ctx context.Context) error { + return storage.EnableAutoRepair(ctx, false) + }) + well.Stop() + return well.Wait() + }, +} + +func init() { + autoRepairCmd.AddCommand(autoRepairDisableCmd) +} diff --git a/pkg/ckecli/cmd/auto_repair_enable.go b/pkg/ckecli/cmd/auto_repair_enable.go new file mode 100644 index 00000000..76f70ff7 --- /dev/null +++ b/pkg/ckecli/cmd/auto_repair_enable.go @@ -0,0 +1,26 @@ +package cmd + +import ( + "context" + + "github.com/cybozu-go/well" + "github.com/spf13/cobra" +) + +var autoRepairEnableCmd = &cobra.Command{ + Use: "enable", + Short: "enable sabakan-triggered automatic repair", + Long: `Enable sabakan-triggered automatic repair.`, + + RunE: func(cmd *cobra.Command, args []string) error { + well.Go(func(ctx context.Context) error { + return storage.EnableAutoRepair(ctx, true) + }) + well.Stop() + return well.Wait() + }, +} + +func init() { + autoRepairCmd.AddCommand(autoRepairEnableCmd) +} diff --git a/pkg/ckecli/cmd/auto_repair_get_variables.go b/pkg/ckecli/cmd/auto_repair_get_variables.go new file mode 100644 index 00000000..44ed25d7 --- /dev/null +++ b/pkg/ckecli/cmd/auto_repair_get_variables.go @@ -0,0 +1,33 @@ +package cmd + +import ( + "context" + "os" + + "github.com/cybozu-go/well" + "github.com/spf13/cobra" +) + +// autoRepairGetVariablesCmd represents the "auto-repair get-variables" command +var autoRepairGetVariablesCmd = &cobra.Command{ + Use: "get-variables", + Short: "get the query variables to search non-healthy machines in sabakan", + Long: `Get the query variables to search non-healthy machines in sabakan.`, + + RunE: func(cmd *cobra.Command, args []string) error { + well.Go(func(ctx context.Context) error { + data, err := storage.GetAutoRepairQueryVariables(ctx) + if err != nil { + return err + } + os.Stdout.Write(data) + return nil + }) + well.Stop() + return well.Wait() + }, +} + +func init() { + autoRepairCmd.AddCommand(autoRepairGetVariablesCmd) +} diff --git a/pkg/ckecli/cmd/auto_repair_is_enabled.go b/pkg/ckecli/cmd/auto_repair_is_enabled.go new file mode 100644 index 00000000..65aa6ec3 --- /dev/null +++ b/pkg/ckecli/cmd/auto_repair_is_enabled.go @@ -0,0 +1,32 @@ +package cmd + +import ( + "context" + "fmt" + + "github.com/cybozu-go/well" + "github.com/spf13/cobra" +) + +var autoRepairIsEnabledCmd = &cobra.Command{ + Use: "is-enabled", + Short: "show sabakan-triggered automatic repair status", + Long: `Show whether sabakan-triggered automatic repair is enabled or not. "true" if enabled.`, + + RunE: func(cmd *cobra.Command, args []string) error { + well.Go(func(ctx context.Context) error { + disabled, err := storage.IsAutoRepairDisabled(ctx) + if err != nil { + return err + } + fmt.Println(!disabled) + return nil + }) + well.Stop() + return well.Wait() + }, +} + +func init() { + autoRepairCmd.AddCommand(autoRepairIsEnabledCmd) +} diff --git a/pkg/ckecli/cmd/auto_repair_set_variables.go b/pkg/ckecli/cmd/auto_repair_set_variables.go new file mode 100644 index 00000000..e4f047a3 --- /dev/null +++ b/pkg/ckecli/cmd/auto_repair_set_variables.go @@ -0,0 +1,61 @@ +package cmd + +import ( + "context" + "encoding/json" + "os" + + "github.com/cybozu-go/cke/sabakan" + "github.com/cybozu-go/well" + "github.com/spf13/cobra" +) + +// autoRepairSetVariablesCmd represents the "auto-repair set-variables" command +var autoRepairSetVariablesCmd = &cobra.Command{ + Use: "set-variables FILE", + Short: "set the query variables to search non-healthy machines in sabakan", + Long: `Set the query variables to search non-healthy machines in sabakan. + +FILE should contain a JSON object like this: + + { + "having": { + "labels": [{"name": "foo", "value": "bar"}], + "racks": [0, 1, 2], + "roles": ["worker"], + "states": ["UNREACHABLE"], + "minDaysBeforeRetire": 90 + }, + "notHaving": { + } + } +`, + + Args: cobra.ExactArgs(1), + RunE: func(cmd *cobra.Command, args []string) error { + data, err := os.ReadFile(args[0]) + if err != nil { + return err + } + + vars := new(sabakan.QueryVariables) + err = json.Unmarshal(data, vars) + if err != nil { + return err + } + err = vars.IsValid() + if err != nil { + return err + } + + well.Go(func(ctx context.Context) error { + return storage.SetAutoRepairQueryVariables(ctx, string(data)) + }) + well.Stop() + return well.Wait() + }, +} + +func init() { + autoRepairCmd.AddCommand(autoRepairSetVariablesCmd) +} diff --git a/pkg/ckecli/cmd/constraints_set.go b/pkg/ckecli/cmd/constraints_set.go index 50b42156..36b0d6b0 100644 --- a/pkg/ckecli/cmd/constraints_set.go +++ b/pkg/ckecli/cmd/constraints_set.go @@ -23,6 +23,7 @@ NAME is one of: minimum-workers maximum-workers maximum-unreachable-nodes-for-reboot + maximum-repair-queue-entries VALUE is an integer.`, @@ -53,6 +54,10 @@ VALUE is an integer.`, cstrSet = func(cstr *cke.Constraints) { cstr.RebootMaximumUnreachable = val } + case "maximum-repair-queue-entries": + cstrSet = func(cstr *cke.Constraints) { + cstr.MaximumRepairs = val + } default: return errors.New("no such constraint: " + args[0]) } diff --git a/pkg/ckecli/cmd/sabakan_get_variables.go b/pkg/ckecli/cmd/sabakan_get_variables.go index c5f493e1..53523438 100644 --- a/pkg/ckecli/cmd/sabakan_get_variables.go +++ b/pkg/ckecli/cmd/sabakan_get_variables.go @@ -11,8 +11,8 @@ import ( // sabakanGetVariablesCmd represents the "sabakan get-variables" command var sabakanGetVariablesCmd = &cobra.Command{ Use: "get-variables", - Short: "get the query variables to search machines in sabakan", - Long: `Get the query variables to search machines in sabakan.`, + Short: "get the query variables to search available machines in sabakan", + Long: `Get the query variables to search available machines in sabakan.`, RunE: func(cmd *cobra.Command, args []string) error { well.Go(func(ctx context.Context) error { diff --git a/pkg/ckecli/cmd/sabakan_set_variables.go b/pkg/ckecli/cmd/sabakan_set_variables.go index 0d712e17..011efe60 100644 --- a/pkg/ckecli/cmd/sabakan_set_variables.go +++ b/pkg/ckecli/cmd/sabakan_set_variables.go @@ -13,8 +13,8 @@ import ( // sabakanSetVariablesCmd represents the "sabakan set-variables" command var sabakanSetVariablesCmd = &cobra.Command{ Use: "set-variables FILE", - Short: "set the query variables to search machines in sabakan", - Long: `Set the query variables to search machines in sabakan. + Short: "set the query variables to search available machines in sabakan", + Long: `Set the query variables to search available machines in sabakan. FILE should contain a JSON object like this: diff --git a/sabakan/integrate.go b/sabakan/integrate.go index 31598268..9e3c8b82 100644 --- a/sabakan/integrate.go +++ b/sabakan/integrate.go @@ -2,6 +2,7 @@ package sabakan import ( "context" + "errors" "time" "github.com/cybozu-go/cke" @@ -50,15 +51,18 @@ func (ig integrator) StartWatch(ctx context.Context, ch chan<- struct{}) error { } func (ig integrator) Init(ctx context.Context, leaderKey string) error { - return ig.run(ctx, leaderKey, nil, true) + return ig.runGenerator(ctx, leaderKey, nil, true) } func (ig integrator) Do(ctx context.Context, leaderKey string, clusterStatus *cke.ClusterStatus) error { - return ig.run(ctx, leaderKey, clusterStatus, false) + if err := ig.runGenerator(ctx, leaderKey, clusterStatus, false); err != nil { + return err + } + return ig.runRepairer(ctx, clusterStatus) } -// Do references WaitSecs in ctx to change the wait second value. -func (ig integrator) run(ctx context.Context, leaderKey string, clusterStatus *cke.ClusterStatus, onlyRegenerate bool) error { +// runGenerator references WaitSecs in ctx to change the wait second value. +func (ig integrator) runGenerator(ctx context.Context, leaderKey string, clusterStatus *cke.ClusterStatus, onlyRegenerate bool) error { st := cke.Storage{Client: ig.etcd} disabled, err := st.IsSabakanDisabled(ctx) @@ -78,7 +82,7 @@ func (ig integrator) run(ctx context.Context, leaderKey string, clusterStatus *c return err } - machines, err := Query(ctx, st) + machines, err := QueryAvailable(ctx, st) if err != nil { // the error is either harmless (cke.ErrNotFound) or already // logged by well.HTTPClient. @@ -148,3 +152,41 @@ func (ig integrator) run(ctx context.Context, leaderKey string, clusterStatus *c return st.PutClusterWithTemplateRevision(ctx, newc, rev, leaderKey) } + +func (ig integrator) runRepairer(ctx context.Context, clusterStatus *cke.ClusterStatus) error { + st := cke.Storage{Client: ig.etcd} + + disabled, err := st.IsAutoRepairDisabled(ctx) + if err != nil { + return err + } + if disabled { + return nil + } + + machines, err := QueryNonHealthy(ctx, st) + if err != nil { + if !errors.Is(err, cke.ErrNotFound) { + log.Warn("query failed", map[string]interface{}{ + log.FnError: err, + }) + } + return nil + } + + constraints, err := st.GetConstraints(ctx) + if err != nil { + return err + } + + entries := Repairer(machines, clusterStatus.RepairQueue.Entries, constraints) + + for _, entry := range entries { + err := st.RegisterRepairsEntry(ctx, entry) + if err != nil { + return err + } + } + + return nil +} diff --git a/sabakan/query.go b/sabakan/query.go index 2dc85c8b..a0c0603c 100644 --- a/sabakan/query.go +++ b/sabakan/query.go @@ -38,6 +38,9 @@ query ckeSearch($having: MachineParams = null, ipv4 registerDate retireDate + bmc { + type + } } status { state @@ -122,30 +125,42 @@ func (v QueryVariables) IsValid() error { return nil } +// BMC represents the BMC of a machine registered with sabakan. +type BMC struct { + Type string `json:"type"` +} + +// MachineSpec represents the spec of a machine registered with sabakan. +type MachineSpec struct { + Serial string `json:"serial"` + Labels []struct { + Name string `json:"name"` + Value string `json:"value"` + } `json:"labels"` + Rack int `json:"rack"` + IndexInRack int `json:"indexInRack"` + Role string `json:"role"` + IPv4 []string `json:"ipv4"` + RegisterDate time.Time `json:"registerDate"` + RetireDate time.Time `json:"retireDate"` + BMC BMC `json:"bmc"` +} + +// MachineStatus represents the status of a machine registered with sabakan. +type MachineStatus struct { + State State `json:"state"` + Duration float64 `json:"duration"` +} + // Machine represents a machine registered with sabakan. type Machine struct { - Spec struct { - Serial string `json:"serial"` - Labels []struct { - Name string `json:"name"` - Value string `json:"value"` - } `json:"labels"` - Rack int `json:"rack"` - IndexInRack int `json:"indexInRack"` - Role string `json:"role"` - IPv4 []string `json:"ipv4"` - RegisterDate time.Time `json:"registerDate"` - RetireDate time.Time `json:"retireDate"` - } `json:"spec"` - Status struct { - State State `json:"state"` - Duration float64 `json:"duration"` - } `json:"status"` + Spec MachineSpec `json:"spec"` + Status MachineStatus `json:"status"` } -// Query send a GraphQL query to sabakan. +// QueryAvailable sends a GraphQL query to sabakan to retrieve available machines information. // If sabakan URL is not set, this returns (nil, cke.ErrNotFound). -func Query(ctx context.Context, storage cke.Storage) ([]Machine, error) { +func QueryAvailable(ctx context.Context, storage cke.Storage) ([]Machine, error) { url, err := storage.GetSabakanURL(ctx) if err != nil { return nil, err @@ -168,6 +183,39 @@ func Query(ctx context.Context, storage cke.Storage) ([]Machine, error) { return doQuery(ctx, url, variables, httpClient) } +// QueryNonHealthy sends a GraphQL query to sabakan to retrieve non-healthy machines information. +// If sabakan URL is not set, this returns (nil, cke.ErrNotFound). +func QueryNonHealthy(ctx context.Context, storage cke.Storage) ([]Machine, error) { + url, err := storage.GetSabakanURL(ctx) + if err != nil { + return nil, err + } + + var variables *QueryVariables + varsData, err := storage.GetAutoRepairQueryVariables(ctx) + switch err { + case cke.ErrNotFound: + variables = &QueryVariables{ + Having: &MachineParams{ + States: []State{StateUnhealthy, StateUnreachable}, + }, + NotHaving: &MachineParams{ + Roles: []string{"boot"}, + }, + } + case nil: + variables = new(QueryVariables) + err = json.Unmarshal(varsData, variables) + if err != nil { + return nil, err + } + default: + return nil, err + } + + return doQuery(ctx, url, variables, httpClient) +} + func doQuery(ctx context.Context, url string, vars *QueryVariables, hc *well.HTTPClient) ([]Machine, error) { body := struct { Query string `json:"query"` diff --git a/sabakan/query_test.go b/sabakan/query_test.go index 73c256b7..bcbf9ad2 100644 --- a/sabakan/query_test.go +++ b/sabakan/query_test.go @@ -52,6 +52,10 @@ func testMachine(t *testing.T, m Machine) { t.Error("wrong retire date:", m.Spec.RetireDate.Format(time.RFC3339Nano)) } + if m.Spec.BMC.Type != "iDRAC" { + t.Error("wrong BMC type:", m.Spec.BMC.Type) + } + if m.Status.State != StateHealthy { t.Error("wrong machine state:", m.Status.State) } diff --git a/sabakan/repairer.go b/sabakan/repairer.go new file mode 100644 index 00000000..40046026 --- /dev/null +++ b/sabakan/repairer.go @@ -0,0 +1,57 @@ +package sabakan + +import ( + "strings" + + "github.com/cybozu-go/cke" + "github.com/cybozu-go/log" +) + +func Repairer(machines []Machine, entries []*cke.RepairQueueEntry, constraints *cke.Constraints) []*cke.RepairQueueEntry { + recent := make(map[string]bool) + for _, entry := range entries { + // entry.Operation is ignored when checking duplication + recent[entry.Address] = true + } + + newMachines := make([]Machine, 0, len(machines)) + for _, machine := range machines { + if len(machine.Spec.IPv4) == 0 { + log.Warn("ignore non-healthy machine w/o IPv4 address", map[string]interface{}{ + "serial": machine.Spec.Serial, + }) + continue + } + + if recent[machine.Spec.IPv4[0]] { + log.Warn("ignore recently-repaired non-healthy machine", map[string]interface{}{ + "serial": machine.Spec.Serial, + "address": machine.Spec.IPv4[0], + }) + continue + } + + newMachines = append(newMachines, machine) + } + + if len(entries)+len(newMachines) > constraints.MaximumRepairs { + log.Warn("ignore too many repair requests", nil) + return nil + } + + ret := make([]*cke.RepairQueueEntry, len(newMachines)) + for i, machine := range newMachines { + operation := strings.ToLower(string(machine.Status.State)) + typ := machine.Spec.BMC.Type + address := machine.Spec.IPv4[0] + entry := cke.NewRepairQueueEntry(operation, typ, address) + log.Info("initiate sabakan-triggered automatic repair", map[string]interface{}{ + "serial": machine.Spec.Serial, + "address": address, + "operation": operation, + }) + ret[i] = entry + } + + return ret +} diff --git a/sabakan/repairer_test.go b/sabakan/repairer_test.go new file mode 100644 index 00000000..e7c40bb7 --- /dev/null +++ b/sabakan/repairer_test.go @@ -0,0 +1,87 @@ +package sabakan + +import ( + "testing" + + "github.com/cybozu-go/cke" + "github.com/google/go-cmp/cmp" + "github.com/google/go-cmp/cmp/cmpopts" +) + +func TestRepairer(t *testing.T) { + constraints := &cke.Constraints{ + MaximumRepairs: 3, + } + + machines := []Machine{ + {Spec: MachineSpec{Serial: "0000"}, Status: MachineStatus{State: StateUnhealthy}}, + {Spec: MachineSpec{Serial: "1111", IPv4: []string{"1.1.1.1"}, BMC: BMC{Type: "type1"}}, Status: MachineStatus{State: StateUnhealthy}}, + {Spec: MachineSpec{Serial: "2222", IPv4: []string{"2.2.2.2"}, BMC: BMC{Type: "type2"}}, Status: MachineStatus{State: StateUnhealthy}}, + {Spec: MachineSpec{Serial: "3333", IPv4: []string{"3.3.3.3"}, BMC: BMC{Type: "type3"}}, Status: MachineStatus{State: StateUnreachable}}, + {Spec: MachineSpec{Serial: "4444", IPv4: []string{"4.4.4.4"}, BMC: BMC{Type: "type4"}}, Status: MachineStatus{State: StateUnreachable}}, + } + + entries := []*cke.RepairQueueEntry{ + nil, + cke.NewRepairQueueEntry("unhealthy", "type1", "1.1.1.1"), + cke.NewRepairQueueEntry("unhealthy", "type2", "2.2.2.2"), + cke.NewRepairQueueEntry("unreachable", "type3", "3.3.3.3"), + cke.NewRepairQueueEntry("unreachable", "type4", "4.4.4.4"), + } + + tests := []struct { + name string + failedMachines []Machine + queuedEntries []*cke.RepairQueueEntry + expectedEntries []*cke.RepairQueueEntry + }{ + { + name: "NoFailedMachine", + failedMachines: []Machine{}, + queuedEntries: []*cke.RepairQueueEntry{entries[2]}, + expectedEntries: []*cke.RepairQueueEntry{}, + }, + { + name: "OneFailedMachine", + failedMachines: []Machine{machines[1]}, + queuedEntries: []*cke.RepairQueueEntry{entries[2]}, + expectedEntries: []*cke.RepairQueueEntry{entries[1]}, + }, + { + name: "IgnoreNoIPAddress", + failedMachines: []Machine{machines[0], machines[1]}, + queuedEntries: []*cke.RepairQueueEntry{entries[2]}, + expectedEntries: []*cke.RepairQueueEntry{entries[1]}, + }, + { + name: "IgnoreRecentlyRepaired", + failedMachines: []Machine{machines[1], machines[2], machines[3]}, + queuedEntries: []*cke.RepairQueueEntry{entries[2]}, + expectedEntries: []*cke.RepairQueueEntry{entries[1], entries[3]}, + }, + { + name: "IgnoreRecentlyRepairedWithDifferentOperation", + failedMachines: []Machine{machines[1], machines[2], machines[3]}, + queuedEntries: []*cke.RepairQueueEntry{cke.NewRepairQueueEntry("unreachable", "type2", "2.2.2.2")}, + expectedEntries: []*cke.RepairQueueEntry{entries[1], entries[3]}, + }, + { + name: "IgnoreTooManyFailedMachines", + failedMachines: []Machine{machines[1], machines[2], machines[3]}, + queuedEntries: []*cke.RepairQueueEntry{entries[2], entries[4]}, + expectedEntries: []*cke.RepairQueueEntry{}, + }, + } + + for _, tt := range tests { + tt := tt + t.Run(tt.name, func(t *testing.T) { + t.Parallel() + + entries := Repairer(tt.failedMachines, tt.queuedEntries, constraints) + if !cmp.Equal(entries, tt.expectedEntries, cmpopts.EquateEmpty()) { + t.Errorf("!cmp.Equal(entries, tt.newEntries), actual: %v, expected: %v", entries, tt.expectedEntries) + } + }) + } +} diff --git a/storage.go b/storage.go index 832d1b25..c43a948d 100644 --- a/storage.go +++ b/storage.go @@ -24,30 +24,32 @@ type RecordChan <-chan *Record // etcd keys and prefixes const ( - KeyCA = "ca/" - KeyConfigVersion = "config-version" - KeyCluster = "cluster" - KeyClusterRevision = "cluster-revision" - KeyConstraints = "constraints" - KeyLeader = "leader/" - KeyRebootsDisabled = "reboots/disabled" - KeyRebootsRunning = "reboots/running" - KeyRebootsPrefix = "reboots/data/" - KeyRebootsWriteIndex = "reboots/write-index" - KeyRecords = "records/" - KeyRecordID = "records" - KeyRepairsDisabled = "repairs/disabled" - KeyRepairsPrefix = "repairs/data/" - KeyRepairsWriteIndex = "repairs/write-index" - KeyResourcePrefix = "resource/" - KeySabakanDisabled = "sabakan/disabled" - KeySabakanQueryVariables = "sabakan/query-variables" - KeySabakanTemplate = "sabakan/template" - KeySabakanURL = "sabakan/url" - KeyServiceAccountCert = "service-account/certificate" - KeyServiceAccountKey = "service-account/key" - KeyStatus = "status" - KeyVault = "vault" + KeyAutoRepairDisabled = "auto-repair/disabled" + KeyAutoRepairQueryVariables = "auto-repair/query-variables" + KeyCA = "ca/" + KeyConfigVersion = "config-version" + KeyCluster = "cluster" + KeyClusterRevision = "cluster-revision" + KeyConstraints = "constraints" + KeyLeader = "leader/" + KeyRebootsDisabled = "reboots/disabled" + KeyRebootsRunning = "reboots/running" + KeyRebootsPrefix = "reboots/data/" + KeyRebootsWriteIndex = "reboots/write-index" + KeyRecords = "records/" + KeyRecordID = "records" + KeyRepairsDisabled = "repairs/disabled" + KeyRepairsPrefix = "repairs/data/" + KeyRepairsWriteIndex = "repairs/write-index" + KeyResourcePrefix = "resource/" + KeySabakanDisabled = "sabakan/disabled" + KeySabakanQueryVariables = "sabakan/query-variables" + KeySabakanTemplate = "sabakan/template" + KeySabakanURL = "sabakan/url" + KeyServiceAccountCert = "service-account/certificate" + KeyServiceAccountKey = "service-account/key" + KeyStatus = "status" + KeyVault = "vault" ) const maxRecords = 1000 @@ -683,6 +685,51 @@ func (s Storage) GetSabakanURL(ctx context.Context) (string, error) { return s.getStringValue(ctx, KeySabakanURL) } +// IsAutoRepairDisabled returns true if sabakan-triggered automatic repair is disabled. +func (s Storage) IsAutoRepairDisabled(ctx context.Context) (bool, error) { + resp, err := s.Get(ctx, KeyAutoRepairDisabled) + if err != nil { + return false, err + } + if resp.Count == 0 { + return false, nil + } + + if bytes.Equal([]byte("true"), resp.Kvs[0].Value) { + return true, nil + } + return false, nil +} + +// EnableAutoRepair enables sabakan-triggered automatic repair when "enable" flag is true. +// When "enable" flag is false, sabakan-triggered repair is disabled. +func (s Storage) EnableAutoRepair(ctx context.Context, enable bool) error { + val := fmt.Sprint(!enable) + _, err := s.Put(ctx, KeyAutoRepairDisabled, val) + return err +} + +// SetAutoRepairQueryVariables sets values of query variables for sabakan-triggered automatic repair. +// Caller must validate the contents. +func (s Storage) SetAutoRepairQueryVariables(ctx context.Context, vars string) error { + _, err := s.Put(ctx, KeyAutoRepairQueryVariables, vars) + return err +} + +// GetAutoRepairQueryVariables gets values of query variables for sabakan-triggered automatic repair +func (s Storage) GetAutoRepairQueryVariables(ctx context.Context) ([]byte, error) { + resp, err := s.Get(ctx, KeyAutoRepairQueryVariables) + if err != nil { + return nil, err + } + + if len(resp.Kvs) == 0 { + return nil, ErrNotFound + } + + return resp.Kvs[0].Value, nil +} + // IsRebootQueueDisabled returns true if reboot queue is disabled. func (s Storage) IsRebootQueueDisabled(ctx context.Context) (bool, error) { resp, err := s.Get(ctx, KeyRebootsDisabled) @@ -915,7 +962,7 @@ func repairsEntryKey(index int64) string { return fmt.Sprintf("%s%016x", KeyRepairsPrefix, index) } -// RegisterRepairssEntry enqueues a repair queue entry to the repair queue. +// RegisterRepairsEntry enqueues a repair queue entry to the repair queue. // "Index" of the entry is retrieved and updated in this method. The given value is ignored. func (s Storage) RegisterRepairsEntry(ctx context.Context, r *RepairQueueEntry) error { RETRY: diff --git a/storage_test.go b/storage_test.go index 43785e5d..aa089f20 100644 --- a/storage_test.go +++ b/storage_test.go @@ -635,6 +635,66 @@ func testStorageSabakan(t *testing.T) { } } +func testStorageAutoRepair(t *testing.T) { + t.Parallel() + + client := newEtcdClient(t) + defer client.Close() + s := Storage{client} + ctx := context.Background() + + _, err := s.GetAutoRepairQueryVariables(ctx) + if err != ErrNotFound { + t.Error(`err != ErrNotFound,`, err) + } + + const vars = `{"having": {"racks": [0, 1, 2]}}` + err = s.SetAutoRepairQueryVariables(ctx, vars) + if err != nil { + t.Fatal(err) + } + + vars2, err := s.GetAutoRepairQueryVariables(ctx) + if err != nil { + t.Fatal(err) + } + if string(vars2) != vars { + t.Error("unexpected query variables:", string(vars2)) + } + + disabled, err := s.IsAutoRepairDisabled(ctx) + if err != nil { + t.Fatal(err) + } + if disabled { + t.Error("sabakan-triggered automatic repair should not be disabled by default") + } + + err = s.EnableAutoRepair(ctx, false) + if err != nil { + t.Fatal(err) + } + disabled, err = s.IsAutoRepairDisabled(ctx) + if err != nil { + t.Fatal(err) + } + if !disabled { + t.Error("sabakan-triggered automatic repair could not be disabled") + } + + err = s.EnableAutoRepair(ctx, true) + if err != nil { + t.Fatal(err) + } + disabled, err = s.IsAutoRepairDisabled(ctx) + if err != nil { + t.Fatal(err) + } + if disabled { + t.Error("sabakan-triggered automatic repair could not be re-enabled") + } +} + func testStorageReboot(t *testing.T) { t.Parallel() @@ -1005,6 +1065,7 @@ func TestStorage(t *testing.T) { t.Run("Maint", testStorageMaint) t.Run("Resource", testStorageResource) t.Run("Sabakan", testStorageSabakan) + t.Run("AutoRepair", testStorageAutoRepair) t.Run("Reboot", testStorageReboot) t.Run("Repair", testStorageRepair) t.Run("Status", testStatus)