diff --git a/cmd/kubenest/node-agent/app/client/client.go b/cmd/kubenest/node-agent/app/client/client.go index eda4fbcca..9f3ec64f3 100644 --- a/cmd/kubenest/node-agent/app/client/client.go +++ b/cmd/kubenest/node-agent/app/client/client.go @@ -60,7 +60,7 @@ var ( } wg sync.WaitGroup - wsAddr []string // websocket client connect address list + WsAddr []string // websocket client connect address list filePath string // the server path to save upload file fileName string // local file to upload params []string // New slice to hold multiple command parameters @@ -79,7 +79,7 @@ func cmdCheckRun(cmd *cobra.Command, args []string) error { headers := http.Header{ "Authorization": {"Basic " + auth}, } - for _, addr := range wsAddr { + for _, addr := range WsAddr { wg.Add(1) go func(addr string) { defer wg.Done() @@ -106,15 +106,11 @@ func init() { // #nosec G402 dialer.TLSClientConfig = &tls.Config{InsecureSkipVerify: true} - ClientCmd.PersistentFlags().StringSliceVarP(&wsAddr, "addr", "a", []string{}, "WebSocket address (e.g., host1:port1,host2:port2)") - err := ClientCmd.MarkPersistentFlagRequired("addr") - if err != nil { - return - } + ClientCmd.PersistentFlags().StringSliceVarP(&WsAddr, "addr", "a", []string{}, "WebSocket address (e.g., host1:port1,host2:port2)") // PreRunE check param ClientCmd.PersistentPreRunE = func(cmd *cobra.Command, args []string) error { - for _, value := range wsAddr { + for _, value := range WsAddr { if _, exists := uniqueValuesMap[value]; exists { return errors.New("duplicate values are not allowed") } @@ -137,7 +133,7 @@ func init() { _ = uploadCmd.MarkFlagRequired("path") ttyCmd.Flags().StringVarP(&operation, "operation", "o", "", "Operation to perform") - err = ttyCmd.MarkFlagRequired("operation") // Ensure 'operation' flag is required for ttyCmd + err := ttyCmd.MarkFlagRequired("operation") // Ensure 'operation' flag is required for ttyCmd if err != nil { return } @@ -157,7 +153,7 @@ func cmdTtyRun(cmd *cobra.Command, args []string) error { } cmdStr := fmt.Sprintf("command=%s", operation) // execute one every wsAddr - for _, addr := range wsAddr { + for _, addr := range WsAddr { wsURL := fmt.Sprintf("wss://%s/tty/?%s", addr, cmdStr) fmt.Println("Executing tty:", cmdStr, "on", addr) err := connectTty(wsURL, headers) @@ -294,7 +290,7 @@ func executeWebSocketCommand(auth string) error { } // execute one every wsAddr - for _, addr := range wsAddr { + for _, addr := range WsAddr { wg.Add(1) go func(addr string) { defer wg.Done() @@ -314,7 +310,7 @@ func uploadFile(filePath, fileName, auth string) error { headers := http.Header{ "Authorization": {"Basic " + auth}, } - for _, addr := range wsAddr { + for _, addr := range WsAddr { wg.Add(1) go func(addr string) { defer wg.Done() diff --git a/cmd/kubenest/node-agent/app/root.go b/cmd/kubenest/node-agent/app/root.go index 69c6dc648..aeb9d2d59 100644 --- a/cmd/kubenest/node-agent/app/root.go +++ b/cmd/kubenest/node-agent/app/root.go @@ -2,6 +2,7 @@ package app import ( "os" + "strings" "github.com/spf13/cobra" "github.com/spf13/viper" @@ -33,6 +34,7 @@ func initConfig() { currentDir, _ := os.Getwd() viper.AddConfigPath(currentDir) viper.AddConfigPath("/srv/node-agent/agent.env") + viper.SetConfigType("toml") // If a agent.env file is found, read it in. if err := viper.ReadInConfig(); err != nil { log.Warnf("Load config file error, %s", err) @@ -46,8 +48,27 @@ func initConfig() { } } +func initWebSocketAddr() { + err := viper.BindPFlag("ADDR", client.ClientCmd.PersistentFlags().Lookup("addr")) + if err != nil { + log.Fatalf("Failed to bind flag: %v", err) + return + } + err = viper.BindEnv("ADDR", "ADDR") + if err != nil { + log.Fatalf("Failed to bind env: %v", err) + return + } + // Initialize addr value from viper + log.Infof(strings.Join(viper.AllKeys(), ",")) + if viper.Get("addr") != nil { + client.WsAddr = viper.GetStringSlice("addr") + log.Infof("addr: %v", client.WsAddr) + } +} + func init() { - cobra.OnInitialize(initConfig) + cobra.OnInitialize(initConfig, initWebSocketAddr) RootCmd.PersistentFlags().StringVarP(&user, "user", "u", "", "Username for authentication") RootCmd.PersistentFlags().StringVarP(&password, "password", "p", "", "Password for authentication") diff --git a/deploy/crds/kosmos.io_virtualclusters.yaml b/deploy/crds/kosmos.io_virtualclusters.yaml index aa4f3b3be..f6258fbbb 100644 --- a/deploy/crds/kosmos.io_virtualclusters.yaml +++ b/deploy/crds/kosmos.io_virtualclusters.yaml @@ -170,6 +170,10 @@ spec: updateTime: format: date-time type: string + vipMap: + additionalProperties: + type: string + type: object type: object required: - spec diff --git a/deploy/virtual-cluster-components-manifest-cm.yaml b/deploy/virtual-cluster-components-manifest-cm.yaml index 449040b5a..60f69a752 100644 --- a/deploy/virtual-cluster-components-manifest-cm.yaml +++ b/deploy/virtual-cluster-components-manifest-cm.yaml @@ -4,6 +4,7 @@ data: [ {"name": "kube-proxy", "path": "/kosmos/manifest/kube-proxy/*.yaml"}, {"name": "calico", "path": "/kosmos/manifest/calico/*.yaml"}, + {"name": "keepalived", "path": "/kosmos/manifest/keepalived/*.yaml"}, ] host-core-dns-components: | [ diff --git a/deploy/virtual-cluster-vip-pool-cm.yaml b/deploy/virtual-cluster-vip-pool-cm.yaml new file mode 100644 index 000000000..940c63ab5 --- /dev/null +++ b/deploy/virtual-cluster-vip-pool-cm.yaml @@ -0,0 +1,11 @@ +--- +apiVersion: v1 +kind: ConfigMap +metadata: + name: kosmos-vip-pool + namespace: kosmos-system +data: + vip-config.yaml: | + # can be use for vc, the ip formate is 192.168.0.1 and 192.168.0.2-192.168.0.10 + vipPool: + - 192.168.0.1-192.168.0.10 \ No newline at end of file diff --git a/hack/node-agent/init.sh b/hack/node-agent/init.sh index 3a0609861..cc522968e 100644 --- a/hack/node-agent/init.sh +++ b/hack/node-agent/init.sh @@ -1,7 +1,7 @@ #!/usr/bin/env bash -WEB_USER="$WEB_USER" sed -i 's/^WEB_USER=.*/WEB_USER='"$WEB_USER"'/' /app/agent.env -WEB_PASS="$WEB_PASS" sed -i 's/^WEB_PASS=.*/WEB_PASS='"$WEB_PASS"'/' /app/agent.env +WEB_USER="$WEB_USER" sed -i 's/^WEB_USER=.*/WEB_USER="'"$WEB_USER"'"/' /app/agent.env +WEB_PASS="$WEB_PASS" sed -i 's/^WEB_PASS=.*/WEB_PASS="'"$WEB_PASS"'"/' /app/agent.env sha256sum /app/node-agent > /app/node-agent.sum sha256sum /host-path/node-agent >> /app/node-agent.sum rsync -avz /app/ /host-path/ diff --git a/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go b/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go index cd8b91dda..9f562aa9c 100644 --- a/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go +++ b/pkg/apis/kosmos/v1alpha1/virtualcluster_types.go @@ -112,6 +112,8 @@ type VirtualClusterStatus struct { Port int32 `json:"port,omitempty"` // +optional PortMap map[string]int32 `json:"portMap,omitempty"` + // +optional + VipMap map[string]string `json:"vipMap,omitempty"` } // +k8s:deepcopy-gen:interfaces=k8s.io/apimachinery/pkg/runtime.Object diff --git a/pkg/apis/kosmos/v1alpha1/zz_generated.deepcopy.go b/pkg/apis/kosmos/v1alpha1/zz_generated.deepcopy.go index c375988bc..47bed8a5a 100644 --- a/pkg/apis/kosmos/v1alpha1/zz_generated.deepcopy.go +++ b/pkg/apis/kosmos/v1alpha1/zz_generated.deepcopy.go @@ -1781,6 +1781,13 @@ func (in *VirtualClusterStatus) DeepCopyInto(out *VirtualClusterStatus) { (*out)[key] = val } } + if in.VipMap != nil { + in, out := &in.VipMap, &out.VipMap + *out = make(map[string]string, len(*in)) + for key, val := range *in { + (*out)[key] = val + } + } return } diff --git a/pkg/generated/openapi/zz_generated.openapi.go b/pkg/generated/openapi/zz_generated.openapi.go index eb88b6d23..da095cb80 100644 --- a/pkg/generated/openapi/zz_generated.openapi.go +++ b/pkg/generated/openapi/zz_generated.openapi.go @@ -3115,6 +3115,21 @@ func schema_pkg_apis_kosmos_v1alpha1_VirtualClusterStatus(ref common.ReferenceCa }, }, }, + "vipMap": { + SchemaProps: spec.SchemaProps{ + Type: []string{"object"}, + AdditionalProperties: &spec.SchemaOrBool{ + Allows: true, + Schema: &spec.Schema{ + SchemaProps: spec.SchemaProps{ + Default: "", + Type: []string{"string"}, + Format: "", + }, + }, + }, + }, + }, }, }, }, diff --git a/pkg/kubenest/constants/constant.go b/pkg/kubenest/constants/constant.go index a46013008..897b68049 100644 --- a/pkg/kubenest/constants/constant.go +++ b/pkg/kubenest/constants/constant.go @@ -100,6 +100,17 @@ const ( ApiServerNetworkProxyAdminPortKey = "apiserver-network-proxy-admin-port" VirtualClusterPortNum = 5 + // vip + VipPoolConfigMapName = "kosmos-vip-pool" + VipPoolKey = "vip-config.yaml" + VcVipStatusKey = "vip-key" + VipKeepAlivedNodeLabelKey = "kosmos.io/keepalived-node" + VipKeepAlivedNodeLabelValue = "true" + VipKeepAlivedNodeRoleKey = "kosmos.io/keepalived-role" + VipKeepAlivedNodeRoleMaster = "master" + VipKeepalivedNodeRoleBackup = "backup" + VipKeepAlivedReplicas = 3 + ManifestComponentsConfigMap = "components-manifest-cm" NodePoolConfigmap = "node-pool" NodeVirtualclusterState = "virtualcluster" diff --git a/pkg/kubenest/controller/virtualcluster_init_controller.go b/pkg/kubenest/controller/virtualcluster_init_controller.go index 652a582a2..5f1164318 100644 --- a/pkg/kubenest/controller/virtualcluster_init_controller.go +++ b/pkg/kubenest/controller/virtualcluster_init_controller.go @@ -10,11 +10,11 @@ import ( "github.com/pkg/errors" "gopkg.in/yaml.v3" - corev1 "k8s.io/api/core/v1" apierrors "k8s.io/apimachinery/pkg/api/errors" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/kubernetes" + cs "k8s.io/client-go/kubernetes" "k8s.io/client-go/rest" "k8s.io/client-go/tools/clientcmd" "k8s.io/client-go/tools/record" @@ -33,10 +33,8 @@ import ( "github.com/kosmos.io/kosmos/pkg/apis/kosmos/v1alpha1" "github.com/kosmos.io/kosmos/pkg/generated/clientset/versioned" "github.com/kosmos.io/kosmos/pkg/kubenest/constants" - env "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/env" - "github.com/kosmos.io/kosmos/pkg/kubenest/controller/virtualcluster.node.controller/exector" + "github.com/kosmos.io/kosmos/pkg/kubenest/tasks" "github.com/kosmos.io/kosmos/pkg/kubenest/util" - apiclient "github.com/kosmos.io/kosmos/pkg/kubenest/util/api-client" ) type VirtualClusterInitController struct { @@ -60,6 +58,10 @@ type HostPortPool struct { PortsPool []int32 `yaml:"portsPool"` } +type VipPool struct { + Vips []string `yaml:"vipPool"` +} + const ( VirtualClusterControllerFinalizer = "kosmos.io/virtualcluster-controller" RequeueTime = 10 * time.Second @@ -144,6 +146,23 @@ func (c *VirtualClusterInitController) Reconcile(ctx context.Context, request re return reconcile.Result{}, errors.Wrapf(err, "Error update virtualcluster %s status", updatedCluster.Name) } case v1alpha1.AllNodeReady: + name, namespace := request.Name, request.Namespace + // check if the vc enable vip + if len(originalCluster.Status.VipMap) > 0 { + // label node for keepalived + vcClient, err := tasks.GetVcClientset(c.RootClientSet, name, namespace) + if err != nil { + klog.Errorf("Get vc client failed. err: %s", err.Error()) + return reconcile.Result{}, errors.Wrapf(err, "Get vc client failed. err: %s", err.Error()) + } + reps, err := c.labelNode(vcClient) + if err != nil { + klog.Errorf("Label node for keepalived failed. err: %s", err.Error()) + return reconcile.Result{}, errors.Wrapf(err, "Label node for keepalived failed. err: %s", err.Error()) + } + klog.V(2).Infof("Label %d node for keepalived", reps) + } + err := c.ensureAllPodsRunning(updatedCluster, constants.WaitAllPodsRunningTimeoutSeconds*time.Second) if err != nil { klog.Errorf("Check all pods running err: %s", err.Error()) @@ -238,13 +257,23 @@ func (c *VirtualClusterInitController) removeFinalizer(virtualCluster *v1alpha1. // createVirtualCluster assign work nodes, create control plane and create compoennts from manifests func (c *VirtualClusterInitController) createVirtualCluster(virtualCluster *v1alpha1.VirtualCluster, kubeNestOptions *options.KubeNestOptions) error { - klog.V(2).Infof("Reconciling virtual cluster", "name", virtualCluster.Name) + klog.V(2).Infof("Reconciling virtual cluster %s %s", "name", virtualCluster.Name) //Assign host port _, err := c.AllocateHostPort(virtualCluster) if err != nil { return errors.Wrap(err, "Error in assign host port!") } + // check if enable vip + vipPool, err := GetVipFromConfigMap(c.RootClientSet, constants.KosmosNs, constants.VipPoolConfigMapName, constants.VipPoolKey) + if err == nil && vipPool != nil && len(vipPool.Vips) > 0 { + klog.V(2).Infof("Enable vip for virtual cluster %s", virtualCluster.Name) + //Allocate vip + err = c.AllocateVip(virtualCluster, vipPool) + if err != nil { + return errors.Wrap(err, "Error in allocate vip!") + } + } executer, err := NewExecutor(virtualCluster, c.Client, c.Config, kubeNestOptions) if err != nil { @@ -492,13 +521,31 @@ func GetHostPortPoolFromConfigMap(client kubernetes.Interface, ns, cmName, dataK return &hostPool, nil } -// Return false to indicate that the port is not occupied -func (c *VirtualClusterInitController) isPortAllocated(port int32, hostAddress []string) bool { +func GetVipFromConfigMap(client kubernetes.Interface, ns, cmName, key string) (*VipPool, error) { + vipPoolCm, err := client.CoreV1().ConfigMaps(ns).Get(context.TODO(), cmName, metav1.GetOptions{}) + if err != nil { + return nil, err + } + + yamlData, exist := vipPoolCm.Data[key] + if !exist { + return nil, fmt.Errorf("key '%s' not found in vip pool ConfigMap '%s'", key, cmName) + } + + var vipPool VipPool + if err := yaml.Unmarshal([]byte(yamlData), &vipPool); err != nil { + return nil, err + } + + return &vipPool, nil +} + +func (c *VirtualClusterInitController) isPortAllocated(port int32) bool { vcList := &v1alpha1.VirtualClusterList{} err := c.List(context.Background(), vcList) if err != nil { klog.Errorf("list virtual cluster error: %v", err) - return true + return false } for _, vc := range vcList.Items { @@ -516,84 +563,7 @@ func (c *VirtualClusterInitController) isPortAllocated(port int32, hostAddress [ } } - ret, err := checkPortOnHostWithAddresses(port, hostAddress) - if err != nil { - klog.Errorf("check port on host error: %v", err) - return true - } - return ret -} - -// Return false to indicate that the port is not occupied -func checkPortOnHostWithAddresses(port int32, hostAddress []string) (bool, error) { - for _, addr := range hostAddress { - flag, err := CheckPortOnHost(addr, port) - if err != nil { - return false, err - } - if flag { - return true, nil - } - } - return false, nil -} - -func findAddress(node corev1.Node) (string, error) { - for _, addr := range node.Status.Addresses { - if addr.Type == corev1.NodeInternalIP { - return addr.Address, nil - } - } - return "", fmt.Errorf("cannot find internal IP address in node addresses, node name: %s", node.GetName()) -} - -// Return false to indicate that the port is not occupied -func CheckPortOnHost(addr string, port int32) (bool, error) { - hostExectorHelper := exector.NewExectorHelper(addr, "") - checkCmd := &exector.CheckExector{ - Port: fmt.Sprintf("%d", port), - } - - var ret *exector.ExectorReturn - err := apiclient.TryRunCommand(func() error { - ret = hostExectorHelper.DoExector(context.TODO().Done(), checkCmd) - if ret.Code != 1000 { - return fmt.Errorf("chekc port failed, err: %s", ret.String()) - } - return nil - }, 3) - - if err != nil { - klog.Errorf("check port on host error! addr:%s, port %d, err: %s", addr, port, err.Error()) - return true, err - } - - if ret.Status != exector.SUCCESS { - return true, fmt.Errorf("pod[%d] is occupied", port) - } else { - return false, nil - } -} - -func (c *VirtualClusterInitController) findHostAddresses() ([]string, error) { - nodes, err := c.RootClientSet.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{ - LabelSelector: env.GetControlPlaneLabel(), - }) - if err != nil { - return nil, err - } - - ret := []string{} - - for _, node := range nodes.Items { - addr, err := findAddress(node) - if err != nil { - return nil, err - } - - ret = append(ret, addr) - } - return ret, nil + return false } // AllocateHostPort allocate host port for virtual cluster @@ -608,20 +578,11 @@ func (c *VirtualClusterInitController) AllocateHostPort(virtualCluster *v1alpha1 if err != nil { return 0, err } - - hostAddress, err := c.findHostAddresses() - if err != nil { - return 0, err - } - ports := func() []int32 { ports := make([]int32, 0) for _, p := range hostPool.PortsPool { - if !c.isPortAllocated(p, hostAddress) { + if !c.isPortAllocated(p) { ports = append(ports, p) - if len(ports) > constants.VirtualClusterPortNum { - break - } } } return ports @@ -640,3 +601,86 @@ func (c *VirtualClusterInitController) AllocateHostPort(virtualCluster *v1alpha1 return 0, err } + +// AllocateVip allocate vip for virtual cluster +// #nosec G602 +func (c *VirtualClusterInitController) AllocateVip(virtualCluster *v1alpha1.VirtualCluster, vipPool *VipPool) error { + c.lock.Lock() + defer c.lock.Unlock() + if len(virtualCluster.Status.VipMap) > 0 { + return nil + } + klog.V(4).InfoS("get vip pool", "vipPool", vipPool) + + vcList := &v1alpha1.VirtualClusterList{} + err := c.List(context.Background(), vcList) + if err != nil { + klog.Errorf("list virtual cluster error: %v", err) + return err + } + var allocatedVips []string + for _, vc := range vcList.Items { + for _, val := range vc.Status.VipMap { + allocatedVips = append(allocatedVips, val) + } + } + + vip, err := util.FindAvailableIP(vipPool.Vips, allocatedVips) + if err != nil { + klog.Errorf("find available vip error: %v", err) + return err + } + virtualCluster.Status.VipMap = make(map[string]string) + virtualCluster.Status.VipMap[constants.VcVipStatusKey] = vip + + return err +} + +func (c *VirtualClusterInitController) labelNode(client cs.Interface) (reps int, err error) { + replicas := constants.VipKeepAlivedReplicas + nodes, err := client.CoreV1().Nodes().List(context.TODO(), metav1.ListOptions{}) + if err != nil { + return 0, fmt.Errorf("failed to list nodes, err: %w", err) + } + if len(nodes.Items) == 0 { + return 0, fmt.Errorf("no nodes found") + } + reps = replicas + // select replicas nodes + if replicas > len(nodes.Items) { + reps = len(nodes.Items) + } + randomIndex, err := util.SecureRandomInt(reps) + if err != nil { + klog.Errorf("failed to get random index for master node, err: %v", err) + return 0, err + } + // sub reps as nodes + subNodes := nodes.Items[:reps] + masterNode := nodes.Items[randomIndex] + + // label node + for _, node := range subNodes { + currentNode := node + labels := currentNode.GetLabels() + if currentNode.Name == masterNode.Name { + // label master + labels[constants.VipKeepAlivedNodeRoleKey] = constants.VipKeepAlivedNodeRoleMaster + } else { + // label backup + labels[constants.VipKeepAlivedNodeRoleKey] = constants.VipKeepalivedNodeRoleBackup + } + labels[constants.VipKeepAlivedNodeLabelKey] = constants.VipKeepAlivedNodeLabelValue + + // update label + currentNode.SetLabels(labels) + _, err := client.CoreV1().Nodes().Update(context.TODO(), ¤tNode, metav1.UpdateOptions{}) + if err != nil { + klog.V(2).Infof("Failed to update labels for node %s: %v", currentNode.Name, err) + return 0, err + } + klog.V(2).Infof("Successfully updated labels for node %s", currentNode.Name) + } + klog.V(2).InfoS("[vip] Successfully label all node") + return reps, nil +} diff --git a/pkg/kubenest/init.go b/pkg/kubenest/init.go index 4155dedcb..5e1cb334f 100644 --- a/pkg/kubenest/init.go +++ b/pkg/kubenest/init.go @@ -38,6 +38,7 @@ type initData struct { hostPort int32 hostPortMap map[string]int32 kubeNestOptions *ko.KubeNestOptions + vipMap map[string]string virtualCluster *v1alpha1.VirtualCluster ETCDStorageClass string ETCDUnitSize string @@ -187,6 +188,7 @@ func newRunData(opt *InitOptions) (*initData, error) { externalIP: opt.virtualCluster.Spec.ExternalIP, hostPort: opt.virtualCluster.Status.Port, hostPortMap: opt.virtualCluster.Status.PortMap, + vipMap: opt.virtualCluster.Status.VipMap, kubeNestOptions: opt.KubeNestOptions, virtualCluster: opt.virtualCluster, ETCDUnitSize: opt.KubeNestOptions.ETCDUnitSize, @@ -258,6 +260,10 @@ func (i initData) HostPortMap() map[string]int32 { return i.hostPortMap } +func (i initData) VipMap() map[string]string { + return i.vipMap +} + func (i initData) DynamicClient() *dynamic.DynamicClient { return i.dynamicClient } diff --git a/pkg/kubenest/tasks/anp.go b/pkg/kubenest/tasks/anp.go index f943e0ee0..922f324af 100644 --- a/pkg/kubenest/tasks/anp.go +++ b/pkg/kubenest/tasks/anp.go @@ -311,7 +311,7 @@ func getVcDynamicClient(client clientset.Interface, name, namespace string) (dyn } return dynamicClient, nil } -func getVcClientset(client clientset.Interface, name, namespace string) (clientset.Interface, error) { +func GetVcClientset(client clientset.Interface, name, namespace string) (clientset.Interface, error) { secret, err := client.CoreV1().Secrets(namespace).Get(context.TODO(), fmt.Sprintf("%s-%s", name, constants.AdminConfig), metav1.GetOptions{}) if err != nil { @@ -346,7 +346,7 @@ func runUploadProxyAgentCert(r workflow.RunData) error { certsData[c.CertName()] = c.CertData() } } - vcClient, err := getVcClientset(data.RemoteClient(), name, namespace) + vcClient, err := GetVcClientset(data.RemoteClient(), name, namespace) if err != nil { return fmt.Errorf("failed to get virtual cluster client, err: %w", err) } diff --git a/pkg/kubenest/tasks/cert.go b/pkg/kubenest/tasks/cert.go index 266170ab5..832a153fa 100644 --- a/pkg/kubenest/tasks/cert.go +++ b/pkg/kubenest/tasks/cert.go @@ -135,6 +135,7 @@ func mutateCertConfig(data InitData, cc *cert.CertConfig) error { ControlplaneAddr: data.ControlplaneAddress(), ClusterIps: data.ServiceClusterIp(), ExternalIP: data.ExternalIP(), + VipMap: data.VipMap(), }, cc) if err != nil { return err diff --git a/pkg/kubenest/tasks/data.go b/pkg/kubenest/tasks/data.go index edf63616f..af0e20704 100644 --- a/pkg/kubenest/tasks/data.go +++ b/pkg/kubenest/tasks/data.go @@ -23,6 +23,7 @@ type InitData interface { ExternalIP() string HostPort() int32 HostPortMap() map[string]int32 + VipMap() map[string]string DynamicClient() *dynamic.DynamicClient KubeNestOpt() *ko.KubeNestOptions PluginOptions() map[string]string diff --git a/pkg/kubenest/tasks/manifests_components.go b/pkg/kubenest/tasks/manifests_components.go index a68cf846f..c0ae2fe47 100644 --- a/pkg/kubenest/tasks/manifests_components.go +++ b/pkg/kubenest/tasks/manifests_components.go @@ -59,7 +59,7 @@ func applyComponentsManifests(r workflow.RunData) error { if !ok { return errors.New("Virtual cluster manifests-components task invoked with an invalid data struct") } - + keepalivedReplicas := constants.VipKeepAlivedReplicas secret, err := data.RemoteClient().CoreV1().Secrets(data.GetNamespace()).Get(context.TODO(), fmt.Sprintf("%s-%s", data.GetName(), constants.AdminConfig), metav1.GetOptions{}) if err != nil { @@ -83,9 +83,17 @@ func applyComponentsManifests(r workflow.RunData) error { templatedMapping["KUBE_PROXY_KUBECONFIG"] = string(secret.Data[constants.KubeConfig]) imageRepository, _ := util.GetImageMessage() templatedMapping["ImageRepository"] = imageRepository - - for k, v := range data.PluginOptions() { - templatedMapping[k] = v + if data.VipMap() != nil && data.VipMap()[constants.VcVipStatusKey] != "" { + templatedMapping["Vip"] = data.VipMap()[constants.VcVipStatusKey] + // use min replicas + nodeCount := data.VirtualCluster().Spec.PromotePolicies[0].NodeCount + if nodeCount < constants.VipKeepAlivedReplicas { + keepalivedReplicas = int(nodeCount) + } + for k, v := range data.PluginOptions() { + templatedMapping[k] = v + } + templatedMapping["KeepalivedReplicas"] = keepalivedReplicas } for _, component := range components { diff --git a/pkg/kubenest/util/cert/certs.go b/pkg/kubenest/util/cert/certs.go index 908d29c4b..087cedae8 100644 --- a/pkg/kubenest/util/cert/certs.go +++ b/pkg/kubenest/util/cert/certs.go @@ -43,6 +43,7 @@ type AltNamesMutatorConfig struct { ControlplaneAddr string ClusterIps []string ExternalIP string + VipMap map[string]string } func (config *CertConfig) defaultPublicKeyAlgorithm() { @@ -273,6 +274,11 @@ func apiServerAltNamesMutator(cfg *AltNamesMutatorConfig) (*certutil.AltNames, e if len(cfg.ExternalIP) > 0 { appendSANsToAltNames(altNames, []string{cfg.ExternalIP}) } + if len(cfg.VipMap) > 0 { + for _, vip := range cfg.VipMap { + appendSANsToAltNames(altNames, []string{vip}) + } + } if len(cfg.ClusterIps) > 0 { for _, clusterIp := range cfg.ClusterIps { appendSANsToAltNames(altNames, []string{clusterIp}) diff --git a/pkg/kubenest/util/util.go b/pkg/kubenest/util/util.go index dacf2fa30..ee0f0535b 100644 --- a/pkg/kubenest/util/util.go +++ b/pkg/kubenest/util/util.go @@ -1,8 +1,10 @@ package util import ( + "crypto/rand" "encoding/base64" "fmt" + "math/big" "net" "strings" @@ -88,3 +90,127 @@ func IPV6First(ipNetStr string) (bool, error) { } return utils.IsIPv6(ipNetStrArray[0]), nil } + +// parseCIDR returns a channel that generates IP addresses in the CIDR range. +func parseCIDR(cidr string) (chan string, error) { + ip, ipnet, err := net.ParseCIDR(cidr) + if err != nil { + return nil, err + } + ch := make(chan string) + go func() { + for ip := ip.Mask(ipnet.Mask); ipnet.Contains(ip); inc(ip) { + ch <- ip.String() + } + close(ch) + }() + return ch, nil +} + +// inc increments an IP address. +func inc(ip net.IP) { + for j := len(ip) - 1; j >= 0; j-- { + ip[j]++ + if ip[j] > 0 { + break + } + } +} + +// parseRange returns a channel that generates IP addresses in the range. +func parseRange(ipRange string) (chan string, error) { + parts := strings.Split(ipRange, "-") + if len(parts) != 2 { + return nil, fmt.Errorf("invalid IP range format: %s", ipRange) + } + startIP := net.ParseIP(parts[0]) + endIP := net.ParseIP(parts[1]) + if startIP == nil || endIP == nil { + return nil, fmt.Errorf("invalid IP address in range: %s", ipRange) + } + + ch := make(chan string) + go func() { + for ip := startIP; !ip.Equal(endIP); inc(ip) { + ch <- ip.String() + } + ch <- endIP.String() + close(ch) + }() + return ch, nil +} + +// ParseVIPPool returns a channel that generates IP addresses from the vipPool. +func parseVIPPool(vipPool []string) (chan string, error) { + ch := make(chan string) + go func() { + defer close(ch) + for _, entry := range vipPool { + entry = strings.TrimSpace(entry) + var ipCh chan string + var err error + if strings.Contains(entry, "/") { + ipCh, err = parseCIDR(entry) + } else if strings.Contains(entry, "-") { + ipCh, err = parseRange(entry) + } else { + ip := net.ParseIP(entry) + if ip == nil { + err = fmt.Errorf("invalid IP address: %s", entry) + } else { + ipCh = make(chan string, 1) + ipCh <- entry + close(ipCh) + } + } + if err != nil { + fmt.Println("Error:", err) + return + } + for ip := range ipCh { + ch <- ip + } + } + }() + return ch, nil +} + +// FindAvailableIP finds an available IP address from vipPool that is not in allocatedVips. +func FindAvailableIP(vipPool, allocatedVips []string) (string, error) { + allocatedSet := make(map[string]struct{}) + for _, ip := range allocatedVips { + allocatedSet[ip] = struct{}{} + } + + ipCh, err := parseVIPPool(vipPool) + if err != nil { + return "", err + } + + for ip := range ipCh { + if _, allocated := allocatedSet[ip]; !allocated { + return ip, nil + } + } + + return "", fmt.Errorf("no available IP addresses") +} + +// Seed the random number generator using crypto/rand +func SecureRandomInt(n int) (int, error) { + bigN := big.NewInt(int64(n)) + randInt, err := rand.Int(rand.Reader, bigN) + if err != nil { + return 0, err + } + return int(randInt.Int64()), nil +} + +func MapContains(big map[string]string, small map[string]string) bool { + for k, v := range small { + if bigV, ok := big[k]; !ok || bigV != v { + return false + } + } + return true +} diff --git a/pkg/kubenest/util/util_test.go b/pkg/kubenest/util/util_test.go new file mode 100644 index 000000000..403785a25 --- /dev/null +++ b/pkg/kubenest/util/util_test.go @@ -0,0 +1,106 @@ +package util + +import ( + "fmt" + "testing" + + "gopkg.in/yaml.v3" +) + +func TestFindAvailableIP(t *testing.T) { + type args struct { + vipPool []string + allocatedVips []string + } + tests := []struct { + name string + args args + want string + wantErr bool + }{ + { + name: "test1", + args: args{ + vipPool: []string{"192.168.0.1", "192.168.0.2", "192.168.0.3"}, + allocatedVips: []string{"192.168.0.1", "192.168.0.2"}, + }, + want: "192.168.0.3", + wantErr: false, + }, + { + name: "test2", + args: args{ + vipPool: []string{ + "192.168.0.1", + "192.168.0.2-192.168.0.10", + "192.168.1.0/24", + "2001:db8::1", + "2001:db8::1-2001:db8::10", + "2001:db8::/64", + }, + allocatedVips: []string{"192.168.0.1", "192.168.0.2"}, + }, + want: "192.168.0.3", + wantErr: false, + }, + { + name: "test3", + args: args{ + vipPool: []string{ + "192.168.6.110-192.168.6.120", + }, + allocatedVips: []string{}, + }, + want: "192.168.6.110", + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := FindAvailableIP(tt.args.vipPool, tt.args.allocatedVips) + fmt.Printf("got vip : %v", got) + if (err != nil) != tt.wantErr { + t.Errorf("FindAvailableIP() error = %v, wantErr %v", err, tt.wantErr) + return + } + if got != tt.want { + t.Errorf("FindAvailableIP() got = %v, want %v", got, tt.want) + } + }) + } +} + +func TestFindAvailableIP2(t *testing.T) { + type HostPortPool struct { + PortsPool []int32 `yaml:"portsPool"` + } + type VipPool struct { + Vip []string `yaml:"vipPool"` + } + var vipPool VipPool + var hostPortPool HostPortPool + yamlData2 := ` +portsPool: + - 33001 + - 33002 + - 33003 + - 33004 + - 33005 + - 33006 + - 33007 + - 33008 + - 33009 + - 33010 +` + yamlData := ` +vipPool: + - 192.168.6.110-192.168.6.120 +` + if err := yaml.Unmarshal([]byte(yamlData), &vipPool); err != nil { + panic(err) + } + if err := yaml.Unmarshal([]byte(yamlData2), &hostPortPool); err != nil { + panic(err) + } + fmt.Printf("vipPool: %v", vipPool) +}