Skip to content

Commit

Permalink
version 1.5.2
Browse files Browse the repository at this point in the history
1. check max redirects
2. add monkey test
3. fix pinger buffer
  • Loading branch information
felixhao authored Dec 21, 2018
1 parent 14bc9bc commit be21c0f
Show file tree
Hide file tree
Showing 23 changed files with 665 additions and 21 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,8 @@
# Overlord

## Version 1.5.2
1. max redirects 5.

## Version 1.5.1
1. reset sub message only in nedd.

Expand Down
6 changes: 3 additions & 3 deletions cmd/proxy/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (

const (
// VERSION version
VERSION = "1.5.0"
VERSION = "1.5.2"
)

var (
Expand Down Expand Up @@ -160,12 +160,12 @@ func signalHandler() {
var ch = make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGHUP, syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT)
for {
log.Infof("overlord proxy version[%s] already started", VERSION)
log.Infof("overlord proxy version[%s] start serving", VERSION)
si := <-ch
log.Infof("overlord proxy version[%s] signal(%s) stop the process", VERSION, si.String())
switch si {
case syscall.SIGQUIT, syscall.SIGTERM, syscall.SIGINT:
log.Infof("overlord proxy version[%s] already exited", VERSION)
log.Infof("overlord proxy version[%s] exited", VERSION)
return
case syscall.SIGHUP:
default:
Expand Down
2 changes: 1 addition & 1 deletion codecov.sh
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ echo "" > coverage.txt

for d in $(go list ./... | grep -v vendor | grep -v cmd); do
echo "testing for $d ..."
go test -coverprofile=profile.out -covermode=atomic $d
go test -gcflags="-N -l" -coverprofile=profile.out -covermode=atomic $d
if [ -f profile.out ]; then
cat profile.out >> coverage.txt
rm profile.out
Expand Down
1 change: 1 addition & 0 deletions proto/memcache/binary/pinger.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (m *mcPinger) Ping() (err error) {
return
}
_ = m.br.Read()
defer m.br.AdvanceTo(0)
head, err := m.br.ReadExact(requestHeaderLen)
if err != nil {
err = errors.WithStack(err)
Expand Down
8 changes: 2 additions & 6 deletions proto/memcache/binary/pinger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,7 @@ func TestPingerPingEOF(t *testing.T) {
assert.NoError(t, err)

err = pinger.Ping()
assert.Error(t, err)

err = errors.Cause(err)
assert.Equal(t, bufio.ErrBufferFull, err)
assert.NoError(t, err)
}

func TestPingerPing100Ok(t *testing.T) {
Expand All @@ -41,8 +38,7 @@ func TestPingerPing100Ok(t *testing.T) {
}

err := pinger.Ping()
assert.Error(t, err)
_causeEqual(t, bufio.ErrBufferFull, err)
assert.NoError(t, err)
}

func TestPingerFlushErr(t *testing.T) {
Expand Down
1 change: 1 addition & 0 deletions proto/memcache/pinger.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ func (m *mcPinger) Ping() (err error) {
return
}
_ = m.br.Read()
defer m.br.AdvanceTo(0)
var b []byte
if b, err = m.br.ReadLine(); err != nil {
err = errors.WithStack(err)
Expand Down
12 changes: 3 additions & 9 deletions proto/memcache/pinger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,6 @@ package memcache
import (
"testing"

"overlord/lib/bufio"

"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)
Expand All @@ -17,18 +15,15 @@ func TestPingerPingOk(t *testing.T) {
assert.NoError(t, err)
}

func TestPingerPingEOF(t *testing.T) {
func TestPingerPingMore(t *testing.T) {
conn := _createConn(pongBytes)
pinger := NewPinger(conn)

err := pinger.Ping()
assert.NoError(t, err)

err = pinger.Ping()
assert.Error(t, err)

err = errors.Cause(err)
assert.Equal(t, bufio.ErrBufferFull, err)
assert.NoError(t, err)
}

func TestPingerPing100Ok(t *testing.T) {
Expand All @@ -41,8 +36,7 @@ func TestPingerPing100Ok(t *testing.T) {
}

err := pinger.Ping()
assert.Error(t, err)
_causeEqual(t, bufio.ErrBufferFull, err)
assert.NoError(t, err)
}

func TestPingerErr(t *testing.T) {
Expand Down
18 changes: 17 additions & 1 deletion proto/redis/cluster/node_conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

const (
respRedirect = '-'
maxRedirects = 5
)

var (
Expand All @@ -30,6 +31,8 @@ type nodeConn struct {

sb strings.Builder

redirects int

state int32
}

Expand Down Expand Up @@ -66,6 +69,12 @@ func (nc *nodeConn) Read(m *proto.Message) (err error) {
if reply.Type() != respRedirect {
return
}
if nc.redirects >= 5 { // NOTE: check max redirects
if log.V(4) {
log.Infof("Redis Cluster NodeConn key(%s) already max redirects", req.Key())
}
return
}
data := reply.Data()
if !bytes.HasPrefix(data, askBytes) && !bytes.HasPrefix(data, movedBytes) {
return
Expand All @@ -78,15 +87,22 @@ func (nc *nodeConn) Read(m *proto.Message) (err error) {
if err = nc.redirectProcess(m, req, addr, isAsk); err != nil && log.V(2) {
log.Errorf("Redis Cluster NodeConn redirectProcess addr:%s error:%v", addr, err)
}
nc.redirects = 0
return
}

func (nc *nodeConn) redirectProcess(m *proto.Message, req *redis.Request, addr string, isAsk bool) (err error) {
// next redirect
nc.redirects++
if log.V(5) {
log.Infof("Redis Cluster NodeConn key(%s) redirect count(%d)", req.Key(), nc.redirects)
}
// start redirect
nnc := newNodeConn(nc.c, addr)
tmp := nnc.(*nodeConn)
tmp.redirects = nc.redirects // NOTE: for check max redirects
rnc := tmp.nc.(*redis.NodeConn)
defer nnc.Close()
// rnc := rdt.nc
if isAsk {
if err = rnc.Bw().Write(askingResp); err != nil {
err = errors.WithStack(err)
Expand Down
91 changes: 91 additions & 0 deletions proto/redis/cluster/node_conn_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
package cluster

import (
"math/rand"
"reflect"
"testing"

"overlord/lib/bufio"
"overlord/proto"
"overlord/proto/redis"

"github.com/bouk/monkey"
"github.com/stretchr/testify/assert"
)

func TestNodeConnMaxRedirect(t *testing.T) {
monkey.Patch(newNodeConn, func(_ *cluster, addr string) proto.NodeConn {
return &nodeConn{
nc: &redis.NodeConn{},
}
})

nnc := newNodeConn(nil, "")
cnc := nnc.(*nodeConn)
rnc := cnc.nc.(*redis.NodeConn)
bw := &bufio.Writer{}

msgs := proto.GetMsgs(1)
msg := msgs[0]
req := &redis.Request{}
msg.WithRequest(req)
resp := &redis.RESP{}

// msg stub
monkey.PatchInstanceMethod(reflect.TypeOf(msg), "Request", func(*proto.Message) proto.Request {
return req
})
// nc stub
monkey.PatchInstanceMethod(reflect.TypeOf(rnc), "Read", func(_ *redis.NodeConn, _ *proto.Message) error {
return nil
})
monkey.PatchInstanceMethod(reflect.TypeOf(rnc), "Bw", func(_ *redis.NodeConn) *bufio.Writer {
return bw
})
monkey.PatchInstanceMethod(reflect.TypeOf(rnc), "Close", func(_ *redis.NodeConn) error {
return nil
})
// req stub
monkey.PatchInstanceMethod(reflect.TypeOf(req), "IsSupport", func(_ *redis.Request) bool {
return true
})
monkey.PatchInstanceMethod(reflect.TypeOf(req), "IsCtl", func(_ *redis.Request) bool {
return false
})
monkey.PatchInstanceMethod(reflect.TypeOf(req), "Reply", func(_ *redis.Request) *redis.RESP {
return resp
})
monkey.PatchInstanceMethod(reflect.TypeOf(req), "RESP", func(_ *redis.Request) *redis.RESP {
return resp
})
// resp stub
monkey.PatchInstanceMethod(reflect.TypeOf(resp), "Type", func(_ *redis.RESP) byte {
return '-'
})
monkey.PatchInstanceMethod(reflect.TypeOf(resp), "Data", func(_ *redis.RESP) []byte {
var (
s = [][]byte{
[]byte("ASK 1 127.0.0.1:31234"),
[]byte("MOVED 1 127.0.0.1:31234"),
}
)
return s[rand.Intn(2)]
})
monkey.PatchInstanceMethod(reflect.TypeOf(resp), "Encode", func(_ *redis.RESP, _ *bufio.Writer) error {
return nil
})
// bw stub
monkey.PatchInstanceMethod(reflect.TypeOf(bw), "Write", func(_ *bufio.Writer, _ []byte) error {
return nil
})
monkey.PatchInstanceMethod(reflect.TypeOf(bw), "Flush", func(_ *bufio.Writer) error {
return nil
})

for i := 0; i < 10; i++ {
err := nnc.Read(msg)
assert.NoError(t, err)
}

monkey.UnpatchAll()
}
116 changes: 116 additions & 0 deletions proto/redis/cluster/proxy_conn_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
package cluster

import (
"reflect"
"testing"

"overlord/lib/bufio"
"overlord/proto"
"overlord/proto/redis"

"github.com/bouk/monkey"
"github.com/pkg/errors"
"github.com/stretchr/testify/assert"
)

func TestProxyConn(t *testing.T) {
pc := &proxyConn{
c: &cluster{},
pc: &redis.ProxyConn{},
}
pc.c.fakeNodesBytes = []byte("fake nodes bytes")
pc.c.fakeSlotsBytes = []byte("fake slots bytes")
bw := &bufio.Writer{}
msgs := proto.GetMsgs(1)
msg := msgs[0]
req := &redis.Request{}
msg.WithRequest(req)
resp := &redis.RESP{}

// pc stub
monkey.PatchInstanceMethod(reflect.TypeOf(pc.pc), "Bw", func(_ *redis.ProxyConn) *bufio.Writer {
return bw
})
// msg stub
monkey.PatchInstanceMethod(reflect.TypeOf(msg), "IsBatch", func(*proto.Message) bool {
return false
})
monkey.PatchInstanceMethod(reflect.TypeOf(msg), "Request", func(*proto.Message) proto.Request {
return req
})
// req stub
monkey.PatchInstanceMethod(reflect.TypeOf(req), "IsSupport", func(_ *redis.Request) bool {
return false
})
monkey.PatchInstanceMethod(reflect.TypeOf(req), "IsCtl", func(_ *redis.Request) bool {
return false
})
monkey.PatchInstanceMethod(reflect.TypeOf(req), "RESP", func(_ *redis.Request) *redis.RESP {
return resp
})
// resp stub
monkey.PatchInstanceMethod(reflect.TypeOf(resp), "Array", func(_ *redis.RESP) []*redis.RESP {
return []*redis.RESP{resp, resp}
})

// case cmdNodesBytes
var count int
monkey.PatchInstanceMethod(reflect.TypeOf(resp), "Data", func(_ *redis.RESP) []byte {
if count == 0 {
count++
return cmdClusterBytes
}
return cmdNodesBytes
})
// bw stub
monkey.PatchInstanceMethod(reflect.TypeOf(bw), "Write", func(_ *bufio.Writer, bs []byte) error {
assert.Equal(t, pc.c.fakeNodesBytes, bs)
return nil
})
err := pc.Encode(msg)
assert.NoError(t, err)

// case cmdSlotsBytes
count = 0
monkey.PatchInstanceMethod(reflect.TypeOf(resp), "Data", func(_ *redis.RESP) []byte {
if count == 0 {
count++
return cmdClusterBytes
}
return cmdSlotsBytes
})
// bw stub
monkey.PatchInstanceMethod(reflect.TypeOf(bw), "Write", func(_ *bufio.Writer, bs []byte) error {
assert.Equal(t, pc.c.fakeSlotsBytes, bs)
return nil
})
err = pc.Encode(msg)
assert.NoError(t, err)

// case not support
count = 0
monkey.PatchInstanceMethod(reflect.TypeOf(resp), "Data", func(_ *redis.RESP) []byte {
if count == 0 {
count++
return cmdClusterBytes
}
return []byte("")
})
// bw stub
monkey.PatchInstanceMethod(reflect.TypeOf(bw), "Write", func(_ *bufio.Writer, bs []byte) error {
assert.Equal(t, notSupportBytes, bs)
return nil
})
err = pc.Encode(msg)
assert.NoError(t, err)

// resp stub
monkey.PatchInstanceMethod(reflect.TypeOf(resp), "Data", func(_ *redis.RESP) []byte {
return cmdClusterBytes
})
monkey.PatchInstanceMethod(reflect.TypeOf(resp), "Array", func(_ *redis.RESP) []*redis.RESP {
return []*redis.RESP{resp}
})
err = pc.Encode(msg)
assert.EqualError(t, errors.Cause(err), ErrInvalidArgument.Error())
}
Loading

0 comments on commit be21c0f

Please sign in to comment.