forked from shunfei/cronsun
-
Notifications
You must be signed in to change notification settings - Fork 0
/
node.go
164 lines (133 loc) · 3.79 KB
/
node.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
package cronsun
import (
"encoding/json"
"fmt"
"os"
"strconv"
"syscall"
"time"
client "github.com/coreos/etcd/clientv3"
"gopkg.in/mgo.v2"
"gopkg.in/mgo.v2/bson"
"github.com/shunfei/cronsun/conf"
"github.com/shunfei/cronsun/log"
)
const (
Coll_Node = "node"
)
// 执行 cron cmd 的进程
// 注册到 /cronsun/node/<id>
type Node struct {
ID string `bson:"_id" json:"id"` // machine id
PID string `bson:"pid" json:"pid"` // 进程 pid
PIDFile string `bson:"-" json:"-"`
IP string `bson:"ip" json:"ip"` // node ip
Hostname string `bson:"hostname" json:"hostname"`
Version string `bson:"version" json:"version"`
UpTime time.Time `bson:"up" json:"up"` // 启动时间
DownTime time.Time `bson:"down" json:"down"` // 上次关闭时间
Alived bool `bson:"alived" json:"alived"` // 是否可用
Connected bool `bson:"-" json:"connected"` // 当 Alived 为 true 时有效,表示心跳是否正常
}
func (n *Node) String() string {
return "node[" + n.ID + "] pid[" + n.PID + "]"
}
func (n *Node) Put(opts ...client.OpOption) (*client.PutResponse, error) {
return DefalutClient.Put(conf.Config.Node+n.ID, n.PID, opts...)
}
func (n *Node) Del() (*client.DeleteResponse, error) {
return DefalutClient.Delete(conf.Config.Node + n.ID)
}
// 判断 node 是否已注册到 etcd
// 存在则返回进行 pid,不存在返回 -1
func (n *Node) Exist() (pid int, err error) {
resp, err := DefalutClient.Get(conf.Config.Node + n.ID)
if err != nil {
return
}
if len(resp.Kvs) == 0 {
return -1, nil
}
if pid, err = strconv.Atoi(string(resp.Kvs[0].Value)); err != nil {
if _, err = DefalutClient.Delete(conf.Config.Node + n.ID); err != nil {
return
}
return -1, nil
}
p, err := os.FindProcess(pid)
if err != nil {
return -1, nil
}
// TODO: 暂时不考虑 linux/unix 以外的系统
if p != nil && p.Signal(syscall.Signal(0)) == nil {
return
}
return -1, nil
}
func GetNodes() (nodes []*Node, err error) {
return GetNodesBy(nil)
}
func GetNodesBy(query interface{}) (nodes []*Node, err error) {
err = mgoDB.WithC(Coll_Node, func(c *mgo.Collection) error {
return c.Find(query).All(&nodes)
})
return
}
func GetNodesByID(id string) (node *Node, err error) {
err = mgoDB.FindId(Coll_Node, id, &node)
return
}
func RemoveNode(query interface{}) error {
return mgoDB.WithC(Coll_Node, func(c *mgo.Collection) error {
return c.Remove(query)
})
}
func ISNodeAlive(id string) (bool, error) {
n := 0
err := mgoDB.WithC(Coll_Node, func(c *mgo.Collection) error {
var e error
n, e = c.Find(bson.M{"_id": id, "alived": true}).Count()
return e
})
return n > 0, err
}
func GetNodeGroups() (list []*Group, err error) {
resp, err := DefalutClient.Get(conf.Config.Group, client.WithPrefix(), client.WithSort(client.SortByKey, client.SortAscend))
if err != nil {
return
}
list = make([]*Group, 0, resp.Count)
for i := range resp.Kvs {
g := Group{}
err = json.Unmarshal(resp.Kvs[i].Value, &g)
if err != nil {
err = fmt.Errorf("node.GetGroups(key: %s) error: %s", string(resp.Kvs[i].Key), err.Error())
return
}
list = append(list, &g)
}
return
}
func WatchNode() client.WatchChan {
return DefalutClient.Watch(conf.Config.Node, client.WithPrefix())
}
// On 结点实例启动后,在 mongoDB 中记录存活信息
func (n *Node) On() {
n.Alived, n.Version, n.UpTime = true, Version, time.Now()
n.SyncToMgo()
}
// On 结点实例停用后,在 mongoDB 中去掉存活信息
func (n *Node) Down() {
n.Alived, n.DownTime = false, time.Now()
n.SyncToMgo()
}
func (n *Node) SyncToMgo() {
if err := mgoDB.Upsert(Coll_Node, bson.M{"_id": n.ID}, n); err != nil {
log.Errorf(err.Error())
}
}
// RmOldInfo remove old version(< 0.3.0) node info
func (n *Node) RmOldInfo() {
RemoveNode(bson.M{"_id": n.IP})
DefalutClient.Delete(conf.Config.Node + n.IP)
}