Skip to content

Commit

Permalink
ensure replicas are fully up (#17)
Browse files Browse the repository at this point in the history
* ensure replicas are fully up

* cleanup logs
  • Loading branch information
stlava authored Jun 11, 2024
1 parent fcaa5f1 commit be5448e
Showing 1 changed file with 34 additions and 4 deletions.
38 changes: 34 additions & 4 deletions testbed/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func NewCluster(startport uint16) *Cluster {
cl.Node[3].DoSure("CLUSTER REPLICATE", cl.Node[0].NodeId)
cl.Node[4].DoSure("CLUSTER REPLICATE", cl.Node[1].NodeId)
cl.Node[5].DoSure("CLUSTER REPLICATE", cl.Node[2].NodeId)

cl.WaitClusterOk()

return cl
Expand All @@ -78,13 +79,13 @@ func (cl *Cluster) Start() {
// WaitClusterOk wait for cluster configuration to be stable.
func (cl *Cluster) WaitClusterOk() {
i := 0
t := time.AfterFunc(10*time.Second, func() { panic("cluster didn't stabilize") })
t := time.AfterFunc(30*time.Second, func() { panic("cluster didn't stabilize") })
defer t.Stop()
for !cl.ClusterOk() {
if i++; i == 10 {
if i++; i == 15 {
cl.AttemptFailover()
}
time.Sleep(100 * time.Millisecond)
time.Sleep(1000 * time.Millisecond)
}
}

Expand All @@ -97,27 +98,39 @@ func (cl *Cluster) ClusterOk() bool {
}
}
var hashsum uint64
var goodNode *Node

for i := range cl.Node {
if !cl.Node[i].RunningNow() {
continue
}

goodNode = &cl.Node[i]

// Check cluster state
res := cl.Node[i].Do("CLUSTER INFO")
buf, ok := res.([]byte)
if !ok {
return false
}

if !bytes.Contains(buf, []byte("cluster_state:ok")) {
return false
}

// Check replica link
res = cl.Node[i].Do("INFO REPLICATION")
buf, ok = res.([]byte)

if !ok {
return false
}
if !bytes.Contains(buf, []byte("role:master")) &&

if bytes.Contains(buf, []byte("role:slave")) &&
!bytes.Contains(buf, []byte("master_link_status:up")) {
return false
}

res = cl.Node[i].Do("CLUSTER NODES")
buf, ok = res.([]byte)
if !ok {
Expand Down Expand Up @@ -148,6 +161,23 @@ func (cl *Cluster) ClusterOk() bool {
}
hashsum = hash
}

// Ensure nodes seen by cluster slots match number of running nodes
resp := goodNode.Do("CLUSTER SLOTS")
ranges, err := redisclusterutil.ParseSlotsInfo(resp)
if err != nil {
return false
}

assignedNodes := 0
for _, r := range ranges {
assignedNodes += len(r.Addrs)
}

if assignedNodes < 6 && assignedNodes != len(cl.Node)-len(stopped) {
return false
}

return true
}

Expand Down

0 comments on commit be5448e

Please sign in to comment.