Skip to content

Commit

Permalink
custome etcd port
Browse files Browse the repository at this point in the history
Signed-off-by: pixiake <[email protected]>

(cherry picked from commit 1dd1e54)
Signed-off-by: pixiake <[email protected]>
  • Loading branch information
pixiake committed Jul 26, 2024
1 parent 5ca5c05 commit 19d546b
Show file tree
Hide file tree
Showing 6 changed files with 47 additions and 15 deletions.
18 changes: 18 additions & 0 deletions cmd/kk/apis/kubekey/v1alpha2/etcd_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type EtcdCluster struct {
Type string `yaml:"type" json:"type,omitempty"`
// ExternalEtcd describes how to connect to an external etcd cluster when type is set to external
External ExternalEtcd `yaml:"external" json:"external,omitempty"`
Port *int `yaml:"port" json:"port,omitempty"`
PeerPort *int `yaml:"peerPort" json:"peerPort,omitempty"`
BackupDir string `yaml:"backupDir" json:"backupDir,omitempty"`
BackupPeriod int `yaml:"backupPeriod" json:"backupPeriod,omitempty"`
KeepBackupNumber int `yaml:"keepBackupNumber" json:"keepBackupNumber,omitempty"`
Expand Down Expand Up @@ -57,3 +59,19 @@ type ExternalEtcd struct {
// KeyFile is an SSL key file used to secure etcd communication.
KeyFile string `yaml:"keyFile" json:"keyFile,omitempty"`
}

// GetPort returns the port of etcd cluster
func (e *EtcdCluster) GetPort() int {
if e.Port == nil {
return 2379
}
return *e.Port
}

// GetPeerPort returns the peer port of etcd cluster
func (e *EtcdCluster) GetPeerPort() int {
if e.PeerPort == nil {
return 2380
}
return *e.PeerPort
}
28 changes: 21 additions & 7 deletions cmd/kk/pkg/etcd/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,12 @@ func (g *GetStatus) Execute(runtime connector.Runtime) error {

if v, ok := g.PipelineCache.Get(common.ETCDCluster); ok {
c := v.(*EtcdCluster)
c.peerAddresses = append(c.peerAddresses, fmt.Sprintf("%s=https://%s:2380", etcdName, host.GetInternalIPv4Address()))
c.peerAddresses = append(c.peerAddresses, fmt.Sprintf("%s=https://%s:%d", etcdName, host.GetInternalIPv4Address(), g.KubeConf.Cluster.Etcd.GetPeerPort()))
c.clusterExist = true
// type: *EtcdCluster
g.PipelineCache.Set(common.ETCDCluster, c)
} else {
cluster.peerAddresses = append(cluster.peerAddresses, fmt.Sprintf("%s=https://%s:2380", etcdName, host.GetInternalIPv4Address()))
cluster.peerAddresses = append(cluster.peerAddresses, fmt.Sprintf("%s=https://%s:%d", etcdName, host.GetInternalIPv4Address(), g.KubeConf.Cluster.Etcd.GetPeerPort()))
cluster.clusterExist = true
g.PipelineCache.Set(common.ETCDCluster, cluster)
}
Expand Down Expand Up @@ -169,7 +169,7 @@ type GenerateAccessAddress struct {
func (g *GenerateAccessAddress) Execute(runtime connector.Runtime) error {
var addrList []string
for _, host := range runtime.GetHostsByRole(common.ETCD) {
addrList = append(addrList, fmt.Sprintf("https://%s:2379", host.GetInternalIPv4Address()))
addrList = append(addrList, fmt.Sprintf("https://%s:%d", host.GetInternalIPv4Address(), g.KubeConf.Cluster.Etcd.GetPort()))
}

accessAddresses := strings.Join(addrList, ",")
Expand Down Expand Up @@ -227,7 +227,17 @@ func (g *GenerateConfig) Execute(runtime connector.Runtime) error {
if v, ok := g.PipelineCache.Get(common.ETCDCluster); ok {
cluster := v.(*EtcdCluster)

cluster.peerAddresses = append(cluster.peerAddresses, fmt.Sprintf("%s=https://%s:2380", etcdName, host.GetInternalIPv4Address()))
peerAddressesMap := make(map[string]string, len(cluster.peerAddresses))
for _, v := range cluster.peerAddresses {
peerAddressesMap[v] = v
}

newPeerAddress := fmt.Sprintf("%s=https://%s:%d", etcdName, host.GetInternalIPv4Address(), g.KubeConf.Cluster.Etcd.GetPeerPort())

if _, ok := peerAddressesMap[newPeerAddress]; !ok {
cluster.peerAddresses = append(cluster.peerAddresses, newPeerAddress)
}

g.PipelineCache.Set(common.ETCDCluster, cluster)

if !cluster.clusterExist {
Expand Down Expand Up @@ -276,8 +286,10 @@ func (r *RefreshConfig) Execute(runtime connector.Runtime) error {
return err
}
}

return nil
}

return errors.New("get etcd cluster status by pipeline cache failed")
}

Expand All @@ -297,6 +309,8 @@ func refreshConfig(KubeConf *common.KubeConf, runtime connector.Runtime, endpoin
"Name": etcdName,
"Ip": host.GetInternalIPv4Address(),
"Hostname": host.GetName(),
"Port": KubeConf.Cluster.Etcd.GetPort(),
"PeerPort": KubeConf.Cluster.Etcd.GetPeerPort(),
"State": state,
"PeerAddresses": strings.Join(endpoints, ","),
"UnsupportedArch": UnsupportedArch,
Expand Down Expand Up @@ -341,7 +355,7 @@ func (j *JoinMember) Execute(runtime connector.Runtime) error {
"export ETCDCTL_CA_FILE='/etc/ssl/etcd/ssl/ca.pem';"+
"%s/etcdctl --endpoints=%s member add %s %s",
host.GetName(), host.GetName(), common.BinDir, cluster.accessAddresses, etcdName,
fmt.Sprintf("https://%s:2380", host.GetInternalIPv4Address()))
fmt.Sprintf("https://%s:%d", host.GetInternalIPv4Address(), j.KubeConf.Cluster.Etcd.GetPeerPort()))

if _, err := runtime.GetRunner().SudoCmd(joinMemberCmd, true); err != nil {
return errors.Wrap(errors.WithStack(err), "add etcd member failed")
Expand Down Expand Up @@ -375,7 +389,7 @@ func (c *CheckMember) Execute(runtime connector.Runtime) error {
if err != nil {
return errors.Wrap(errors.WithStack(err), "list etcd member failed")
}
if !strings.Contains(memberList, fmt.Sprintf("https://%s:2379", host.GetInternalIPv4Address())) {
if !strings.Contains(memberList, fmt.Sprintf("https://%s:%d", host.GetInternalIPv4Address(), c.KubeConf.Cluster.Etcd.GetPort())) {
return errors.Wrap(errors.WithStack(err), "add etcd member failed")
}
} else {
Expand Down Expand Up @@ -405,7 +419,7 @@ func (b *BackupETCD) Execute(runtime connector.Runtime) error {
Dst: filepath.Join(b.KubeConf.Cluster.Etcd.BackupScriptDir, "etcd-backup.sh"),
Data: util.Data{
"Hostname": runtime.RemoteHost().GetName(),
"Etcdendpoint": fmt.Sprintf("https://%s:2379", runtime.RemoteHost().GetInternalIPv4Address()),
"Etcdendpoint": fmt.Sprintf("https://%s:%d", runtime.RemoteHost().GetInternalIPv4Address(), b.KubeConf.Cluster.Etcd.GetPort()),
"DataDir": b.KubeConf.Cluster.Etcd.DataDir,
"Backupdir": b.KubeConf.Cluster.Etcd.BackupDir,
"KeepbackupNumber": b.KubeConf.Cluster.Etcd.KeepBackupNumber + 1,
Expand Down
10 changes: 5 additions & 5 deletions cmd/kk/pkg/etcd/templates/etcd_env.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,13 @@ ETCD_DATA_DIR={{ .DataDir }}
{{- else }}
ETCD_DATA_DIR=/var/lib/etcd
{{- end }}
ETCD_ADVERTISE_CLIENT_URLS=https://{{ .Ip }}:2379
ETCD_INITIAL_ADVERTISE_PEER_URLS=https://{{ .Ip }}:2380
ETCD_ADVERTISE_CLIENT_URLS=https://{{ .Ip }}:{{ .Port }}
ETCD_INITIAL_ADVERTISE_PEER_URLS=https://{{ .Ip }}:{{ .PeerPort }}
ETCD_INITIAL_CLUSTER_STATE={{ .State }}
ETCD_METRICS=basic
ETCD_LISTEN_CLIENT_URLS=https://{{ .Ip }}:2379,https://127.0.0.1:2379
ETCD_LISTEN_CLIENT_URLS=https://{{ .Ip }}:{{ .Port }},https://127.0.0.1:{{ .Port }}
ETCD_INITIAL_CLUSTER_TOKEN=k8s_etcd
ETCD_LISTEN_PEER_URLS=https://{{ .Ip }}:2380
ETCD_LISTEN_PEER_URLS=https://{{ .Ip }}:{{ .PeerPort }}
ETCD_NAME={{ .Name }}
ETCD_PROXY=off
ETCD_ENABLE_V2=true
Expand Down Expand Up @@ -95,7 +95,7 @@ ETCD_PEER_KEY_FILE=/etc/ssl/etcd/ssl/member-{{ .Hostname }}-key.pem
ETCD_PEER_CLIENT_CERT_AUTH=true
# CLI settings
ETCDCTL_ENDPOINTS=https://127.0.0.1:2379
ETCDCTL_ENDPOINTS=https://127.0.0.1:{{ .Port }}
ETCDCTL_CACERT=/etc/ssl/etcd/ssl/ca.pem
ETCDCTL_KEY=/etc/ssl/etcd/ssl/admin-{{ .Hostname }}-key.pem
ETCDCTL_CERT=/etc/ssl/etcd/ssl/admin-{{ .Hostname }}.pem
Expand Down
2 changes: 1 addition & 1 deletion cmd/kk/pkg/k3s/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ func (g *GenerateK3sServiceEnv) Execute(runtime connector.Runtime) error {
}
default:
for _, node := range runtime.GetHostsByRole(common.ETCD) {
endpoint := fmt.Sprintf("https://%s:%s", node.GetInternalIPv4Address(), kubekeyapiv1alpha2.DefaultEtcdPort)
endpoint := fmt.Sprintf("https://%s:%d", node.GetInternalIPv4Address(), g.KubeConf.Cluster.Etcd.GetPort())
endpointsList = append(endpointsList, endpoint)
}
externalEtcd.Endpoints = endpointsList
Expand Down
2 changes: 1 addition & 1 deletion cmd/kk/pkg/k8e/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (g *GenerateK8eServiceEnv) Execute(runtime connector.Runtime) error {
}
default:
for _, node := range runtime.GetHostsByRole(common.ETCD) {
endpoint := fmt.Sprintf("https://%s:%s", node.GetInternalIPv4Address(), kubekeyapiv1alpha2.DefaultEtcdPort)
endpoint := fmt.Sprintf("https://%s:%d", node.GetInternalIPv4Address(), g.KubeConf.Cluster.Etcd.GetPort())
endpointsList = append(endpointsList, endpoint)
}
externalEtcd.Endpoints = endpointsList
Expand Down
2 changes: 1 addition & 1 deletion cmd/kk/pkg/kubernetes/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func (g *GenerateKubeadmConfig) Execute(runtime connector.Runtime) error {
switch g.KubeConf.Cluster.Etcd.Type {
case kubekeyv1alpha2.KubeKey:
for _, host := range runtime.GetHostsByRole(common.ETCD) {
endpoint := fmt.Sprintf("https://%s:%s", host.GetInternalIPv4Address(), kubekeyv1alpha2.DefaultEtcdPort)
endpoint := fmt.Sprintf("https://%s:%d", host.GetInternalIPv4Address(), g.KubeConf.Cluster.Etcd.GetPort())
endpointsList = append(endpointsList, endpoint)
}
externalEtcd.Endpoints = endpointsList
Expand Down

0 comments on commit 19d546b

Please sign in to comment.