Skip to content

Commit

Permalink
Trace validation test
Browse files Browse the repository at this point in the history
  • Loading branch information
joshuazh-x committed Jun 13, 2024
1 parent ee19bbe commit 3b47a57
Show file tree
Hide file tree
Showing 23 changed files with 2,753 additions and 5,079 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@ verify-mod-tidy:
verify-genproto:
PASSES="genproto" ./scripts/test.sh

.PHONY: trace-validation
trace-validation:
PASSES="trace-validation" ./scripts/test.sh

.PHONY: test
test:
PASSES="unit" ./scripts/test.sh $(GO_TEST_FLAGS)
2 changes: 2 additions & 0 deletions bootstrap.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,5 +76,7 @@ func (rn *RawNode) Bootstrap(peers []Peer) error {
for _, peer := range peers {
rn.raft.applyConfChange(pb.ConfChange{NodeID: peer.ID, Type: pb.ConfChangeAddNode}.AsV2())
}

traceBootstrap(rn.raft)
return nil
}
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,12 +9,14 @@ require (
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.5.4
github.com/stretchr/testify v1.9.0
go.uber.org/zap v1.27.0
)

require (
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/google/go-cmp v0.5.8 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
go.uber.org/multierr v1.10.0 // indirect
google.golang.org/protobuf v1.33.0 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,12 @@ github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsT
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
github.com/yuin/goldmark v1.2.1/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
go.uber.org/goleak v1.3.0 h1:2K3zAYmnTNqV73imy9J1T3WC+gmCePx2hEGkimedGto=
go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
go.uber.org/multierr v1.10.0 h1:S0h4aNzvfcFsC3dRF1jLoaov7oRaKqRGC/pUEJ2yvPQ=
go.uber.org/multierr v1.10.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y=
go.uber.org/zap v1.27.0 h1:aJMhYGrd5QSmlpLMr2MftRKl7t8J8PTZPA732ud/XR8=
go.uber.org/zap v1.27.0/go.mod h1:GB2qFLM7cTU87MWRP2mPIjqfIDnGu+VIO4V/SdhGo2E=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
Expand Down
4 changes: 2 additions & 2 deletions raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -1963,6 +1963,8 @@ func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState {
panic(err)
}

traceConfChangeEvent(cfg, r)

return r.switchToConfig(cfg, trk)
}

Expand All @@ -1973,8 +1975,6 @@ func (r *raft) applyConfChange(cc pb.ConfChangeV2) pb.ConfState {
//
// The inputs usually result from restoring a ConfState or applying a ConfChange.
func (r *raft) switchToConfig(cfg tracker.Config, trk tracker.ProgressMap) pb.ConfState {
traceConfChangeEvent(cfg, r)

r.trk.Config = cfg
r.trk.Progress = trk

Expand Down
274 changes: 274 additions & 0 deletions rafttest/cluster.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,274 @@
// Copyright 2024 The etcd Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package rafttest

import (
"context"
"fmt"
"time"

"go.etcd.io/raft/v3"
"go.etcd.io/raft/v3/raftpb"
)

type clusterConfig struct {

Check failure on line 26 in rafttest/cluster.go

View workflow job for this annotation

GitHub Actions / run

type `clusterConfig` is unused (unused)
size int
traceLogger raft.TraceLogger
tickInterval time.Duration
}

type endpoint struct {

Check failure on line 32 in rafttest/cluster.go

View workflow job for this annotation

GitHub Actions / run

type `endpoint` is unused (unused)
index int
node *node
}

type getEndpoint struct {

Check failure on line 37 in rafttest/cluster.go

View workflow job for this annotation

GitHub Actions / run

type `getEndpoint` is unused (unused)
i int
c chan endpoint
}

type cluster struct {

Check failure on line 42 in rafttest/cluster.go

View workflow job for this annotation

GitHub Actions / run

type `cluster` is unused (unused)
nodes map[uint64]*node
network *raftNetwork

stopc chan struct{}
removec chan uint64
addc chan uint64
getc chan getEndpoint
faultc chan func(*cluster)

traceLogger raft.TraceLogger
tickInterval time.Duration
}

func newCluster(c clusterConfig) *cluster {

Check failure on line 56 in rafttest/cluster.go

View workflow job for this annotation

GitHub Actions / run

func `newCluster` is unused (unused)
tickInterval := c.tickInterval
if tickInterval == 0 {
tickInterval = 100 * time.Millisecond
}
ids := make([]uint64, c.size)
peers := make([]raft.Peer, c.size)
for i := 0; i < c.size; i++ {
peers[i].ID = uint64(i + 1)
ids[i] = peers[i].ID
}
network := newRaftNetwork(ids...)
nodes := make(map[uint64]*node, c.size)
for i := 0; i < c.size; i++ {
nodes[uint64(i+1)] = startNodeWithConfig(nodeConfig{
id: uint64(i + 1),
peers: peers,
iface: network.nodeNetwork(uint64(i + 1)),
traceLogger: c.traceLogger,
tickInterval: tickInterval,
})
}

cl := &cluster{
nodes: nodes,
network: network,
stopc: make(chan struct{}),
removec: make(chan uint64),
addc: make(chan uint64),
getc: make(chan getEndpoint),
faultc: make(chan func(*cluster)),
traceLogger: c.traceLogger,
tickInterval: tickInterval,
}

cl.waitLeader()

go cl.mgmtLoop()

return cl
}

func (cl *cluster) mgmtLoop() {

Check failure on line 98 in rafttest/cluster.go

View workflow job for this annotation

GitHub Actions / run

func `(*cluster).mgmtLoop` is unused (unused)
peers := []raft.Peer{}
for _, n := range cl.nodes {
peers = append(peers, raft.Peer{ID: n.id, Context: []byte{}})
}
for {
select {
case id := <-cl.addc:
cl.network.changeFace(id, true)
peers = append(peers, raft.Peer{ID: id, Context: []byte{}})
node := startNodeWithConfig(nodeConfig{
id: id,
peers: nil,
iface: cl.network.nodeNetwork(id),
traceLogger: cl.traceLogger,
tickInterval: cl.tickInterval,
})
cl.nodes[id] = node
case id := <-cl.removec:
cl.network.changeFace(id, false)
cl.nodes[id].stop()
delete(cl.nodes, id)
for i, p := range peers {
if p.ID == id {
peers = append(peers[:i], peers[i+1:]...)
break
}
}
case gn := <-cl.getc:
i := gn.i % len(peers)
nid := peers[i].ID
gn.c <- endpoint{index: i, node: cl.nodes[nid]}
case <-cl.stopc:
for _, n := range cl.nodes {
n.stop()
}
close(cl.stopc)
return
case f := <-cl.faultc:
cl.network.clearFault()
for _, n := range cl.nodes {
if n.stopped {
n.restart()
}
}
f(cl)
}
}
}

func (cl *cluster) stop() {
cl.stopc <- struct{}{}
<-cl.stopc
}

func (cl *cluster) removeNode(id uint64) {
cl.removec <- id
}

func (cl *cluster) addNode(id uint64) {
cl.addc <- id
}

func (cl *cluster) newClient() *client {
return &client{cluster: cl, epc: make(chan endpoint)}
}

func (cl *cluster) waitLeader() uint64 {

Check failure on line 165 in rafttest/cluster.go

View workflow job for this annotation

GitHub Actions / run

func `(*cluster).waitLeader` is unused (unused)
var l map[uint64]struct{}
var lindex uint64

for {
l = make(map[uint64]struct{})

for i, n := range cl.nodes {
lead := n.Status().SoftState.Lead
if lead != 0 {
l[lead] = struct{}{}
if n.id == lead {
lindex = i
}
}
}

if len(l) == 1 {
return lindex
}
}
}

type client struct {
cluster *cluster
epi int
epc chan endpoint
}

func (cl *client) propose(ctx context.Context, data []byte) error {
ep := cl.getEndpoint()
return ep.Propose(ctx, data)
}

func (cl *client) addNode(ctx context.Context, n uint64) error {
change := raftpb.ConfChangeSingle{
Type: raftpb.ConfChangeAddNode,
NodeID: n,
}
cc := raftpb.ConfChangeV2{
Transition: 0,
Changes: []raftpb.ConfChangeSingle{change},
Context: []byte{},
}

ep := cl.getEndpoint()
if err := ep.ProposeConfChange(ctx, cc); err != nil {
return err
}

toc := time.After(cl.cluster.tickInterval * 50)
for {
select {
case <-toc:
return fmt.Errorf("addNode timeout")
default:
}
st := ep.Status()
if _, exist := st.Config.Voters[0][n]; exist {
break
}
}
cl.cluster.addNode(n)
return nil
}

func (cl *client) removeNode(ctx context.Context, n uint64) error {
change := raftpb.ConfChangeSingle{
Type: raftpb.ConfChangeRemoveNode,
NodeID: n,
}
cc := raftpb.ConfChangeV2{
Transition: 0,
Changes: []raftpb.ConfChangeSingle{change},
Context: []byte{},
}

ep := cl.getEndpoint()
if err := ep.ProposeConfChange(ctx, cc); err != nil {
return err
}

toc := time.After(cl.cluster.tickInterval * 50)
for {
select {
case <-toc:
return fmt.Errorf("removeNode timeout")
default:
}
st := ep.Status()
if _, exist := st.Config.Voters[0][n]; !exist {
break
}
}

cl.cluster.removeNode(n)
return nil
}

func (cl *client) getEndpoint() *node {
// round robin
ge := getEndpoint{
i: cl.epi + 1,
c: cl.epc,
}
cl.cluster.getc <- ge
ep := <-ge.c
cl.epi = ep.index
return ep.node
}
Loading

0 comments on commit 3b47a57

Please sign in to comment.