Skip to content

Commit

Permalink
ensure replicas are fully up
Browse files Browse the repository at this point in the history
  • Loading branch information
stlava committed Jun 11, 2024
1 parent fcaa5f1 commit 69963be
Showing 1 changed file with 37 additions and 4 deletions.
41 changes: 37 additions & 4 deletions testbed/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package testbed

import (
"bytes"
"fmt"
"log"
"time"

Expand Down Expand Up @@ -52,6 +53,7 @@ func NewCluster(startport uint16) *Cluster {
cl.Node[3].DoSure("CLUSTER REPLICATE", cl.Node[0].NodeId)
cl.Node[4].DoSure("CLUSTER REPLICATE", cl.Node[1].NodeId)
cl.Node[5].DoSure("CLUSTER REPLICATE", cl.Node[2].NodeId)

cl.WaitClusterOk()

return cl
Expand All @@ -78,13 +80,14 @@ func (cl *Cluster) Start() {
// WaitClusterOk wait for cluster configuration to be stable.
func (cl *Cluster) WaitClusterOk() {
i := 0
t := time.AfterFunc(10*time.Second, func() { panic("cluster didn't stabilize") })
t := time.AfterFunc(30*time.Second, func() { panic("cluster didn't stabilize") })
defer t.Stop()
for !cl.ClusterOk() {
if i++; i == 10 {
if i++; i == 15 {
cl.AttemptFailover()
fmt.Println("did failover")
}
time.Sleep(100 * time.Millisecond)
time.Sleep(1000 * time.Millisecond)
}
}

Expand All @@ -97,27 +100,39 @@ func (cl *Cluster) ClusterOk() bool {
}
}
var hashsum uint64
var goodNode *Node

for i := range cl.Node {
if !cl.Node[i].RunningNow() {
continue
}

goodNode = &cl.Node[i]

// Check cluster state
res := cl.Node[i].Do("CLUSTER INFO")
buf, ok := res.([]byte)
if !ok {
return false
}

if !bytes.Contains(buf, []byte("cluster_state:ok")) {
return false
}

// Check replica link
res = cl.Node[i].Do("INFO REPLICATION")
buf, ok = res.([]byte)

if !ok {
return false
}
if !bytes.Contains(buf, []byte("role:master")) &&

if bytes.Contains(buf, []byte("role:slave")) &&
!bytes.Contains(buf, []byte("master_link_status:up")) {
return false
}

res = cl.Node[i].Do("CLUSTER NODES")
buf, ok = res.([]byte)
if !ok {
Expand Down Expand Up @@ -148,6 +163,24 @@ func (cl *Cluster) ClusterOk() bool {
}
hashsum = hash
}

// Ensure nodes seen by cluster slots match number of running nodes
resp := goodNode.Do("CLUSTER SLOTS")
ranges, err := redisclusterutil.ParseSlotsInfo(resp)
if err != nil {
return false
}

assignedNodes := 0
for _, r := range ranges {
assignedNodes += len(r.Addrs)
}

if assignedNodes < 6 && assignedNodes != len(cl.Node)-len(stopped) {
fmt.Println("running nodes ver live nodes error", len(cl.Node)-len(stopped), assignedNodes)
return false
}

return true
}

Expand Down

0 comments on commit 69963be

Please sign in to comment.