From 02580c973abb08391da0a0e56242aac08fe51a7f Mon Sep 17 00:00:00 2001 From: Felix Hao Date: Wed, 2 Jan 2019 14:29:42 +0800 Subject: [PATCH] Version 1.5.3 (#42) * fix pipe read * fix unit test * fix changelog typo --- CHANGELOG.md | 5 +++- proto/memcache/binary/node_conn.go | 2 +- proto/memcache/node_conn.go | 3 +-- proto/memcache/proxy_conn.go | 14 +++++----- proto/memcache/proxy_conn_test.go | 1 + proto/memcache/request.go | 4 +-- proto/memcache/request_test.go | 4 +-- proto/pipe.go | 5 +++- proto/pipe_test.go | 42 +++++++++++++++++++++++++++--- 9 files changed, 61 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 279f0d38..a1a2583c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,10 +1,13 @@ # Overlord +## Version 1.5.3 +1. fix pipe read when one err. + ## Version 1.5.2 1. max redirects 5. ## Version 1.5.1 -1. reset sub message only in nedd. +1. reset sub message only in need. ## Version 1.5.0 1. refactor message pipeline. diff --git a/proto/memcache/binary/node_conn.go b/proto/memcache/binary/node_conn.go index 1a1512c1..85c8dcd4 100644 --- a/proto/memcache/binary/node_conn.go +++ b/proto/memcache/binary/node_conn.go @@ -91,6 +91,7 @@ func (n *nodeConn) Read(m *proto.Message) (err error) { err = errors.WithStack(ErrAssertReq) return } + mcr.data = mcr.data[:0] REREAD: var bs []byte if bs, err = n.br.ReadExact(requestHeaderLen); err == bufio.ErrBufferFull { @@ -120,7 +121,6 @@ REREADData: err = errors.WithStack(err) return } - mcr.data = mcr.data[:0] mcr.data = append(mcr.data, data...) return } diff --git a/proto/memcache/node_conn.go b/proto/memcache/node_conn.go index cba2a1b4..f3e3583e 100644 --- a/proto/memcache/node_conn.go +++ b/proto/memcache/node_conn.go @@ -84,6 +84,7 @@ func (n *nodeConn) Read(m *proto.Message) (err error) { err = errors.WithStack(ErrAssertReq) return } + mcr.data = mcr.data[:0] REREAD: var bs []byte if bs, err = n.br.ReadLine(); err == bufio.ErrBufferFull { @@ -97,7 +98,6 @@ REREAD: return } if _, ok := withValueTypes[mcr.rTp]; !ok || bytes.Equal(bs, endBytes) || bytes.Equal(bs, errorBytes) { - mcr.data = mcr.data[:0] mcr.data = append(mcr.data, bs...) return } @@ -119,7 +119,6 @@ REREADData: err = errors.WithStack(err) return } - mcr.data = mcr.data[:0] mcr.data = append(mcr.data, bs...) mcr.data = append(mcr.data, data...) return diff --git a/proto/memcache/proxy_conn.go b/proto/memcache/proxy_conn.go index e2744f22..fed5f399 100644 --- a/proto/memcache/proxy_conn.go +++ b/proto/memcache/proxy_conn.go @@ -340,14 +340,14 @@ func revSpacIdx(bs []byte) int { // Encode encode response and write into writer. func (p *proxyConn) Encode(m *proto.Message) (err error) { + if me := m.Err(); me != nil { + se := errors.Cause(me).Error() + _ = p.bw.Write(serverErrorBytes) + _ = p.bw.Write([]byte(se)) + err = p.bw.Write(crlfBytes) + return + } if !m.IsBatch() { - if me := m.Err(); me != nil { - se := errors.Cause(me).Error() - _ = p.bw.Write(serverErrorBytes) - _ = p.bw.Write([]byte(se)) - err = p.bw.Write(crlfBytes) - return - } mcr, ok := m.Request().(*MCRequest) if !ok { _ = p.bw.Write(serverErrorBytes) diff --git a/proto/memcache/proxy_conn_test.go b/proto/memcache/proxy_conn_test.go index 8a4714fe..eb4c0ce8 100644 --- a/proto/memcache/proxy_conn_test.go +++ b/proto/memcache/proxy_conn_test.go @@ -236,6 +236,7 @@ func TestEncodeErr(t *testing.T) { msg.Type = proto.CacheTypeMemcache msg.WithRequest(&mockReq{}) msg.WithRequest(&mockReq{}) // NOTE: batch + msg.Batch() err = p.Encode(msg) assert.NoError(t, err) diff --git a/proto/memcache/request.go b/proto/memcache/request.go index ef0af30e..5907788f 100644 --- a/proto/memcache/request.go +++ b/proto/memcache/request.go @@ -226,9 +226,9 @@ func NewReq() *MCRequest { // Put put req back to pool. func (r *MCRequest) Put() { - r.data = nil r.rTp = RequestTypeUnknown - r.key = nil + r.key = r.key[:0] + r.data = r.data[:0] msgPool.Put(r) } diff --git a/proto/memcache/request_test.go b/proto/memcache/request_test.go index f0b741a3..b7948c52 100644 --- a/proto/memcache/request_test.go +++ b/proto/memcache/request_test.go @@ -46,6 +46,6 @@ func TestMCRequestFuncsOk(t *testing.T) { req.Put() assert.Equal(t, RequestTypeUnknown, req.rTp) - assert.Nil(t, req.key) - assert.Nil(t, req.data) + assert.Equal(t, []byte{}, req.key) + assert.Equal(t, []byte{}, req.data) } diff --git a/proto/pipe.go b/proto/pipe.go index 6ea54a7f..a11f1c31 100644 --- a/proto/pipe.go +++ b/proto/pipe.go @@ -148,7 +148,10 @@ func (mp *msgPipe) pipe() { } var rerr error for i := 0; i < mp.count; i++ { - if rerr = nc.Read(mp.batch[i]); rerr != nil { + if rerr == nil { + rerr = nc.Read(mp.batch[i]) + } // NOTE: no else!!! + if rerr != nil { mp.batch[i].WithError(rerr) } mp.batch[i].Done() diff --git a/proto/pipe_test.go b/proto/pipe_test.go index ba4a6c08..f0d2054b 100644 --- a/proto/pipe_test.go +++ b/proto/pipe_test.go @@ -2,6 +2,7 @@ package proto import ( "crypto/rand" + "errors" "sync" "testing" "time" @@ -10,12 +11,20 @@ import ( ) type mockNodeConn struct { - closed bool + closed bool + count, num int + err error } func (n *mockNodeConn) Write(*Message) error { return nil } -func (n *mockNodeConn) Read(*Message) error { return nil } -func (n *mockNodeConn) Flush() error { return nil } +func (n *mockNodeConn) Read(*Message) error { + if n.count == n.num { + return n.err + } + n.count++ + return nil +} +func (n *mockNodeConn) Flush() error { return nil } func (n *mockNodeConn) Close() error { n.closed = true return nil @@ -55,4 +64,31 @@ func TestPipe(t *testing.T) { time.Sleep(10 * time.Millisecond) assert.True(t, nc1.closed) assert.True(t, nc2.closed) + + const whenErrNum = 3 + nc3 := &mockNodeConn{} + nc3.num = whenErrNum + nc3.err = errors.New("some error") + ncp3 := NewNodeConnPipe(1, func() NodeConn { + return nc3 + }) + wg = &sync.WaitGroup{} + var msgs []*Message + for i := 0; i < 10; i++ { + m := getMsg() + m.WithRequest(&mockRequest{}) + m.WithWaitGroup(wg) + ncp3.Push(m) + msgs = append(msgs, m) + } + wg.Wait() + ncp3.Close() + time.Sleep(10 * time.Millisecond) + for i, msg := range msgs { + if i < whenErrNum { + assert.NoError(t, msg.Err()) + } else { + assert.EqualError(t, msg.Err(), "some error") + } + } }