-
Notifications
You must be signed in to change notification settings - Fork 51
/
etcddb_server.go
151 lines (113 loc) · 3.4 KB
/
etcddb_server.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
package etcddb
import (
"errors"
"fmt"
"github.com/singnet/snet-daemon/v5/config"
"github.com/spf13/viper"
"go.etcd.io/etcd/server/v3/embed"
"go.uber.org/zap"
"net/url"
"time"
)
// EtcdServer struct has some useful methods to work with etcd server
type EtcdServer struct {
conf *EtcdServerConf
etcd *embed.Etcd
}
func (server EtcdServer) GetConf() *EtcdServerConf {
return server.conf
}
// IsEtcdServerEnabled checks that etcd server is enabled using conf file
func IsEtcdServerEnabled() (enabled bool, err error) {
return IsEtcdServerEnabledInVip(config.Vip())
}
// IsEtcdServerEnabledInVip checks that etcd server is enabled using viper conf
func IsEtcdServerEnabledInVip(vip *viper.Viper) (enabled bool, err error) {
conf, err := GetEtcdServerConf(vip)
if err != nil {
return
}
return conf.Enabled, nil
}
// GetEtcdServer returns EtcdServer in case it is defined in the viper config
// returns null if PAYMENT_CHANNEL_STORAGE property is not defined
// in the config file or the ENABLED field of the PAYMENT_CHANNEL_STORAGE
// is set to false
func GetEtcdServer() (server *EtcdServer, err error) {
return GetEtcdServerFromVip(config.Vip())
}
// GetEtcdServerFromVip run etcd server using viper config
func GetEtcdServerFromVip(vip *viper.Viper) (server *EtcdServer, err error) {
conf, err := GetEtcdServerConf(vip)
zap.L().Info("Getting payment channel storage sever", zap.Any("conf", conf))
if err != nil || conf == nil || !conf.Enabled {
return
}
server = &EtcdServer{conf: conf}
return
}
// Start starts etcd server
func (server *EtcdServer) Start() (err error) {
etcd, err := startEtcdServer(server.conf)
if err != nil {
return
}
server.etcd = etcd
return
}
// Close closes etcd server
func (server *EtcdServer) Close() {
server.etcd.Close()
}
// StartEtcdServer starts ectd server
// The method blocks until the server is started
// or failed by timeout
func startEtcdServer(conf *EtcdServerConf) (etcd *embed.Etcd, err error) {
etcdConf := getEtcdConf(conf)
etcd, err = embed.StartEtcd(etcdConf)
if err != nil {
return
}
select {
case <-etcd.Server.ReadyNotify():
case <-time.After(conf.StartupTimeout):
etcd.Server.Stop()
return nil, errors.New("etcd server took too long to start: " + conf.ID)
}
return
}
func getEtcdConf(conf *EtcdServerConf) *embed.Config {
clientURL := &url.URL{
Scheme: conf.Scheme,
Host: fmt.Sprintf("%s:%d", conf.Host, conf.ClientPort),
}
peerURL := &url.URL{
Scheme: conf.Scheme,
Host: fmt.Sprintf("%s:%d", conf.Host, conf.PeerPort),
}
zap.L().Info("Getting etcd config", zap.Any("PaymentChannelStorageServer", fmt.Sprintf("%+v", conf)),
zap.Any("ClientURL", clientURL),
zap.Any("PeerURL", clientURL))
etcdConf := embed.NewConfig()
etcdConf.Name = conf.ID
etcdConf.Dir = conf.DataDir
// --listen-client-urls
etcdConf.ListenClientUrls = []url.URL{*clientURL}
// --advertise-client-urls
etcdConf.AdvertiseClientUrls = []url.URL{*clientURL}
// --listen-peer-urls
etcdConf.ListenPeerUrls = []url.URL{*peerURL}
// --initial-advertise-peer-urls
etcdConf.AdvertisePeerUrls = []url.URL{*peerURL}
// --initial-cluster
etcdConf.InitialCluster = conf.Cluster
//--initial-cluster-token
etcdConf.InitialClusterToken = conf.Token
// --initial-cluster-state
etcdConf.ClusterState = embed.ClusterStateFlagNew
// --log-level
etcdConf.LogLevel = conf.LogLevel
// --log-outputs
etcdConf.LogOutputs = conf.LogOutputs
return etcdConf
}